pinakes-server: fix api key timing, notification scoping, and validate progress inputs

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Ieb342b4b48034de0a2184cdf89d068316a6a6964
This commit is contained in:
raf 2026-03-08 00:42:17 +03:00
commit 2b2c1830a1
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
7 changed files with 334 additions and 179 deletions

View file

@ -25,6 +25,7 @@ use pinakes_core::{
update_device_cursor,
},
};
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio_util::io::ReaderStream;
use uuid::Uuid;
@ -99,7 +100,7 @@ pub async fn register_device(
.register_device(&device, &token_hash)
.await
.map_err(|e| {
ApiError::internal(format!("Failed to register device: {}", e))
ApiError::internal(format!("Failed to register device: {e}"))
})?;
Ok(Json(DeviceRegistrationResponse {
@ -115,14 +116,11 @@ pub async fn list_devices(
Extension(username): Extension<String>,
) -> ApiResult<Json<Vec<DeviceResponse>>> {
let user_id = resolve_user_id(&state.storage, &username).await?;
let devices =
state
.storage
.list_user_devices(user_id)
.await
.map_err(|e| {
ApiError::internal(format!("Failed to list devices: {}", e))
})?;
let devices = state
.storage
.list_user_devices(user_id)
.await
.map_err(|e| ApiError::internal(format!("Failed to list devices: {e}")))?;
Ok(Json(devices.into_iter().map(Into::into).collect()))
}
@ -139,7 +137,7 @@ pub async fn get_device(
.storage
.get_device(DeviceId(id))
.await
.map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?;
.map_err(|e| ApiError::not_found(format!("Device not found: {e}")))?;
// Verify ownership
if device.user_id != user_id {
@ -162,7 +160,7 @@ pub async fn update_device(
.storage
.get_device(DeviceId(id))
.await
.map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?;
.map_err(|e| ApiError::not_found(format!("Device not found: {e}")))?;
// Verify ownership
if device.user_id != user_id {
@ -176,9 +174,11 @@ pub async fn update_device(
device.enabled = enabled;
}
state.storage.update_device(&device).await.map_err(|e| {
ApiError::internal(format!("Failed to update device: {}", e))
})?;
state
.storage
.update_device(&device)
.await
.map_err(|e| ApiError::internal(format!("Failed to update device: {e}")))?;
Ok(Json(device.into()))
}
@ -195,7 +195,7 @@ pub async fn delete_device(
.storage
.get_device(DeviceId(id))
.await
.map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?;
.map_err(|e| ApiError::not_found(format!("Device not found: {e}")))?;
// Verify ownership
if device.user_id != user_id {
@ -206,9 +206,7 @@ pub async fn delete_device(
.storage
.delete_device(DeviceId(id))
.await
.map_err(|e| {
ApiError::internal(format!("Failed to delete device: {}", e))
})?;
.map_err(|e| ApiError::internal(format!("Failed to delete device: {e}")))?;
Ok(StatusCode::NO_CONTENT)
}
@ -225,7 +223,7 @@ pub async fn regenerate_token(
.storage
.get_device(DeviceId(id))
.await
.map_err(|e| ApiError::not_found(format!("Device not found: {}", e)))?;
.map_err(|e| ApiError::not_found(format!("Device not found: {e}")))?;
// Verify ownership
if device.user_id != user_id {
@ -244,7 +242,7 @@ pub async fn regenerate_token(
.register_device(&device, &token_hash)
.await
.map_err(|e| {
ApiError::internal(format!("Failed to regenerate token: {}", e))
ApiError::internal(format!("Failed to regenerate token: {e}"))
})?;
Ok(Json(DeviceRegistrationResponse {
@ -272,7 +270,7 @@ pub async fn get_changes(
.storage
.get_changes_since(cursor, limit + 1)
.await
.map_err(|e| ApiError::internal(format!("Failed to get changes: {}", e)))?;
.map_err(|e| ApiError::internal(format!("Failed to get changes: {e}")))?;
let has_more = changes.len() > limit as usize;
let changes: Vec<SyncChangeResponse> = changes
@ -281,7 +279,7 @@ pub async fn get_changes(
.map(Into::into)
.collect();
let new_cursor = changes.last().map(|c| c.sequence).unwrap_or(cursor);
let new_cursor = changes.last().map_or(cursor, |c| c.sequence);
Ok(Json(ChangesResponse {
changes,
@ -357,12 +355,8 @@ pub async fn report_changes(
// No conflict, check if upload is needed
match change.change_type.as_str() {
"created" | "modified" => {
if change.content_hash.is_some() {
upload_required.push(change.path);
} else {
accepted.push(change.path);
}
"created" | "modified" if change.content_hash.is_some() => {
upload_required.push(change.path);
},
"deleted" => {
// Record deletion
@ -415,15 +409,13 @@ pub async fn acknowledge_changes(
.storage
.get_device_by_token(&token_hash)
.await
.map_err(|e| ApiError::internal(format!("Failed to get device: {}", e)))?
.map_err(|e| ApiError::internal(format!("Failed to get device: {e}")))?
.ok_or_else(|| ApiError::unauthorized("Invalid device token"))?;
// Update device cursor
update_device_cursor(&state.storage, device.id, req.cursor)
.await
.map_err(|e| {
ApiError::internal(format!("Failed to update cursor: {}", e))
})?;
.map_err(|e| ApiError::internal(format!("Failed to update cursor: {e}")))?;
Ok(StatusCode::OK)
}
@ -445,16 +437,14 @@ pub async fn list_conflicts(
.storage
.get_device_by_token(&token_hash)
.await
.map_err(|e| ApiError::internal(format!("Failed to get device: {}", e)))?
.map_err(|e| ApiError::internal(format!("Failed to get device: {e}")))?
.ok_or_else(|| ApiError::unauthorized("Invalid device token"))?;
let conflicts = state
.storage
.get_unresolved_conflicts(device.id)
.await
.map_err(|e| {
ApiError::internal(format!("Failed to get conflicts: {}", e))
})?;
.map_err(|e| ApiError::internal(format!("Failed to get conflicts: {e}")))?;
Ok(Json(conflicts.into_iter().map(Into::into).collect()))
}
@ -479,7 +469,7 @@ pub async fn resolve_conflict(
.resolve_conflict(id, resolution)
.await
.map_err(|e| {
ApiError::internal(format!("Failed to resolve conflict: {}", e))
ApiError::internal(format!("Failed to resolve conflict: {e}"))
})?;
Ok(StatusCode::OK)
@ -510,7 +500,7 @@ pub async fn create_upload(
.storage
.get_device_by_token(&token_hash)
.await
.map_err(|e| ApiError::internal(format!("Failed to get device: {}", e)))?
.map_err(|e| ApiError::internal(format!("Failed to get device: {e}")))?
.ok_or_else(|| ApiError::unauthorized("Invalid device token"))?;
let chunk_size = req.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
@ -536,13 +526,13 @@ pub async fn create_upload(
.create_upload_session(&session)
.await
.map_err(|e| {
ApiError::internal(format!("Failed to create upload session: {}", e))
ApiError::internal(format!("Failed to create upload session: {e}"))
})?;
// Create temp file for chunked upload if manager is available
if let Some(ref manager) = state.chunked_upload_manager {
manager.create_temp_file(&session).await.map_err(|e| {
ApiError::internal(format!("Failed to create temp file: {}", e))
ApiError::internal(format!("Failed to create temp file: {e}"))
})?;
}
@ -563,7 +553,7 @@ pub async fn upload_chunk(
.get_upload_session(session_id)
.await
.map_err(|e| {
ApiError::not_found(format!("Upload session not found: {}", e))
ApiError::not_found(format!("Upload session not found: {e}"))
})?;
if session.status == UploadStatus::Expired {
@ -583,16 +573,14 @@ pub async fn upload_chunk(
let chunk = manager
.write_chunk(&session, chunk_index, body.as_ref())
.await
.map_err(|e| ApiError::internal(format!("Failed to write chunk: {}", e)))?;
.map_err(|e| ApiError::internal(format!("Failed to write chunk: {e}")))?;
// Record chunk metadata in database
state
.storage
.record_chunk(session_id, &chunk)
.await
.map_err(|e| {
ApiError::internal(format!("Failed to record chunk: {}", e))
})?;
.map_err(|e| ApiError::internal(format!("Failed to record chunk: {e}")))?;
Ok(Json(ChunkUploadedResponse {
chunk_index,
@ -607,7 +595,7 @@ pub async fn get_upload_status(
Path(id): Path<Uuid>,
) -> ApiResult<Json<UploadSessionResponse>> {
let session = state.storage.get_upload_session(id).await.map_err(|e| {
ApiError::not_found(format!("Upload session not found: {}", e))
ApiError::not_found(format!("Upload session not found: {e}"))
})?;
Ok(Json(session.into()))
@ -621,14 +609,15 @@ pub async fn complete_upload(
) -> ApiResult<StatusCode> {
let mut session =
state.storage.get_upload_session(id).await.map_err(|e| {
ApiError::not_found(format!("Upload session not found: {}", e))
ApiError::not_found(format!("Upload session not found: {e}"))
})?;
// Verify all chunks received
let chunks =
state.storage.get_upload_chunks(id).await.map_err(|e| {
ApiError::internal(format!("Failed to get chunks: {}", e))
})?;
let chunks = state
.storage
.get_upload_chunks(id)
.await
.map_err(|e| ApiError::internal(format!("Failed to get chunks: {e}")))?;
if chunks.len() != session.chunk_count as usize {
return Err(ApiError::bad_request(format!(
@ -645,7 +634,7 @@ pub async fn complete_upload(
// Verify and finalize the temp file
let temp_path = manager.finalize(&session, &chunks).await.map_err(|e| {
ApiError::internal(format!("Failed to finalize upload: {}", e))
ApiError::internal(format!("Failed to finalize upload: {e}"))
})?;
// Validate and resolve target path securely
@ -676,13 +665,13 @@ pub async fn complete_upload(
// Canonicalize root to resolve symlinks and get absolute path
let root_canon = tokio::fs::canonicalize(&root).await.map_err(|e| {
ApiError::internal(format!("Failed to canonicalize root: {}", e))
ApiError::internal(format!("Failed to canonicalize root: {e}"))
})?;
// Ensure parent directory exists before canonicalizing candidate
if let Some(parent) = candidate.parent() {
tokio::fs::create_dir_all(parent).await.map_err(|e| {
ApiError::internal(format!("Failed to create directory: {}", e))
ApiError::internal(format!("Failed to create directory: {e}"))
})?;
}
@ -694,7 +683,7 @@ pub async fn complete_upload(
canon
} else if let Some(parent) = candidate.parent() {
let parent_canon = tokio::fs::canonicalize(parent).await.map_err(|e| {
ApiError::internal(format!("Failed to canonicalize parent: {}", e))
ApiError::internal(format!("Failed to canonicalize parent: {e}"))
})?;
if let Some(filename) = candidate.file_name() {
@ -721,13 +710,11 @@ pub async fn complete_upload(
// Fallback: copy then remove
tokio::fs::copy(&temp_path, &final_path)
.await
.map_err(|e| {
ApiError::internal(format!("Failed to copy file: {}", e))
})?;
.map_err(|e| ApiError::internal(format!("Failed to copy file: {e}")))?;
let _ = tokio::fs::remove_file(&temp_path).await;
} else {
return Err(ApiError::internal(format!("Failed to move file: {}", e)));
return Err(ApiError::internal(format!("Failed to move file: {e}")));
}
}
@ -744,7 +731,7 @@ pub async fn complete_upload(
.update_upload_session(&session)
.await
.map_err(|e| {
ApiError::internal(format!("Failed to update session: {}", e))
ApiError::internal(format!("Failed to update session: {e}"))
})?;
// Record the sync change
@ -765,9 +752,7 @@ pub async fn complete_upload(
.storage
.record_sync_change(&entry)
.await
.map_err(|e| {
ApiError::internal(format!("Failed to record change: {}", e))
})?;
.map_err(|e| ApiError::internal(format!("Failed to record change: {e}")))?;
Ok(StatusCode::OK)
}
@ -780,7 +765,7 @@ pub async fn cancel_upload(
) -> ApiResult<StatusCode> {
let mut session =
state.storage.get_upload_session(id).await.map_err(|e| {
ApiError::not_found(format!("Upload session not found: {}", e))
ApiError::not_found(format!("Upload session not found: {e}"))
})?;
// Clean up temp file if manager is available
@ -796,7 +781,7 @@ pub async fn cancel_upload(
.update_upload_session(&session)
.await
.map_err(|e| {
ApiError::internal(format!("Failed to cancel session: {}", e))
ApiError::internal(format!("Failed to cancel session: {e}"))
})?;
Ok(StatusCode::NO_CONTENT)
@ -813,16 +798,17 @@ pub async fn download_file(
.storage
.get_media_by_path(FilePath::new(&path))
.await
.map_err(|e| ApiError::internal(format!("Failed to get media: {}", e)))?
.map_err(|e| ApiError::internal(format!("Failed to get media: {e}")))?
.ok_or_else(|| ApiError::not_found("File not found"))?;
let file = tokio::fs::File::open(&item.path)
.await
.map_err(|e| ApiError::not_found(format!("File not found: {}", e)))?;
.map_err(|e| ApiError::not_found(format!("File not found: {e}")))?;
let metadata = file.metadata().await.map_err(|e| {
ApiError::internal(format!("Failed to get metadata: {}", e))
})?;
let metadata = file
.metadata()
.await
.map_err(|e| ApiError::internal(format!("Failed to get metadata: {e}")))?;
let file_size = metadata.len();
@ -835,11 +821,15 @@ pub async fn download_file(
let (start, end) = range;
let length = end - start + 1;
let file = tokio::fs::File::open(&item.path).await.map_err(|e| {
ApiError::internal(format!("Failed to reopen file: {}", e))
})?;
let mut file = tokio::fs::File::open(&item.path)
.await
.map_err(|e| ApiError::internal(format!("Failed to reopen file: {e}")))?;
file
.seek(std::io::SeekFrom::Start(start))
.await
.map_err(|e| ApiError::internal(format!("Failed to seek file: {e}")))?;
let stream = ReaderStream::new(file);
let stream = ReaderStream::new(file.take(length));
let body = Body::from_stream(stream);
return Ok(
@ -850,7 +840,7 @@ pub async fn download_file(
(header::CONTENT_LENGTH, length.to_string()),
(
header::CONTENT_RANGE,
format!("bytes {}-{}/{}", start, end, file_size),
format!("bytes {start}-{end}/{file_size}"),
),
(header::ACCEPT_RANGES, "bytes".to_string()),
],