pinakes-core: add configurable rate limits and cors; add webhook dispatcher; bound job history
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Ib0d34cd7878eb9e8d019497234a092466a6a6964
This commit is contained in:
parent
d5be5026a7
commit
672e11b592
4 changed files with 582 additions and 106 deletions
|
|
@ -81,7 +81,14 @@ impl JobQueue {
|
|||
///
|
||||
/// The `executor` callback is invoked for each job; it receives the job kind,
|
||||
/// a progress-reporting callback, and a cancellation token.
|
||||
pub fn new<F>(worker_count: usize, executor: F) -> Arc<Self>
|
||||
///
|
||||
/// `job_timeout_secs` sets the maximum time a job can run before being
|
||||
/// cancelled. Set to 0 to disable the timeout.
|
||||
pub fn new<F>(
|
||||
worker_count: usize,
|
||||
job_timeout_secs: u64,
|
||||
executor: F,
|
||||
) -> Arc<Self>
|
||||
where
|
||||
F: Fn(
|
||||
Uuid,
|
||||
|
|
@ -103,10 +110,10 @@ impl JobQueue {
|
|||
let executor = Arc::new(executor);
|
||||
|
||||
for _ in 0..worker_count {
|
||||
let rx = rx.clone();
|
||||
let jobs = jobs.clone();
|
||||
let cancellations = cancellations.clone();
|
||||
let executor = executor.clone();
|
||||
let rx = Arc::clone(&rx);
|
||||
let jobs = Arc::clone(&jobs);
|
||||
let cancellations = Arc::clone(&cancellations);
|
||||
let executor = Arc::clone(&executor);
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
|
|
@ -128,9 +135,26 @@ impl JobQueue {
|
|||
}
|
||||
}
|
||||
|
||||
let cancel_token = item.cancel.clone();
|
||||
let handle =
|
||||
executor(item.job_id, item.kind, item.cancel, jobs.clone());
|
||||
let _ = handle.await;
|
||||
executor(item.job_id, item.kind, item.cancel, Arc::clone(&jobs));
|
||||
|
||||
if job_timeout_secs > 0 {
|
||||
let timeout = std::time::Duration::from_secs(job_timeout_secs);
|
||||
if tokio::time::timeout(timeout, handle).await.is_err() {
|
||||
// Timeout: cancel the job and mark as failed
|
||||
cancel_token.cancel();
|
||||
let mut map = jobs.write().await;
|
||||
if let Some(job) = map.get_mut(&item.job_id) {
|
||||
job.status = JobStatus::Failed {
|
||||
error: format!("job timed out after {job_timeout_secs}s"),
|
||||
};
|
||||
job.updated_at = Utc::now();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let _ = handle.await;
|
||||
}
|
||||
|
||||
// Clean up cancellation token
|
||||
cancellations.write().await.remove(&item.job_id);
|
||||
|
|
@ -159,7 +183,33 @@ impl JobQueue {
|
|||
updated_at: now,
|
||||
};
|
||||
|
||||
self.jobs.write().await.insert(id, job);
|
||||
{
|
||||
let mut map = self.jobs.write().await;
|
||||
map.insert(id, job);
|
||||
// Prune old terminal jobs to prevent unbounded memory growth.
|
||||
// Keep at most 500 completed/failed/cancelled entries, removing the
|
||||
// oldest.
|
||||
const MAX_TERMINAL_JOBS: usize = 500;
|
||||
let mut terminal: Vec<(Uuid, chrono::DateTime<Utc>)> = map
|
||||
.iter()
|
||||
.filter(|(_, j)| {
|
||||
matches!(
|
||||
j.status,
|
||||
JobStatus::Completed { .. }
|
||||
| JobStatus::Failed { .. }
|
||||
| JobStatus::Cancelled
|
||||
)
|
||||
})
|
||||
.map(|(k, j)| (*k, j.updated_at))
|
||||
.collect();
|
||||
if terminal.len() > MAX_TERMINAL_JOBS {
|
||||
terminal.sort_by_key(|(_, t)| *t);
|
||||
let to_remove = terminal.len() - MAX_TERMINAL_JOBS;
|
||||
for (stale_id, _) in terminal.into_iter().take(to_remove) {
|
||||
map.remove(&stale_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.cancellations.write().await.insert(id, cancel.clone());
|
||||
|
||||
let item = WorkerItem {
|
||||
|
|
@ -180,20 +230,28 @@ impl JobQueue {
|
|||
|
||||
/// List all jobs, most recent first.
|
||||
pub async fn list(&self) -> Vec<Job> {
|
||||
let map = self.jobs.read().await;
|
||||
let mut jobs: Vec<Job> = map.values().cloned().collect();
|
||||
let mut jobs: Vec<Job> = {
|
||||
let map = self.jobs.read().await;
|
||||
map.values().cloned().collect()
|
||||
};
|
||||
jobs.sort_by_key(|job| std::cmp::Reverse(job.created_at));
|
||||
jobs
|
||||
}
|
||||
|
||||
/// Cancel a running or pending job.
|
||||
pub async fn cancel(&self, id: Uuid) -> bool {
|
||||
if let Some(token) = self.cancellations.read().await.get(&id) {
|
||||
let token = {
|
||||
let guard = self.cancellations.read().await;
|
||||
guard.get(&id).cloned()
|
||||
};
|
||||
if let Some(token) = token {
|
||||
token.cancel();
|
||||
let mut map = self.jobs.write().await;
|
||||
if let Some(job) = map.get_mut(&id) {
|
||||
job.status = JobStatus::Cancelled;
|
||||
job.updated_at = Utc::now();
|
||||
{
|
||||
let mut map = self.jobs.write().await;
|
||||
if let Some(job) = map.get_mut(&id) {
|
||||
job.status = JobStatus::Cancelled;
|
||||
job.updated_at = Utc::now();
|
||||
}
|
||||
}
|
||||
true
|
||||
} else {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue