pinakes-core: improve media management features; various configuration improvements
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I2d1f04f13970d21c36067f30bc04a9176a6a6964
This commit is contained in:
parent
cfdc3d0622
commit
e02c15490e
31 changed files with 1167 additions and 197 deletions
|
|
@ -1,132 +1,205 @@
|
|||
use std::sync::Arc;
|
||||
//! Auto-detection of photo events and albums based on time and location proximity
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::warn;
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
use crate::config::WebhookConfig;
|
||||
use crate::error::Result;
|
||||
use crate::model::{MediaId, MediaItem};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum PinakesEvent {
|
||||
MediaImported {
|
||||
media_id: String,
|
||||
},
|
||||
MediaUpdated {
|
||||
media_id: String,
|
||||
},
|
||||
MediaDeleted {
|
||||
media_id: String,
|
||||
},
|
||||
ScanCompleted {
|
||||
files_found: usize,
|
||||
files_processed: usize,
|
||||
},
|
||||
IntegrityMismatch {
|
||||
media_id: String,
|
||||
expected: String,
|
||||
actual: String,
|
||||
},
|
||||
MediaRated {
|
||||
media_id: String,
|
||||
user_id: String,
|
||||
stars: u8,
|
||||
},
|
||||
MediaCommented {
|
||||
media_id: String,
|
||||
user_id: String,
|
||||
},
|
||||
PlaylistCreated {
|
||||
playlist_id: String,
|
||||
owner_id: String,
|
||||
},
|
||||
TranscodeStarted {
|
||||
media_id: String,
|
||||
profile: String,
|
||||
},
|
||||
TranscodeCompleted {
|
||||
media_id: String,
|
||||
profile: String,
|
||||
},
|
||||
/// Configuration for event detection
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EventDetectionConfig {
|
||||
/// Maximum time gap between photos in the same event (in seconds)
|
||||
pub max_time_gap_secs: i64,
|
||||
/// Minimum number of photos to form an event
|
||||
pub min_photos: usize,
|
||||
/// Maximum distance between photos in the same event (in kilometers)
|
||||
/// None means location is not considered
|
||||
pub max_distance_km: Option<f64>,
|
||||
/// Consider photos on the same day as potentially the same event
|
||||
pub same_day_threshold: bool,
|
||||
}
|
||||
|
||||
impl PinakesEvent {
|
||||
pub fn event_name(&self) -> &'static str {
|
||||
match self {
|
||||
Self::MediaImported { .. } => "media_imported",
|
||||
Self::MediaUpdated { .. } => "media_updated",
|
||||
Self::MediaDeleted { .. } => "media_deleted",
|
||||
Self::ScanCompleted { .. } => "scan_completed",
|
||||
Self::IntegrityMismatch { .. } => "integrity_mismatch",
|
||||
Self::MediaRated { .. } => "media_rated",
|
||||
Self::MediaCommented { .. } => "media_commented",
|
||||
Self::PlaylistCreated { .. } => "playlist_created",
|
||||
Self::TranscodeStarted { .. } => "transcode_started",
|
||||
Self::TranscodeCompleted { .. } => "transcode_completed",
|
||||
impl Default for EventDetectionConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_time_gap_secs: 2 * 60 * 60, // 2 hours
|
||||
min_photos: 5,
|
||||
max_distance_km: Some(1.0), // 1km
|
||||
same_day_threshold: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EventBus {
|
||||
tx: broadcast::Sender<PinakesEvent>,
|
||||
/// A detected photo event/album
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DetectedEvent {
|
||||
/// Suggested name for the event (e.g., "Photos from 2024-01-15")
|
||||
pub suggested_name: String,
|
||||
/// Start time of the event
|
||||
pub start_time: DateTime<Utc>,
|
||||
/// End time of the event
|
||||
pub end_time: DateTime<Utc>,
|
||||
/// Media items in this event
|
||||
pub items: Vec<MediaId>,
|
||||
/// Representative location (if available)
|
||||
pub location: Option<(f64, f64)>, // (latitude, longitude)
|
||||
}
|
||||
|
||||
impl EventBus {
|
||||
pub fn new(webhooks: Vec<WebhookConfig>) -> Arc<Self> {
|
||||
let (tx, _) = broadcast::channel(256);
|
||||
/// Calculate Haversine distance between two GPS coordinates in kilometers
|
||||
fn haversine_distance(lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 {
|
||||
const EARTH_RADIUS_KM: f64 = 6371.0;
|
||||
|
||||
// Spawn webhook delivery task
|
||||
if !webhooks.is_empty() {
|
||||
let mut rx: broadcast::Receiver<PinakesEvent> = tx.subscribe();
|
||||
let webhooks = Arc::new(webhooks);
|
||||
tokio::spawn(async move {
|
||||
while let Ok(event) = rx.recv().await {
|
||||
let event_name = event.event_name();
|
||||
for hook in webhooks.iter() {
|
||||
if hook.events.iter().any(|e| e == event_name || e == "*") {
|
||||
let url = hook.url.clone();
|
||||
let event_clone = event.clone();
|
||||
let secret = hook.secret.clone();
|
||||
tokio::spawn(async move {
|
||||
deliver_webhook(&url, &event_clone, secret.as_deref()).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
let dlat = (lat2 - lat1).to_radians();
|
||||
let dlon = (lon2 - lon1).to_radians();
|
||||
|
||||
Arc::new(Self { tx })
|
||||
}
|
||||
let a = (dlat / 2.0).sin().powi(2)
|
||||
+ lat1.to_radians().cos() * lat2.to_radians().cos() * (dlon / 2.0).sin().powi(2);
|
||||
|
||||
pub fn emit(&self, event: PinakesEvent) {
|
||||
// Ignore send errors (no receivers)
|
||||
let _ = self.tx.send(event);
|
||||
}
|
||||
let c = 2.0 * a.sqrt().atan2((1.0 - a).sqrt());
|
||||
|
||||
EARTH_RADIUS_KM * c
|
||||
}
|
||||
|
||||
async fn deliver_webhook(url: &str, event: &PinakesEvent, _secret: Option<&str>) {
|
||||
let client = reqwest::Client::new();
|
||||
let body = serde_json::to_string(event).unwrap_or_default();
|
||||
/// Detect photo events from a list of media items
|
||||
pub fn detect_events(
|
||||
mut items: Vec<MediaItem>,
|
||||
config: &EventDetectionConfig,
|
||||
) -> Result<Vec<DetectedEvent>> {
|
||||
// Filter to only photos with date_taken
|
||||
items.retain(|item| item.date_taken.is_some());
|
||||
|
||||
for attempt in 0..3 {
|
||||
match client
|
||||
.post(url)
|
||||
.header("Content-Type", "application/json")
|
||||
.body(body.clone())
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(resp) if resp.status().is_success() => return,
|
||||
Ok(resp) => {
|
||||
warn!(url, status = %resp.status(), attempt, "webhook delivery failed");
|
||||
if items.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
// Sort by date_taken
|
||||
items.sort_by(|a, b| a.date_taken.unwrap().cmp(&b.date_taken.unwrap()));
|
||||
|
||||
let mut events: Vec<DetectedEvent> = Vec::new();
|
||||
let mut current_event_items: Vec<MediaId> = vec![items[0].id];
|
||||
let mut current_start_time = items[0].date_taken.unwrap();
|
||||
let mut current_last_time = items[0].date_taken.unwrap();
|
||||
let mut current_location = items[0].latitude.zip(items[0].longitude);
|
||||
|
||||
for item in items.iter().skip(1) {
|
||||
let item_time = item.date_taken.unwrap();
|
||||
let time_gap = (item_time - current_last_time).num_seconds();
|
||||
|
||||
// Check time gap
|
||||
let time_ok = if config.same_day_threshold {
|
||||
// Same day or within time gap
|
||||
item_time.date_naive() == current_last_time.date_naive()
|
||||
|| time_gap <= config.max_time_gap_secs
|
||||
} else {
|
||||
time_gap <= config.max_time_gap_secs
|
||||
};
|
||||
|
||||
// Check location proximity if both have GPS data
|
||||
let location_ok = match (
|
||||
config.max_distance_km,
|
||||
current_location,
|
||||
item.latitude.zip(item.longitude),
|
||||
) {
|
||||
(Some(max_dist), Some((lat1, lon1)), Some((lat2, lon2))) => {
|
||||
let dist = haversine_distance(lat1, lon1, lat2, lon2);
|
||||
dist <= max_dist
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(url, error = %e, attempt, "webhook delivery error");
|
||||
// If no location constraint or missing GPS, consider location OK
|
||||
_ => true,
|
||||
};
|
||||
|
||||
if time_ok && location_ok {
|
||||
// Add to current event
|
||||
current_event_items.push(item.id);
|
||||
current_last_time = item_time;
|
||||
|
||||
// Update location to average if available
|
||||
if let (Some((lat1, lon1)), Some((lat2, lon2))) =
|
||||
(current_location, item.latitude.zip(item.longitude))
|
||||
{
|
||||
current_location = Some(((lat1 + lat2) / 2.0, (lon1 + lon2) / 2.0));
|
||||
} else if item.latitude.is_some() && item.longitude.is_some() {
|
||||
current_location = item.latitude.zip(item.longitude);
|
||||
}
|
||||
} else {
|
||||
// Start new event if current has enough photos
|
||||
if current_event_items.len() >= config.min_photos {
|
||||
let event_name = format!("Event on {}", current_start_time.format("%Y-%m-%d"));
|
||||
|
||||
events.push(DetectedEvent {
|
||||
suggested_name: event_name,
|
||||
start_time: current_start_time,
|
||||
end_time: current_last_time,
|
||||
items: current_event_items.clone(),
|
||||
location: current_location,
|
||||
});
|
||||
}
|
||||
|
||||
// Reset for new event
|
||||
current_event_items = vec![item.id];
|
||||
current_start_time = item_time;
|
||||
current_last_time = item_time;
|
||||
current_location = item.latitude.zip(item.longitude);
|
||||
}
|
||||
}
|
||||
|
||||
// Don't forget the last event
|
||||
if current_event_items.len() >= config.min_photos {
|
||||
let event_name = format!("Event on {}", current_start_time.format("%Y-%m-%d"));
|
||||
|
||||
events.push(DetectedEvent {
|
||||
suggested_name: event_name,
|
||||
start_time: current_start_time,
|
||||
end_time: current_last_time,
|
||||
items: current_event_items,
|
||||
location: current_location,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(events)
|
||||
}
|
||||
|
||||
/// Detect photo bursts (rapid sequences of photos)
|
||||
/// Returns groups of media IDs that are likely burst sequences
|
||||
pub fn detect_bursts(
|
||||
mut items: Vec<MediaItem>,
|
||||
max_gap_secs: i64,
|
||||
min_burst_size: usize,
|
||||
) -> Result<Vec<Vec<MediaId>>> {
|
||||
// Filter to only photos with date_taken
|
||||
items.retain(|item| item.date_taken.is_some());
|
||||
|
||||
if items.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
// Sort by date_taken
|
||||
items.sort_by(|a, b| a.date_taken.unwrap().cmp(&b.date_taken.unwrap()));
|
||||
|
||||
let mut bursts: Vec<Vec<MediaId>> = Vec::new();
|
||||
let mut current_burst: Vec<MediaId> = vec![items[0].id];
|
||||
let mut last_time = items[0].date_taken.unwrap();
|
||||
|
||||
for item in items.iter().skip(1) {
|
||||
let item_time = item.date_taken.unwrap();
|
||||
let gap = (item_time - last_time).num_seconds();
|
||||
|
||||
if gap <= max_gap_secs {
|
||||
current_burst.push(item.id);
|
||||
} else {
|
||||
if current_burst.len() >= min_burst_size {
|
||||
bursts.push(current_burst.clone());
|
||||
}
|
||||
current_burst = vec![item.id];
|
||||
}
|
||||
|
||||
// Exponential backoff
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1 << attempt)).await;
|
||||
last_time = item_time;
|
||||
}
|
||||
|
||||
// Don't forget the last burst
|
||||
if current_burst.len() >= min_burst_size {
|
||||
bursts.push(current_burst);
|
||||
}
|
||||
|
||||
Ok(bursts)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue