diff --git a/memory_lib/__init__.py b/memory_lib/__init__.py index c7babfc00274337fe20518c585ccc14bcea19713..bf6ae896df9a13a83cf2eda8a2d4b81a5fef2201 100644 --- a/memory_lib/__init__.py +++ b/memory_lib/__init__.py @@ -1 +1 @@ -from .memory import MemoryABM, MemoryAruco, MemoryArucoFull +from .memory import MemoryABM, MemoryArucoHalf, MemoryArucoFull diff --git a/memory_lib/board.py b/memory_lib/board.py index d6d5b414e90f2a9f2daca77c13625e01f39a782d..70b4e7288b4f4b59a6535430145a419eeb9dba8e 100644 --- a/memory_lib/board.py +++ b/memory_lib/board.py @@ -1,552 +1,75 @@ -from pprint import pprint - -import cv2 as cv -import numpy as np - -from abc import ABC, abstractmethod +import sys from typing import Tuple, Optional, List, Callable -from .cv_utils import ( - draw_line, - draw_point, - average_color, - get_roi_around_point, - MotionDetector, -) -from .geometry import ( - get_segment_size, - line_eq_from_points, - get_parallel_eq, - get_line_intersection_point, - get_rect_center, - get_segment_center, - get_segement_point_at_dist, -) from .model import Board, Point, LineEq, Piece, Rect, PieceState from .piece_state import PiecesStatesTracker -class BoardFactory(ABC): - @abstractmethod - def get_board(self) -> Board: - ... - - -class ABMExtractor(ABC): - @abstractmethod - def get_a_b_m(self, img: np.ndarray) -> Optional[Tuple[Point, Point, Point]]: - ... - - -class PieceVisibilityDetector(ABC): - @abstractmethod - def is_visible(self, img, piece_pos: Point, piece_index) -> bool: - ... - - @abstractmethod - def train(self, board: Board, img: np.ndarray) -> None: - ... - - -class ABMBoardFactory(BoardFactory): - def __init__(self, a: Point, b: Point, m: Point, img: np.ndarray): - self.a = a - self.b = b - self.m = m - # TODO: no need img here, only for debug - self.img = img - - def get_board(self) -> Board: - """ Create a Board object from from A, B and M borad points """ - a = self.a - b = self.b - m = self.m - - # F is [AB] center - f = Point((a.x + b.x) // 2, (a.y + b.y) // 2) - ab_eq = line_eq_from_points(a, b) - - # TODO: check - # if ab_m == math.nan or ab_b == math.nan: - # return +class PieceTakenTrigger: + """Call 'trigger' function when after a piece is taken""" - # TODO: debug only - # draw_line(self.img, ab_eq) - - # FM line - fm_eq = line_eq_from_points(f, m) - - # TODO: check - # if fm_m == math.inf or fm_b == math.inf: - # return - - # TODO: debug only - # draw_line(self.img, fm_eq) - fm_s = get_segment_size(f, m) - - # Approximate E, based on FM size. - # First approximate DC line, based on FM size - dc_eq = get_parallel_eq(ab_eq, Point(0, int(a.y - fm_s * 1.5))) - - # TODO: debug only - # draw_line(self.img, dc_eq) - - # Then e is on DC, FM intersection - e = get_line_intersection_point(fm_eq, dc_eq) - - # Compute the 4 parallels: A1B1, A2B2, A3B3, A4B4 - a1, b1 = self._compute_parallel(a, b, ab_eq, e, f, 0.75, 1 * (fm_s / 10)) - a2, b2 = self._compute_parallel(a, b, ab_eq, e, f, 0.48, 3 * (fm_s / 10)) - a3, b3 = self._compute_parallel(a, b, ab_eq, e, f, 0.25, 4.5 * (fm_s / 10)) - a4, b4 = self._compute_parallel(a, b, ab_eq, e, f, 0.07, 6 * (fm_s / 10)) + def __init__(self, trigger: Callable[[int], None]): + self.trigger = trigger + self.averager = BoardStateAverager() + self.piece_state_tracker = PiecesStatesTracker(trigger) - # Compoute the pieces position from the above lines - pieces = ( - *self._compute_piece(a1, b1), - *self._compute_piece(a2, b2), - *self._compute_piece(a3, b3), - *self._compute_piece(a4, b4), - ) + def add_board(self, board: Board) -> Board: + new_board = self.averager.add_board(board) + self.piece_state_tracker.update(new_board) + return new_board - # TODO: debug only - debut_print = ( - (a, "A"), - (b, "B"), - (f, "F"), - (e, "E"), - (a1, "A1"), - (b1, "B1"), - (a2, "A2"), - (b2, "B2"), - (a3, "A3"), - (b3, "B3"), - (a4, "A4"), - (b4, "B4"), - ) - # for point, name in debut_print: - # draw_point(self.img, point, name, (0, 255, 0)) - # for idx, piece in enumerate(pieces): - # draw_point(img, piece, f"P{idx}", (0, 0, 255)) - # cv.imshow("IMG", img) - return Board(pieces) +class BoardStateAverager: + """Moving average for board states""" + def __init__(self, window_size: int = 15) -> None: + self.window_size = window_size + self.last_boards: List[Board] = [] - @staticmethod - def _compute_parallel( - a: Point, - b: Point, - ab_eq: LineEq, - e: Point, - f: Point, - height_reduce_ratio: float, - width_reduce_ratio: float, - ) -> Tuple[Point, Point]: - """ used to find A1B1, A2B2, A3B3, A4B4 from AB and EF""" - m1 = Point( - int(e.x + height_reduce_ratio * (f.x - e.x)), - int(e.y + height_reduce_ratio * (f.y - e.y)), - ) - a1b1_eq = get_parallel_eq(ab_eq, m1) - left_point = Point( - int(a.x + width_reduce_ratio), - int(a1b1_eq.m * (a.x + width_reduce_ratio) + a1b1_eq.b), - ) - right_point = Point( - int(b.x - width_reduce_ratio), - int(a1b1_eq.m * (b.x - width_reduce_ratio) + a1b1_eq.b), - ) - return left_point, right_point + def add_board(self, board: Board) -> Board: + if len(self.last_boards) > self.window_size: + self.last_boards.pop(0) + self.last_boards.append(board) + states_by_pieces = self._states_by_pieces(self.last_boards) + average_state_by_pieces = [self._average_state(states) for states in states_by_pieces] + out_board = Board.from_board(board) + for idx, _ in enumerate(out_board.pieces): + out_board.pieces[idx].state = average_state_by_pieces[idx] + return out_board @staticmethod - def _compute_piece(left_point: Point, right_point: Point) -> Tuple[Piece, ...]: - """ find the 4 pieces from a horizontal line """ - return ( - Piece( - Point( - int(left_point.x + 0.1 * (right_point.x - left_point.x)), - int(left_point.y + 0.1 * (right_point.y - left_point.y)), - ) - ), - Piece( - Point( - int(left_point.x + 0.35 * (right_point.x - left_point.x)), - int(left_point.y + 0.35 * (right_point.y - left_point.y)), - ) - ), - Piece( - Point( - int(left_point.x + 0.65 * (right_point.x - left_point.x)), - int(left_point.y + 0.65 * (right_point.y - left_point.y)), - ) - ), - Piece( - Point( - int(left_point.x + 0.9 * (right_point.x - left_point.x)), - int(left_point.y + 0.9 * (right_point.y - left_point.y)), - ) - ), - ) - - -class ArucoABMExtractor(ABMExtractor): - """Uses Aruco marker to extract the board A, B and M points - - Aruco marker ids: - 0 for point A - 1 for point B - 2 for point M - """ - - def __init__(self): - self.aruco_dict = cv.aruco.Dictionary_get(cv.aruco.DICT_4X4_50) - self.aruco_params = cv.aruco.DetectorParameters_create() - - def get_a_b_m(self, img: np.ndarray) -> Optional[Tuple[Point, Point, Point]]: - """ If all 3 markers are found, returns the 3 Points A, B and M """ - corners, ids, rejected = cv.aruco.detectMarkers( - img, self.aruco_dict, parameters=self.aruco_params - ) - - # TODO: debug - # cv.aruco.drawDetectedMarkers(img, corners) - - markers = {} - if ids is None: - return None - - for i in range(len(ids)): - rect_val = ((int(x), int(y)) for x, y in corners[i][0].tolist()) - rect = Rect.from_corners(*(Point.from_tuple(t) for t in rect_val)) - markers[ids[i][0]] = get_rect_center(rect) - - a = markers.get(0) - b = markers.get(1) - m = markers.get(2) - - if a and b and m and (a != b): - return a, b, m - return None - - -class LightnessPieceVisibilityDetector(PieceVisibilityDetector): - """ Primitive detector based on lightness of average pixels around the piece""" - - def __init__(self, threshold=80): - self.threshold = threshold - - def is_visible(self, img: np.ndarray, piece_pos: Point, piece_index) -> bool: - roi = get_roi_around_point(img, piece_pos, 3) - if roi.shape != (6, 6, 3): - return False - avg_color = average_color(roi) - return avg_color[2] > self.threshold - - def train(self, board: Board, img: np.ndarray) -> None: - ... - - -class AverageColorPieceVisibilityDetector(PieceVisibilityDetector): - """ Piece detector based on average pixels color around the piece""" - - def __init__(self): - self.pieces_colors: List[Tuple[int, int, int]] = [] - self.board_color: Tuple[int, int, int] = (0, 0, 0) - - def train(self, board: Board, img: np.ndarray) -> None: - """ Store an avraged color value for every pieces in the board """ - if not self.pieces_colors: - self.pieces_colors = [ - average_color(get_roi_around_point(img, piece.postion)) - for piece in board.pieces - ] - # get the socle color from a point below M - middle = get_segment_center( - board.pieces[5].postion, board.pieces[6].postion - ) - self.board_color = average_color(get_roi_around_point(img, middle)) - else: - for idx, piece in enumerate(board.pieces): - c1 = self.pieces_colors[idx] - c2 = average_color(get_roi_around_point(img, piece.postion)) - self.pieces_colors[idx] = self._avg_colors(c1, c2) - middle = get_segment_center( - board.pieces[5].postion, board.pieces[6].postion - ) - self.board_color = self._avg_colors( - self.board_color, average_color(get_roi_around_point(img, middle)) - ) - - def is_visible(self, img: np.ndarray, piece_pos: Point, piece_index: int) -> bool: - # naive color distance detection, TODO: uses device independent color spaces? in_range? - - # Test if piece color is far from piece and close to board_color - c1 = self.pieces_colors[piece_index] - c2 = average_color(get_roi_around_point(img, piece_pos)) - m1 = self.board_color - m2 = average_color(get_roi_around_point(img, piece_pos)) - return ( - abs(m1[0] - m2[0]) < 100 - and abs(m1[1] - m2[1]) < 100 - and abs(m1[2] - m2[2]) < 100 - ) and ( - abs(c1[0] - c2[0]) > 60 - or abs(c1[1] - c2[1]) > 60 - or abs(c1[2] - c2[2]) > 60 - ) + def _average_state(states: List[PieceState]) -> PieceState: + """Get the most present state in a list""" + u = states.count(PieceState.UNKNOWN), PieceState.UNKNOWN + on = states.count(PieceState.ON_BOARD), PieceState.ON_BOARD + off = states.count(PieceState.OFF_BOARD), PieceState.OFF_BOARD + return max(u, on, off, key=lambda t: t[0])[1] @staticmethod - def _avg_colors( - c1: Tuple[int, int, int], c2: Tuple[int, int, int] - ) -> Tuple[int, int, int]: - return (c1[0] + c2[0]) // 2, (c1[1] + c2[1]) // 2, (c1[2] + c2[2]) // 2 - - -class AverageColorAndMovementPieceVisibilityDetector(PieceVisibilityDetector): - """ Piece detector based on average pixels color around the piece, compute only no moving objects""" + def _states_by_pieces(boards: List[Board]) -> list[list[PieceState]]: + # TODO: Improve perf by storing directly the states, not the boards in the fifo + """For every pieces, get the last X states - def __init__(self, same_frame_counter_max: int = 5): - """ - :param same_frame_counter_max: how many different frames before considering a movement + res = [ + [...state_at_t-3, state_at_t-2, state_at_t-1], # piece 0 + [...state_at_t-3, state_at_t-2, state_at_t-1], # piece 1 + ... + ] """ - self.average_color_detector = AverageColorPieceVisibilityDetector() - self.same_frame_counter_max = same_frame_counter_max - self.motion_detectors = [MotionDetector() for _ in range(16)] - - def train(self, board: Board, img: np.ndarray) -> None: - self.average_color_detector.train(board, img) - - def is_visible(self, img: np.ndarray, piece_pos: Point, piece_index: int) -> bool: - roi = cv.cvtColor( - get_roi_around_point(img, piece_pos, radius=10), cv.COLOR_BGR2GRAY - ) - - # TODO: debug only - # if piece_index == 5: - # print(self.motion_detectors[piece_index].same_frame_counter) - # cv.imshow("roi", roi) - # if (self.motion_detectors[piece_index]._last_frame is not None): - # cv.imshow("last", self.motion_detectors[piece_index]._last_frame) - - self.motion_detectors[piece_index].add(roi) - if ( - self.motion_detectors[piece_index].same_frame_counter - > self.same_frame_counter_max - ): - return self.average_color_detector.is_visible(img, piece_pos, piece_index) - return False - - -class ArucoFullPieceVisibilityDetector: - """ Piece detector based on Aruco markers above and below every piece - - TODO: - for now this detector does not work as a PieceVisibilityDetector - This detector directly send a board with updated visibility - """ - - MARKER_ON_BOARD_ID = 930 - MARKER_OFF_BOARD_ID = 190 - - def __init__(self): - self.aruco_dict = cv.aruco.Dictionary_get(cv.aruco.DICT_4X4_1000) - self.aruco_params = cv.aruco.DetectorParameters_create() - self.ready = False - self.top_left: Point = Point(-1, -1) - self.top_right: Point = Point(-1, -1) - self.bottom_left: Point = Point(-1, -1) - self.bottom_right: Point = Point(-1, -1) - self.board = None - - def train(self, img: np.ndarray) -> None: - """ Store the position of the 4 markers below the the 4 pieces in the corners""" - corners, ids, rejected = cv.aruco.detectMarkers( - img, self.aruco_dict, parameters=self.aruco_params - ) - - if ids is not None: - all_markers = [] - for idx, marker_id in enumerate(ids): - if marker_id in (self.MARKER_ON_BOARD_ID, self.MARKER_OFF_BOARD_ID): - r = ((int(x), int(y)) for x, y in corners[idx][0].tolist()) - rect = Rect.from_corners(*(Point.from_tuple(t) for t in r)) - center = get_rect_center(rect) - all_markers.append(center) - - if len(all_markers) == 16: - vertical_sort = sorted(all_markers, key=lambda point: point.y) - top_markers = vertical_sort[:4] - bottom_markers = vertical_sort[-4:] - self.top_left = min(top_markers, key=lambda point: point.x) - self.top_right = max(top_markers, key=lambda point: point.x) - self.bottom_left = min(bottom_markers, key=lambda point: point.x) - self.bottom_right = max(bottom_markers, key=lambda point: point.x) - self.ready = True - - def get_board_with_visibility(self, img: np.ndarray) -> Board: - """Compute a new board from the 4 corners stored during training, with all pieces visibility""" - board = self._board_from_corners() - - corners, ids, rejected = cv.aruco.detectMarkers( - img, self.aruco_dict, parameters=self.aruco_params - ) - - # DEBUG - cv.aruco.drawDetectedMarkers(img, corners) - - if ids is not None: - all_markers = [] - for idx, marker_ids in enumerate(ids): - r = ((int(x), int(y)) for x, y in corners[idx][0].tolist()) - rect = Rect.from_corners(*(Point.from_tuple(t) for t in r)) - center = get_rect_center(rect) - all_markers.append((marker_ids[0], center)) - - for marker_id, center in all_markers: - closest_piece_idx = np.argmin( - [get_segment_size(center, piece.postion) for piece in board.pieces] - ) - if marker_id == self.MARKER_ON_BOARD_ID: - board.pieces[closest_piece_idx].state = PieceState.ON_BOARD - elif marker_id == self.MARKER_OFF_BOARD_ID: - board.pieces[closest_piece_idx].state = PieceState.OFF_BOARD - else: - board.pieces[closest_piece_idx].state = PieceState.UNKNOWN - - return board - - def _board_from_corners(self) -> Board: - left_pieces = self._four_pieces_from_two_ends_vertical(self.bottom_left, self.top_left) - right_pieces = self._four_pieces_from_two_ends_vertical(self.bottom_right, self.top_right) - row_0 = self._four_pieces_from_two_ends_horizontal(left_pieces[0].postion, right_pieces[0].postion) - row_1 = self._four_pieces_from_two_ends_horizontal(left_pieces[1].postion, right_pieces[1].postion) - row_2 = self._four_pieces_from_two_ends_horizontal(left_pieces[2].postion, right_pieces[2].postion) - row_3 = self._four_pieces_from_two_ends_horizontal(left_pieces[3].postion, right_pieces[3].postion) - return Board(row_0 + row_1 + row_2 + row_3) - - @staticmethod - def _four_pieces_from_two_ends_vertical(p1: Point, p2: Point) -> Tuple[Piece, ...]: - """Get 4 pieces from 2 ends pieces""" - return ( - Piece(p1), - Piece(get_segement_point_at_dist(p1, p2, 0.45)), - Piece(get_segement_point_at_dist(p2, p1, 0.20)), - Piece(p2), - ) - - @staticmethod - def _four_pieces_from_two_ends_horizontal(p1: Point, p2: Point) -> Tuple[Piece, ...]: - """Get 4 pieces from 2 ends pieces""" - return ( - Piece(p1), - Piece(get_segement_point_at_dist(p1, p2, 0.33)), - Piece(get_segement_point_at_dist(p2, p1, 0.33)), - Piece(p2), - ) - - -class ArucoPieceVisibilityDetector: - """ Piece detector based on Aruco marker below every piece - - TODO: - for now this detector does not work as a PieceVisibilityDetector - This detector directly send a board with updated visibility - """ - - def __init__(self): - self.aruco_dict = cv.aruco.Dictionary_get(cv.aruco.DICT_4X4_250) - self.aruco_params = cv.aruco.DetectorParameters_create() - self.top_left: Point = Point(-1, -1) - self.top_right: Point = Point(-1, -1) - self.bottom_left: Point = Point(-1, -1) - self.bottom_right: Point = Point(-1, -1) - - def train(self, img: np.ndarray) -> None: - """ Store the position of the 4 markers below the the 4 pieces in the corners""" - corners, ids, rejected = cv.aruco.detectMarkers( - img, self.aruco_dict, parameters=self.aruco_params - ) - - if ids is not None: - all_markers = [] - for i in range(len(ids)): - r = ((int(x), int(y)) for x, y in corners[i][0].tolist()) - rect = Rect.from_corners(*(Point.from_tuple(t) for t in r)) - center = get_rect_center(rect) - all_markers.append(center) - - # we have the 4 corners - if len(all_markers) == 4: - vertical_sort = sorted(all_markers, key=lambda point: point.y) - top_markers = vertical_sort[:2] - bottom_markers = vertical_sort[2:] - self.top_left = min(top_markers, key=lambda point: point.x) - self.top_right = max(top_markers, key=lambda point: point.x) - self.bottom_left = min(bottom_markers, key=lambda point: point.x) - self.bottom_right = max(bottom_markers, key=lambda point: point.x) - - def get_board_with_visibility(self, img: np.ndarray) -> Board: - """Compute a new board from the 4 corners stored during training, with all pieces visibility""" - board = self._board_from_corners() - - corners, ids, rejected = cv.aruco.detectMarkers( - img, self.aruco_dict, parameters=self.aruco_params - ) - - if ids is not None: - all_markers = [] - for i in range(len(ids)): - r = ((int(x), int(y)) for x, y in corners[i][0].tolist()) - rect = Rect.from_corners(*(Point.from_tuple(t) for t in r)) - center = get_rect_center(rect) - all_markers.append(center) - - for marker in all_markers: - closest_piece_idx = np.argmin( - [get_segment_size(marker, piece.postion) for piece in board.pieces] - ) - board.pieces[closest_piece_idx].is_visible = True - - return board - - def _board_from_corners(self) -> Board: - left_pieces = self._four_pieces_from_two_ends_vertical(self.bottom_left, self.top_left) - right_pieces = self._four_pieces_from_two_ends_vertical(self.bottom_right, self.top_right) - row_0 = self._four_pieces_from_two_ends_horizontal(left_pieces[0].postion, right_pieces[0].postion) - row_1 = self._four_pieces_from_two_ends_horizontal(left_pieces[1].postion, right_pieces[1].postion) - row_2 = self._four_pieces_from_two_ends_horizontal(left_pieces[2].postion, right_pieces[2].postion) - row_3 = self._four_pieces_from_two_ends_horizontal(left_pieces[3].postion, right_pieces[3].postion) - return Board(row_0 + row_1 + row_2 + row_3) - - @staticmethod - def _four_pieces_from_two_ends_vertical(p1: Point, p2: Point) -> Tuple[Piece, ...]: - """Get 4 pieces from 2 ends pieces""" - return ( - Piece(p1), - Piece(get_segement_point_at_dist(p1, p2, 0.45)), - Piece(get_segement_point_at_dist(p2, p1, 0.20)), - Piece(p2), - ) - - @staticmethod - def _four_pieces_from_two_ends_horizontal(p1: Point, p2: Point) -> Tuple[Piece, ...]: - """Get 4 pieces from 2 ends pieces""" - return ( - Piece(p1), - Piece(get_segement_point_at_dist(p1, p2, 0.33)), - Piece(get_segement_point_at_dist(p2, p1, 0.33)), - Piece(p2), - ) + boards_pieces = [board.pieces for board in boards] + transposed = [elem for elem in zip(*boards_pieces)] + res = [] + for last_pieces_for_one_piece in transposed: + construct = [] + for piece in last_pieces_for_one_piece: + construct.append(piece.state) + res.append(construct) + return res class PieceTakenDetectionEdgesTrigger: - """Call 'trigger' function when after a piece is taken - - It waits for 'rising_count' times the same piece detection state before triggering when we take a piece - It waits for 'falling_count' times the same piece detection state before triggering when we put back a piece - - TODO: replace this average impl with a counter to increase perf - """ + """ WARNING: deprectated: replaced with PieceTakenTrigger""" def __init__(self, trigger: Callable[[int], None], rising_count: int = 5, falling_count: int = 5): + print(self.__doc__, file=sys.stderr) self.trigger = trigger self.rising_count = rising_count self.falling_count = falling_count @@ -590,62 +113,3 @@ class PieceTakenDetectionEdgesTrigger: return next_out_board -class PieceTakenDetectionEdgesTriggerTestOnArucoFull: - """Call 'trigger' function when after a piece is taken - """ - - def __init__(self, trigger: Callable[[int], None]): - self.trigger = trigger - self.averager = BoardAverager() - self.piece_state_tracker = PiecesStatesTracker(trigger) - - def add_board(self, board: Board) -> Board: - new_board = self.averager.add_board(board) - self.piece_state_tracker.update(new_board) - return new_board - - -class BoardAverager: - def __init__(self, window_size: int = 15) -> None: - self.window_size = window_size - self.last_boards: List[Board] = [] - - def add_board(self, board: Board) -> Board: - if len(self.last_boards) > self.window_size: - self.last_boards.pop(0) - self.last_boards.append(board) - states_by_pieces = self._states_by_pieces(self.last_boards) - average_state_by_pieces = [self._average_state(states) for states in states_by_pieces] - out_board = Board.from_board(board) - for idx, _ in enumerate(out_board.pieces): - out_board.pieces[idx].state = average_state_by_pieces[idx] - return out_board - - @staticmethod - def _average_state(states: List[PieceState]) -> PieceState: - """Get the most present state in a list""" - u = states.count(PieceState.UNKNOWN), PieceState.UNKNOWN - on = states.count(PieceState.ON_BOARD), PieceState.ON_BOARD - off = states.count(PieceState.OFF_BOARD), PieceState.OFF_BOARD - return max(u, on, off, key=lambda t: t[0])[1] - - @staticmethod - def _states_by_pieces(boards: List[Board]) -> list[list[PieceState]]: - # TODO: Improve perf by storing directly the states, not the boards in the fifo - """For every pieces, get the last X states - - res = [ - [...state_at_t-3, state_at_t-2, state_at_t-1], # piece 0 - [...state_at_t-3, state_at_t-2, state_at_t-1], # piece 1 - ... - ] - """ - boards_pieces = [board.pieces for board in boards] - transposed = [elem for elem in zip(*boards_pieces)] - res = [] - for last_pieces_for_one_piece in transposed: - construct = [] - for piece in last_pieces_for_one_piece: - construct.append(piece.state) - res.append(construct) - return res diff --git a/memory_lib/cv_utils.py b/memory_lib/cv_utils.py index afcf5fd418d10feaf91af46a02222f41a99f6012..b6ec89386b457acd5b3c10610d549d12b655642f 100644 --- a/memory_lib/cv_utils.py +++ b/memory_lib/cv_utils.py @@ -93,32 +93,3 @@ def draw_rect(img: np.ndarray, rect: Rect, color=(255, 0, 0)): """ Draw rectangle rect on img """ x, y, w, h = rect.to_tuple() cv.rectangle(img, (x, y), (x + w, y + h), color) - - -class MotionDetector: - """ Compute how many frames are the same """ - - def __init__(self, pixels_count_to_diff: int = 3): - """ - :param pixels_count_to_diff: When you diff to frame, how many pixels can change to consider same/different frame - """ - self.pixels_count_to_diff = pixels_count_to_diff - self._same_frame_counter = 0 - self._last_frame = None - self._dbg_last_frame_img = None - - def add(self, img: np.ndarray) -> None: - self._dbg_last_frame_img = img.copy() - - canny = cv.Canny(img, 0, 150) - if self._last_frame is not None: - diff = cv.absdiff(canny, self._last_frame) - if cv.countNonZero(diff) < self.pixels_count_to_diff: - self._same_frame_counter += 1 - else: - self._same_frame_counter = 0 - self._last_frame = canny.copy() - - @property - def same_frame_counter(self): - return self._same_frame_counter diff --git a/memory_lib/detectors/board_detectors.py b/memory_lib/detectors/board_detectors.py new file mode 100644 index 0000000000000000000000000000000000000000..f09ab23e7cfad75e093ea1bfd08d23129624ff36 --- /dev/null +++ b/memory_lib/detectors/board_detectors.py @@ -0,0 +1,54 @@ +import cv2 as cv +import numpy as np + +from abc import ABC, abstractmethod +from typing import Tuple, Optional + +from memory_lib.geometry import get_rect_center +from memory_lib.model import Point, Rect + + +class ABMExtractor(ABC): + @abstractmethod + def get_a_b_m(self, img: np.ndarray) -> Optional[Tuple[Point, Point, Point]]: + ... + + +class ArucoABMExtractor(ABMExtractor): + """Uses Aruco marker to extract the board A, B and M points + + Aruco marker ids: + 0 for point A + 1 for point B + 2 for point M + """ + + def __init__(self): + self.aruco_dict = cv.aruco.Dictionary_get(cv.aruco.DICT_4X4_50) + self.aruco_params = cv.aruco.DetectorParameters_create() + + def get_a_b_m(self, img: np.ndarray) -> Optional[Tuple[Point, Point, Point]]: + """ If all 3 markers are found, returns the 3 Points A, B and M """ + corners, ids, rejected = cv.aruco.detectMarkers( + img, self.aruco_dict, parameters=self.aruco_params + ) + + # TODO: debug + # cv.aruco.drawDetectedMarkers(img, corners) + + markers = {} + if ids is None: + return None + + for i in range(len(ids)): + rect_val = ((int(x), int(y)) for x, y in corners[i][0].tolist()) + rect = Rect.from_corners(*(Point.from_tuple(t) for t in rect_val)) + markers[ids[i][0]] = get_rect_center(rect) + + a = markers.get(0) + b = markers.get(1) + m = markers.get(2) + + if a and b and m and (a != b): + return a, b, m + return None diff --git a/memory_lib/detectors/board_factory.py b/memory_lib/detectors/board_factory.py new file mode 100644 index 0000000000000000000000000000000000000000..fc4ca5e50529a955afeeb8b170827d4b8a1656f3 --- /dev/null +++ b/memory_lib/detectors/board_factory.py @@ -0,0 +1,153 @@ +from abc import ABC, abstractmethod +from typing import Tuple + +import numpy as np + +from memory_lib.geometry import line_eq_from_points, get_segment_size, get_parallel_eq, get_line_intersection_point +from memory_lib.model import Board, Point, LineEq, Piece + + +class BoardFactory(ABC): + @abstractmethod + def get_board(self) -> Board: + ... + + +class ABMBoardFactory(BoardFactory): + def __init__(self, a: Point, b: Point, m: Point, img: np.ndarray): + self.a = a + self.b = b + self.m = m + # TODO: no need img here, only for debug + self.img = img + + def get_board(self) -> Board: + """ Create a Board object from from A, B and M borad points """ + a = self.a + b = self.b + m = self.m + + # F is [AB] center + f = Point((a.x + b.x) // 2, (a.y + b.y) // 2) + ab_eq = line_eq_from_points(a, b) + + # TODO: check + # if ab_m == math.nan or ab_b == math.nan: + # return + + # TODO: debug only + # draw_line(self.img, ab_eq) + + # FM line + fm_eq = line_eq_from_points(f, m) + + # TODO: check + # if fm_m == math.inf or fm_b == math.inf: + # return + + # TODO: debug only + # draw_line(self.img, fm_eq) + fm_s = get_segment_size(f, m) + + # Approximate E, based on FM size. + # First approximate DC line, based on FM size + dc_eq = get_parallel_eq(ab_eq, Point(0, int(a.y - fm_s * 1.5))) + + # TODO: debug only + # draw_line(self.img, dc_eq) + + # Then e is on DC, FM intersection + e = get_line_intersection_point(fm_eq, dc_eq) + + # Compute the 4 parallels: A1B1, A2B2, A3B3, A4B4 + a1, b1 = self._compute_parallel(a, b, ab_eq, e, f, 0.75, 1 * (fm_s / 10)) + a2, b2 = self._compute_parallel(a, b, ab_eq, e, f, 0.48, 3 * (fm_s / 10)) + a3, b3 = self._compute_parallel(a, b, ab_eq, e, f, 0.25, 4.5 * (fm_s / 10)) + a4, b4 = self._compute_parallel(a, b, ab_eq, e, f, 0.07, 6 * (fm_s / 10)) + + # Compoute the pieces position from the above lines + pieces = ( + *self._compute_piece(a1, b1), + *self._compute_piece(a2, b2), + *self._compute_piece(a3, b3), + *self._compute_piece(a4, b4), + ) + + # TODO: debug only + debut_print = ( + (a, "A"), + (b, "B"), + (f, "F"), + (e, "E"), + (a1, "A1"), + (b1, "B1"), + (a2, "A2"), + (b2, "B2"), + (a3, "A3"), + (b3, "B3"), + (a4, "A4"), + (b4, "B4"), + ) + # for point, name in debut_print: + # draw_point(self.img, point, name, (0, 255, 0)) + # for idx, piece in enumerate(pieces): + # draw_point(img, piece, f"P{idx}", (0, 0, 255)) + # cv.imshow("IMG", img) + + return Board(pieces) + + @staticmethod + def _compute_parallel( + a: Point, + b: Point, + ab_eq: LineEq, + e: Point, + f: Point, + height_reduce_ratio: float, + width_reduce_ratio: float, + ) -> Tuple[Point, Point]: + """ used to find A1B1, A2B2, A3B3, A4B4 from AB and EF""" + m1 = Point( + int(e.x + height_reduce_ratio * (f.x - e.x)), + int(e.y + height_reduce_ratio * (f.y - e.y)), + ) + a1b1_eq = get_parallel_eq(ab_eq, m1) + left_point = Point( + int(a.x + width_reduce_ratio), + int(a1b1_eq.m * (a.x + width_reduce_ratio) + a1b1_eq.b), + ) + right_point = Point( + int(b.x - width_reduce_ratio), + int(a1b1_eq.m * (b.x - width_reduce_ratio) + a1b1_eq.b), + ) + return left_point, right_point + + @staticmethod + def _compute_piece(left_point: Point, right_point: Point) -> Tuple[Piece, ...]: + """ find the 4 pieces from a horizontal line """ + return ( + Piece( + Point( + int(left_point.x + 0.1 * (right_point.x - left_point.x)), + int(left_point.y + 0.1 * (right_point.y - left_point.y)), + ) + ), + Piece( + Point( + int(left_point.x + 0.35 * (right_point.x - left_point.x)), + int(left_point.y + 0.35 * (right_point.y - left_point.y)), + ) + ), + Piece( + Point( + int(left_point.x + 0.65 * (right_point.x - left_point.x)), + int(left_point.y + 0.65 * (right_point.y - left_point.y)), + ) + ), + Piece( + Point( + int(left_point.x + 0.9 * (right_point.x - left_point.x)), + int(left_point.y + 0.9 * (right_point.y - left_point.y)), + ) + ), + ) diff --git a/memory_lib/detectors/misc_detectors.py b/memory_lib/detectors/misc_detectors.py new file mode 100644 index 0000000000000000000000000000000000000000..b017bed0115c8a3d4a0b4bb14ef464d26cc4faf8 --- /dev/null +++ b/memory_lib/detectors/misc_detectors.py @@ -0,0 +1,31 @@ +import cv2 as cv +import numpy as np + + +class MotionDetector: + """ Compute how many frames are the same """ + + def __init__(self, pixels_count_to_diff: int = 3): + """ + :param pixels_count_to_diff: When you diff to frame, how many pixels can change to consider same/different frame + """ + self.pixels_count_to_diff = pixels_count_to_diff + self._same_frame_counter = 0 + self._last_frame = None + self._dbg_last_frame_img = None + + def add(self, img: np.ndarray) -> None: + self._dbg_last_frame_img = img.copy() + + canny = cv.Canny(img, 0, 150) + if self._last_frame is not None: + diff = cv.absdiff(canny, self._last_frame) + if cv.countNonZero(diff) < self.pixels_count_to_diff: + self._same_frame_counter += 1 + else: + self._same_frame_counter = 0 + self._last_frame = canny.copy() + + @property + def same_frame_counter(self): + return self._same_frame_counter diff --git a/memory_lib/detectors/piece_state_detectors.py b/memory_lib/detectors/piece_state_detectors.py new file mode 100644 index 0000000000000000000000000000000000000000..a2855dd28ba382705765d427c56687c21566b43c --- /dev/null +++ b/memory_lib/detectors/piece_state_detectors.py @@ -0,0 +1,340 @@ +from abc import ABC, abstractmethod +from typing import List, Tuple +import cv2 as cv + +import numpy as np + +from memory_lib.cv_utils import get_roi_around_point, average_color +from memory_lib.detectors.misc_detectors import MotionDetector +from memory_lib.geometry import get_segment_center, get_rect_center, get_segement_point_at_dist, get_segment_size +from memory_lib.model import Point, Board, Rect, Piece, PieceState + + +class PieceStateDetector(ABC): + """ Use to extract if a piece is visible or not""" + @abstractmethod + def is_visible(self, img, piece_pos: Point, piece_index) -> bool: + ... + + @abstractmethod + def train(self, board: Board, img: np.ndarray) -> None: + ... + + +class PieceStateExtractor(ABC): + """ Use to get a board with updated pieces states""" + @abstractmethod + def get_board_with_visibility(self, img: np.ndarray) -> Board: + ... + + @abstractmethod + def train(self, img: np.ndarray) -> None: + ... + + +class LightnessPieceStateDetector(PieceStateDetector): + """ Primitive detector based on lightness of average pixels around the piece""" + + def __init__(self, threshold=80): + self.threshold = threshold + + def is_visible(self, img: np.ndarray, piece_pos: Point, piece_index) -> bool: + roi = get_roi_around_point(img, piece_pos, 3) + if roi.shape != (6, 6, 3): + return False + avg_color = average_color(roi) + return avg_color[2] > self.threshold + + def train(self, board: Board, img: np.ndarray) -> None: + ... + + +class AverageColorPieceStateDetector(PieceStateDetector): + """ Piece detector based on average pixels color around the piece""" + + def __init__(self): + self.pieces_colors: List[Tuple[int, int, int]] = [] + self.board_color: Tuple[int, int, int] = (0, 0, 0) + + def train(self, board: Board, img: np.ndarray) -> None: + """ Store an avraged color value for every pieces in the board """ + if not self.pieces_colors: + self.pieces_colors = [ + average_color(get_roi_around_point(img, piece.postion)) + for piece in board.pieces + ] + # get the socle color from a point below M + middle = get_segment_center( + board.pieces[5].postion, board.pieces[6].postion + ) + self.board_color = average_color(get_roi_around_point(img, middle)) + else: + for idx, piece in enumerate(board.pieces): + c1 = self.pieces_colors[idx] + c2 = average_color(get_roi_around_point(img, piece.postion)) + self.pieces_colors[idx] = self._avg_colors(c1, c2) + middle = get_segment_center( + board.pieces[5].postion, board.pieces[6].postion + ) + self.board_color = self._avg_colors( + self.board_color, average_color(get_roi_around_point(img, middle)) + ) + + def is_visible(self, img: np.ndarray, piece_pos: Point, piece_index: int) -> bool: + # naive color distance detection, TODO: uses device independent color spaces? in_range? + + # Test if piece color is far from piece and close to board_color + c1 = self.pieces_colors[piece_index] + c2 = average_color(get_roi_around_point(img, piece_pos)) + m1 = self.board_color + m2 = average_color(get_roi_around_point(img, piece_pos)) + return ( + abs(m1[0] - m2[0]) < 100 + and abs(m1[1] - m2[1]) < 100 + and abs(m1[2] - m2[2]) < 100 + ) and ( + abs(c1[0] - c2[0]) > 60 + or abs(c1[1] - c2[1]) > 60 + or abs(c1[2] - c2[2]) > 60 + ) + + @staticmethod + def _avg_colors( + c1: Tuple[int, int, int], c2: Tuple[int, int, int] + ) -> Tuple[int, int, int]: + return (c1[0] + c2[0]) // 2, (c1[1] + c2[1]) // 2, (c1[2] + c2[2]) // 2 + + +class AverageColorAndMovementPieceVisibilityDetector(PieceStateDetector): + """ Piece detector based on average pixels color around the piece, compute only no moving objects""" + + def __init__(self, same_frame_counter_max: int = 5): + """ + :param same_frame_counter_max: how many different frames before considering a movement + """ + self.average_color_detector = AverageColorPieceStateDetector() + self.same_frame_counter_max = same_frame_counter_max + self.motion_detectors = [MotionDetector() for _ in range(16)] + + def train(self, board: Board, img: np.ndarray) -> None: + self.average_color_detector.train(board, img) + + def is_visible(self, img: np.ndarray, piece_pos: Point, piece_index: int) -> bool: + roi = cv.cvtColor( + get_roi_around_point(img, piece_pos, radius=10), cv.COLOR_BGR2GRAY + ) + + # TODO: debug only + # if piece_index == 5: + # print(self.motion_detectors[piece_index].same_frame_counter) + # cv.imshow("roi", roi) + # if (self.motion_detectors[piece_index]._last_frame is not None): + # cv.imshow("last", self.motion_detectors[piece_index]._last_frame) + + self.motion_detectors[piece_index].add(roi) + if ( + self.motion_detectors[piece_index].same_frame_counter + > self.same_frame_counter_max + ): + return self.average_color_detector.is_visible(img, piece_pos, piece_index) + return False + + +class ArucoHalfPieceStateExtractor(PieceStateExtractor): + """ Piece detector based on Aruco marker below every piece + + TODO: + for now this detector does not work as a PieceVisibilityDetector + This detector directly send a board with updated visibility + """ + + def __init__(self): + self.aruco_dict = cv.aruco.Dictionary_get(cv.aruco.DICT_4X4_250) + self.aruco_params = cv.aruco.DetectorParameters_create() + self.top_left: Point = Point(-1, -1) + self.top_right: Point = Point(-1, -1) + self.bottom_left: Point = Point(-1, -1) + self.bottom_right: Point = Point(-1, -1) + + def train(self, img: np.ndarray) -> None: + """ Store the position of the 4 markers below the the 4 pieces in the corners""" + corners, ids, rejected = cv.aruco.detectMarkers( + img, self.aruco_dict, parameters=self.aruco_params + ) + + if ids is not None: + all_markers = [] + for i in range(len(ids)): + r = ((int(x), int(y)) for x, y in corners[i][0].tolist()) + rect = Rect.from_corners(*(Point.from_tuple(t) for t in r)) + center = get_rect_center(rect) + all_markers.append(center) + + # we have the 4 corners + if len(all_markers) == 4: + vertical_sort = sorted(all_markers, key=lambda point: point.y) + top_markers = vertical_sort[:2] + bottom_markers = vertical_sort[2:] + self.top_left = min(top_markers, key=lambda point: point.x) + self.top_right = max(top_markers, key=lambda point: point.x) + self.bottom_left = min(bottom_markers, key=lambda point: point.x) + self.bottom_right = max(bottom_markers, key=lambda point: point.x) + + def get_board_with_visibility(self, img: np.ndarray) -> Board: + """Compute a new board from the 4 corners stored during training, with all pieces visibility""" + board = self._board_from_corners() + + corners, ids, rejected = cv.aruco.detectMarkers( + img, self.aruco_dict, parameters=self.aruco_params + ) + + if ids is not None: + all_markers = [] + for i in range(len(ids)): + r = ((int(x), int(y)) for x, y in corners[i][0].tolist()) + rect = Rect.from_corners(*(Point.from_tuple(t) for t in r)) + center = get_rect_center(rect) + all_markers.append(center) + + for marker in all_markers: + closest_piece_idx = np.argmin( + [get_segment_size(marker, piece.postion) for piece in board.pieces] + ) + board.pieces[closest_piece_idx].is_visible = True + + return board + + def _board_from_corners(self) -> Board: + left_pieces = self._four_pieces_from_two_ends_vertical(self.bottom_left, self.top_left) + right_pieces = self._four_pieces_from_two_ends_vertical(self.bottom_right, self.top_right) + row_0 = self._four_pieces_from_two_ends_horizontal(left_pieces[0].postion, right_pieces[0].postion) + row_1 = self._four_pieces_from_two_ends_horizontal(left_pieces[1].postion, right_pieces[1].postion) + row_2 = self._four_pieces_from_two_ends_horizontal(left_pieces[2].postion, right_pieces[2].postion) + row_3 = self._four_pieces_from_two_ends_horizontal(left_pieces[3].postion, right_pieces[3].postion) + return Board(row_0 + row_1 + row_2 + row_3) + + @staticmethod + def _four_pieces_from_two_ends_vertical(p1: Point, p2: Point) -> Tuple[Piece, ...]: + """Get 4 pieces from 2 ends pieces""" + return ( + Piece(p1), + Piece(get_segement_point_at_dist(p1, p2, 0.45)), + Piece(get_segement_point_at_dist(p2, p1, 0.20)), + Piece(p2), + ) + + @staticmethod + def _four_pieces_from_two_ends_horizontal(p1: Point, p2: Point) -> Tuple[Piece, ...]: + """Get 4 pieces from 2 ends pieces""" + return ( + Piece(p1), + Piece(get_segement_point_at_dist(p1, p2, 0.33)), + Piece(get_segement_point_at_dist(p2, p1, 0.33)), + Piece(p2), + ) + + +class ArucoFullPieceStateExtractor(PieceStateExtractor): + """ Piece detector based on Aruco markers above and below every piece""" + + MARKER_ON_BOARD_ID = 930 + MARKER_OFF_BOARD_ID = 190 + + def __init__(self): + self.aruco_dict = cv.aruco.Dictionary_get(cv.aruco.DICT_4X4_1000) + self.aruco_params = cv.aruco.DetectorParameters_create() + + # TODO: to PieceStateExtractor property + self.ready = False + + self.top_left: Point = Point(-1, -1) + self.top_right: Point = Point(-1, -1) + self.bottom_left: Point = Point(-1, -1) + self.bottom_right: Point = Point(-1, -1) + self.board = None + + def train(self, img: np.ndarray) -> None: + """ Store the position of the 4 markers below the the 4 pieces in the corners""" + corners, ids, rejected = cv.aruco.detectMarkers( + img, self.aruco_dict, parameters=self.aruco_params + ) + + if ids is not None: + all_markers = [] + for idx, marker_id in enumerate(ids): + if marker_id in (self.MARKER_ON_BOARD_ID, self.MARKER_OFF_BOARD_ID): + r = ((int(x), int(y)) for x, y in corners[idx][0].tolist()) + rect = Rect.from_corners(*(Point.from_tuple(t) for t in r)) + center = get_rect_center(rect) + all_markers.append(center) + + if len(all_markers) == 16: + vertical_sort = sorted(all_markers, key=lambda point: point.y) + top_markers = vertical_sort[:4] + bottom_markers = vertical_sort[-4:] + self.top_left = min(top_markers, key=lambda point: point.x) + self.top_right = max(top_markers, key=lambda point: point.x) + self.bottom_left = min(bottom_markers, key=lambda point: point.x) + self.bottom_right = max(bottom_markers, key=lambda point: point.x) + self.ready = True + + def get_board_with_visibility(self, img: np.ndarray) -> Board: + """Compute a new board from the 4 corners stored during training, with all pieces visibility""" + board = self._board_from_corners() + + corners, ids, rejected = cv.aruco.detectMarkers( + img, self.aruco_dict, parameters=self.aruco_params + ) + + # DEBUG + cv.aruco.drawDetectedMarkers(img, corners) + + if ids is not None: + all_markers = [] + for idx, marker_ids in enumerate(ids): + r = ((int(x), int(y)) for x, y in corners[idx][0].tolist()) + rect = Rect.from_corners(*(Point.from_tuple(t) for t in r)) + center = get_rect_center(rect) + all_markers.append((marker_ids[0], center)) + + for marker_id, center in all_markers: + closest_piece_idx = np.argmin( + [get_segment_size(center, piece.postion) for piece in board.pieces] + ) + if marker_id == self.MARKER_ON_BOARD_ID: + board.pieces[closest_piece_idx].state = PieceState.ON_BOARD + elif marker_id == self.MARKER_OFF_BOARD_ID: + board.pieces[closest_piece_idx].state = PieceState.OFF_BOARD + else: + board.pieces[closest_piece_idx].state = PieceState.UNKNOWN + + return board + + def _board_from_corners(self) -> Board: + left_pieces = self._four_pieces_from_two_ends_vertical(self.bottom_left, self.top_left) + right_pieces = self._four_pieces_from_two_ends_vertical(self.bottom_right, self.top_right) + row_0 = self._four_pieces_from_two_ends_horizontal(left_pieces[0].postion, right_pieces[0].postion) + row_1 = self._four_pieces_from_two_ends_horizontal(left_pieces[1].postion, right_pieces[1].postion) + row_2 = self._four_pieces_from_two_ends_horizontal(left_pieces[2].postion, right_pieces[2].postion) + row_3 = self._four_pieces_from_two_ends_horizontal(left_pieces[3].postion, right_pieces[3].postion) + return Board(row_0 + row_1 + row_2 + row_3) + + @staticmethod + def _four_pieces_from_two_ends_vertical(p1: Point, p2: Point) -> Tuple[Piece, ...]: + """Get 4 pieces from 2 ends pieces""" + return ( + Piece(p1), + Piece(get_segement_point_at_dist(p1, p2, 0.45)), + Piece(get_segement_point_at_dist(p2, p1, 0.20)), + Piece(p2), + ) + + @staticmethod + def _four_pieces_from_two_ends_horizontal(p1: Point, p2: Point) -> Tuple[Piece, ...]: + """Get 4 pieces from 2 ends pieces""" + return ( + Piece(p1), + Piece(get_segement_point_at_dist(p1, p2, 0.33)), + Piece(get_segement_point_at_dist(p2, p1, 0.33)), + Piece(p2), + ) diff --git a/memory_lib/memory.py b/memory_lib/memory.py index 018b508cdb43f3d326c1bcb1b1a3417d8932c55f..e83d152480fbc4961af8156aaf00580cfb8bb37c 100644 --- a/memory_lib/memory.py +++ b/memory_lib/memory.py @@ -5,21 +5,13 @@ import numpy as np from abc import ABC, abstractmethod -from memory_lib.geometry import get_rect_center - -from memory_lib.cv_utils import draw_rect, draw_point +from memory_lib.detectors.board_detectors import ABMExtractor, ArucoABMExtractor +from memory_lib.detectors.board_factory import ABMBoardFactory +from memory_lib.detectors.piece_state_detectors import ArucoFullPieceStateExtractor, ArucoHalfPieceStateExtractor, \ + AverageColorAndMovementPieceVisibilityDetector, PieceStateDetector, PieceStateExtractor from .board import ( - ArucoABMExtractor, - ABMBoardFactory, - PieceVisibilityDetector, - LightnessPieceVisibilityDetector, - AverageColorPieceVisibilityDetector, - ArucoPieceVisibilityDetector, - ArucoFullPieceVisibilityDetector, - ABMExtractor, - AverageColorAndMovementPieceVisibilityDetector, - PieceTakenDetectionEdgesTrigger, PieceTakenDetectionEdgesTriggerTestOnArucoFull, + PieceTakenDetectionEdgesTrigger, PieceTakenTrigger, ) from .model import Board, PiecesObserver, Rect, Point @@ -79,25 +71,23 @@ class MemoryArucoFull(Memory): def __init__( self, video_capture: cv.VideoCapture, - visibility_detector: ArucoFullPieceVisibilityDetector = ArucoFullPieceVisibilityDetector(), + piece_state_extractor: PieceStateExtractor = ArucoFullPieceStateExtractor(), ): Memory.__init__(self, video_capture) - self.visibility_detector = visibility_detector + self.piece_state_extractor = piece_state_extractor self.aruco_dict = cv.aruco.Dictionary_get(cv.aruco.DICT_4X4_1000) self.aruco_params = cv.aruco.DetectorParameters_create() - self.average_trigger = PieceTakenDetectionEdgesTriggerTestOnArucoFull( - self.piece_trigger - ) + self.average_trigger = PieceTakenTrigger(self.piece_trigger) self.frame_counter = 0 def process(self, img: np.ndarray) -> None: # For the 5 first frames, store the corners - if not self.visibility_detector.ready: - self.visibility_detector.train(img) + if not self.piece_state_extractor.ready: + self.piece_state_extractor.train(img) self.frame_counter += 1 return - board = self.visibility_detector.get_board_with_visibility(img) + board = self.piece_state_extractor.get_board_with_visibility(img) # self.board_trigger(board, img) out_board = self.average_trigger.add_board(board) @@ -113,19 +103,19 @@ class MemoryArucoFull(Memory): self.frame_counter = 0 -class MemoryAruco(Memory): +class MemoryArucoHalf(Memory): """Memory pieces detection based on Aruco below every piece""" def __init__( self, video_capture: cv.VideoCapture, - visibility_detector: ArucoPieceVisibilityDetector = ArucoPieceVisibilityDetector(), + piece_state_extractor: PieceStateExtractor = ArucoFullPieceStateExtractor(), ): Memory.__init__(self, video_capture) - self.visibility_detector = visibility_detector + self.piece_extractor = piece_state_extractor self.aruco_dict = cv.aruco.Dictionary_get(cv.aruco.DICT_4X4_250) self.aruco_params = cv.aruco.DetectorParameters_create() - self.average_trigger = PieceTakenDetectionEdgesTrigger( + self.average_trigger = PieceTakenDetectionEdgesTrigger( # Deprecated self.piece_trigger, 10, 40 ) self.frame_counter = 0 @@ -133,11 +123,11 @@ class MemoryAruco(Memory): def process(self, img: np.ndarray) -> None: # For the 5 first frames, store the corners if self.frame_counter < 10: - self.visibility_detector.train(img) + self.piece_extractor.train(img) self.frame_counter += 1 return - board = self.visibility_detector.get_board_with_visibility(img) + board = self.piece_extractor.get_board_with_visibility(img) # self.board_trigger(board, img) self.average_trigger.add_board(board) @@ -165,13 +155,13 @@ class MemoryABM(Memory): self, video_capture: cv.VideoCapture, abm_extractor: ABMExtractor = ArucoABMExtractor(), - visibility_detector: PieceVisibilityDetector = AverageColorAndMovementPieceVisibilityDetector(), + visibility_detector: PieceStateDetector = AverageColorAndMovementPieceVisibilityDetector(), ): Memory.__init__(self, video_capture) self.abm_extractor = abm_extractor self.visibility_detector = visibility_detector self.frame_counter = 0 - self.average_trigger = PieceTakenDetectionEdgesTrigger(self.piece_trigger, 5) + self.average_trigger = PieceTakenDetectionEdgesTrigger(self.piece_trigger, 5) # Deprecated self.last_board = None def process(self, img: np.ndarray) -> None: