use std::{ collections::HashMap, sync::Arc, time::{Duration, Instant}, }; use chrono::Utc; use futures_util::{StreamExt, stream::FuturesUnordered}; use ncro_db::{Db, DbError, RouteEntry}; use ncro_health::{Prober, Status}; use ncro_narinfo::{NarInfo, NarInfoError, parse_public_key}; use thiserror::Error; use tokio::sync::{Mutex, RwLock}; #[derive(Debug, Error)] pub enum RouterError { #[error("not found in any upstream")] NotFound, #[error("all upstreams unavailable")] UpstreamUnavailable, #[error("no candidates for {0:?}")] NoCandidates(String), #[error("narinfo signature verification failed")] SignatureVerificationFailed, #[error(transparent)] Db(#[from] DbError), } #[derive(Debug, Clone)] pub struct ResolveResult { pub url: String, pub latency_ms: f64, pub cache_hit: bool, pub narinfo_bytes: Option>, } #[derive(Clone)] pub struct Router { inner: Arc, } struct RouterInner { db: Db, prober: Prober, route_ttl: Duration, race_timeout: Duration, negative_ttl: Duration, client: reqwest::Client, upstream_keys: RwLock>, inflight: Mutex>>>, } #[derive(Debug)] struct RaceResult { url: String, latency_ms: f64, } impl Router { #[must_use] pub fn new( db: Db, prober: Prober, route_ttl: Duration, race_timeout: Duration, negative_ttl: Duration, ) -> Self { Self { inner: Arc::new(RouterInner { db, prober, route_ttl, race_timeout, negative_ttl, client: reqwest::Client::builder() .timeout(race_timeout) .build() .unwrap_or_else(|_| reqwest::Client::new()), upstream_keys: RwLock::new(HashMap::new()), inflight: Mutex::new(HashMap::new()), }), } } pub async fn set_upstream_key( &self, url: String, public_key: String, ) -> Result<(), NarInfoError> { parse_public_key(&public_key)?; self .inner .upstream_keys .write() .await .insert(url, public_key); Ok(()) } pub async fn resolve( &self, store_hash: &str, candidates: &[String], ) -> Result { if self.inner.db.is_negative(store_hash).await? { return Err(RouterError::NotFound); } if let Some(result) = self.valid_cached_route(store_hash).await? { return Ok(result); } ncro_metrics::get().narinfo_cache_misses.inc(); let lock = { let mut inflight = self.inner.inflight.lock().await; Arc::clone( inflight .entry(store_hash.to_string()) .or_insert_with(|| Arc::new(Mutex::new(()))), ) }; let _guard = lock.lock().await; if let Some(result) = self.valid_cached_route(store_hash).await? { self.inner.inflight.lock().await.remove(store_hash); return Ok(result); } let result = self.race(store_hash, candidates).await; if matches!(result, Err(RouterError::NotFound)) { let _ = self .inner .db .set_negative(store_hash, self.inner.negative_ttl) .await; } self.inner.inflight.lock().await.remove(store_hash); result } async fn valid_cached_route( &self, store_hash: &str, ) -> Result, RouterError> { let Some(entry) = self.inner.db.get_route(store_hash).await? else { return Ok(None); }; if !entry.is_valid() { return Ok(None); } let health = self.inner.prober.get_health(&entry.upstream_url).await; if !health.as_ref().is_none_or(|h| h.status == Status::Active) { return Ok(None); } ncro_metrics::get().narinfo_cache_hits.inc(); Ok(Some(ResolveResult { url: entry.upstream_url, latency_ms: entry.latency_ema, cache_hit: true, narinfo_bytes: None, })) } async fn race( &self, store_hash: &str, candidates: &[String], ) -> Result { if candidates.is_empty() { return Err(RouterError::NoCandidates(store_hash.to_string())); } let mut handles = FuturesUnordered::new(); for upstream in candidates { let upstream = upstream.clone(); let store_hash = store_hash.to_string(); let client = self.inner.client.clone(); handles.push(tokio::spawn(async move { let start = Instant::now(); let res = client .head(format!("{upstream}/{store_hash}.narinfo")) .send() .await; match res { Ok(resp) if resp.status().is_success() => { Ok(RaceResult { url: upstream, latency_ms: start.elapsed().as_secs_f64() * 1000.0, }) }, Ok(_) => Err(false), Err(_) => Err(true), } })); } let mut net_errs = 0; let mut not_founds = 0; let mut winner: Option = None; let deadline = tokio::time::sleep(self.inner.race_timeout); tokio::pin!(deadline); while !handles.is_empty() { tokio::select! { () = &mut deadline => break, joined = handles.next() => { match joined { Some(Ok(Ok(res))) => if winner.as_ref().is_none_or(|w| res.latency_ms < w.latency_ms) { winner = Some(res); }, Some(Ok(Err(true)) | Err(_)) => net_errs += 1, Some(Ok(Err(false))) => not_founds += 1, None => break, } } } } let Some(winner) = winner else { return if net_errs > 0 && not_founds == 0 { Err(RouterError::UpstreamUnavailable) } else { Err(RouterError::NotFound) }; }; ncro_metrics::get() .upstream_race_wins .with_label_values(&[&winner.url]) .inc(); ncro_metrics::get() .upstream_latency .with_label_values(&[&winner.url]) .observe(winner.latency_ms / 1000.0); let (body, nar_url, nar_hash, nar_size) = self.fetch_narinfo(&winner.url, store_hash).await?; let ema = self .inner .prober .get_health(&winner.url) .await .map_or(winner.latency_ms, |h| { 0.3f64.mul_add(winner.latency_ms, 0.7 * h.ema_latency) }); self .inner .prober .record_latency(&winner.url, winner.latency_ms) .await; let now = Utc::now(); self .inner .db .set_route(&RouteEntry { store_path: store_hash.to_string(), upstream_url: winner.url.clone(), latency_ms: winner.latency_ms, latency_ema: ema, last_verified: now, query_count: 1, failure_count: 0, ttl: now + chrono::Duration::from_std(self.inner.route_ttl) .unwrap_or_default(), nar_hash, nar_size, nar_url, }) .await?; Ok(ResolveResult { url: winner.url, latency_ms: winner.latency_ms, cache_hit: false, narinfo_bytes: body, }) } async fn fetch_narinfo( &self, upstream: &str, store_hash: &str, ) -> Result<(Option>, String, String, u64), RouterError> { let Ok(resp) = self .inner .client .get(format!("{upstream}/{store_hash}.narinfo")) .send() .await else { return Ok((None, String::new(), String::new(), 0)); }; if !resp.status().is_success() { return Ok((None, String::new(), String::new(), 0)); } let Ok(bytes) = resp.bytes().await else { return Ok((None, String::new(), String::new(), 0)); }; let body = bytes.to_vec(); let Ok(parsed) = NarInfo::parse(body.as_slice()) else { return Ok((Some(body), String::new(), String::new(), 0)); }; if let Some(pubkey) = self.inner.upstream_keys.read().await.get(upstream) && !parsed.verify(pubkey).unwrap_or(false) { tracing::warn!( upstream, store = store_hash, "narinfo signature verification failed" ); return Err(RouterError::SignatureVerificationFailed); } Ok((Some(body), parsed.url, parsed.nar_hash, parsed.nar_size)) } }