meta: revise project structure; pin deps
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I2133dbb2ae6c7bd27cd94638d61af2686a6a6964
This commit is contained in:
parent
a8262ab7a6
commit
4d205723f6
9 changed files with 390 additions and 150 deletions
137
flipoff.py
137
flipoff.py
|
|
@ -1,137 +0,0 @@
|
|||
import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
|
||||
import cv2
|
||||
from dbus_next.aio.message_bus import MessageBus
|
||||
from dbus_next.constants import BusType
|
||||
import mediapipe as mp # type: ignore[import-untyped]
|
||||
from mediapipe.tasks import python # type: ignore[import-untyped]
|
||||
from mediapipe.tasks.python import vision # type: ignore[import-untyped]
|
||||
import numpy as np
|
||||
|
||||
|
||||
from mediapipe.tasks.python.components.containers.landmark import ( # type: ignore[import-untyped] # isort: skip
|
||||
NormalizedLandmark,
|
||||
)
|
||||
|
||||
MODEL_PATH = os.environ.get("FLIPOFF_MODEL_PATH")
|
||||
DEBUG = os.environ.get("FLIPOFF_DRYRUN", "0") == "1"
|
||||
|
||||
|
||||
async def poweroff() -> None:
|
||||
bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
|
||||
proxy = bus.get_proxy_object(
|
||||
"org.freedesktop.login1",
|
||||
"/org/freedesktop/login1",
|
||||
None,
|
||||
)
|
||||
manager = proxy.get_interface("org.freedesktop.login1.Manager")
|
||||
await manager.call_power_off(False)
|
||||
|
||||
|
||||
def is_flipping_off(hand: list[NormalizedLandmark]) -> bool:
|
||||
y_12 = hand[12].y
|
||||
y_10 = hand[10].y
|
||||
y_8 = hand[8].y
|
||||
y_6 = hand[6].y
|
||||
y_16 = hand[16].y
|
||||
y_14 = hand[14].y
|
||||
y_20 = hand[20].y
|
||||
y_18 = hand[18].y
|
||||
if (
|
||||
y_12 is None
|
||||
or y_10 is None
|
||||
or y_8 is None
|
||||
or y_6 is None
|
||||
or y_16 is None
|
||||
or y_14 is None
|
||||
or y_20 is None
|
||||
or y_18 is None
|
||||
):
|
||||
return False
|
||||
return bool(y_12 < y_10 and y_8 > y_6 and y_16 > y_14 and y_20 > y_18)
|
||||
|
||||
|
||||
async def async_poweroff() -> None:
|
||||
if DEBUG:
|
||||
print("DRYRUN: Would power off")
|
||||
return
|
||||
try:
|
||||
await poweroff()
|
||||
except Exception as e:
|
||||
print(f"Poweroff failed: {e}")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Hand gesture poweroff utility")
|
||||
parser.add_argument(
|
||||
"--headless",
|
||||
action="store_true",
|
||||
help="Hide GUI window and run in headless mode",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if not MODEL_PATH:
|
||||
raise RuntimeError("FLIPOFF_MODEL_PATH environment variable not set")
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
base_options = python.BaseOptions(model_asset_path=MODEL_PATH)
|
||||
options = vision.HandLandmarkerOptions(
|
||||
base_options=base_options,
|
||||
num_hands=1,
|
||||
running_mode=vision.RunningMode.VIDEO,
|
||||
)
|
||||
|
||||
detector = vision.HandLandmarker.create_from_options(options)
|
||||
cap: cv2.VideoCapture = cv2.VideoCapture(0)
|
||||
last_trigger: float = 0.0
|
||||
|
||||
while True:
|
||||
ret: bool
|
||||
frame: np.ndarray
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
|
||||
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
|
||||
result = detector.detect_for_video(mp_image, int(time.time() * 1000))
|
||||
|
||||
if result.hand_landmarks:
|
||||
hand: list[NormalizedLandmark] = result.hand_landmarks[0]
|
||||
|
||||
if DEBUG:
|
||||
for landmark in hand:
|
||||
x = int(landmark.x * frame.shape[1])
|
||||
y = int(landmark.y * frame.shape[0])
|
||||
cv2.circle(frame, (x, y), 5, (0, 255, 0), -1)
|
||||
|
||||
flipping = is_flipping_off(hand)
|
||||
if DEBUG:
|
||||
text = "FLIPPING OFF DETECTED" if flipping else "Waiting for gesture..."
|
||||
color = (0, 0, 255) if flipping else (0, 255, 0)
|
||||
cv2.putText(frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
|
||||
|
||||
if flipping:
|
||||
now: float = time.time()
|
||||
if now - last_trigger > 2:
|
||||
last_trigger = now
|
||||
loop.run_until_complete(async_poweroff())
|
||||
|
||||
if not args.headless:
|
||||
cv2.imshow("Gesture Poweroff", frame)
|
||||
if cv2.waitKey(1) & 0xFF == 27:
|
||||
break
|
||||
|
||||
cap.release()
|
||||
if not args.headless:
|
||||
cv2.destroyAllWindows()
|
||||
detector.close()
|
||||
loop.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -2,12 +2,31 @@
|
|||
name = "flipoff"
|
||||
version = "0.1.0"
|
||||
description = "Begone, clanker"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.11"
|
||||
dependencies = ["mediapipe", "opencv-contrib-python", "dbus-next", "numpy"]
|
||||
dependencies = [
|
||||
"mediapipe>=0.10.33",
|
||||
"opencv-contrib-python>=4.13.0",
|
||||
"dbus-next>=0.2.3",
|
||||
"numpy>=2.4.4",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = ["black>=26.3.1", "isort>=8.0.1", "mypy>=1.20.0"]
|
||||
|
||||
[project.scripts]
|
||||
flipoff = "flipoff.cli:main"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[dependency-groups]
|
||||
dev = ["black>=26.3.1", "isort>=8.0.1", "mypy>=1.20.0"]
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src/flipoff"]
|
||||
|
||||
[tool.black]
|
||||
line-length = 100
|
||||
target-version = ["py311"]
|
||||
|
|
@ -17,7 +36,6 @@ force_single_line = true
|
|||
force_sort_within_sections = true
|
||||
known_first_party = []
|
||||
known_third_party = ["cv2", "dbus_next", "mediapipe"]
|
||||
lines_after_imports = 2
|
||||
profile = "black"
|
||||
|
||||
[tool.mypy]
|
||||
|
|
@ -33,11 +51,6 @@ no_implicit_optional = true
|
|||
warn_redundant_casts = true
|
||||
warn_unused_ignores = true
|
||||
warn_no_return = true
|
||||
show_error_codes = true
|
||||
|
||||
[tool.basedpyright]
|
||||
typeCheckingMode = "off"
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
typeCheckingMode = "off" # too overzealous
|
||||
|
|
|
|||
13
src/flipoff/__init__.py
Normal file
13
src/flipoff/__init__.py
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
from flipoff.events import EventRegistry
|
||||
from flipoff.events import PoweroffEvent
|
||||
from flipoff.gesture import FlippingOffGesture
|
||||
from flipoff.gesture import Gesture
|
||||
from flipoff.gesture import GestureRegistry
|
||||
|
||||
__all__ = [
|
||||
"EventRegistry",
|
||||
"FlippingOffGesture",
|
||||
"Gesture",
|
||||
"GestureRegistry",
|
||||
"PoweroffEvent",
|
||||
]
|
||||
4
src/flipoff/__main__.py
Normal file
4
src/flipoff/__main__.py
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
from flipoff.cli import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
147
src/flipoff/cli.py
Normal file
147
src/flipoff/cli.py
Normal file
|
|
@ -0,0 +1,147 @@
|
|||
import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
|
||||
import cv2
|
||||
|
||||
from flipoff.detector import Camera
|
||||
from flipoff.detector import HandDetector
|
||||
from flipoff.events import EventRegistry
|
||||
from flipoff.gesture import Gesture
|
||||
from flipoff.gesture import GestureRegistry
|
||||
|
||||
|
||||
def _get_callback(
|
||||
gesture_cls: type[Gesture],
|
||||
event_instance: object,
|
||||
cooldown: float,
|
||||
last_trigger: list[float],
|
||||
) -> callable:
|
||||
def callback(hand: object) -> bool:
|
||||
gesture_detected = gesture_cls().detect(hand)
|
||||
if gesture_detected:
|
||||
now = time.time()
|
||||
if now - last_trigger[0] > cooldown:
|
||||
last_trigger[0] = now
|
||||
asyncio.create_task(event_instance.trigger()) # type: ignore[attr-defined]
|
||||
return gesture_detected
|
||||
|
||||
return callback
|
||||
|
||||
|
||||
def run(
|
||||
gesture_name: str,
|
||||
event_name: str,
|
||||
headless: bool,
|
||||
camera_index: int,
|
||||
cooldown: float,
|
||||
debug: bool,
|
||||
) -> None:
|
||||
model_path = os.environ.get("FLIPOFF_MODEL_PATH")
|
||||
if not model_path:
|
||||
raise RuntimeError("FLIPOFF_MODEL_PATH environment variable not set")
|
||||
|
||||
gesture_cls = GestureRegistry.get(gesture_name)
|
||||
if not gesture_cls:
|
||||
raise ValueError(f"Unknown gesture: {gesture_name}")
|
||||
event_cls = EventRegistry.get(event_name)
|
||||
if not event_cls:
|
||||
raise ValueError(f"Unknown event: {event_name}")
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
detector = HandDetector(model_path)
|
||||
camera = Camera(camera_index)
|
||||
event_instance = event_cls()
|
||||
|
||||
gesture_instance = gesture_cls()
|
||||
last_trigger = [0.0]
|
||||
|
||||
callback = _get_callback(gesture_cls, event_instance, cooldown, last_trigger)
|
||||
|
||||
while True:
|
||||
ret, frame = camera.read()
|
||||
if not ret:
|
||||
break
|
||||
|
||||
hands = detector.detect(frame)
|
||||
if hands:
|
||||
callback(hands[0])
|
||||
|
||||
if debug:
|
||||
for landmark in hands[0]:
|
||||
x = int(landmark.x * frame.shape[1])
|
||||
y = int(landmark.y * frame.shape[0])
|
||||
cv2.circle(frame, (x, y), 5, (0, 255, 0), -1)
|
||||
|
||||
flipping = gesture_instance.detect(hands[0])
|
||||
text = f"{gesture_name.upper()} DETECTED" if flipping else "Waiting for gesture..."
|
||||
color = (0, 0, 255) if flipping else (0, 255, 0)
|
||||
cv2.putText(frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)
|
||||
|
||||
if not headless:
|
||||
cv2.imshow("Gesture Poweroff", frame)
|
||||
if cv2.waitKey(1) & 0xFF == 27:
|
||||
break
|
||||
|
||||
camera.release()
|
||||
if not headless:
|
||||
cv2.destroyAllWindows()
|
||||
detector.close()
|
||||
loop.close()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Hand gesture event utility")
|
||||
parser.add_argument(
|
||||
"--gesture",
|
||||
type=str,
|
||||
default="flipping_off",
|
||||
choices=list(GestureRegistry.all().keys()),
|
||||
help="Gesture to detect",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--event",
|
||||
type=str,
|
||||
default="poweroff",
|
||||
choices=list(EventRegistry.all().keys()),
|
||||
help="Event to trigger on gesture",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--headless",
|
||||
action="store_true",
|
||||
help="Hide GUI window and run in headless mode",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--camera",
|
||||
type=int,
|
||||
default=0,
|
||||
help="Camera index to use",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cooldown",
|
||||
type=float,
|
||||
default=2.0,
|
||||
help="Cooldown between event triggers in seconds",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--debug",
|
||||
action="store_true",
|
||||
help="Show debug visualizations",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
run(
|
||||
gesture_name=args.gesture,
|
||||
event_name=args.event,
|
||||
headless=args.headless,
|
||||
camera_index=args.camera,
|
||||
cooldown=args.cooldown,
|
||||
debug=args.debug or os.environ.get("FLIPOFF_DRYRUN", "0") == "1",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
69
src/flipoff/detector.py
Normal file
69
src/flipoff/detector.py
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
import time
|
||||
from typing import Callable
|
||||
|
||||
import cv2
|
||||
import mediapipe as mp
|
||||
from mediapipe.tasks import python
|
||||
from mediapipe.tasks.python import vision
|
||||
from mediapipe.tasks.python.components.containers.landmark import NormalizedLandmark
|
||||
import numpy as np
|
||||
|
||||
|
||||
class HandDetector:
|
||||
def __init__(
|
||||
self,
|
||||
model_path: str,
|
||||
num_hands: int = 1,
|
||||
) -> None:
|
||||
base_options = python.BaseOptions(model_asset_path=model_path)
|
||||
options = vision.HandLandmarkerOptions(
|
||||
base_options=base_options,
|
||||
num_hands=num_hands,
|
||||
running_mode=vision.RunningMode.VIDEO,
|
||||
)
|
||||
self._detector = vision.HandLandmarker.create_from_options(options)
|
||||
|
||||
def detect(
|
||||
self,
|
||||
frame: np.ndarray,
|
||||
timestamp_ms: int | None = None,
|
||||
) -> list[list[NormalizedLandmark]]:
|
||||
if timestamp_ms is None:
|
||||
timestamp_ms = int(time.time() * 1000)
|
||||
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
|
||||
result = self._detector.detect_for_video(mp_image, timestamp_ms)
|
||||
return result.hand_landmarks if result.hand_landmarks else []
|
||||
|
||||
def close(self) -> None:
|
||||
self._detector.close()
|
||||
|
||||
|
||||
class Camera:
|
||||
def __init__(self, index: int = 0) -> None:
|
||||
self._cap = cv2.VideoCapture(index)
|
||||
|
||||
def read(self) -> tuple[bool, np.ndarray]:
|
||||
ret, frame = self._cap.read()
|
||||
return ret, frame
|
||||
|
||||
def release(self) -> None:
|
||||
self._cap.release()
|
||||
|
||||
|
||||
class GestureDetector:
|
||||
def __init__(
|
||||
self,
|
||||
detector: HandDetector,
|
||||
gesture_callback: Callable[[list[NormalizedLandmark]], bool],
|
||||
) -> None:
|
||||
self._detector = detector
|
||||
self._callback = gesture_callback
|
||||
|
||||
def process_frame(self, frame: np.ndarray) -> bool | None:
|
||||
hands = self._detector.detect(frame)
|
||||
if hands:
|
||||
return self._callback(hands[0])
|
||||
return None
|
||||
|
||||
def close(self) -> None:
|
||||
self._detector.close()
|
||||
60
src/flipoff/events.py
Normal file
60
src/flipoff/events.py
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
from abc import ABC
|
||||
from abc import abstractmethod
|
||||
import asyncio
|
||||
import os
|
||||
from typing import ClassVar
|
||||
|
||||
|
||||
class Event(ABC):
|
||||
name: ClassVar[str] = "unknown"
|
||||
|
||||
@abstractmethod
|
||||
async def trigger(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class PoweroffEvent(Event):
|
||||
name = "poweroff"
|
||||
DEBUG: bool = os.environ.get("FLIPOFF_DRYRUN", "0") == "1"
|
||||
|
||||
async def trigger(self) -> None:
|
||||
if self.DEBUG:
|
||||
print("DRYRUN: Would power off")
|
||||
return
|
||||
try:
|
||||
await self._poweroff()
|
||||
except Exception as e:
|
||||
print(f"Poweroff failed: {e}")
|
||||
|
||||
async def _poweroff(self) -> None:
|
||||
from dbus_next.aio.message_bus import MessageBus
|
||||
from dbus_next.constants import BusType
|
||||
|
||||
bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
|
||||
proxy = bus.get_proxy_object(
|
||||
"org.freedesktop.login1",
|
||||
"/org/freedesktop/login1",
|
||||
None,
|
||||
)
|
||||
manager = proxy.get_interface("org.freedesktop.login1.Manager")
|
||||
await manager.call_power_off(False)
|
||||
|
||||
|
||||
class EventRegistry:
|
||||
_events: dict[str, type[Event]] = {}
|
||||
|
||||
@classmethod
|
||||
def register(cls, event_class: type[Event]) -> type[Event]:
|
||||
cls._events[event_class.name] = event_class
|
||||
return event_class
|
||||
|
||||
@classmethod
|
||||
def get(cls, name: str) -> type[Event] | None:
|
||||
return cls._events.get(name)
|
||||
|
||||
@classmethod
|
||||
def all(cls) -> dict[str, type[Event]]:
|
||||
return cls._events.copy()
|
||||
|
||||
|
||||
EventRegistry.register(PoweroffEvent)
|
||||
60
src/flipoff/gesture.py
Normal file
60
src/flipoff/gesture.py
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
from dataclasses import dataclass
|
||||
from typing import Callable
|
||||
from typing import ClassVar
|
||||
|
||||
from mediapipe.tasks.python.components.containers.landmark import NormalizedLandmark
|
||||
|
||||
|
||||
@dataclass
|
||||
class GestureResult:
|
||||
name: str
|
||||
detected: bool
|
||||
|
||||
|
||||
class Gesture:
|
||||
name: ClassVar[str] = "unknown"
|
||||
|
||||
def detect(self, hand: list[NormalizedLandmark]) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class GestureRegistry:
|
||||
_gestures: dict[str, type[Gesture]] = {}
|
||||
|
||||
@classmethod
|
||||
def register(cls, gesture_class: type[Gesture]) -> type[Gesture]:
|
||||
cls._gestures[gesture_class.name] = gesture_class
|
||||
return gesture_class
|
||||
|
||||
@classmethod
|
||||
def get(cls, name: str) -> type[Gesture] | None:
|
||||
return cls._gestures.get(name)
|
||||
|
||||
@classmethod
|
||||
def all(cls) -> dict[str, type[Gesture]]:
|
||||
return cls._gestures.copy()
|
||||
|
||||
|
||||
def _check_y_values(
|
||||
hand: list[NormalizedLandmark],
|
||||
*indices: int,
|
||||
) -> bool:
|
||||
for idx in indices:
|
||||
if hand[idx].y is None:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@GestureRegistry.register
|
||||
class FlippingOffGesture(Gesture):
|
||||
name = "flipping_off"
|
||||
|
||||
def detect(self, hand: list[NormalizedLandmark]) -> bool:
|
||||
if not _check_y_values(hand, 12, 10, 8, 6, 16, 14, 20, 18):
|
||||
return False
|
||||
return bool(
|
||||
hand[12].y < hand[10].y
|
||||
and hand[8].y > hand[6].y
|
||||
and hand[16].y > hand[14].y
|
||||
and hand[20].y > hand[18].y
|
||||
)
|
||||
19
uv.lock
generated
19
uv.lock
generated
|
|
@ -258,6 +258,13 @@ dependencies = [
|
|||
{ name = "opencv-contrib-python" },
|
||||
]
|
||||
|
||||
[package.optional-dependencies]
|
||||
dev = [
|
||||
{ name = "black" },
|
||||
{ name = "isort" },
|
||||
{ name = "mypy" },
|
||||
]
|
||||
|
||||
[package.dev-dependencies]
|
||||
dev = [
|
||||
{ name = "black" },
|
||||
|
|
@ -267,11 +274,15 @@ dev = [
|
|||
|
||||
[package.metadata]
|
||||
requires-dist = [
|
||||
{ name = "dbus-next" },
|
||||
{ name = "mediapipe" },
|
||||
{ name = "numpy" },
|
||||
{ name = "opencv-contrib-python" },
|
||||
{ name = "black", marker = "extra == 'dev'", specifier = ">=26.3.1" },
|
||||
{ name = "dbus-next", specifier = ">=0.2.3" },
|
||||
{ name = "isort", marker = "extra == 'dev'", specifier = ">=8.0.1" },
|
||||
{ name = "mediapipe", specifier = ">=0.10.33" },
|
||||
{ name = "mypy", marker = "extra == 'dev'", specifier = ">=1.20.0" },
|
||||
{ name = "numpy", specifier = ">=2.4.4" },
|
||||
{ name = "opencv-contrib-python", specifier = ">=4.13.0" },
|
||||
]
|
||||
provides-extras = ["dev"]
|
||||
|
||||
[package.metadata.requires-dev]
|
||||
dev = [
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue