diff --git a/flipoff.py b/flipoff.py deleted file mode 100644 index ca297aa..0000000 --- a/flipoff.py +++ /dev/null @@ -1,137 +0,0 @@ -import argparse -import asyncio -import os -import time - -import cv2 -from dbus_next.aio.message_bus import MessageBus -from dbus_next.constants import BusType -import mediapipe as mp # type: ignore[import-untyped] -from mediapipe.tasks import python # type: ignore[import-untyped] -from mediapipe.tasks.python import vision # type: ignore[import-untyped] -import numpy as np - - -from mediapipe.tasks.python.components.containers.landmark import ( # type: ignore[import-untyped] # isort: skip - NormalizedLandmark, -) - -MODEL_PATH = os.environ.get("FLIPOFF_MODEL_PATH") -DEBUG = os.environ.get("FLIPOFF_DRYRUN", "0") == "1" - - -async def poweroff() -> None: - bus = await MessageBus(bus_type=BusType.SYSTEM).connect() - proxy = bus.get_proxy_object( - "org.freedesktop.login1", - "/org/freedesktop/login1", - None, - ) - manager = proxy.get_interface("org.freedesktop.login1.Manager") - await manager.call_power_off(False) - - -def is_flipping_off(hand: list[NormalizedLandmark]) -> bool: - y_12 = hand[12].y - y_10 = hand[10].y - y_8 = hand[8].y - y_6 = hand[6].y - y_16 = hand[16].y - y_14 = hand[14].y - y_20 = hand[20].y - y_18 = hand[18].y - if ( - y_12 is None - or y_10 is None - or y_8 is None - or y_6 is None - or y_16 is None - or y_14 is None - or y_20 is None - or y_18 is None - ): - return False - return bool(y_12 < y_10 and y_8 > y_6 and y_16 > y_14 and y_20 > y_18) - - -async def async_poweroff() -> None: - if DEBUG: - print("DRYRUN: Would power off") - return - try: - await poweroff() - except Exception as e: - print(f"Poweroff failed: {e}") - - -def main() -> None: - parser = argparse.ArgumentParser(description="Hand gesture poweroff utility") - parser.add_argument( - "--headless", - action="store_true", - help="Hide GUI window and run in headless mode", - ) - args = parser.parse_args() - - if not MODEL_PATH: - raise RuntimeError("FLIPOFF_MODEL_PATH environment variable not set") - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - base_options = python.BaseOptions(model_asset_path=MODEL_PATH) - options = vision.HandLandmarkerOptions( - base_options=base_options, - num_hands=1, - running_mode=vision.RunningMode.VIDEO, - ) - - detector = vision.HandLandmarker.create_from_options(options) - cap: cv2.VideoCapture = cv2.VideoCapture(0) - last_trigger: float = 0.0 - - while True: - ret: bool - frame: np.ndarray - ret, frame = cap.read() - if not ret: - break - - mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame) - result = detector.detect_for_video(mp_image, int(time.time() * 1000)) - - if result.hand_landmarks: - hand: list[NormalizedLandmark] = result.hand_landmarks[0] - - if DEBUG: - for landmark in hand: - x = int(landmark.x * frame.shape[1]) - y = int(landmark.y * frame.shape[0]) - cv2.circle(frame, (x, y), 5, (0, 255, 0), -1) - - flipping = is_flipping_off(hand) - if DEBUG: - text = "FLIPPING OFF DETECTED" if flipping else "Waiting for gesture..." - color = (0, 0, 255) if flipping else (0, 255, 0) - cv2.putText(frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2) - - if flipping: - now: float = time.time() - if now - last_trigger > 2: - last_trigger = now - loop.run_until_complete(async_poweroff()) - - if not args.headless: - cv2.imshow("Gesture Poweroff", frame) - if cv2.waitKey(1) & 0xFF == 27: - break - - cap.release() - if not args.headless: - cv2.destroyAllWindows() - detector.close() - loop.close() - - -if __name__ == "__main__": - main() diff --git a/pyproject.toml b/pyproject.toml index 6c82145..c1e26ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,12 +2,31 @@ name = "flipoff" version = "0.1.0" description = "Begone, clanker" +readme = "README.md" requires-python = ">=3.11" -dependencies = ["mediapipe", "opencv-contrib-python", "dbus-next", "numpy"] +dependencies = [ + "mediapipe>=0.10.33", + "opencv-contrib-python>=4.13.0", + "dbus-next>=0.2.3", + "numpy>=2.4.4", +] + +[project.optional-dependencies] +dev = ["black>=26.3.1", "isort>=8.0.1", "mypy>=1.20.0"] + +[project.scripts] +flipoff = "flipoff.cli:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" [dependency-groups] dev = ["black>=26.3.1", "isort>=8.0.1", "mypy>=1.20.0"] +[tool.hatch.build.targets.wheel] +packages = ["src/flipoff"] + [tool.black] line-length = 100 target-version = ["py311"] @@ -17,7 +36,6 @@ force_single_line = true force_sort_within_sections = true known_first_party = [] known_third_party = ["cv2", "dbus_next", "mediapipe"] -lines_after_imports = 2 profile = "black" [tool.mypy] @@ -33,11 +51,6 @@ no_implicit_optional = true warn_redundant_casts = true warn_unused_ignores = true warn_no_return = true -show_error_codes = true [tool.basedpyright] -typeCheckingMode = "off" - -[build-system] -requires = ["poetry-core"] -build-backend = "poetry.core.masonry.api" +typeCheckingMode = "off" # too overzealous diff --git a/src/flipoff/__init__.py b/src/flipoff/__init__.py new file mode 100644 index 0000000..865f1e6 --- /dev/null +++ b/src/flipoff/__init__.py @@ -0,0 +1,13 @@ +from flipoff.events import EventRegistry +from flipoff.events import PoweroffEvent +from flipoff.gesture import FlippingOffGesture +from flipoff.gesture import Gesture +from flipoff.gesture import GestureRegistry + +__all__ = [ + "EventRegistry", + "FlippingOffGesture", + "Gesture", + "GestureRegistry", + "PoweroffEvent", +] diff --git a/src/flipoff/__main__.py b/src/flipoff/__main__.py new file mode 100644 index 0000000..4b071ec --- /dev/null +++ b/src/flipoff/__main__.py @@ -0,0 +1,4 @@ +from flipoff.cli import main + +if __name__ == "__main__": + main() diff --git a/src/flipoff/cli.py b/src/flipoff/cli.py new file mode 100644 index 0000000..e99bffb --- /dev/null +++ b/src/flipoff/cli.py @@ -0,0 +1,147 @@ +import argparse +import asyncio +import os +import time + +import cv2 + +from flipoff.detector import Camera +from flipoff.detector import HandDetector +from flipoff.events import EventRegistry +from flipoff.gesture import Gesture +from flipoff.gesture import GestureRegistry + + +def _get_callback( + gesture_cls: type[Gesture], + event_instance: object, + cooldown: float, + last_trigger: list[float], +) -> callable: + def callback(hand: object) -> bool: + gesture_detected = gesture_cls().detect(hand) + if gesture_detected: + now = time.time() + if now - last_trigger[0] > cooldown: + last_trigger[0] = now + asyncio.create_task(event_instance.trigger()) # type: ignore[attr-defined] + return gesture_detected + + return callback + + +def run( + gesture_name: str, + event_name: str, + headless: bool, + camera_index: int, + cooldown: float, + debug: bool, +) -> None: + model_path = os.environ.get("FLIPOFF_MODEL_PATH") + if not model_path: + raise RuntimeError("FLIPOFF_MODEL_PATH environment variable not set") + + gesture_cls = GestureRegistry.get(gesture_name) + if not gesture_cls: + raise ValueError(f"Unknown gesture: {gesture_name}") + event_cls = EventRegistry.get(event_name) + if not event_cls: + raise ValueError(f"Unknown event: {event_name}") + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + detector = HandDetector(model_path) + camera = Camera(camera_index) + event_instance = event_cls() + + gesture_instance = gesture_cls() + last_trigger = [0.0] + + callback = _get_callback(gesture_cls, event_instance, cooldown, last_trigger) + + while True: + ret, frame = camera.read() + if not ret: + break + + hands = detector.detect(frame) + if hands: + callback(hands[0]) + + if debug: + for landmark in hands[0]: + x = int(landmark.x * frame.shape[1]) + y = int(landmark.y * frame.shape[0]) + cv2.circle(frame, (x, y), 5, (0, 255, 0), -1) + + flipping = gesture_instance.detect(hands[0]) + text = f"{gesture_name.upper()} DETECTED" if flipping else "Waiting for gesture..." + color = (0, 0, 255) if flipping else (0, 255, 0) + cv2.putText(frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2) + + if not headless: + cv2.imshow("Gesture Poweroff", frame) + if cv2.waitKey(1) & 0xFF == 27: + break + + camera.release() + if not headless: + cv2.destroyAllWindows() + detector.close() + loop.close() + + +def main() -> None: + parser = argparse.ArgumentParser(description="Hand gesture event utility") + parser.add_argument( + "--gesture", + type=str, + default="flipping_off", + choices=list(GestureRegistry.all().keys()), + help="Gesture to detect", + ) + parser.add_argument( + "--event", + type=str, + default="poweroff", + choices=list(EventRegistry.all().keys()), + help="Event to trigger on gesture", + ) + parser.add_argument( + "--headless", + action="store_true", + help="Hide GUI window and run in headless mode", + ) + parser.add_argument( + "--camera", + type=int, + default=0, + help="Camera index to use", + ) + parser.add_argument( + "--cooldown", + type=float, + default=2.0, + help="Cooldown between event triggers in seconds", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Show debug visualizations", + ) + args = parser.parse_args() + + run( + gesture_name=args.gesture, + event_name=args.event, + headless=args.headless, + camera_index=args.camera, + cooldown=args.cooldown, + debug=args.debug or os.environ.get("FLIPOFF_DRYRUN", "0") == "1", + ) + + +if __name__ == "__main__": + main() diff --git a/src/flipoff/detector.py b/src/flipoff/detector.py new file mode 100644 index 0000000..1cc8eb5 --- /dev/null +++ b/src/flipoff/detector.py @@ -0,0 +1,69 @@ +import time +from typing import Callable + +import cv2 +import mediapipe as mp +from mediapipe.tasks import python +from mediapipe.tasks.python import vision +from mediapipe.tasks.python.components.containers.landmark import NormalizedLandmark +import numpy as np + + +class HandDetector: + def __init__( + self, + model_path: str, + num_hands: int = 1, + ) -> None: + base_options = python.BaseOptions(model_asset_path=model_path) + options = vision.HandLandmarkerOptions( + base_options=base_options, + num_hands=num_hands, + running_mode=vision.RunningMode.VIDEO, + ) + self._detector = vision.HandLandmarker.create_from_options(options) + + def detect( + self, + frame: np.ndarray, + timestamp_ms: int | None = None, + ) -> list[list[NormalizedLandmark]]: + if timestamp_ms is None: + timestamp_ms = int(time.time() * 1000) + mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame) + result = self._detector.detect_for_video(mp_image, timestamp_ms) + return result.hand_landmarks if result.hand_landmarks else [] + + def close(self) -> None: + self._detector.close() + + +class Camera: + def __init__(self, index: int = 0) -> None: + self._cap = cv2.VideoCapture(index) + + def read(self) -> tuple[bool, np.ndarray]: + ret, frame = self._cap.read() + return ret, frame + + def release(self) -> None: + self._cap.release() + + +class GestureDetector: + def __init__( + self, + detector: HandDetector, + gesture_callback: Callable[[list[NormalizedLandmark]], bool], + ) -> None: + self._detector = detector + self._callback = gesture_callback + + def process_frame(self, frame: np.ndarray) -> bool | None: + hands = self._detector.detect(frame) + if hands: + return self._callback(hands[0]) + return None + + def close(self) -> None: + self._detector.close() diff --git a/src/flipoff/events.py b/src/flipoff/events.py new file mode 100644 index 0000000..f416ec0 --- /dev/null +++ b/src/flipoff/events.py @@ -0,0 +1,60 @@ +from abc import ABC +from abc import abstractmethod +import asyncio +import os +from typing import ClassVar + + +class Event(ABC): + name: ClassVar[str] = "unknown" + + @abstractmethod + async def trigger(self) -> None: + raise NotImplementedError + + +class PoweroffEvent(Event): + name = "poweroff" + DEBUG: bool = os.environ.get("FLIPOFF_DRYRUN", "0") == "1" + + async def trigger(self) -> None: + if self.DEBUG: + print("DRYRUN: Would power off") + return + try: + await self._poweroff() + except Exception as e: + print(f"Poweroff failed: {e}") + + async def _poweroff(self) -> None: + from dbus_next.aio.message_bus import MessageBus + from dbus_next.constants import BusType + + bus = await MessageBus(bus_type=BusType.SYSTEM).connect() + proxy = bus.get_proxy_object( + "org.freedesktop.login1", + "/org/freedesktop/login1", + None, + ) + manager = proxy.get_interface("org.freedesktop.login1.Manager") + await manager.call_power_off(False) + + +class EventRegistry: + _events: dict[str, type[Event]] = {} + + @classmethod + def register(cls, event_class: type[Event]) -> type[Event]: + cls._events[event_class.name] = event_class + return event_class + + @classmethod + def get(cls, name: str) -> type[Event] | None: + return cls._events.get(name) + + @classmethod + def all(cls) -> dict[str, type[Event]]: + return cls._events.copy() + + +EventRegistry.register(PoweroffEvent) diff --git a/src/flipoff/gesture.py b/src/flipoff/gesture.py new file mode 100644 index 0000000..f008d9d --- /dev/null +++ b/src/flipoff/gesture.py @@ -0,0 +1,60 @@ +from dataclasses import dataclass +from typing import Callable +from typing import ClassVar + +from mediapipe.tasks.python.components.containers.landmark import NormalizedLandmark + + +@dataclass +class GestureResult: + name: str + detected: bool + + +class Gesture: + name: ClassVar[str] = "unknown" + + def detect(self, hand: list[NormalizedLandmark]) -> bool: + raise NotImplementedError + + +class GestureRegistry: + _gestures: dict[str, type[Gesture]] = {} + + @classmethod + def register(cls, gesture_class: type[Gesture]) -> type[Gesture]: + cls._gestures[gesture_class.name] = gesture_class + return gesture_class + + @classmethod + def get(cls, name: str) -> type[Gesture] | None: + return cls._gestures.get(name) + + @classmethod + def all(cls) -> dict[str, type[Gesture]]: + return cls._gestures.copy() + + +def _check_y_values( + hand: list[NormalizedLandmark], + *indices: int, +) -> bool: + for idx in indices: + if hand[idx].y is None: + return False + return True + + +@GestureRegistry.register +class FlippingOffGesture(Gesture): + name = "flipping_off" + + def detect(self, hand: list[NormalizedLandmark]) -> bool: + if not _check_y_values(hand, 12, 10, 8, 6, 16, 14, 20, 18): + return False + return bool( + hand[12].y < hand[10].y + and hand[8].y > hand[6].y + and hand[16].y > hand[14].y + and hand[20].y > hand[18].y + ) diff --git a/uv.lock b/uv.lock index 4967014..ef1b554 100644 --- a/uv.lock +++ b/uv.lock @@ -258,6 +258,13 @@ dependencies = [ { name = "opencv-contrib-python" }, ] +[package.optional-dependencies] +dev = [ + { name = "black" }, + { name = "isort" }, + { name = "mypy" }, +] + [package.dev-dependencies] dev = [ { name = "black" }, @@ -267,11 +274,15 @@ dev = [ [package.metadata] requires-dist = [ - { name = "dbus-next" }, - { name = "mediapipe" }, - { name = "numpy" }, - { name = "opencv-contrib-python" }, + { name = "black", marker = "extra == 'dev'", specifier = ">=26.3.1" }, + { name = "dbus-next", specifier = ">=0.2.3" }, + { name = "isort", marker = "extra == 'dev'", specifier = ">=8.0.1" }, + { name = "mediapipe", specifier = ">=0.10.33" }, + { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.20.0" }, + { name = "numpy", specifier = ">=2.4.4" }, + { name = "opencv-contrib-python", specifier = ">=4.13.0" }, ] +provides-extras = ["dev"] [package.metadata.requires-dev] dev = [