diff --git a/memory_lib/board.py b/memory_lib/board.py index 94e3329912e44f81a575f77aa721ac844e2c866e..4c45bac728f31700127af97fd5eb9637cc7a74c1 100644 --- a/memory_lib/board.py +++ b/memory_lib/board.py @@ -1,3 +1,5 @@ +from pprint import pprint + import cv2 as cv import numpy as np @@ -340,9 +342,13 @@ class ArucoFullPieceVisibilityDetector: This detector directly send a board with updated visibility """ + MARKER_ON_BOARD_ID = 930 + MARKER_OFF_BOARD_ID = 190 + def __init__(self): - self.aruco_dict = cv.aruco.Dictionary_get(cv.aruco.DICT_4X4_250) + self.aruco_dict = cv.aruco.Dictionary_get(cv.aruco.DICT_4X4_1000) self.aruco_params = cv.aruco.DetectorParameters_create() + self.ready = False self.top_left: Point = Point(-1, -1) self.top_right: Point = Point(-1, -1) self.bottom_left: Point = Point(-1, -1) @@ -357,21 +363,23 @@ class ArucoFullPieceVisibilityDetector: if ids is not None: all_markers = [] - for i in range(len(ids)): - r = ((int(x), int(y)) for x, y in corners[i][0].tolist()) - rect = Rect.from_corners(*(Point.from_tuple(t) for t in r)) - center = get_rect_center(rect) - all_markers.append(center) - - # we have the 4 corners - if len(all_markers) >= 4: + for idx, marker_id in enumerate(ids): + if marker_id in (self.MARKER_ON_BOARD_ID, self.MARKER_OFF_BOARD_ID): + r = ((int(x), int(y)) for x, y in corners[idx][0].tolist()) + rect = Rect.from_corners(*(Point.from_tuple(t) for t in r)) + center = get_rect_center(rect) + all_markers.append(center) + print(len(all_markers)) + + if len(all_markers) == 16: vertical_sort = sorted(all_markers, key=lambda point: point.y) - top_markers = vertical_sort[:2] - bottom_markers = vertical_sort[2:] + top_markers = vertical_sort[:4] + bottom_markers = vertical_sort[-4:] self.top_left = min(top_markers, key=lambda point: point.x) self.top_right = max(top_markers, key=lambda point: point.x) self.bottom_left = min(bottom_markers, key=lambda point: point.x) self.bottom_right = max(bottom_markers, key=lambda point: point.x) + self.ready = True def get_board_with_visibility(self, img: np.ndarray) -> Board: """Compute a new board from the 4 corners stored during training, with all pieces visibility""" @@ -381,19 +389,27 @@ class ArucoFullPieceVisibilityDetector: img, self.aruco_dict, parameters=self.aruco_params ) + # DEBUG + cv.aruco.drawDetectedMarkers(img, corners) + if ids is not None: all_markers = [] - for i in range(len(ids)): - r = ((int(x), int(y)) for x, y in corners[i][0].tolist()) + for idx, marker_ids in enumerate(ids): + r = ((int(x), int(y)) for x, y in corners[idx][0].tolist()) rect = Rect.from_corners(*(Point.from_tuple(t) for t in r)) center = get_rect_center(rect) - all_markers.append(center) + all_markers.append((marker_ids[0], center)) - for marker in all_markers: + for marker_id, center in all_markers: closest_piece_idx = np.argmin( - [get_segment_size(marker, piece.postion) for piece in board.pieces] + [get_segment_size(center, piece.postion) for piece in board.pieces] ) - board.pieces[closest_piece_idx].state = PieceState.OFF_BOARD + if marker_id == self.MARKER_ON_BOARD_ID: + board.pieces[closest_piece_idx].state = PieceState.ON_BOARD + elif marker_id == self.MARKER_OFF_BOARD_ID: + board.pieces[closest_piece_idx].state = PieceState.OFF_BOARD + else: + board.pieces[closest_piece_idx].state = PieceState.UNKNOWN return board @@ -583,45 +599,50 @@ class PieceTakenDetectionEdgesTriggerTestOnArucoFull: TODO: replace this average impl with a counter to increase perf """ - def __init__(self, trigger: Callable[[int], None], rising_count: int = 5, falling_count: int = 5): + def __init__(self, trigger: Callable[[int], None]): self.trigger = trigger - self.rising_count = rising_count - self.falling_count = falling_count + self.fifo_size = 10 self.last_boards: List[Board] = [] + self.last_state_change = [0]*16 self.out_board: Optional[Board] = None def add_board(self, board: Board) -> None: - if len(self.last_boards) > max(self.rising_count, self.falling_count) + 1: # FIFO size + if len(self.last_boards) > self.fifo_size: self.last_boards.pop(0) self.last_boards.append(board) self.out_board = self._average_board() + @staticmethod + def _average_state(states: List[PieceState]) -> PieceState: + u = states.count(PieceState.UNKNOWN), PieceState.UNKNOWN + on = states.count(PieceState.ON_BOARD), PieceState.ON_BOARD + off = states.count(PieceState.OFF_BOARD), PieceState.OFF_BOARD + return max(u, on, off, key=lambda t: t[0])[1] + + def _states_by_pieces(self): + boards_pieces = [board.pieces for board in self.last_boards] + transposed = [elem for elem in zip(*boards_pieces)] + res = [] + for last_pieces_for_one_piece in transposed: + construct = [] + for piece in last_pieces_for_one_piece: + construct.append(piece.state) + res.append(construct) + return res + def _average_board(self) -> Board: next_out_board = Board.from_board(self.last_boards[-1]) - for idx in range(16): - current_count = 0 - for board in self.last_boards: - current_count += int(board.pieces[idx].state == PieceState.OFF_BOARD) - - if self.out_board: - if self.out_board.pieces[idx].state == PieceState.OFF_BOARD: - if current_count < len(self.last_boards) - self.falling_count: - next_out_board.pieces[idx].state = PieceState.ON_BOARD - else: - next_out_board.pieces[idx].state = self.out_board.pieces[ - idx - ].state - else: - if current_count > self.rising_count: - next_out_board.pieces[idx].state = PieceState.OFF_BOARD - self.trigger(idx) - else: - next_out_board.pieces[idx].state = self.out_board.pieces[ - idx - ].state - - # if idx == 5: - # print(avg, next_out_board.pieces[idx].is_visible, self.last_board.pieces[idx].is_visible) - # print([int(b.pieces[idx].is_visible) for b in self.last_boards]) + states_by_pieces = self._states_by_pieces() + average_state_by_pieces = [self._average_state(states) for states in states_by_pieces] + for idx, piece in enumerate(next_out_board.pieces): + + # TODO: make it work for state ON->UNKNOWN->OFF + if average_state_by_pieces[idx] == PieceState.ON_BOARD and piece.state == PieceState.OFF_BOARD: + if self.last_state_change[idx] > 10: + self.last_state_change[idx] = 0 + self.trigger(idx) + + piece.state = average_state_by_pieces[idx] + self.last_state_change[idx] += 1 return next_out_board diff --git a/memory_lib/memory.py b/memory_lib/memory.py index 702cfa1920d660a1a79702d1e45c24dc0bb98b2d..676dddaadb75f1612a2d98f9e563cca4cdb95c83 100644 --- a/memory_lib/memory.py +++ b/memory_lib/memory.py @@ -43,6 +43,7 @@ class Memory(ABC, threading.Thread, PiecesObserver): PiecesObserver.__init__(self) self.video_caputre = video_capture self.running = False + self.frame = None def run(self): self.running = True @@ -52,6 +53,7 @@ class Memory(ABC, threading.Thread, PiecesObserver): time.sleep(0.04) # cv.imshow("vid", frame) self.process(frame) + self.frame = frame # key = cv.waitKey(1) & 0xFF # if key == ord("r"): # self.reset() @@ -77,20 +79,20 @@ class MemoryArucoFull(Memory): def __init__( self, video_capture: cv.VideoCapture, - visibility_detector: ArucoPieceVisibilityDetector = ArucoFullPieceVisibilityDetector(), + visibility_detector: ArucoFullPieceVisibilityDetector = ArucoFullPieceVisibilityDetector(), ): Memory.__init__(self, video_capture) self.visibility_detector = visibility_detector self.aruco_dict = cv.aruco.Dictionary_get(cv.aruco.DICT_4X4_1000) self.aruco_params = cv.aruco.DetectorParameters_create() self.average_trigger = PieceTakenDetectionEdgesTriggerTestOnArucoFull( - self.piece_trigger, 10, 40 + self.piece_trigger ) self.frame_counter = 0 def process(self, img: np.ndarray) -> None: # For the 5 first frames, store the corners - if self.frame_counter < 10: + if not self.visibility_detector.ready: self.visibility_detector.train(img) self.frame_counter += 1 return @@ -103,7 +105,7 @@ class MemoryArucoFull(Memory): # cv.aruco.drawDetectedMarkers(img, corners) self.average_trigger.out_board.draw(img) self.board_trigger(board, img) - cv.imshow("marker", img) + # cv.imshow("marker", img) def reset(self) -> None: """Reset board position""" diff --git a/memory_lib/tests/run_memory.py b/memory_lib/tests/run_memory.py index 0fa94a919c5053ad00b4c1f9ffd583554b6ca5aa..3ba4fd2f0c608932515acbbf03903ac4b971069c 100644 --- a/memory_lib/tests/run_memory.py +++ b/memory_lib/tests/run_memory.py @@ -42,6 +42,13 @@ def main(): memory.bind_pieces(hello_piece) memory.start() + while True: + frame = memory.frame + if frame is not None: + cv.imshow('webcam', frame) + if cv.waitKey(1) == 27: + break + # process_vid("res/webcam_03_shadow.avi", memory) # process_cam()