treewide: better cross-device sync capabilities; in-database storage

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Id99798df6f7e4470caae8a193c2654aa6a6a6964
This commit is contained in:
raf 2026-02-05 08:28:50 +03:00
commit f34c78b238
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
41 changed files with 8806 additions and 138 deletions

View file

@ -1,6 +1,6 @@
use axum::{
Json, Router,
extract::{Path, Query, State},
extract::{Extension, Path, Query, State},
http::StatusCode,
response::IntoResponse,
routing::{get, put},
@ -13,7 +13,7 @@ use pinakes_core::{
model::{AuthorInfo, BookMetadata, MediaId, Pagination, ReadingProgress, ReadingStatus},
};
use crate::{dto::MediaResponse, error::ApiError, state::AppState};
use crate::{auth::resolve_user_id, dto::MediaResponse, error::ApiError, state::AppState};
/// Book metadata response DTO
#[derive(Debug, Serialize, Deserialize)]
@ -240,15 +240,15 @@ pub async fn get_author_books(
/// Get reading progress for a book
pub async fn get_reading_progress(
State(state): State<AppState>,
Extension(username): Extension<String>,
Path(media_id): Path<Uuid>,
) -> Result<impl IntoResponse, ApiError> {
// TODO: Get user_id from auth context
let user_id = Uuid::new_v4(); // Placeholder
let user_id = resolve_user_id(&state.storage, &username).await?;
let media_id = MediaId(media_id);
let progress = state
.storage
.get_reading_progress(user_id, media_id)
.get_reading_progress(user_id.0, media_id)
.await?
.ok_or(ApiError(PinakesError::NotFound(
"Reading progress not found".to_string(),
@ -260,16 +260,16 @@ pub async fn get_reading_progress(
/// Update reading progress for a book
pub async fn update_reading_progress(
State(state): State<AppState>,
Extension(username): Extension<String>,
Path(media_id): Path<Uuid>,
Json(req): Json<UpdateProgressRequest>,
) -> Result<impl IntoResponse, ApiError> {
// TODO: Get user_id from auth context
let user_id = Uuid::new_v4(); // Placeholder
let user_id = resolve_user_id(&state.storage, &username).await?;
let media_id = MediaId(media_id);
state
.storage
.update_reading_progress(user_id, media_id, req.current_page)
.update_reading_progress(user_id.0, media_id, req.current_page)
.await?;
Ok(StatusCode::NO_CONTENT)
@ -278,14 +278,14 @@ pub async fn update_reading_progress(
/// Get user's reading list
pub async fn get_reading_list(
State(state): State<AppState>,
Extension(username): Extension<String>,
Query(params): Query<ReadingListQuery>,
) -> Result<impl IntoResponse, ApiError> {
// TODO: Get user_id from auth context
let user_id = Uuid::new_v4(); // Placeholder
let user_id = resolve_user_id(&state.storage, &username).await?;
let items = state
.storage
.get_reading_list(user_id, params.status)
.get_reading_list(user_id.0, params.status)
.await?;
let response: Vec<MediaResponse> = items.into_iter().map(MediaResponse::from).collect();

View file

@ -19,11 +19,14 @@ pub mod saved_searches;
pub mod scan;
pub mod scheduled_tasks;
pub mod search;
pub mod shares;
pub mod social;
pub mod statistics;
pub mod streaming;
pub mod subtitles;
pub mod sync;
pub mod tags;
pub mod transcode;
pub mod upload;
pub mod users;
pub mod webhooks;

View file

@ -27,6 +27,12 @@ pub struct TimelineQuery {
pub group_by: GroupBy,
pub year: Option<i32>,
pub month: Option<u32>,
#[serde(default = "default_timeline_limit")]
pub limit: u64,
}
fn default_timeline_limit() -> u64 {
10000
}
/// Timeline group response
@ -62,12 +68,12 @@ pub async fn get_timeline(
State(state): State<AppState>,
Query(query): Query<TimelineQuery>,
) -> Result<impl IntoResponse, ApiError> {
// Query photos with date_taken
// Query photos with date_taken (limit is configurable, defaults to 10000)
let all_media = state
.storage
.list_media(&pinakes_core::model::Pagination {
offset: 0,
limit: 10000, // TODO: Make this more efficient with streaming
limit: query.limit.min(50000), // Cap at 50000 for safety
sort: Some("date_taken DESC".to_string()),
})
.await?;

View file

@ -0,0 +1,543 @@
use axum::{
Json,
extract::{ConnectInfo, Extension, Path, Query, State},
http::StatusCode,
};
use chrono::Utc;
use std::net::SocketAddr;
use uuid::Uuid;
use crate::auth::resolve_user_id;
use crate::dto::{
AccessSharedRequest, BatchDeleteSharesRequest, CreateShareRequest, MediaResponse,
PaginationParams, ShareActivityResponse, ShareNotificationResponse, ShareResponse,
UpdateShareRequest,
};
use crate::error::{ApiError, ApiResult};
use crate::state::AppState;
use pinakes_core::model::MediaId;
use pinakes_core::model::Pagination;
use pinakes_core::sharing::{
Share, ShareActivity, ShareActivityAction, ShareId, ShareNotification, ShareNotificationType,
SharePermissions, ShareRecipient, ShareTarget, generate_share_token, hash_share_password,
verify_share_password,
};
use pinakes_core::users::UserId;
/// Create a new share
/// POST /api/shares
pub async fn create_share(
State(state): State<AppState>,
Extension(username): Extension<String>,
Json(req): Json<CreateShareRequest>,
) -> ApiResult<Json<ShareResponse>> {
let config = state.config.read().await;
if !config.sharing.enabled {
return Err(ApiError::bad_request("Sharing is not enabled"));
}
// Validate public links are allowed
if req.recipient_type == "public_link" && !config.sharing.allow_public_links {
return Err(ApiError::bad_request("Public links are not allowed"));
}
drop(config);
let owner_id = resolve_user_id(&state.storage, &username).await?;
// Parse target
let target_id: Uuid = req
.target_id
.parse()
.map_err(|_| ApiError::bad_request("Invalid target_id"))?;
let target = match req.target_type.as_str() {
"media" => ShareTarget::Media {
media_id: MediaId(target_id),
},
"collection" => ShareTarget::Collection {
collection_id: target_id,
},
"tag" => ShareTarget::Tag { tag_id: target_id },
"saved_search" => ShareTarget::SavedSearch {
search_id: target_id,
},
_ => return Err(ApiError::bad_request("Invalid target_type")),
};
// Parse recipient
let recipient = match req.recipient_type.as_str() {
"public_link" => {
let token = generate_share_token();
let password_hash = req.password.as_ref().map(|p| hash_share_password(p));
ShareRecipient::PublicLink {
token,
password_hash,
}
}
"user" => {
let recipient_user_id = req.recipient_user_id.ok_or_else(|| {
ApiError::bad_request("recipient_user_id required for user share")
})?;
ShareRecipient::User {
user_id: UserId(recipient_user_id),
}
}
"group" => {
let group_id = req.recipient_group_id.ok_or_else(|| {
ApiError::bad_request("recipient_group_id required for group share")
})?;
ShareRecipient::Group { group_id }
}
_ => return Err(ApiError::bad_request("Invalid recipient_type")),
};
// Parse permissions
let permissions = if let Some(perms) = req.permissions {
SharePermissions {
can_view: perms.can_view.unwrap_or(true),
can_download: perms.can_download.unwrap_or(false),
can_edit: perms.can_edit.unwrap_or(false),
can_delete: perms.can_delete.unwrap_or(false),
can_reshare: perms.can_reshare.unwrap_or(false),
can_add: perms.can_add.unwrap_or(false),
}
} else {
SharePermissions::view_only()
};
// Calculate expiration
let expires_at = req
.expires_in_hours
.map(|hours| Utc::now() + chrono::Duration::hours(hours as i64));
let share = Share {
id: ShareId(Uuid::now_v7()),
target,
owner_id,
recipient,
permissions,
note: req.note,
expires_at,
access_count: 0,
last_accessed: None,
inherit_to_children: req.inherit_to_children.unwrap_or(true),
parent_share_id: None,
created_at: Utc::now(),
updated_at: Utc::now(),
};
let created = state
.storage
.create_share(&share)
.await
.map_err(|e| ApiError::internal(format!("Failed to create share: {}", e)))?;
// Send notification to recipient if it's a user share
if let ShareRecipient::User { user_id } = &created.recipient {
let notification = ShareNotification {
id: Uuid::now_v7(),
user_id: *user_id,
share_id: created.id,
notification_type: ShareNotificationType::NewShare,
is_read: false,
created_at: Utc::now(),
};
// Ignore notification errors
let _ = state.storage.create_share_notification(&notification).await;
}
Ok(Json(created.into()))
}
/// List outgoing shares (shares I created)
/// GET /api/shares/outgoing
pub async fn list_outgoing(
State(state): State<AppState>,
Extension(username): Extension<String>,
Query(params): Query<PaginationParams>,
) -> ApiResult<Json<Vec<ShareResponse>>> {
let user_id = resolve_user_id(&state.storage, &username).await?;
let pagination = Pagination {
offset: params.offset.unwrap_or(0),
limit: params.limit.unwrap_or(50),
sort: params.sort,
};
let shares = state
.storage
.list_shares_by_owner(user_id, &pagination)
.await
.map_err(|e| ApiError::internal(format!("Failed to list shares: {}", e)))?;
Ok(Json(shares.into_iter().map(Into::into).collect()))
}
/// List incoming shares (shares shared with me)
/// GET /api/shares/incoming
pub async fn list_incoming(
State(state): State<AppState>,
Extension(username): Extension<String>,
Query(params): Query<PaginationParams>,
) -> ApiResult<Json<Vec<ShareResponse>>> {
let user_id = resolve_user_id(&state.storage, &username).await?;
let pagination = Pagination {
offset: params.offset.unwrap_or(0),
limit: params.limit.unwrap_or(50),
sort: params.sort,
};
let shares = state
.storage
.list_shares_for_user(user_id, &pagination)
.await
.map_err(|e| ApiError::internal(format!("Failed to list shares: {}", e)))?;
Ok(Json(shares.into_iter().map(Into::into).collect()))
}
/// Get share details
/// GET /api/shares/{id}
pub async fn get_share(
State(state): State<AppState>,
Extension(username): Extension<String>,
Path(id): Path<Uuid>,
) -> ApiResult<Json<ShareResponse>> {
let user_id = resolve_user_id(&state.storage, &username).await?;
let share = state
.storage
.get_share(ShareId(id))
.await
.map_err(|e| ApiError::not_found(format!("Share not found: {}", e)))?;
// Check authorization
let is_owner = share.owner_id == user_id;
let is_recipient = match &share.recipient {
ShareRecipient::User {
user_id: recipient_id,
} => *recipient_id == user_id,
_ => false,
};
if !is_owner && !is_recipient {
return Err(ApiError::forbidden("Not authorized to view this share"));
}
Ok(Json(share.into()))
}
/// Update a share
/// PATCH /api/shares/{id}
pub async fn update_share(
State(state): State<AppState>,
Extension(username): Extension<String>,
Path(id): Path<Uuid>,
Json(req): Json<UpdateShareRequest>,
) -> ApiResult<Json<ShareResponse>> {
let user_id = resolve_user_id(&state.storage, &username).await?;
let mut share = state
.storage
.get_share(ShareId(id))
.await
.map_err(|e| ApiError::not_found(format!("Share not found: {}", e)))?;
// Only owner can update
if share.owner_id != user_id {
return Err(ApiError::forbidden("Only the owner can update this share"));
}
// Update fields
if let Some(perms) = req.permissions {
share.permissions = SharePermissions {
can_view: perms.can_view.unwrap_or(share.permissions.can_view),
can_download: perms.can_download.unwrap_or(share.permissions.can_download),
can_edit: perms.can_edit.unwrap_or(share.permissions.can_edit),
can_delete: perms.can_delete.unwrap_or(share.permissions.can_delete),
can_reshare: perms.can_reshare.unwrap_or(share.permissions.can_reshare),
can_add: perms.can_add.unwrap_or(share.permissions.can_add),
};
}
if let Some(note) = req.note {
share.note = Some(note);
}
if let Some(expires_at) = req.expires_at {
share.expires_at = Some(expires_at);
}
if let Some(inherit) = req.inherit_to_children {
share.inherit_to_children = inherit;
}
share.updated_at = Utc::now();
let updated = state
.storage
.update_share(&share)
.await
.map_err(|e| ApiError::internal(format!("Failed to update share: {}", e)))?;
// Notify recipient of update
if let ShareRecipient::User { user_id } = &updated.recipient {
let notification = ShareNotification {
id: Uuid::now_v7(),
user_id: *user_id,
share_id: updated.id,
notification_type: ShareNotificationType::ShareUpdated,
is_read: false,
created_at: Utc::now(),
};
let _ = state.storage.create_share_notification(&notification).await;
}
Ok(Json(updated.into()))
}
/// Delete (revoke) a share
/// DELETE /api/shares/{id}
pub async fn delete_share(
State(state): State<AppState>,
Extension(username): Extension<String>,
Path(id): Path<Uuid>,
) -> ApiResult<StatusCode> {
let user_id = resolve_user_id(&state.storage, &username).await?;
let share = state
.storage
.get_share(ShareId(id))
.await
.map_err(|e| ApiError::not_found(format!("Share not found: {}", e)))?;
// Only owner can delete
if share.owner_id != user_id {
return Err(ApiError::forbidden("Only the owner can revoke this share"));
}
// Notify recipient before deletion
if let ShareRecipient::User { user_id } = &share.recipient {
let notification = ShareNotification {
id: Uuid::now_v7(),
user_id: *user_id,
share_id: share.id,
notification_type: ShareNotificationType::ShareRevoked,
is_read: false,
created_at: Utc::now(),
};
let _ = state.storage.create_share_notification(&notification).await;
}
state
.storage
.delete_share(ShareId(id))
.await
.map_err(|e| ApiError::internal(format!("Failed to delete share: {}", e)))?;
Ok(StatusCode::NO_CONTENT)
}
/// Batch delete shares
/// POST /api/shares/batch/delete
pub async fn batch_delete(
State(state): State<AppState>,
Extension(username): Extension<String>,
Json(req): Json<BatchDeleteSharesRequest>,
) -> ApiResult<Json<serde_json::Value>> {
let user_id = resolve_user_id(&state.storage, &username).await?;
let share_ids: Vec<ShareId> = req.share_ids.into_iter().map(ShareId).collect();
// Verify ownership of all shares
for share_id in &share_ids {
let share = state
.storage
.get_share(*share_id)
.await
.map_err(|e| ApiError::not_found(format!("Share not found: {}", e)))?;
if share.owner_id != user_id {
return Err(ApiError::forbidden(format!(
"Not authorized to delete share {}",
share_id.0
)));
}
}
let deleted = state
.storage
.batch_delete_shares(&share_ids)
.await
.map_err(|e| ApiError::internal(format!("Failed to batch delete: {}", e)))?;
Ok(Json(serde_json::json!({ "deleted": deleted })))
}
/// Access a public shared resource
/// GET /api/shared/{token}
pub async fn access_shared(
State(state): State<AppState>,
Path(token): Path<String>,
Query(params): Query<AccessSharedRequest>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
) -> ApiResult<Json<MediaResponse>> {
let share = state
.storage
.get_share_by_token(&token)
.await
.map_err(|e| ApiError::not_found(format!("Share not found: {}", e)))?;
// Check expiration
if let Some(expires_at) = share.expires_at {
if Utc::now() > expires_at {
return Err(ApiError::not_found("Share has expired"));
}
}
// Check password if required
if let ShareRecipient::PublicLink { password_hash, .. } = &share.recipient {
if let Some(hash) = password_hash {
let provided_password = params
.password
.as_ref()
.ok_or_else(|| ApiError::unauthorized("Password required"))?;
if !verify_share_password(provided_password, hash) {
// Log failed attempt
let activity = ShareActivity {
id: Uuid::now_v7(),
share_id: share.id,
actor_id: None,
actor_ip: Some(addr.ip().to_string()),
action: ShareActivityAction::PasswordFailed,
details: None,
timestamp: Utc::now(),
};
let _ = state.storage.record_share_activity(&activity).await;
return Err(ApiError::unauthorized("Invalid password"));
}
}
}
// Record access
state
.storage
.record_share_access(share.id)
.await
.map_err(|e| ApiError::internal(format!("Failed to record access: {}", e)))?;
// Log the access
let activity = ShareActivity {
id: Uuid::now_v7(),
share_id: share.id,
actor_id: None,
actor_ip: Some(addr.ip().to_string()),
action: ShareActivityAction::Accessed,
details: None,
timestamp: Utc::now(),
};
let _ = state.storage.record_share_activity(&activity).await;
// Return the shared content
match &share.target {
ShareTarget::Media { media_id } => {
let item = state
.storage
.get_media(*media_id)
.await
.map_err(|e| ApiError::not_found(format!("Media not found: {}", e)))?;
Ok(Json(item.into()))
}
_ => {
// For collections/tags, return a placeholder
// Full implementation would return the collection contents
Err(ApiError::bad_request(
"Collection/tag sharing not yet fully implemented",
))
}
}
}
/// Get share activity log
/// GET /api/shares/{id}/activity
pub async fn get_activity(
State(state): State<AppState>,
Extension(username): Extension<String>,
Path(id): Path<Uuid>,
Query(params): Query<PaginationParams>,
) -> ApiResult<Json<Vec<ShareActivityResponse>>> {
let user_id = resolve_user_id(&state.storage, &username).await?;
let share = state
.storage
.get_share(ShareId(id))
.await
.map_err(|e| ApiError::not_found(format!("Share not found: {}", e)))?;
// Only owner can view activity
if share.owner_id != user_id {
return Err(ApiError::forbidden(
"Only the owner can view share activity",
));
}
let pagination = Pagination {
offset: params.offset.unwrap_or(0),
limit: params.limit.unwrap_or(50),
sort: params.sort,
};
let activity = state
.storage
.get_share_activity(ShareId(id), &pagination)
.await
.map_err(|e| ApiError::internal(format!("Failed to get activity: {}", e)))?;
Ok(Json(activity.into_iter().map(Into::into).collect()))
}
/// Get unread share notifications
/// GET /api/notifications/shares
pub async fn get_notifications(
State(state): State<AppState>,
Extension(username): Extension<String>,
) -> ApiResult<Json<Vec<ShareNotificationResponse>>> {
let user_id = resolve_user_id(&state.storage, &username).await?;
let notifications = state
.storage
.get_unread_notifications(user_id)
.await
.map_err(|e| ApiError::internal(format!("Failed to get notifications: {}", e)))?;
Ok(Json(notifications.into_iter().map(Into::into).collect()))
}
/// Mark a notification as read
/// POST /api/notifications/shares/{id}/read
pub async fn mark_notification_read(
State(state): State<AppState>,
Extension(_username): Extension<String>,
Path(id): Path<Uuid>,
) -> ApiResult<StatusCode> {
state
.storage
.mark_notification_read(id)
.await
.map_err(|e| ApiError::internal(format!("Failed to mark as read: {}", e)))?;
Ok(StatusCode::OK)
}
/// Mark all notifications as read
/// POST /api/notifications/shares/read-all
pub async fn mark_all_read(
State(state): State<AppState>,
Extension(username): Extension<String>,
) -> ApiResult<StatusCode> {
let user_id = resolve_user_id(&state.storage, &username).await?;
state
.storage
.mark_all_notifications_read(user_id)
.await
.map_err(|e| ApiError::internal(format!("Failed to mark all as read: {}", e)))?;
Ok(StatusCode::OK)
}

View file

@ -0,0 +1,743 @@
use axum::{
Json,
body::Body,
extract::{Extension, Path, Query, State},
http::{HeaderMap, StatusCode, header},
response::IntoResponse,
};
use chrono::Utc;
use tokio_util::io::ReaderStream;
use uuid::Uuid;
use crate::auth::resolve_user_id;
use crate::dto::{
AcknowledgeChangesRequest, ChangesResponse, ChunkUploadedResponse, ConflictResponse,
CreateUploadSessionRequest, DeviceRegistrationResponse, DeviceResponse, GetChangesParams,
RegisterDeviceRequest, ReportChangesRequest, ReportChangesResponse, ResolveConflictRequest,
SyncChangeResponse, UpdateDeviceRequest, UploadSessionResponse,
};
use crate::error::{ApiError, ApiResult};
use crate::state::AppState;
use pinakes_core::config::ConflictResolution;
use pinakes_core::model::ContentHash;
use pinakes_core::sync::{
ChunkInfo, DeviceId, DeviceType, SyncChangeType, SyncConflict, SyncDevice, SyncLogEntry,
UploadSession, UploadStatus, generate_device_token, hash_device_token, update_device_cursor,
};
use std::path::Path as FilePath;
const DEFAULT_CHUNK_SIZE: u64 = 4 * 1024 * 1024; // 4MB
const DEFAULT_CHANGES_LIMIT: u64 = 100;
/// Register a new sync device
/// POST /api/sync/devices
pub async fn register_device(
State(state): State<AppState>,
Extension(username): Extension<String>,
Json(req): Json<RegisterDeviceRequest>,
) -> ApiResult<Json<DeviceRegistrationResponse>> {
let config = state.config.read().await;
if !config.sync.enabled {
return Err(ApiError::bad_request("Sync is not enabled"));
}
drop(config);
let user_id = resolve_user_id(&state.storage, &username).await?;
let device_type = req
.device_type
.parse::<DeviceType>()
.map_err(|_| ApiError::bad_request("Invalid device type"))?;
// Generate device token
let device_token = generate_device_token();
let token_hash = hash_device_token(&device_token);
let now = Utc::now();
let device = SyncDevice {
id: DeviceId(Uuid::now_v7()),
user_id,
name: req.name,
device_type,
client_version: req.client_version,
os_info: req.os_info,
last_sync_at: None,
last_seen_at: now,
sync_cursor: Some(0),
enabled: true,
created_at: now,
updated_at: now,
};
let registered = state
.storage
.register_device(&device, &token_hash)
.await
.map_err(|e| ApiError::internal(format!("Failed to register device: {}", e)))?;
Ok(Json(DeviceRegistrationResponse {
device: registered.into(),
device_token,
}))
}
/// List user's sync devices
/// GET /api/sync/devices
pub async fn list_devices(
State(state): State<AppState>,
Extension(username): Extension<String>,
) -> ApiResult<Json<Vec<DeviceResponse>>> {
let user_id = resolve_user_id(&state.storage, &username).await?;
let devices = state
.storage
.list_user_devices(user_id)
.await
.map_err(|e| ApiError::internal(format!("Failed to list devices: {}", e)))?;
Ok(Json(devices.into_iter().map(Into::into).collect()))
}
/// Get device details
/// GET /api/sync/devices/{id}
pub async fn get_device(
State(state): State<AppState>,
Extension(username): Extension<String>,
Path(id): Path<Uuid>,
) -> ApiResult<Json<DeviceResponse>> {
let user_id = resolve_user_id(&state.storage, &username).await?;
let device = state
.storage
.get_device(DeviceId(id))
.await
.map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?;
// Verify ownership
if device.user_id != user_id {
return Err(ApiError::forbidden("Not authorized to access this device"));
}
Ok(Json(device.into()))
}
/// Update a device
/// PUT /api/sync/devices/{id}
pub async fn update_device(
State(state): State<AppState>,
Extension(username): Extension<String>,
Path(id): Path<Uuid>,
Json(req): Json<UpdateDeviceRequest>,
) -> ApiResult<Json<DeviceResponse>> {
let user_id = resolve_user_id(&state.storage, &username).await?;
let mut device = state
.storage
.get_device(DeviceId(id))
.await
.map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?;
// Verify ownership
if device.user_id != user_id {
return Err(ApiError::forbidden("Not authorized to update this device"));
}
if let Some(name) = req.name {
device.name = name;
}
if let Some(enabled) = req.enabled {
device.enabled = enabled;
}
state
.storage
.update_device(&device)
.await
.map_err(|e| ApiError::internal(format!("Failed to update device: {}", e)))?;
Ok(Json(device.into()))
}
/// Delete a device
/// DELETE /api/sync/devices/{id}
pub async fn delete_device(
State(state): State<AppState>,
Extension(username): Extension<String>,
Path(id): Path<Uuid>,
) -> ApiResult<StatusCode> {
let user_id = resolve_user_id(&state.storage, &username).await?;
let device = state
.storage
.get_device(DeviceId(id))
.await
.map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?;
// Verify ownership
if device.user_id != user_id {
return Err(ApiError::forbidden("Not authorized to delete this device"));
}
state
.storage
.delete_device(DeviceId(id))
.await
.map_err(|e| ApiError::internal(format!("Failed to delete device: {}", e)))?;
Ok(StatusCode::NO_CONTENT)
}
/// Regenerate device token
/// POST /api/sync/devices/{id}/token
pub async fn regenerate_token(
State(state): State<AppState>,
Extension(username): Extension<String>,
Path(id): Path<Uuid>,
) -> ApiResult<Json<DeviceRegistrationResponse>> {
let user_id = resolve_user_id(&state.storage, &username).await?;
let device = state
.storage
.get_device(DeviceId(id))
.await
.map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?;
// Verify ownership
if device.user_id != user_id {
return Err(ApiError::forbidden(
"Not authorized to regenerate token for this device",
));
}
// Generate new token
let new_token = generate_device_token();
let token_hash = hash_device_token(&new_token);
// Re-register with new token (this updates the token hash)
let updated = state
.storage
.register_device(&device, &token_hash)
.await
.map_err(|e| ApiError::internal(format!("Failed to regenerate token: {}", e)))?;
Ok(Json(DeviceRegistrationResponse {
device: updated.into(),
device_token: new_token,
}))
}
/// Get changes since cursor
/// GET /api/sync/changes
pub async fn get_changes(
State(state): State<AppState>,
Query(params): Query<GetChangesParams>,
) -> ApiResult<Json<ChangesResponse>> {
let config = state.config.read().await;
if !config.sync.enabled {
return Err(ApiError::bad_request("Sync is not enabled"));
}
drop(config);
let cursor = params.cursor.unwrap_or(0);
let limit = params.limit.unwrap_or(DEFAULT_CHANGES_LIMIT);
let changes = state
.storage
.get_changes_since(cursor, limit + 1)
.await
.map_err(|e| ApiError::internal(format!("Failed to get changes: {}", e)))?;
let has_more = changes.len() > limit as usize;
let changes: Vec<SyncChangeResponse> = changes
.into_iter()
.take(limit as usize)
.map(Into::into)
.collect();
let new_cursor = changes.last().map(|c| c.sequence).unwrap_or(cursor);
Ok(Json(ChangesResponse {
changes,
cursor: new_cursor,
has_more,
}))
}
/// Report local changes from client
/// POST /api/sync/report
pub async fn report_changes(
State(state): State<AppState>,
Extension(_username): Extension<String>,
Json(req): Json<ReportChangesRequest>,
) -> ApiResult<Json<ReportChangesResponse>> {
let config = state.config.read().await;
if !config.sync.enabled {
return Err(ApiError::bad_request("Sync is not enabled"));
}
let conflict_resolution = config.sync.default_conflict_resolution.clone();
drop(config);
let mut accepted = Vec::new();
let mut conflicts = Vec::new();
let mut upload_required = Vec::new();
for change in req.changes {
// Check for conflicts
if let Some(content_hash) = &change.content_hash {
let server_state = state
.storage
.get_media_by_path(FilePath::new(&change.path))
.await
.ok()
.flatten();
if let Some(server_item) = server_state {
let client_hash = ContentHash(content_hash.clone());
if server_item.content_hash != client_hash {
// Conflict detected
let conflict = SyncConflict {
id: Uuid::now_v7(),
device_id: DeviceId(Uuid::nil()), // Will be set by device context
path: change.path.clone(),
local_hash: content_hash.clone(),
local_mtime: change.local_mtime.unwrap_or(0),
server_hash: server_item.content_hash.to_string(),
server_mtime: server_item.updated_at.timestamp(),
detected_at: Utc::now(),
resolved_at: None,
resolution: None,
};
// Auto-resolve if configured
match conflict_resolution {
ConflictResolution::ServerWins => {
// Client should download server version
accepted.push(change.path);
}
ConflictResolution::ClientWins => {
// Client should upload
upload_required.push(change.path);
}
ConflictResolution::KeepBoth | ConflictResolution::Manual => {
conflicts.push(conflict.into());
}
}
continue;
}
}
}
// No conflict, check if upload is needed
match change.change_type.as_str() {
"created" | "modified" => {
if change.content_hash.is_some() {
upload_required.push(change.path);
} else {
accepted.push(change.path);
}
}
"deleted" => {
// Record deletion
let entry = SyncLogEntry {
id: Uuid::now_v7(),
sequence: 0, // Will be assigned by storage
change_type: SyncChangeType::Deleted,
media_id: None,
path: change.path.clone(),
content_hash: None,
file_size: None,
metadata_json: None,
changed_by_device: None,
timestamp: Utc::now(),
};
if state.storage.record_sync_change(&entry).await.is_ok() {
accepted.push(change.path);
}
}
_ => {
accepted.push(change.path);
}
}
}
Ok(Json(ReportChangesResponse {
accepted,
conflicts,
upload_required,
}))
}
/// Acknowledge processed changes
/// POST /api/sync/ack
pub async fn acknowledge_changes(
State(state): State<AppState>,
Extension(_username): Extension<String>,
headers: HeaderMap,
Json(req): Json<AcknowledgeChangesRequest>,
) -> ApiResult<StatusCode> {
// Get device from header or context
let device_token = headers
.get("X-Device-Token")
.and_then(|v| v.to_str().ok())
.ok_or_else(|| ApiError::bad_request("Missing X-Device-Token header"))?;
let token_hash = hash_device_token(device_token);
let device = state
.storage
.get_device_by_token(&token_hash)
.await
.map_err(|e| ApiError::internal(format!("Failed to get device: {}", e)))?
.ok_or_else(|| ApiError::unauthorized("Invalid device token"))?;
// Update device cursor
update_device_cursor(&state.storage, device.id, req.cursor)
.await
.map_err(|e| ApiError::internal(format!("Failed to update cursor: {}", e)))?;
Ok(StatusCode::OK)
}
/// List unresolved conflicts
/// GET /api/sync/conflicts
pub async fn list_conflicts(
State(state): State<AppState>,
Extension(_username): Extension<String>,
headers: HeaderMap,
) -> ApiResult<Json<Vec<ConflictResponse>>> {
let device_token = headers
.get("X-Device-Token")
.and_then(|v| v.to_str().ok())
.ok_or_else(|| ApiError::bad_request("Missing X-Device-Token header"))?;
let token_hash = hash_device_token(device_token);
let device = state
.storage
.get_device_by_token(&token_hash)
.await
.map_err(|e| ApiError::internal(format!("Failed to get device: {}", e)))?
.ok_or_else(|| ApiError::unauthorized("Invalid device token"))?;
let conflicts = state
.storage
.get_unresolved_conflicts(device.id)
.await
.map_err(|e| ApiError::internal(format!("Failed to get conflicts: {}", e)))?;
Ok(Json(conflicts.into_iter().map(Into::into).collect()))
}
/// Resolve a sync conflict
/// POST /api/sync/conflicts/{id}/resolve
pub async fn resolve_conflict(
State(state): State<AppState>,
Extension(_username): Extension<String>,
Path(id): Path<Uuid>,
Json(req): Json<ResolveConflictRequest>,
) -> ApiResult<StatusCode> {
let resolution = match req.resolution.as_str() {
"server_wins" => ConflictResolution::ServerWins,
"client_wins" => ConflictResolution::ClientWins,
"keep_both" => ConflictResolution::KeepBoth,
_ => return Err(ApiError::bad_request("Invalid resolution type")),
};
state
.storage
.resolve_conflict(id, resolution)
.await
.map_err(|e| ApiError::internal(format!("Failed to resolve conflict: {}", e)))?;
Ok(StatusCode::OK)
}
/// Create an upload session for chunked upload
/// POST /api/sync/upload
pub async fn create_upload(
State(state): State<AppState>,
Extension(_username): Extension<String>,
headers: HeaderMap,
Json(req): Json<CreateUploadSessionRequest>,
) -> ApiResult<Json<UploadSessionResponse>> {
let config = state.config.read().await;
if !config.sync.enabled {
return Err(ApiError::bad_request("Sync is not enabled"));
}
let upload_timeout_hours = config.sync.upload_timeout_hours;
drop(config);
let device_token = headers
.get("X-Device-Token")
.and_then(|v| v.to_str().ok())
.ok_or_else(|| ApiError::bad_request("Missing X-Device-Token header"))?;
let token_hash = hash_device_token(device_token);
let device = state
.storage
.get_device_by_token(&token_hash)
.await
.map_err(|e| ApiError::internal(format!("Failed to get device: {}", e)))?
.ok_or_else(|| ApiError::unauthorized("Invalid device token"))?;
let chunk_size = req.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
let chunk_count = (req.expected_size + chunk_size - 1) / chunk_size;
let now = Utc::now();
let session = UploadSession {
id: Uuid::now_v7(),
device_id: device.id,
target_path: req.target_path,
expected_hash: ContentHash(req.expected_hash),
expected_size: req.expected_size,
chunk_size,
chunk_count,
status: UploadStatus::Pending,
created_at: now,
expires_at: now + chrono::Duration::hours(upload_timeout_hours as i64),
last_activity: now,
};
state
.storage
.create_upload_session(&session)
.await
.map_err(|e| ApiError::internal(format!("Failed to create upload session: {}", e)))?;
Ok(Json(session.into()))
}
/// Upload a chunk
/// PUT /api/sync/upload/{id}/chunks/{index}
pub async fn upload_chunk(
State(state): State<AppState>,
Path((session_id, chunk_index)): Path<(Uuid, u64)>,
_headers: HeaderMap,
body: axum::body::Bytes,
) -> ApiResult<Json<ChunkUploadedResponse>> {
let session = state
.storage
.get_upload_session(session_id)
.await
.map_err(|e| ApiError::not_found(format!("Upload session not found: {}", e)))?;
if session.status == UploadStatus::Expired {
return Err(ApiError::bad_request("Upload session has expired"));
}
if chunk_index >= session.chunk_count {
return Err(ApiError::bad_request("Invalid chunk index"));
}
// Calculate chunk hash
let hash = blake3::hash(&body);
let chunk_hash = hash.to_hex().to_string();
let chunk = ChunkInfo {
upload_id: session_id,
chunk_index,
offset: chunk_index * session.chunk_size,
size: body.len() as u64,
hash: chunk_hash,
received_at: Utc::now(),
};
state
.storage
.record_chunk(session_id, &chunk)
.await
.map_err(|e| ApiError::internal(format!("Failed to record chunk: {}", e)))?;
// Store the chunk data (would integrate with managed storage)
// For now, this is a placeholder - actual implementation would write to temp storage
Ok(Json(ChunkUploadedResponse {
chunk_index,
received: true,
}))
}
/// Get upload session status
/// GET /api/sync/upload/{id}
pub async fn get_upload_status(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> ApiResult<Json<UploadSessionResponse>> {
let session = state
.storage
.get_upload_session(id)
.await
.map_err(|e| ApiError::not_found(format!("Upload session not found: {}", e)))?;
Ok(Json(session.into()))
}
/// Complete an upload session
/// POST /api/sync/upload/{id}/complete
pub async fn complete_upload(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> ApiResult<StatusCode> {
let mut session = state
.storage
.get_upload_session(id)
.await
.map_err(|e| ApiError::not_found(format!("Upload session not found: {}", e)))?;
// Verify all chunks received
let chunks = state
.storage
.get_upload_chunks(id)
.await
.map_err(|e| ApiError::internal(format!("Failed to get chunks: {}", e)))?;
if chunks.len() != session.chunk_count as usize {
return Err(ApiError::bad_request(format!(
"Missing chunks: expected {}, got {}",
session.chunk_count,
chunks.len()
)));
}
// Mark session as completed
session.status = UploadStatus::Completed;
state
.storage
.update_upload_session(&session)
.await
.map_err(|e| ApiError::internal(format!("Failed to update session: {}", e)))?;
// Record the sync change
let entry = SyncLogEntry {
id: Uuid::now_v7(),
sequence: 0,
change_type: SyncChangeType::Created,
media_id: None,
path: session.target_path,
content_hash: Some(session.expected_hash),
file_size: Some(session.expected_size),
metadata_json: None,
changed_by_device: Some(session.device_id),
timestamp: Utc::now(),
};
state
.storage
.record_sync_change(&entry)
.await
.map_err(|e| ApiError::internal(format!("Failed to record change: {}", e)))?;
Ok(StatusCode::OK)
}
/// Cancel an upload session
/// DELETE /api/sync/upload/{id}
pub async fn cancel_upload(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> ApiResult<StatusCode> {
let mut session = state
.storage
.get_upload_session(id)
.await
.map_err(|e| ApiError::not_found(format!("Upload session not found: {}", e)))?;
session.status = UploadStatus::Cancelled;
state
.storage
.update_upload_session(&session)
.await
.map_err(|e| ApiError::internal(format!("Failed to cancel session: {}", e)))?;
Ok(StatusCode::NO_CONTENT)
}
/// Download a file for sync (supports Range header)
/// GET /api/sync/download/{*path}
pub async fn download_file(
State(state): State<AppState>,
Path(path): Path<String>,
headers: HeaderMap,
) -> ApiResult<impl IntoResponse> {
let item = state
.storage
.get_media_by_path(FilePath::new(&path))
.await
.map_err(|e| ApiError::internal(format!("Failed to get media: {}", e)))?
.ok_or_else(|| ApiError::not_found("File not found"))?;
let file = tokio::fs::File::open(&item.path)
.await
.map_err(|e| ApiError::not_found(format!("File not found: {}", e)))?;
let metadata = file
.metadata()
.await
.map_err(|e| ApiError::internal(format!("Failed to get metadata: {}", e)))?;
let file_size = metadata.len();
// Check for Range header
if let Some(range_header) = headers.get(header::RANGE) {
if let Ok(range_str) = range_header.to_str() {
if let Some(range) = parse_range_header(range_str, file_size) {
// Partial content response
let (start, end) = range;
let length = end - start + 1;
let file = tokio::fs::File::open(&item.path)
.await
.map_err(|e| ApiError::internal(format!("Failed to reopen file: {}", e)))?;
let stream = ReaderStream::new(file);
let body = Body::from_stream(stream);
return Ok((
StatusCode::PARTIAL_CONTENT,
[
(header::CONTENT_TYPE, item.media_type.mime_type()),
(header::CONTENT_LENGTH, length.to_string()),
(
header::CONTENT_RANGE,
format!("bytes {}-{}/{}", start, end, file_size),
),
(header::ACCEPT_RANGES, "bytes".to_string()),
],
body,
)
.into_response());
}
}
}
// Full content response
let stream = ReaderStream::new(file);
let body = Body::from_stream(stream);
Ok((
StatusCode::OK,
[
(header::CONTENT_TYPE, item.media_type.mime_type()),
(header::CONTENT_LENGTH, file_size.to_string()),
(header::ACCEPT_RANGES, "bytes".to_string()),
],
body,
)
.into_response())
}
/// Parse HTTP Range header
fn parse_range_header(range: &str, file_size: u64) -> Option<(u64, u64)> {
let range = range.strip_prefix("bytes=")?;
let parts: Vec<&str> = range.split('-').collect();
if parts.len() != 2 {
return None;
}
let start: u64 = parts[0].parse().ok()?;
let end: u64 = if parts[1].is_empty() {
file_size - 1
} else {
parts[1].parse().ok()?
};
if start > end || end >= file_size {
return None;
}
Some((start, end))
}

View file

@ -0,0 +1,169 @@
use axum::{
Json,
extract::{Multipart, Path, State},
http::{StatusCode, header},
response::IntoResponse,
};
use tokio_util::io::ReaderStream;
use uuid::Uuid;
use crate::dto::{ManagedStorageStatsResponse, UploadResponse};
use crate::error::{ApiError, ApiResult};
use crate::state::AppState;
use pinakes_core::model::MediaId;
use pinakes_core::upload;
/// Upload a file to managed storage
/// POST /api/upload
pub async fn upload_file(
State(state): State<AppState>,
mut multipart: Multipart,
) -> ApiResult<Json<UploadResponse>> {
let managed_storage = state
.managed_storage
.as_ref()
.ok_or_else(|| ApiError::bad_request("Managed storage is not enabled"))?;
let config = state.config.read().await;
if !config.managed_storage.enabled {
return Err(ApiError::bad_request("Managed storage is not enabled"));
}
drop(config);
// Extract file from multipart
let field = multipart
.next_field()
.await
.map_err(|e| ApiError::bad_request(format!("Failed to read multipart field: {}", e)))?
.ok_or_else(|| ApiError::bad_request("No file provided"))?;
let original_filename = field
.file_name()
.map(|s| s.to_string())
.unwrap_or_else(|| "unknown".to_string());
let content_type = field
.content_type()
.map(|s| s.to_string())
.unwrap_or_else(|| "application/octet-stream".to_string());
let data = field
.bytes()
.await
.map_err(|e| ApiError::bad_request(format!("Failed to read file data: {}", e)))?;
// Process the upload
let result = upload::process_upload_bytes(
&state.storage,
managed_storage.as_ref(),
&data,
&original_filename,
Some(&content_type),
)
.await
.map_err(|e| ApiError::internal(format!("Upload failed: {}", e)))?;
Ok(Json(result.into()))
}
/// Download a managed file
/// GET /api/media/{id}/download
pub async fn download_file(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> ApiResult<impl IntoResponse> {
let media_id = MediaId(id);
let item = state
.storage
.get_media(media_id)
.await
.map_err(|e| ApiError::not_found(format!("Media not found: {}", e)))?;
let managed_storage = state
.managed_storage
.as_ref()
.ok_or_else(|| ApiError::bad_request("Managed storage is not enabled"))?;
// Check if this is a managed file
if item.storage_mode != pinakes_core::model::StorageMode::Managed {
// For external files, stream from their original path
let file = tokio::fs::File::open(&item.path)
.await
.map_err(|e| ApiError::not_found(format!("File not found: {}", e)))?;
let stream = ReaderStream::new(file);
let body = axum::body::Body::from_stream(stream);
let content_type = item.media_type.mime_type();
let filename = item.original_filename.unwrap_or(item.file_name);
return Ok((
[
(header::CONTENT_TYPE, content_type),
(
header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", filename),
),
],
body,
));
}
// For managed files, stream from content-addressable storage
let file = managed_storage
.open(&item.content_hash)
.await
.map_err(|e| ApiError::not_found(format!("Blob not found: {}", e)))?;
let stream = ReaderStream::new(file);
let body = axum::body::Body::from_stream(stream);
let content_type = item.media_type.mime_type();
let filename = item.original_filename.unwrap_or(item.file_name);
Ok((
[
(header::CONTENT_TYPE, content_type),
(
header::CONTENT_DISPOSITION,
format!("attachment; filename=\"{}\"", filename),
),
],
body,
))
}
/// Migrate an external file to managed storage
/// POST /api/media/{id}/move-to-managed
pub async fn move_to_managed(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> ApiResult<StatusCode> {
let managed_storage = state
.managed_storage
.as_ref()
.ok_or_else(|| ApiError::bad_request("Managed storage is not enabled"))?;
let media_id = MediaId(id);
upload::migrate_to_managed(&state.storage, managed_storage.as_ref(), media_id)
.await
.map_err(|e| ApiError::internal(format!("Migration failed: {}", e)))?;
Ok(StatusCode::NO_CONTENT)
}
/// Get managed storage statistics
/// GET /api/managed/stats
pub async fn managed_stats(
State(state): State<AppState>,
) -> ApiResult<Json<ManagedStorageStatsResponse>> {
let stats = state
.storage
.managed_storage_stats()
.await
.map_err(|e| ApiError::internal(format!("Failed to get stats: {}", e)))?;
Ok(Json(stats.into()))
}