Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I02ee54baee048c58a480522ce79159eb6a6a6964
395 lines
12 KiB
Rust
395 lines
12 KiB
Rust
use std::{path::Path, time::Duration};
|
|
|
|
use chrono::{DateTime, TimeZone, Utc};
|
|
use sqlx::{
|
|
Row,
|
|
SqlitePool,
|
|
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
|
|
};
|
|
use thiserror::Error;
|
|
|
|
#[derive(Debug, Error)]
|
|
pub enum DbError {
|
|
#[error("sqlite: {0}")]
|
|
Sqlx(#[from] sqlx::Error),
|
|
#[error("create database directory: {0}")]
|
|
CreateDir(#[from] std::io::Error),
|
|
}
|
|
|
|
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
|
|
pub struct RouteEntry {
|
|
pub store_path: String,
|
|
pub upstream_url: String,
|
|
pub latency_ms: f64,
|
|
pub latency_ema: f64,
|
|
pub last_verified: DateTime<Utc>,
|
|
pub query_count: u32,
|
|
pub failure_count: u32,
|
|
pub ttl: DateTime<Utc>,
|
|
pub nar_hash: String,
|
|
pub nar_size: u64,
|
|
pub nar_url: String,
|
|
}
|
|
|
|
impl RouteEntry {
|
|
#[must_use]
|
|
pub fn is_valid(&self) -> bool {
|
|
Utc::now() < self.ttl
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct HealthRow {
|
|
pub url: String,
|
|
pub ema_latency: f64,
|
|
pub consecutive_fails: i64,
|
|
pub total_queries: i64,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct Db {
|
|
pool: SqlitePool,
|
|
max_entries: i64,
|
|
}
|
|
|
|
impl Db {
|
|
pub async fn open(path: &str, max_entries: i64) -> Result<Self, DbError> {
|
|
if path != ":memory:"
|
|
&& let Some(parent) = Path::new(path).parent()
|
|
{
|
|
tokio::fs::create_dir_all(parent).await?;
|
|
}
|
|
|
|
let options = if path == ":memory:" {
|
|
SqliteConnectOptions::new().filename(path)
|
|
} else {
|
|
SqliteConnectOptions::new()
|
|
.filename(path)
|
|
.create_if_missing(true)
|
|
}
|
|
.journal_mode(SqliteJournalMode::Wal)
|
|
.busy_timeout(Duration::from_secs(5));
|
|
|
|
let pool = SqlitePoolOptions::new()
|
|
.max_connections(1)
|
|
.connect_with(options)
|
|
.await?;
|
|
migrate(&pool).await?;
|
|
Ok(Self { pool, max_entries })
|
|
}
|
|
|
|
pub async fn get_route(
|
|
&self,
|
|
store_path: &str,
|
|
) -> Result<Option<RouteEntry>, DbError> {
|
|
let row = sqlx::query(
|
|
r"SELECT store_path, upstream_url, latency_ms, latency_ema, query_count, failure_count,
|
|
last_verified, ttl, nar_hash, nar_size, nar_url
|
|
FROM routes WHERE store_path = ?",
|
|
)
|
|
.bind(store_path)
|
|
.fetch_optional(&self.pool)
|
|
.await?;
|
|
Ok(row.as_ref().map(row_to_route))
|
|
}
|
|
|
|
pub async fn get_route_by_nar_url(
|
|
&self,
|
|
nar_url: &str,
|
|
) -> Result<Option<RouteEntry>, DbError> {
|
|
let row = sqlx::query(
|
|
r"SELECT store_path, upstream_url, latency_ms, latency_ema, query_count, failure_count,
|
|
last_verified, ttl, nar_hash, nar_size, nar_url
|
|
FROM routes WHERE nar_url = ? AND ttl > ?",
|
|
)
|
|
.bind(nar_url)
|
|
.bind(Utc::now().timestamp())
|
|
.fetch_optional(&self.pool)
|
|
.await?;
|
|
Ok(row.as_ref().map(row_to_route))
|
|
}
|
|
|
|
pub async fn set_route(&self, entry: &RouteEntry) -> Result<(), DbError> {
|
|
sqlx::query(
|
|
r"INSERT INTO routes
|
|
(store_path, upstream_url, latency_ms, latency_ema, query_count, failure_count,
|
|
last_verified, ttl, nar_hash, nar_size, nar_url)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(store_path) DO UPDATE SET
|
|
upstream_url = excluded.upstream_url,
|
|
latency_ms = excluded.latency_ms,
|
|
latency_ema = excluded.latency_ema,
|
|
query_count = excluded.query_count,
|
|
failure_count = excluded.failure_count,
|
|
last_verified = excluded.last_verified,
|
|
ttl = excluded.ttl,
|
|
nar_hash = excluded.nar_hash,
|
|
nar_size = excluded.nar_size,
|
|
nar_url = excluded.nar_url",
|
|
)
|
|
.bind(&entry.store_path)
|
|
.bind(&entry.upstream_url)
|
|
.bind(entry.latency_ms)
|
|
.bind(entry.latency_ema)
|
|
.bind(i64::from(entry.query_count))
|
|
.bind(i64::from(entry.failure_count))
|
|
.bind(entry.last_verified.timestamp())
|
|
.bind(entry.ttl.timestamp())
|
|
.bind(&entry.nar_hash)
|
|
.bind(i64::try_from(entry.nar_size).unwrap_or(i64::MAX))
|
|
.bind(&entry.nar_url)
|
|
.execute(&self.pool)
|
|
.await?;
|
|
self.evict_if_needed().await
|
|
}
|
|
|
|
pub async fn expire_old_routes(&self) -> Result<(), DbError> {
|
|
sqlx::query("DELETE FROM routes WHERE ttl < ?")
|
|
.bind(Utc::now().timestamp())
|
|
.execute(&self.pool)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn list_recent_routes(
|
|
&self,
|
|
n: i64,
|
|
) -> Result<Vec<RouteEntry>, DbError> {
|
|
let rows = sqlx::query(
|
|
r"SELECT store_path, upstream_url, latency_ms, latency_ema, query_count, failure_count,
|
|
last_verified, ttl, nar_hash, nar_size, nar_url
|
|
FROM routes WHERE ttl > ? ORDER BY last_verified DESC LIMIT ?",
|
|
)
|
|
.bind(Utc::now().timestamp())
|
|
.bind(n)
|
|
.fetch_all(&self.pool)
|
|
.await?;
|
|
Ok(rows.iter().map(row_to_route).collect())
|
|
}
|
|
|
|
pub async fn route_count(&self) -> Result<i64, DbError> {
|
|
Ok(
|
|
sqlx::query("SELECT COUNT(*) FROM routes")
|
|
.fetch_one(&self.pool)
|
|
.await?
|
|
.get::<i64, _>(0),
|
|
)
|
|
}
|
|
|
|
pub async fn set_negative(
|
|
&self,
|
|
store_path: &str,
|
|
ttl: Duration,
|
|
) -> Result<(), DbError> {
|
|
sqlx::query(
|
|
r"INSERT INTO negative_cache (store_path, expires_at) VALUES (?, ?)
|
|
ON CONFLICT(store_path) DO UPDATE SET expires_at = excluded.expires_at",
|
|
)
|
|
.bind(store_path)
|
|
.bind((Utc::now() + chrono::Duration::from_std(ttl).unwrap_or_default()).timestamp())
|
|
.execute(&self.pool)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn is_negative(&self, store_path: &str) -> Result<bool, DbError> {
|
|
Ok(
|
|
sqlx::query(
|
|
"SELECT EXISTS(SELECT 1 FROM negative_cache WHERE store_path = ? AND \
|
|
expires_at > ?)",
|
|
)
|
|
.bind(store_path)
|
|
.bind(Utc::now().timestamp())
|
|
.fetch_one(&self.pool)
|
|
.await?
|
|
.get::<i64, _>(0)
|
|
!= 0,
|
|
)
|
|
}
|
|
|
|
pub async fn expire_negatives(&self) -> Result<(), DbError> {
|
|
sqlx::query("DELETE FROM negative_cache WHERE expires_at < ?")
|
|
.bind(Utc::now().timestamp())
|
|
.execute(&self.pool)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn save_health(
|
|
&self,
|
|
url: &str,
|
|
ema: f64,
|
|
consecutive_fails: i64,
|
|
total_queries: i64,
|
|
) -> Result<(), DbError> {
|
|
sqlx::query(
|
|
r"INSERT INTO upstream_health (url, ema_latency, consecutive_fails, total_queries)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(url) DO UPDATE SET
|
|
ema_latency = excluded.ema_latency,
|
|
consecutive_fails = excluded.consecutive_fails,
|
|
total_queries = excluded.total_queries",
|
|
)
|
|
.bind(url)
|
|
.bind(ema)
|
|
.bind(consecutive_fails)
|
|
.bind(total_queries)
|
|
.execute(&self.pool)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn load_all_health(&self) -> Result<Vec<HealthRow>, DbError> {
|
|
let rows = sqlx::query(
|
|
"SELECT url, ema_latency, consecutive_fails, total_queries FROM \
|
|
upstream_health",
|
|
)
|
|
.fetch_all(&self.pool)
|
|
.await?;
|
|
Ok(
|
|
rows
|
|
.into_iter()
|
|
.map(|row| {
|
|
HealthRow {
|
|
url: row.get("url"),
|
|
ema_latency: row.get("ema_latency"),
|
|
consecutive_fails: row.get("consecutive_fails"),
|
|
total_queries: row.get("total_queries"),
|
|
}
|
|
})
|
|
.collect(),
|
|
)
|
|
}
|
|
|
|
async fn evict_if_needed(&self) -> Result<(), DbError> {
|
|
sqlx::query(
|
|
r"DELETE FROM routes WHERE store_path IN (
|
|
SELECT store_path FROM routes ORDER BY last_verified ASC
|
|
LIMIT MAX(0, (SELECT COUNT(*) FROM routes) - ?)
|
|
)",
|
|
)
|
|
.bind(self.max_entries)
|
|
.execute(&self.pool)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
async fn migrate(pool: &SqlitePool) -> Result<(), DbError> {
|
|
sqlx::query(
|
|
r"CREATE TABLE IF NOT EXISTS routes (
|
|
store_path TEXT PRIMARY KEY,
|
|
upstream_url TEXT NOT NULL,
|
|
latency_ms REAL NOT NULL DEFAULT 0,
|
|
latency_ema REAL NOT NULL DEFAULT 0,
|
|
query_count INTEGER NOT NULL DEFAULT 1,
|
|
failure_count INTEGER NOT NULL DEFAULT 0,
|
|
last_verified INTEGER NOT NULL DEFAULT 0,
|
|
ttl INTEGER NOT NULL,
|
|
nar_hash TEXT NOT NULL DEFAULT '',
|
|
nar_size INTEGER NOT NULL DEFAULT 0,
|
|
nar_url TEXT NOT NULL DEFAULT '',
|
|
created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
|
|
)",
|
|
)
|
|
.execute(pool)
|
|
.await?;
|
|
sqlx::query("CREATE INDEX IF NOT EXISTS idx_routes_ttl ON routes(ttl)")
|
|
.execute(pool)
|
|
.await?;
|
|
sqlx::query(
|
|
"CREATE INDEX IF NOT EXISTS idx_routes_last_verified ON \
|
|
routes(last_verified)",
|
|
)
|
|
.execute(pool)
|
|
.await?;
|
|
sqlx::query(
|
|
"CREATE INDEX IF NOT EXISTS idx_routes_nar_url ON routes(nar_url)",
|
|
)
|
|
.execute(pool)
|
|
.await?;
|
|
sqlx::query(
|
|
r"CREATE TABLE IF NOT EXISTS upstream_health (
|
|
url TEXT PRIMARY KEY,
|
|
ema_latency REAL NOT NULL DEFAULT 0,
|
|
consecutive_fails INTEGER NOT NULL DEFAULT 0,
|
|
total_queries INTEGER NOT NULL DEFAULT 0
|
|
)",
|
|
)
|
|
.execute(pool)
|
|
.await?;
|
|
sqlx::query(
|
|
r"CREATE TABLE IF NOT EXISTS negative_cache (
|
|
store_path TEXT PRIMARY KEY,
|
|
expires_at INTEGER NOT NULL
|
|
)",
|
|
)
|
|
.execute(pool)
|
|
.await?;
|
|
sqlx::query(
|
|
"CREATE INDEX IF NOT EXISTS idx_negative_expires ON \
|
|
negative_cache(expires_at)",
|
|
)
|
|
.execute(pool)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
fn row_to_route(row: &sqlx::sqlite::SqliteRow) -> RouteEntry {
|
|
let query_count = row.get::<i64, _>("query_count");
|
|
let failure_count = row.get::<i64, _>("failure_count");
|
|
let nar_size = row.get::<i64, _>("nar_size");
|
|
RouteEntry {
|
|
store_path: row.get("store_path"),
|
|
upstream_url: row.get("upstream_url"),
|
|
latency_ms: row.get("latency_ms"),
|
|
latency_ema: row.get("latency_ema"),
|
|
query_count: u32::try_from(query_count).unwrap_or_default(),
|
|
failure_count: u32::try_from(failure_count).unwrap_or_default(),
|
|
last_verified: Utc
|
|
.timestamp_opt(row.get("last_verified"), 0)
|
|
.single()
|
|
.unwrap_or_else(Utc::now),
|
|
ttl: Utc
|
|
.timestamp_opt(row.get("ttl"), 0)
|
|
.single()
|
|
.unwrap_or_else(Utc::now),
|
|
nar_hash: row.get("nar_hash"),
|
|
nar_size: u64::try_from(nar_size).unwrap_or_default(),
|
|
nar_url: row.get("nar_url"),
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[tokio::test]
|
|
async fn route_roundtrip_and_negative_cache() -> Result<(), DbError> {
|
|
let db = Db::open(":memory:", 100).await?;
|
|
let now = Utc::now();
|
|
let entry = RouteEntry {
|
|
store_path: "abc123".into(),
|
|
upstream_url: "https://cache.nixos.org".into(),
|
|
latency_ms: 10.0,
|
|
latency_ema: 10.0,
|
|
last_verified: now,
|
|
query_count: 1,
|
|
failure_count: 0,
|
|
ttl: now + chrono::Duration::hours(1),
|
|
nar_hash: "sha256:abc".into(),
|
|
nar_size: 42,
|
|
nar_url: "nar/abc.nar.xz".into(),
|
|
};
|
|
db.set_route(&entry).await?;
|
|
let got = db
|
|
.get_route("abc123")
|
|
.await?
|
|
.ok_or(sqlx::Error::RowNotFound)?;
|
|
assert_eq!(got.upstream_url, entry.upstream_url);
|
|
assert!(db.get_route_by_nar_url("nar/abc.nar.xz").await?.is_some());
|
|
db.set_negative("missing", Duration::from_secs(60)).await?;
|
|
assert!(db.is_negative("missing").await?);
|
|
Ok(())
|
|
}
|
|
}
|