diff --git a/.envrc b/.envrc index 3550a30..df0bb43 100644 --- a/.envrc +++ b/.envrc @@ -1 +1,2 @@ use flake +export CGO_ENABLED=0 diff --git a/go.mod b/go.mod index b87bd26..6e4ad38 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,20 @@ module notashelf.dev/ncro go 1.25.7 -require gopkg.in/yaml.v3 v3.0.1 +require ( + gopkg.in/yaml.v3 v3.0.1 + modernc.org/sqlite v1.46.1 +) + +require ( + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/ncruces/go-strftime v1.0.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect + golang.org/x/sys v0.37.0 // indirect + modernc.org/libc v1.67.6 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect +) diff --git a/go.sum b/go.sum index a62c313..b2791d1 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,57 @@ +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= +github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY= +golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= +golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= +golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= +golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= +golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= +modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= +modernc.org/ccgo/v4 v4.30.1 h1:4r4U1J6Fhj98NKfSjnPUN7Ze2c6MnAdL0hWw6+LrJpc= +modernc.org/ccgo/v4 v4.30.1/go.mod h1:bIOeI1JL54Utlxn+LwrFyjCx2n2RDiYEaJVSrgdrRfM= +modernc.org/fileutil v1.3.40 h1:ZGMswMNc9JOCrcrakF1HrvmergNLAmxOPjizirpfqBA= +modernc.org/fileutil v1.3.40/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/gc/v3 v3.1.1 h1:k8T3gkXWY9sEiytKhcgyiZ2L0DTyCQ/nvX+LoCljoRE= +modernc.org/gc/v3 v3.1.1/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY= +modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks= +modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI= +modernc.org/libc v1.67.6 h1:eVOQvpModVLKOdT+LvBPjdQqfrZq+pC39BygcT+E7OI= +modernc.org/libc v1.67.6/go.mod h1:JAhxUVlolfYDErnwiqaLvUqc8nfb2r6S6slAgZOnaiE= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= +modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.46.1 h1:eFJ2ShBLIEnUWlLy12raN0Z1plqmFX9Qe3rjQTKt6sU= +modernc.org/sqlite v1.46.1/go.mod h1:CzbrU2lSB1DKUusvwGz7rqEKIq+NUd8GWuBBZDs9/nA= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/internal/cache/db.go b/internal/cache/db.go index 08bf029..1eaaac9 100644 --- a/internal/cache/db.go +++ b/internal/cache/db.go @@ -1 +1,169 @@ package cache + +import ( + "database/sql" + "fmt" + "time" + + _ "modernc.org/sqlite" +) + +// Core routing decision persisted per store path. +type RouteEntry struct { + StorePath string + UpstreamURL string + LatencyMs float64 + LatencyEMA float64 + LastVerified time.Time + QueryCount uint32 + FailureCount uint32 + TTL time.Time + NarHash string + NarSize uint64 +} + +// Returns true if the entry exists and hasn't expired. +func (r *RouteEntry) IsValid() bool { + return r != nil && time.Now().Before(r.TTL) +} + +// SQLite-backed store for route persistence. +type DB struct { + db *sql.DB + maxEntries int +} + +// Opens or creates the SQLite database at path with WAL mode. +func Open(path string, maxEntries int) (*DB, error) { + db, err := sql.Open("sqlite", path+"?_journal=WAL&_busy_timeout=5000") + if err != nil { + return nil, fmt.Errorf("open sqlite: %w", err) + } + db.SetMaxOpenConns(1) // SQLite WAL allows 1 writer + + if err := migrate(db); err != nil { + db.Close() + return nil, fmt.Errorf("migrate: %w", err) + } + + return &DB{db: db, maxEntries: maxEntries}, nil +} + +// Closes the database. +func (d *DB) Close() error { + return d.db.Close() +} + +func migrate(db *sql.DB) error { + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS routes ( + store_path TEXT PRIMARY KEY, + upstream_url TEXT NOT NULL, + latency_ms REAL DEFAULT 0, + latency_ema REAL DEFAULT 0, + query_count INTEGER DEFAULT 1, + failure_count INTEGER DEFAULT 0, + last_verified INTEGER DEFAULT 0, + ttl INTEGER NOT NULL, + nar_hash TEXT DEFAULT '', + nar_size INTEGER DEFAULT 0, + created_at INTEGER DEFAULT (strftime('%s', 'now')) + ); + CREATE INDEX IF NOT EXISTS idx_routes_ttl ON routes(ttl); + CREATE INDEX IF NOT EXISTS idx_routes_last_verified ON routes(last_verified); + + CREATE TABLE IF NOT EXISTS upstream_health ( + url TEXT PRIMARY KEY, + ema_latency REAL DEFAULT 0, + last_probe INTEGER DEFAULT 0, + consecutive_fails INTEGER DEFAULT 0, + total_queries INTEGER DEFAULT 0, + success_rate REAL DEFAULT 1.0 + ); + `) + return err +} + +// Returns the route for storePath, or nil if not found. +func (d *DB) GetRoute(storePath string) (*RouteEntry, error) { + row := d.db.QueryRow(` + SELECT store_path, upstream_url, latency_ms, latency_ema, + query_count, failure_count, last_verified, ttl, nar_hash, nar_size + FROM routes WHERE store_path = ?`, storePath) + + var e RouteEntry + var lastVerifiedUnix, ttlUnix int64 + err := row.Scan( + &e.StorePath, &e.UpstreamURL, &e.LatencyMs, &e.LatencyEMA, + &e.QueryCount, &e.FailureCount, &lastVerifiedUnix, &ttlUnix, + &e.NarHash, &e.NarSize, + ) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + e.LastVerified = time.Unix(lastVerifiedUnix, 0).UTC() + e.TTL = time.Unix(ttlUnix, 0).UTC() + return &e, nil +} + +// Inserts or updates a route entry. +func (d *DB) SetRoute(entry *RouteEntry) error { + _, err := d.db.Exec(` + INSERT INTO routes + (store_path, upstream_url, latency_ms, latency_ema, + query_count, failure_count, last_verified, ttl, nar_hash, nar_size) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(store_path) DO UPDATE SET + upstream_url = excluded.upstream_url, + latency_ms = excluded.latency_ms, + latency_ema = excluded.latency_ema, + query_count = excluded.query_count, + failure_count = excluded.failure_count, + last_verified = excluded.last_verified, + ttl = excluded.ttl, + nar_hash = excluded.nar_hash, + nar_size = excluded.nar_size`, + entry.StorePath, entry.UpstreamURL, + entry.LatencyMs, entry.LatencyEMA, + entry.QueryCount, entry.FailureCount, + entry.LastVerified.Unix(), entry.TTL.Unix(), + entry.NarHash, entry.NarSize, + ) + if err != nil { + return err + } + return d.evictIfNeeded() +} + +// Deletes routes whose TTL has passed. +func (d *DB) ExpireOldRoutes() error { + _, err := d.db.Exec(`DELETE FROM routes WHERE ttl < ?`, time.Now().Unix()) + return err +} + +// Returns the total number of stored routes. +func (d *DB) RouteCount() (int, error) { + var count int + err := d.db.QueryRow(`SELECT COUNT(*) FROM routes`).Scan(&count) + return count, err +} + +// Deletes the oldest routes (by last_verified) when over capacity. +func (d *DB) evictIfNeeded() error { + count, err := d.RouteCount() + if err != nil { + return err + } + if count <= d.maxEntries { + return nil + } + excess := count - d.maxEntries + _, err = d.db.Exec(` + DELETE FROM routes WHERE store_path IN ( + SELECT store_path FROM routes ORDER BY last_verified ASC LIMIT ? + )`, excess) + return err +} diff --git a/internal/cache/db_test.go b/internal/cache/db_test.go new file mode 100644 index 0000000..abf393d --- /dev/null +++ b/internal/cache/db_test.go @@ -0,0 +1,157 @@ +package cache_test + +import ( + "os" + "testing" + "time" + + "notashelf.dev/ncro/internal/cache" +) + +func newTestDB(t *testing.T) *cache.DB { + t.Helper() + f, err := os.CreateTemp("", "ncro-test-*.db") + if err != nil { + t.Fatal(err) + } + f.Close() + t.Cleanup(func() { os.Remove(f.Name()) }) + + db, err := cache.Open(f.Name(), 1000) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { db.Close() }) + return db +} + +func TestGetSetRoute(t *testing.T) { + db := newTestDB(t) + + entry := &cache.RouteEntry{ + StorePath: "abc123xyz-hello-2.12", + UpstreamURL: "https://cache.nixos.org", + LatencyMs: 12.5, + LatencyEMA: 12.5, + LastVerified: time.Now().UTC().Truncate(time.Second), + QueryCount: 1, + TTL: time.Now().Add(time.Hour).UTC().Truncate(time.Second), + } + + if err := db.SetRoute(entry); err != nil { + t.Fatalf("SetRoute: %v", err) + } + + got, err := db.GetRoute("abc123xyz-hello-2.12") + if err != nil { + t.Fatalf("GetRoute: %v", err) + } + if got == nil { + t.Fatal("GetRoute returned nil") + } + if got.UpstreamURL != entry.UpstreamURL { + t.Errorf("upstream = %q, want %q", got.UpstreamURL, entry.UpstreamURL) + } + if got.QueryCount != 1 { + t.Errorf("query_count = %d, want 1", got.QueryCount) + } +} + +func TestGetRouteNotFound(t *testing.T) { + db := newTestDB(t) + got, err := db.GetRoute("nonexistent") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != nil { + t.Errorf("expected nil, got %+v", got) + } +} + +func TestSetRouteUpsert(t *testing.T) { + db := newTestDB(t) + + entry := &cache.RouteEntry{ + StorePath: "abc123-pkg", + UpstreamURL: "https://cache.nixos.org", + LatencyMs: 20.0, + LatencyEMA: 20.0, + QueryCount: 1, + TTL: time.Now().Add(time.Hour), + } + db.SetRoute(entry) + + entry.LatencyEMA = 18.0 + entry.QueryCount = 2 + if err := db.SetRoute(entry); err != nil { + t.Fatalf("upsert: %v", err) + } + + got, _ := db.GetRoute("abc123-pkg") + if got.LatencyEMA != 18.0 { + t.Errorf("ema = %f, want 18.0", got.LatencyEMA) + } + if got.QueryCount != 2 { + t.Errorf("query_count = %d, want 2", got.QueryCount) + } +} + +func TestExpireOldRoutes(t *testing.T) { + db := newTestDB(t) + + // Insert expired route + expired := &cache.RouteEntry{ + StorePath: "expired-pkg", + UpstreamURL: "https://cache.nixos.org", + TTL: time.Now().Add(-time.Minute), // already expired + } + db.SetRoute(expired) + + // Insert valid route + valid := &cache.RouteEntry{ + StorePath: "valid-pkg", + UpstreamURL: "https://cache.nixos.org", + TTL: time.Now().Add(time.Hour), + } + db.SetRoute(valid) + + if err := db.ExpireOldRoutes(); err != nil { + t.Fatalf("ExpireOldRoutes: %v", err) + } + + got, _ := db.GetRoute("expired-pkg") + if got != nil { + t.Error("expired route should have been deleted") + } + got2, _ := db.GetRoute("valid-pkg") + if got2 == nil { + t.Error("valid route should still exist") + } +} + +func TestLRUEviction(t *testing.T) { + // Use maxEntries=3 to trigger eviction easily + f, _ := os.CreateTemp("", "ncro-lru-*.db") + f.Close() + defer os.Remove(f.Name()) + + db, _ := cache.Open(f.Name(), 3) + defer db.Close() + + for i := range 4 { + db.SetRoute(&cache.RouteEntry{ + StorePath: "pkg-" + string(rune('a'+i)), + UpstreamURL: "https://cache.nixos.org", + LastVerified: time.Now().Add(time.Duration(i) * time.Second), + TTL: time.Now().Add(time.Hour), + }) + } + + count, err := db.RouteCount() + if err != nil { + t.Fatal(err) + } + if count > 3 { + t.Errorf("expected count <= 3 after LRU eviction, got %d", count) + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 087dfa4..5a5cfe2 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -8,9 +8,8 @@ import ( "gopkg.in/yaml.v3" ) -// Duration is a wrapper around time.Duration that supports YAML unmarshaling -// from Go duration strings (e.g., "30s", "1h"). yaml.v3 cannot unmarshal -// duration strings directly into time.Duration (int64), so we handle it here. +// Wrapper around time.Duration supporting YAML duration strings ("30s", "1h"). +// yaml.v3 cannot unmarshal duration strings directly into time.Duration (int64). type Duration struct { time.Duration } @@ -101,7 +100,7 @@ func defaults() Config { } } -// Load loads config from file (if non-empty) and applies env overrides. +// Loads config from file (if non-empty) and applies env overrides. func Load(path string) (*Config, error) { cfg := defaults() diff --git a/internal/prober/prober.go b/internal/prober/prober.go index 8219d21..93f518d 100644 --- a/internal/prober/prober.go +++ b/internal/prober/prober.go @@ -1 +1,176 @@ package prober + +import ( + "net/http" + "sort" + "sync" + "time" +) + +// Upstream health status. +type Status int + +const ( + StatusActive Status = iota + StatusDegraded // 3+ consecutive failures + StatusDown // 10+ consecutive failures +) + +func (s Status) String() string { + switch s { + case StatusActive: + return "ACTIVE" + case StatusDegraded: + return "DEGRADED" + default: + return "DOWN" + } +} + +// In-memory metrics for one upstream. +type UpstreamHealth struct { + URL string + EMALatency float64 + LastProbe time.Time + ConsecutiveFails uint32 + TotalQueries uint64 + Status Status +} + +// Tracks latency and health for a set of upstreams. +type Prober struct { + mu sync.RWMutex + alpha float64 + table map[string]*UpstreamHealth + client *http.Client +} + +// Creates a Prober with the given EMA alpha coefficient. +func New(alpha float64) *Prober { + return &Prober{ + alpha: alpha, + table: make(map[string]*UpstreamHealth), + client: &http.Client{ + Timeout: 10 * time.Second, + }, + } +} + +// Seeds the prober with upstream URLs (no measurements yet). +func (p *Prober) InitUpstreams(urls []string) { + p.mu.Lock() + defer p.mu.Unlock() + for _, u := range urls { + if _, ok := p.table[u]; !ok { + p.table[u] = &UpstreamHealth{URL: u, Status: StatusActive} + } + } +} + +// Records a successful latency measurement and updates the EMA. +func (p *Prober) RecordLatency(url string, ms float64) { + p.mu.Lock() + defer p.mu.Unlock() + h := p.getOrCreate(url) + if h.TotalQueries == 0 { + h.EMALatency = ms + } else { + h.EMALatency = p.alpha*ms + (1-p.alpha)*h.EMALatency + } + h.ConsecutiveFails = 0 + h.TotalQueries++ + h.Status = StatusActive + h.LastProbe = time.Now() +} + +// Records a probe failure. +func (p *Prober) RecordFailure(url string) { + p.mu.Lock() + defer p.mu.Unlock() + h := p.getOrCreate(url) + h.ConsecutiveFails++ + switch { + case h.ConsecutiveFails >= 10: + h.Status = StatusDown + case h.ConsecutiveFails >= 3: + h.Status = StatusDegraded + } +} + +// Returns a copy of the health entry for url, or nil if unknown. +func (p *Prober) GetHealth(url string) *UpstreamHealth { + p.mu.RLock() + defer p.mu.RUnlock() + h, ok := p.table[url] + if !ok { + return nil + } + cp := *h + return &cp +} + +// Returns all known upstreams sorted by EMA latency (ascending). DOWN upstreams last. +func (p *Prober) SortedByLatency() []*UpstreamHealth { + p.mu.RLock() + defer p.mu.RUnlock() + result := make([]*UpstreamHealth, 0, len(p.table)) + for _, h := range p.table { + cp := *h + result = append(result, &cp) + } + sort.Slice(result, func(i, j int) bool { + if result[i].Status == StatusDown && result[j].Status != StatusDown { + return false + } + if result[j].Status == StatusDown && result[i].Status != StatusDown { + return true + } + return result[i].EMALatency < result[j].EMALatency + }) + return result +} + +// Performs a HEAD /nix-cache-info against url and updates health. +func (p *Prober) ProbeUpstream(url string) { + start := time.Now() + resp, err := p.client.Head(url + "/nix-cache-info") + elapsed := float64(time.Since(start).Nanoseconds()) / 1e6 + + if err != nil || resp.StatusCode != 200 { + p.RecordFailure(url) + return + } + resp.Body.Close() + p.RecordLatency(url, elapsed) +} + +// Probes all known upstreams on interval until stop is closed. +func (p *Prober) RunProbeLoop(interval time.Duration, stop <-chan struct{}) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-stop: + return + case <-ticker.C: + p.mu.RLock() + urls := make([]string, 0, len(p.table)) + for u := range p.table { + urls = append(urls, u) + } + p.mu.RUnlock() + for _, u := range urls { + go p.ProbeUpstream(u) + } + } + } +} + +func (p *Prober) getOrCreate(url string) *UpstreamHealth { + h, ok := p.table[url] + if !ok { + h = &UpstreamHealth{URL: url, Status: StatusActive} + p.table[url] = h + } + return h +} diff --git a/internal/prober/prober_test.go b/internal/prober/prober_test.go new file mode 100644 index 0000000..59761e4 --- /dev/null +++ b/internal/prober/prober_test.go @@ -0,0 +1,100 @@ +package prober_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + "notashelf.dev/ncro/internal/prober" +) + +func TestEMACalculation(t *testing.T) { + p := prober.New(0.3) + p.RecordLatency("https://example.com", 100) + p.RecordLatency("https://example.com", 50) + + // EMA after 2 measurements: first=100, second = 0.3*50 + 0.7*100 = 85 + health := p.GetHealth("https://example.com") + if health == nil { + t.Fatal("expected health entry") + } + if health.EMALatency < 84 || health.EMALatency > 86 { + t.Errorf("EMA = %.2f, want ~85", health.EMALatency) + } +} + +func TestStatusProgression(t *testing.T) { + p := prober.New(0.3) + p.RecordLatency("https://example.com", 10) + + for range 3 { + p.RecordFailure("https://example.com") + } + h := p.GetHealth("https://example.com") + if h.Status != prober.StatusDegraded { + t.Errorf("status = %v, want Degraded after 3 failures", h.Status) + } + + for range 7 { + p.RecordFailure("https://example.com") + } + h = p.GetHealth("https://example.com") + if h.Status != prober.StatusDown { + t.Errorf("status = %v, want Down after 10 failures", h.Status) + } +} + +func TestRecoveryAfterSuccess(t *testing.T) { + p := prober.New(0.3) + for range 10 { + p.RecordFailure("https://example.com") + } + p.RecordLatency("https://example.com", 20) + h := p.GetHealth("https://example.com") + if h.Status != prober.StatusActive { + t.Errorf("status = %v, want Active after recovery", h.Status) + } + if h.ConsecutiveFails != 0 { + t.Errorf("ConsecutiveFails = %d, want 0", h.ConsecutiveFails) + } +} + +func TestSortedByLatency(t *testing.T) { + p := prober.New(0.3) + p.RecordLatency("https://slow.example.com", 200) + p.RecordLatency("https://fast.example.com", 10) + p.RecordLatency("https://medium.example.com", 50) + + sorted := p.SortedByLatency() + if len(sorted) != 3 { + t.Fatalf("expected 3, got %d", len(sorted)) + } + if sorted[0].URL != "https://fast.example.com" { + t.Errorf("first = %q, want fast", sorted[0].URL) + } +} + +func TestProbeUpstream(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + })) + defer srv.Close() + + p := prober.New(0.3) + p.ProbeUpstream(srv.URL) + + h := p.GetHealth(srv.URL) + if h == nil || h.Status != prober.StatusActive { + t.Errorf("expected Active after successful probe, got %v", h) + } +} + +func TestProbeUpstreamFailure(t *testing.T) { + p := prober.New(0.3) + p.ProbeUpstream("http://127.0.0.1:1") // nothing listening + + h := p.GetHealth("http://127.0.0.1:1") + if h == nil || h.ConsecutiveFails == 0 { + t.Error("expected failure recorded") + } +}