Skip to content
Snippets Groups Projects
Commit 511498a6 authored by Adrien Lescourt's avatar Adrien Lescourt
Browse files

Make lib works with aruco full

Still missing the states ON->UNKNOWN->OFF trigger
parent d42547ed
No related branches found
No related tags found
No related merge requests found
from pprint import pprint
import cv2 as cv
import numpy as np
......@@ -340,9 +342,13 @@ class ArucoFullPieceVisibilityDetector:
This detector directly send a board with updated visibility
"""
MARKER_ON_BOARD_ID = 930
MARKER_OFF_BOARD_ID = 190
def __init__(self):
self.aruco_dict = cv.aruco.Dictionary_get(cv.aruco.DICT_4X4_250)
self.aruco_dict = cv.aruco.Dictionary_get(cv.aruco.DICT_4X4_1000)
self.aruco_params = cv.aruco.DetectorParameters_create()
self.ready = False
self.top_left: Point = Point(-1, -1)
self.top_right: Point = Point(-1, -1)
self.bottom_left: Point = Point(-1, -1)
......@@ -357,21 +363,23 @@ class ArucoFullPieceVisibilityDetector:
if ids is not None:
all_markers = []
for i in range(len(ids)):
r = ((int(x), int(y)) for x, y in corners[i][0].tolist())
for idx, marker_id in enumerate(ids):
if marker_id in (self.MARKER_ON_BOARD_ID, self.MARKER_OFF_BOARD_ID):
r = ((int(x), int(y)) for x, y in corners[idx][0].tolist())
rect = Rect.from_corners(*(Point.from_tuple(t) for t in r))
center = get_rect_center(rect)
all_markers.append(center)
print(len(all_markers))
# we have the 4 corners
if len(all_markers) >= 4:
if len(all_markers) == 16:
vertical_sort = sorted(all_markers, key=lambda point: point.y)
top_markers = vertical_sort[:2]
bottom_markers = vertical_sort[2:]
top_markers = vertical_sort[:4]
bottom_markers = vertical_sort[-4:]
self.top_left = min(top_markers, key=lambda point: point.x)
self.top_right = max(top_markers, key=lambda point: point.x)
self.bottom_left = min(bottom_markers, key=lambda point: point.x)
self.bottom_right = max(bottom_markers, key=lambda point: point.x)
self.ready = True
def get_board_with_visibility(self, img: np.ndarray) -> Board:
"""Compute a new board from the 4 corners stored during training, with all pieces visibility"""
......@@ -381,19 +389,27 @@ class ArucoFullPieceVisibilityDetector:
img, self.aruco_dict, parameters=self.aruco_params
)
# DEBUG
cv.aruco.drawDetectedMarkers(img, corners)
if ids is not None:
all_markers = []
for i in range(len(ids)):
r = ((int(x), int(y)) for x, y in corners[i][0].tolist())
for idx, marker_ids in enumerate(ids):
r = ((int(x), int(y)) for x, y in corners[idx][0].tolist())
rect = Rect.from_corners(*(Point.from_tuple(t) for t in r))
center = get_rect_center(rect)
all_markers.append(center)
all_markers.append((marker_ids[0], center))
for marker in all_markers:
for marker_id, center in all_markers:
closest_piece_idx = np.argmin(
[get_segment_size(marker, piece.postion) for piece in board.pieces]
[get_segment_size(center, piece.postion) for piece in board.pieces]
)
if marker_id == self.MARKER_ON_BOARD_ID:
board.pieces[closest_piece_idx].state = PieceState.ON_BOARD
elif marker_id == self.MARKER_OFF_BOARD_ID:
board.pieces[closest_piece_idx].state = PieceState.OFF_BOARD
else:
board.pieces[closest_piece_idx].state = PieceState.UNKNOWN
return board
......@@ -583,45 +599,50 @@ class PieceTakenDetectionEdgesTriggerTestOnArucoFull:
TODO: replace this average impl with a counter to increase perf
"""
def __init__(self, trigger: Callable[[int], None], rising_count: int = 5, falling_count: int = 5):
def __init__(self, trigger: Callable[[int], None]):
self.trigger = trigger
self.rising_count = rising_count
self.falling_count = falling_count
self.fifo_size = 10
self.last_boards: List[Board] = []
self.last_state_change = [0]*16
self.out_board: Optional[Board] = None
def add_board(self, board: Board) -> None:
if len(self.last_boards) > max(self.rising_count, self.falling_count) + 1: # FIFO size
if len(self.last_boards) > self.fifo_size:
self.last_boards.pop(0)
self.last_boards.append(board)
self.out_board = self._average_board()
@staticmethod
def _average_state(states: List[PieceState]) -> PieceState:
u = states.count(PieceState.UNKNOWN), PieceState.UNKNOWN
on = states.count(PieceState.ON_BOARD), PieceState.ON_BOARD
off = states.count(PieceState.OFF_BOARD), PieceState.OFF_BOARD
return max(u, on, off, key=lambda t: t[0])[1]
def _states_by_pieces(self):
boards_pieces = [board.pieces for board in self.last_boards]
transposed = [elem for elem in zip(*boards_pieces)]
res = []
for last_pieces_for_one_piece in transposed:
construct = []
for piece in last_pieces_for_one_piece:
construct.append(piece.state)
res.append(construct)
return res
def _average_board(self) -> Board:
next_out_board = Board.from_board(self.last_boards[-1])
for idx in range(16):
current_count = 0
for board in self.last_boards:
current_count += int(board.pieces[idx].state == PieceState.OFF_BOARD)
if self.out_board:
if self.out_board.pieces[idx].state == PieceState.OFF_BOARD:
if current_count < len(self.last_boards) - self.falling_count:
next_out_board.pieces[idx].state = PieceState.ON_BOARD
else:
next_out_board.pieces[idx].state = self.out_board.pieces[
idx
].state
else:
if current_count > self.rising_count:
next_out_board.pieces[idx].state = PieceState.OFF_BOARD
states_by_pieces = self._states_by_pieces()
average_state_by_pieces = [self._average_state(states) for states in states_by_pieces]
for idx, piece in enumerate(next_out_board.pieces):
# TODO: make it work for state ON->UNKNOWN->OFF
if average_state_by_pieces[idx] == PieceState.ON_BOARD and piece.state == PieceState.OFF_BOARD:
if self.last_state_change[idx] > 10:
self.last_state_change[idx] = 0
self.trigger(idx)
else:
next_out_board.pieces[idx].state = self.out_board.pieces[
idx
].state
# if idx == 5:
# print(avg, next_out_board.pieces[idx].is_visible, self.last_board.pieces[idx].is_visible)
# print([int(b.pieces[idx].is_visible) for b in self.last_boards])
piece.state = average_state_by_pieces[idx]
self.last_state_change[idx] += 1
return next_out_board
......@@ -43,6 +43,7 @@ class Memory(ABC, threading.Thread, PiecesObserver):
PiecesObserver.__init__(self)
self.video_caputre = video_capture
self.running = False
self.frame = None
def run(self):
self.running = True
......@@ -52,6 +53,7 @@ class Memory(ABC, threading.Thread, PiecesObserver):
time.sleep(0.04)
# cv.imshow("vid", frame)
self.process(frame)
self.frame = frame
# key = cv.waitKey(1) & 0xFF
# if key == ord("r"):
# self.reset()
......@@ -77,20 +79,20 @@ class MemoryArucoFull(Memory):
def __init__(
self,
video_capture: cv.VideoCapture,
visibility_detector: ArucoPieceVisibilityDetector = ArucoFullPieceVisibilityDetector(),
visibility_detector: ArucoFullPieceVisibilityDetector = ArucoFullPieceVisibilityDetector(),
):
Memory.__init__(self, video_capture)
self.visibility_detector = visibility_detector
self.aruco_dict = cv.aruco.Dictionary_get(cv.aruco.DICT_4X4_1000)
self.aruco_params = cv.aruco.DetectorParameters_create()
self.average_trigger = PieceTakenDetectionEdgesTriggerTestOnArucoFull(
self.piece_trigger, 10, 40
self.piece_trigger
)
self.frame_counter = 0
def process(self, img: np.ndarray) -> None:
# For the 5 first frames, store the corners
if self.frame_counter < 10:
if not self.visibility_detector.ready:
self.visibility_detector.train(img)
self.frame_counter += 1
return
......@@ -103,7 +105,7 @@ class MemoryArucoFull(Memory):
# cv.aruco.drawDetectedMarkers(img, corners)
self.average_trigger.out_board.draw(img)
self.board_trigger(board, img)
cv.imshow("marker", img)
# cv.imshow("marker", img)
def reset(self) -> None:
"""Reset board position"""
......
......@@ -42,6 +42,13 @@ def main():
memory.bind_pieces(hello_piece)
memory.start()
while True:
frame = memory.frame
if frame is not None:
cv.imshow('webcam', frame)
if cv.waitKey(1) == 27:
break
# process_vid("res/webcam_03_shadow.avi", memory)
# process_cam()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment