use std::{path::Path, time::Duration}; use chrono::{DateTime, TimeZone, Utc}; use sqlx::{ Row, SqlitePool, sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions}, }; use thiserror::Error; #[derive(Debug, Error)] pub enum DbError { #[error("sqlite: {0}")] Sqlx(#[from] sqlx::Error), #[error("create database directory: {0}")] CreateDir(#[from] std::io::Error), } #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] pub struct RouteEntry { pub store_path: String, pub upstream_url: String, pub latency_ms: f64, pub latency_ema: f64, pub last_verified: DateTime, pub query_count: u32, pub failure_count: u32, pub ttl: DateTime, pub nar_hash: String, pub nar_size: u64, pub nar_url: String, } impl RouteEntry { #[must_use] pub fn is_valid(&self) -> bool { Utc::now() < self.ttl } } #[derive(Debug, Clone)] pub struct HealthRow { pub url: String, pub ema_latency: f64, pub consecutive_fails: i64, pub total_queries: i64, } #[derive(Clone)] pub struct Db { pool: SqlitePool, max_entries: i64, } impl Db { pub async fn open(path: &str, max_entries: i64) -> Result { if path != ":memory:" && let Some(parent) = Path::new(path).parent() { tokio::fs::create_dir_all(parent).await?; } let options = if path == ":memory:" { SqliteConnectOptions::new().filename(path) } else { SqliteConnectOptions::new() .filename(path) .create_if_missing(true) } .journal_mode(SqliteJournalMode::Wal) .busy_timeout(Duration::from_secs(5)); let pool = SqlitePoolOptions::new() .max_connections(1) .connect_with(options) .await?; migrate(&pool).await?; Ok(Self { pool, max_entries }) } pub async fn get_route( &self, store_path: &str, ) -> Result, DbError> { let row = sqlx::query( r"SELECT store_path, upstream_url, latency_ms, latency_ema, query_count, failure_count, last_verified, ttl, nar_hash, nar_size, nar_url FROM routes WHERE store_path = ?", ) .bind(store_path) .fetch_optional(&self.pool) .await?; Ok(row.as_ref().map(row_to_route)) } pub async fn get_route_by_nar_url( &self, nar_url: &str, ) -> Result, DbError> { let row = sqlx::query( r"SELECT store_path, upstream_url, latency_ms, latency_ema, query_count, failure_count, last_verified, ttl, nar_hash, nar_size, nar_url FROM routes WHERE nar_url = ? AND ttl > ?", ) .bind(nar_url) .bind(Utc::now().timestamp()) .fetch_optional(&self.pool) .await?; Ok(row.as_ref().map(row_to_route)) } pub async fn set_route(&self, entry: &RouteEntry) -> Result<(), DbError> { sqlx::query( r"INSERT INTO routes (store_path, upstream_url, latency_ms, latency_ema, query_count, failure_count, last_verified, ttl, nar_hash, nar_size, nar_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(store_path) DO UPDATE SET upstream_url = excluded.upstream_url, latency_ms = excluded.latency_ms, latency_ema = excluded.latency_ema, query_count = excluded.query_count, failure_count = excluded.failure_count, last_verified = excluded.last_verified, ttl = excluded.ttl, nar_hash = excluded.nar_hash, nar_size = excluded.nar_size, nar_url = excluded.nar_url", ) .bind(&entry.store_path) .bind(&entry.upstream_url) .bind(entry.latency_ms) .bind(entry.latency_ema) .bind(i64::from(entry.query_count)) .bind(i64::from(entry.failure_count)) .bind(entry.last_verified.timestamp()) .bind(entry.ttl.timestamp()) .bind(&entry.nar_hash) .bind(i64::try_from(entry.nar_size).unwrap_or(i64::MAX)) .bind(&entry.nar_url) .execute(&self.pool) .await?; self.evict_if_needed().await } pub async fn expire_old_routes(&self) -> Result<(), DbError> { sqlx::query("DELETE FROM routes WHERE ttl < ?") .bind(Utc::now().timestamp()) .execute(&self.pool) .await?; Ok(()) } pub async fn list_recent_routes( &self, n: i64, ) -> Result, DbError> { let rows = sqlx::query( r"SELECT store_path, upstream_url, latency_ms, latency_ema, query_count, failure_count, last_verified, ttl, nar_hash, nar_size, nar_url FROM routes WHERE ttl > ? ORDER BY last_verified DESC LIMIT ?", ) .bind(Utc::now().timestamp()) .bind(n) .fetch_all(&self.pool) .await?; Ok(rows.iter().map(row_to_route).collect()) } pub async fn route_count(&self) -> Result { Ok( sqlx::query("SELECT COUNT(*) FROM routes") .fetch_one(&self.pool) .await? .get::(0), ) } pub async fn set_negative( &self, store_path: &str, ttl: Duration, ) -> Result<(), DbError> { sqlx::query( r"INSERT INTO negative_cache (store_path, expires_at) VALUES (?, ?) ON CONFLICT(store_path) DO UPDATE SET expires_at = excluded.expires_at", ) .bind(store_path) .bind((Utc::now() + chrono::Duration::from_std(ttl).unwrap_or_default()).timestamp()) .execute(&self.pool) .await?; Ok(()) } pub async fn is_negative(&self, store_path: &str) -> Result { Ok( sqlx::query( "SELECT EXISTS(SELECT 1 FROM negative_cache WHERE store_path = ? AND \ expires_at > ?)", ) .bind(store_path) .bind(Utc::now().timestamp()) .fetch_one(&self.pool) .await? .get::(0) != 0, ) } pub async fn expire_negatives(&self) -> Result<(), DbError> { sqlx::query("DELETE FROM negative_cache WHERE expires_at < ?") .bind(Utc::now().timestamp()) .execute(&self.pool) .await?; Ok(()) } pub async fn save_health( &self, url: &str, ema: f64, consecutive_fails: i64, total_queries: i64, ) -> Result<(), DbError> { sqlx::query( r"INSERT INTO upstream_health (url, ema_latency, consecutive_fails, total_queries) VALUES (?, ?, ?, ?) ON CONFLICT(url) DO UPDATE SET ema_latency = excluded.ema_latency, consecutive_fails = excluded.consecutive_fails, total_queries = excluded.total_queries", ) .bind(url) .bind(ema) .bind(consecutive_fails) .bind(total_queries) .execute(&self.pool) .await?; Ok(()) } pub async fn load_all_health(&self) -> Result, DbError> { let rows = sqlx::query( "SELECT url, ema_latency, consecutive_fails, total_queries FROM \ upstream_health", ) .fetch_all(&self.pool) .await?; Ok( rows .into_iter() .map(|row| { HealthRow { url: row.get("url"), ema_latency: row.get("ema_latency"), consecutive_fails: row.get("consecutive_fails"), total_queries: row.get("total_queries"), } }) .collect(), ) } async fn evict_if_needed(&self) -> Result<(), DbError> { sqlx::query( r"DELETE FROM routes WHERE store_path IN ( SELECT store_path FROM routes ORDER BY last_verified ASC LIMIT MAX(0, (SELECT COUNT(*) FROM routes) - ?) )", ) .bind(self.max_entries) .execute(&self.pool) .await?; Ok(()) } } async fn migrate(pool: &SqlitePool) -> Result<(), DbError> { sqlx::query( r"CREATE TABLE IF NOT EXISTS routes ( store_path TEXT PRIMARY KEY, upstream_url TEXT NOT NULL, latency_ms REAL NOT NULL DEFAULT 0, latency_ema REAL NOT NULL DEFAULT 0, query_count INTEGER NOT NULL DEFAULT 1, failure_count INTEGER NOT NULL DEFAULT 0, last_verified INTEGER NOT NULL DEFAULT 0, ttl INTEGER NOT NULL, nar_hash TEXT NOT NULL DEFAULT '', nar_size INTEGER NOT NULL DEFAULT 0, nar_url TEXT NOT NULL DEFAULT '', created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')) )", ) .execute(pool) .await?; sqlx::query("CREATE INDEX IF NOT EXISTS idx_routes_ttl ON routes(ttl)") .execute(pool) .await?; sqlx::query( "CREATE INDEX IF NOT EXISTS idx_routes_last_verified ON \ routes(last_verified)", ) .execute(pool) .await?; sqlx::query( "CREATE INDEX IF NOT EXISTS idx_routes_nar_url ON routes(nar_url)", ) .execute(pool) .await?; sqlx::query( r"CREATE TABLE IF NOT EXISTS upstream_health ( url TEXT PRIMARY KEY, ema_latency REAL NOT NULL DEFAULT 0, consecutive_fails INTEGER NOT NULL DEFAULT 0, total_queries INTEGER NOT NULL DEFAULT 0 )", ) .execute(pool) .await?; sqlx::query( r"CREATE TABLE IF NOT EXISTS negative_cache ( store_path TEXT PRIMARY KEY, expires_at INTEGER NOT NULL )", ) .execute(pool) .await?; sqlx::query( "CREATE INDEX IF NOT EXISTS idx_negative_expires ON \ negative_cache(expires_at)", ) .execute(pool) .await?; Ok(()) } fn row_to_route(row: &sqlx::sqlite::SqliteRow) -> RouteEntry { let query_count = row.get::("query_count"); let failure_count = row.get::("failure_count"); let nar_size = row.get::("nar_size"); RouteEntry { store_path: row.get("store_path"), upstream_url: row.get("upstream_url"), latency_ms: row.get("latency_ms"), latency_ema: row.get("latency_ema"), query_count: u32::try_from(query_count).unwrap_or_default(), failure_count: u32::try_from(failure_count).unwrap_or_default(), last_verified: Utc .timestamp_opt(row.get("last_verified"), 0) .single() .unwrap_or_else(Utc::now), ttl: Utc .timestamp_opt(row.get("ttl"), 0) .single() .unwrap_or_else(Utc::now), nar_hash: row.get("nar_hash"), nar_size: u64::try_from(nar_size).unwrap_or_default(), nar_url: row.get("nar_url"), } } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn route_roundtrip_and_negative_cache() -> Result<(), DbError> { let db = Db::open(":memory:", 100).await?; let now = Utc::now(); let entry = RouteEntry { store_path: "abc123".into(), upstream_url: "https://cache.nixos.org".into(), latency_ms: 10.0, latency_ema: 10.0, last_verified: now, query_count: 1, failure_count: 0, ttl: now + chrono::Duration::hours(1), nar_hash: "sha256:abc".into(), nar_size: 42, nar_url: "nar/abc.nar.xz".into(), }; db.set_route(&entry).await?; let got = db .get_route("abc123") .await? .ok_or(sqlx::Error::RowNotFound)?; assert_eq!(got.upstream_url, entry.upstream_url); assert!(db.get_route_by_nar_url("nar/abc.nar.xz").await?.is_some()); db.set_negative("missing", Duration::from_secs(60)).await?; assert!(db.is_negative("missing").await?); Ok(()) } }