diff --git a/src/flipoff/cli.py b/src/flipoff/cli.py index f089842..48d0e7a 100644 --- a/src/flipoff/cli.py +++ b/src/flipoff/cli.py @@ -1,12 +1,16 @@ import argparse import asyncio +from collections.abc import Callable import os +import threading import time import cv2 +from mediapipe.tasks.python.components.containers.landmark import NormalizedLandmark from flipoff.detector import Camera from flipoff.detector import HandDetector +from flipoff.events import Event from flipoff.events import EventRegistry from flipoff.gesture import Gesture from flipoff.gesture import GestureRegistry @@ -15,17 +19,17 @@ from flipoff.gesture import GestureRegistry def _get_callback( loop: asyncio.AbstractEventLoop, gesture_instance: Gesture, - event_instance: object, + event_instance: Event, cooldown: float, last_trigger: list[float], -) -> callable: - def callback(hand: object) -> bool: +) -> Callable[[list[NormalizedLandmark]], bool]: + def callback(hand: list[NormalizedLandmark]) -> bool: gesture_detected = gesture_instance.detect(hand) if gesture_detected: now = time.time() if now - last_trigger[0] > cooldown: last_trigger[0] = now - loop.create_task(event_instance.trigger()) # type: ignore[attr-defined] + asyncio.run_coroutine_threadsafe(event_instance.trigger(), loop) return gesture_detected return callback @@ -51,7 +55,8 @@ def run( raise ValueError(f"Unknown event: {event_name}") loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + loop_thread = threading.Thread(target=loop.run_forever, daemon=True) + loop_thread.start() detector = HandDetector(model_path) camera = Camera(camera_index) @@ -69,7 +74,7 @@ def run( hands = detector.detect(frame) if hands: - callback(hands[0]) + gesture_detected = callback(hands[0]) if debug: for landmark in hands[0]: @@ -77,9 +82,12 @@ def run( y = int(landmark.y * frame.shape[0]) cv2.circle(frame, (x, y), 5, (0, 255, 0), -1) - flipping = gesture_instance.detect(hands[0]) - text = f"{gesture_name.upper()} DETECTED" if flipping else "Waiting for gesture..." - color = (0, 0, 255) if flipping else (0, 255, 0) + text = ( + f"{gesture_name.upper()} DETECTED" + if gesture_detected + else "Waiting for gesture..." + ) + color = (0, 0, 255) if gesture_detected else (0, 255, 0) cv2.putText(frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2) if not headless: @@ -92,11 +100,8 @@ def run( cv2.destroyAllWindows() detector.close() - pending = asyncio.all_tasks(loop) - for task in pending: - task.cancel() - loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) - + loop.call_soon_threadsafe(loop.stop) + loop_thread.join() loop.close()