cache: add SQLite route persistence; initial TTL and LRU eviction implementation

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I0370d6c114d5490634905c1a831a31526a6a6964
This commit is contained in:
raf 2026-03-05 23:58:11 +03:00
commit 663f9995b2
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
8 changed files with 674 additions and 5 deletions

1
.envrc
View file

@ -1 +1,2 @@
use flake
export CGO_ENABLED=0

18
go.mod
View file

@ -2,4 +2,20 @@ module notashelf.dev/ncro
go 1.25.7
require gopkg.in/yaml.v3 v3.0.1
require (
gopkg.in/yaml.v3 v3.0.1
modernc.org/sqlite v1.46.1
)
require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/ncruces/go-strftime v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect
golang.org/x/sys v0.37.0 // indirect
modernc.org/libc v1.67.6 // indirect
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.11.0 // indirect
)

53
go.sum
View file

@ -1,4 +1,57 @@
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY=
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70=
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=
golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ=
golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis=
modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
modernc.org/ccgo/v4 v4.30.1 h1:4r4U1J6Fhj98NKfSjnPUN7Ze2c6MnAdL0hWw6+LrJpc=
modernc.org/ccgo/v4 v4.30.1/go.mod h1:bIOeI1JL54Utlxn+LwrFyjCx2n2RDiYEaJVSrgdrRfM=
modernc.org/fileutil v1.3.40 h1:ZGMswMNc9JOCrcrakF1HrvmergNLAmxOPjizirpfqBA=
modernc.org/fileutil v1.3.40/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc=
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
modernc.org/gc/v3 v3.1.1 h1:k8T3gkXWY9sEiytKhcgyiZ2L0DTyCQ/nvX+LoCljoRE=
modernc.org/gc/v3 v3.1.1/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY=
modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks=
modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI=
modernc.org/libc v1.67.6 h1:eVOQvpModVLKOdT+LvBPjdQqfrZq+pC39BygcT+E7OI=
modernc.org/libc v1.67.6/go.mod h1:JAhxUVlolfYDErnwiqaLvUqc8nfb2r6S6slAgZOnaiE=
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
modernc.org/sqlite v1.46.1 h1:eFJ2ShBLIEnUWlLy12raN0Z1plqmFX9Qe3rjQTKt6sU=
modernc.org/sqlite v1.46.1/go.mod h1:CzbrU2lSB1DKUusvwGz7rqEKIq+NUd8GWuBBZDs9/nA=
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=

168
internal/cache/db.go vendored
View file

@ -1 +1,169 @@
package cache
import (
"database/sql"
"fmt"
"time"
_ "modernc.org/sqlite"
)
// Core routing decision persisted per store path.
type RouteEntry struct {
StorePath string
UpstreamURL string
LatencyMs float64
LatencyEMA float64
LastVerified time.Time
QueryCount uint32
FailureCount uint32
TTL time.Time
NarHash string
NarSize uint64
}
// Returns true if the entry exists and hasn't expired.
func (r *RouteEntry) IsValid() bool {
return r != nil && time.Now().Before(r.TTL)
}
// SQLite-backed store for route persistence.
type DB struct {
db *sql.DB
maxEntries int
}
// Opens or creates the SQLite database at path with WAL mode.
func Open(path string, maxEntries int) (*DB, error) {
db, err := sql.Open("sqlite", path+"?_journal=WAL&_busy_timeout=5000")
if err != nil {
return nil, fmt.Errorf("open sqlite: %w", err)
}
db.SetMaxOpenConns(1) // SQLite WAL allows 1 writer
if err := migrate(db); err != nil {
db.Close()
return nil, fmt.Errorf("migrate: %w", err)
}
return &DB{db: db, maxEntries: maxEntries}, nil
}
// Closes the database.
func (d *DB) Close() error {
return d.db.Close()
}
func migrate(db *sql.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS routes (
store_path TEXT PRIMARY KEY,
upstream_url TEXT NOT NULL,
latency_ms REAL DEFAULT 0,
latency_ema REAL DEFAULT 0,
query_count INTEGER DEFAULT 1,
failure_count INTEGER DEFAULT 0,
last_verified INTEGER DEFAULT 0,
ttl INTEGER NOT NULL,
nar_hash TEXT DEFAULT '',
nar_size INTEGER DEFAULT 0,
created_at INTEGER DEFAULT (strftime('%s', 'now'))
);
CREATE INDEX IF NOT EXISTS idx_routes_ttl ON routes(ttl);
CREATE INDEX IF NOT EXISTS idx_routes_last_verified ON routes(last_verified);
CREATE TABLE IF NOT EXISTS upstream_health (
url TEXT PRIMARY KEY,
ema_latency REAL DEFAULT 0,
last_probe INTEGER DEFAULT 0,
consecutive_fails INTEGER DEFAULT 0,
total_queries INTEGER DEFAULT 0,
success_rate REAL DEFAULT 1.0
);
`)
return err
}
// Returns the route for storePath, or nil if not found.
func (d *DB) GetRoute(storePath string) (*RouteEntry, error) {
row := d.db.QueryRow(`
SELECT store_path, upstream_url, latency_ms, latency_ema,
query_count, failure_count, last_verified, ttl, nar_hash, nar_size
FROM routes WHERE store_path = ?`, storePath)
var e RouteEntry
var lastVerifiedUnix, ttlUnix int64
err := row.Scan(
&e.StorePath, &e.UpstreamURL, &e.LatencyMs, &e.LatencyEMA,
&e.QueryCount, &e.FailureCount, &lastVerifiedUnix, &ttlUnix,
&e.NarHash, &e.NarSize,
)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
e.LastVerified = time.Unix(lastVerifiedUnix, 0).UTC()
e.TTL = time.Unix(ttlUnix, 0).UTC()
return &e, nil
}
// Inserts or updates a route entry.
func (d *DB) SetRoute(entry *RouteEntry) error {
_, err := d.db.Exec(`
INSERT INTO routes
(store_path, upstream_url, latency_ms, latency_ema,
query_count, failure_count, last_verified, ttl, nar_hash, nar_size)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(store_path) DO UPDATE SET
upstream_url = excluded.upstream_url,
latency_ms = excluded.latency_ms,
latency_ema = excluded.latency_ema,
query_count = excluded.query_count,
failure_count = excluded.failure_count,
last_verified = excluded.last_verified,
ttl = excluded.ttl,
nar_hash = excluded.nar_hash,
nar_size = excluded.nar_size`,
entry.StorePath, entry.UpstreamURL,
entry.LatencyMs, entry.LatencyEMA,
entry.QueryCount, entry.FailureCount,
entry.LastVerified.Unix(), entry.TTL.Unix(),
entry.NarHash, entry.NarSize,
)
if err != nil {
return err
}
return d.evictIfNeeded()
}
// Deletes routes whose TTL has passed.
func (d *DB) ExpireOldRoutes() error {
_, err := d.db.Exec(`DELETE FROM routes WHERE ttl < ?`, time.Now().Unix())
return err
}
// Returns the total number of stored routes.
func (d *DB) RouteCount() (int, error) {
var count int
err := d.db.QueryRow(`SELECT COUNT(*) FROM routes`).Scan(&count)
return count, err
}
// Deletes the oldest routes (by last_verified) when over capacity.
func (d *DB) evictIfNeeded() error {
count, err := d.RouteCount()
if err != nil {
return err
}
if count <= d.maxEntries {
return nil
}
excess := count - d.maxEntries
_, err = d.db.Exec(`
DELETE FROM routes WHERE store_path IN (
SELECT store_path FROM routes ORDER BY last_verified ASC LIMIT ?
)`, excess)
return err
}

157
internal/cache/db_test.go vendored Normal file
View file

@ -0,0 +1,157 @@
package cache_test
import (
"os"
"testing"
"time"
"notashelf.dev/ncro/internal/cache"
)
func newTestDB(t *testing.T) *cache.DB {
t.Helper()
f, err := os.CreateTemp("", "ncro-test-*.db")
if err != nil {
t.Fatal(err)
}
f.Close()
t.Cleanup(func() { os.Remove(f.Name()) })
db, err := cache.Open(f.Name(), 1000)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { db.Close() })
return db
}
func TestGetSetRoute(t *testing.T) {
db := newTestDB(t)
entry := &cache.RouteEntry{
StorePath: "abc123xyz-hello-2.12",
UpstreamURL: "https://cache.nixos.org",
LatencyMs: 12.5,
LatencyEMA: 12.5,
LastVerified: time.Now().UTC().Truncate(time.Second),
QueryCount: 1,
TTL: time.Now().Add(time.Hour).UTC().Truncate(time.Second),
}
if err := db.SetRoute(entry); err != nil {
t.Fatalf("SetRoute: %v", err)
}
got, err := db.GetRoute("abc123xyz-hello-2.12")
if err != nil {
t.Fatalf("GetRoute: %v", err)
}
if got == nil {
t.Fatal("GetRoute returned nil")
}
if got.UpstreamURL != entry.UpstreamURL {
t.Errorf("upstream = %q, want %q", got.UpstreamURL, entry.UpstreamURL)
}
if got.QueryCount != 1 {
t.Errorf("query_count = %d, want 1", got.QueryCount)
}
}
func TestGetRouteNotFound(t *testing.T) {
db := newTestDB(t)
got, err := db.GetRoute("nonexistent")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got != nil {
t.Errorf("expected nil, got %+v", got)
}
}
func TestSetRouteUpsert(t *testing.T) {
db := newTestDB(t)
entry := &cache.RouteEntry{
StorePath: "abc123-pkg",
UpstreamURL: "https://cache.nixos.org",
LatencyMs: 20.0,
LatencyEMA: 20.0,
QueryCount: 1,
TTL: time.Now().Add(time.Hour),
}
db.SetRoute(entry)
entry.LatencyEMA = 18.0
entry.QueryCount = 2
if err := db.SetRoute(entry); err != nil {
t.Fatalf("upsert: %v", err)
}
got, _ := db.GetRoute("abc123-pkg")
if got.LatencyEMA != 18.0 {
t.Errorf("ema = %f, want 18.0", got.LatencyEMA)
}
if got.QueryCount != 2 {
t.Errorf("query_count = %d, want 2", got.QueryCount)
}
}
func TestExpireOldRoutes(t *testing.T) {
db := newTestDB(t)
// Insert expired route
expired := &cache.RouteEntry{
StorePath: "expired-pkg",
UpstreamURL: "https://cache.nixos.org",
TTL: time.Now().Add(-time.Minute), // already expired
}
db.SetRoute(expired)
// Insert valid route
valid := &cache.RouteEntry{
StorePath: "valid-pkg",
UpstreamURL: "https://cache.nixos.org",
TTL: time.Now().Add(time.Hour),
}
db.SetRoute(valid)
if err := db.ExpireOldRoutes(); err != nil {
t.Fatalf("ExpireOldRoutes: %v", err)
}
got, _ := db.GetRoute("expired-pkg")
if got != nil {
t.Error("expired route should have been deleted")
}
got2, _ := db.GetRoute("valid-pkg")
if got2 == nil {
t.Error("valid route should still exist")
}
}
func TestLRUEviction(t *testing.T) {
// Use maxEntries=3 to trigger eviction easily
f, _ := os.CreateTemp("", "ncro-lru-*.db")
f.Close()
defer os.Remove(f.Name())
db, _ := cache.Open(f.Name(), 3)
defer db.Close()
for i := range 4 {
db.SetRoute(&cache.RouteEntry{
StorePath: "pkg-" + string(rune('a'+i)),
UpstreamURL: "https://cache.nixos.org",
LastVerified: time.Now().Add(time.Duration(i) * time.Second),
TTL: time.Now().Add(time.Hour),
})
}
count, err := db.RouteCount()
if err != nil {
t.Fatal(err)
}
if count > 3 {
t.Errorf("expected count <= 3 after LRU eviction, got %d", count)
}
}

View file

@ -8,9 +8,8 @@ import (
"gopkg.in/yaml.v3"
)
// Duration is a wrapper around time.Duration that supports YAML unmarshaling
// from Go duration strings (e.g., "30s", "1h"). yaml.v3 cannot unmarshal
// duration strings directly into time.Duration (int64), so we handle it here.
// Wrapper around time.Duration supporting YAML duration strings ("30s", "1h").
// yaml.v3 cannot unmarshal duration strings directly into time.Duration (int64).
type Duration struct {
time.Duration
}
@ -101,7 +100,7 @@ func defaults() Config {
}
}
// Load loads config from file (if non-empty) and applies env overrides.
// Loads config from file (if non-empty) and applies env overrides.
func Load(path string) (*Config, error) {
cfg := defaults()

View file

@ -1 +1,176 @@
package prober
import (
"net/http"
"sort"
"sync"
"time"
)
// Upstream health status.
type Status int
const (
StatusActive Status = iota
StatusDegraded // 3+ consecutive failures
StatusDown // 10+ consecutive failures
)
func (s Status) String() string {
switch s {
case StatusActive:
return "ACTIVE"
case StatusDegraded:
return "DEGRADED"
default:
return "DOWN"
}
}
// In-memory metrics for one upstream.
type UpstreamHealth struct {
URL string
EMALatency float64
LastProbe time.Time
ConsecutiveFails uint32
TotalQueries uint64
Status Status
}
// Tracks latency and health for a set of upstreams.
type Prober struct {
mu sync.RWMutex
alpha float64
table map[string]*UpstreamHealth
client *http.Client
}
// Creates a Prober with the given EMA alpha coefficient.
func New(alpha float64) *Prober {
return &Prober{
alpha: alpha,
table: make(map[string]*UpstreamHealth),
client: &http.Client{
Timeout: 10 * time.Second,
},
}
}
// Seeds the prober with upstream URLs (no measurements yet).
func (p *Prober) InitUpstreams(urls []string) {
p.mu.Lock()
defer p.mu.Unlock()
for _, u := range urls {
if _, ok := p.table[u]; !ok {
p.table[u] = &UpstreamHealth{URL: u, Status: StatusActive}
}
}
}
// Records a successful latency measurement and updates the EMA.
func (p *Prober) RecordLatency(url string, ms float64) {
p.mu.Lock()
defer p.mu.Unlock()
h := p.getOrCreate(url)
if h.TotalQueries == 0 {
h.EMALatency = ms
} else {
h.EMALatency = p.alpha*ms + (1-p.alpha)*h.EMALatency
}
h.ConsecutiveFails = 0
h.TotalQueries++
h.Status = StatusActive
h.LastProbe = time.Now()
}
// Records a probe failure.
func (p *Prober) RecordFailure(url string) {
p.mu.Lock()
defer p.mu.Unlock()
h := p.getOrCreate(url)
h.ConsecutiveFails++
switch {
case h.ConsecutiveFails >= 10:
h.Status = StatusDown
case h.ConsecutiveFails >= 3:
h.Status = StatusDegraded
}
}
// Returns a copy of the health entry for url, or nil if unknown.
func (p *Prober) GetHealth(url string) *UpstreamHealth {
p.mu.RLock()
defer p.mu.RUnlock()
h, ok := p.table[url]
if !ok {
return nil
}
cp := *h
return &cp
}
// Returns all known upstreams sorted by EMA latency (ascending). DOWN upstreams last.
func (p *Prober) SortedByLatency() []*UpstreamHealth {
p.mu.RLock()
defer p.mu.RUnlock()
result := make([]*UpstreamHealth, 0, len(p.table))
for _, h := range p.table {
cp := *h
result = append(result, &cp)
}
sort.Slice(result, func(i, j int) bool {
if result[i].Status == StatusDown && result[j].Status != StatusDown {
return false
}
if result[j].Status == StatusDown && result[i].Status != StatusDown {
return true
}
return result[i].EMALatency < result[j].EMALatency
})
return result
}
// Performs a HEAD /nix-cache-info against url and updates health.
func (p *Prober) ProbeUpstream(url string) {
start := time.Now()
resp, err := p.client.Head(url + "/nix-cache-info")
elapsed := float64(time.Since(start).Nanoseconds()) / 1e6
if err != nil || resp.StatusCode != 200 {
p.RecordFailure(url)
return
}
resp.Body.Close()
p.RecordLatency(url, elapsed)
}
// Probes all known upstreams on interval until stop is closed.
func (p *Prober) RunProbeLoop(interval time.Duration, stop <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-stop:
return
case <-ticker.C:
p.mu.RLock()
urls := make([]string, 0, len(p.table))
for u := range p.table {
urls = append(urls, u)
}
p.mu.RUnlock()
for _, u := range urls {
go p.ProbeUpstream(u)
}
}
}
}
func (p *Prober) getOrCreate(url string) *UpstreamHealth {
h, ok := p.table[url]
if !ok {
h = &UpstreamHealth{URL: url, Status: StatusActive}
p.table[url] = h
}
return h
}

View file

@ -0,0 +1,100 @@
package prober_test
import (
"net/http"
"net/http/httptest"
"testing"
"notashelf.dev/ncro/internal/prober"
)
func TestEMACalculation(t *testing.T) {
p := prober.New(0.3)
p.RecordLatency("https://example.com", 100)
p.RecordLatency("https://example.com", 50)
// EMA after 2 measurements: first=100, second = 0.3*50 + 0.7*100 = 85
health := p.GetHealth("https://example.com")
if health == nil {
t.Fatal("expected health entry")
}
if health.EMALatency < 84 || health.EMALatency > 86 {
t.Errorf("EMA = %.2f, want ~85", health.EMALatency)
}
}
func TestStatusProgression(t *testing.T) {
p := prober.New(0.3)
p.RecordLatency("https://example.com", 10)
for range 3 {
p.RecordFailure("https://example.com")
}
h := p.GetHealth("https://example.com")
if h.Status != prober.StatusDegraded {
t.Errorf("status = %v, want Degraded after 3 failures", h.Status)
}
for range 7 {
p.RecordFailure("https://example.com")
}
h = p.GetHealth("https://example.com")
if h.Status != prober.StatusDown {
t.Errorf("status = %v, want Down after 10 failures", h.Status)
}
}
func TestRecoveryAfterSuccess(t *testing.T) {
p := prober.New(0.3)
for range 10 {
p.RecordFailure("https://example.com")
}
p.RecordLatency("https://example.com", 20)
h := p.GetHealth("https://example.com")
if h.Status != prober.StatusActive {
t.Errorf("status = %v, want Active after recovery", h.Status)
}
if h.ConsecutiveFails != 0 {
t.Errorf("ConsecutiveFails = %d, want 0", h.ConsecutiveFails)
}
}
func TestSortedByLatency(t *testing.T) {
p := prober.New(0.3)
p.RecordLatency("https://slow.example.com", 200)
p.RecordLatency("https://fast.example.com", 10)
p.RecordLatency("https://medium.example.com", 50)
sorted := p.SortedByLatency()
if len(sorted) != 3 {
t.Fatalf("expected 3, got %d", len(sorted))
}
if sorted[0].URL != "https://fast.example.com" {
t.Errorf("first = %q, want fast", sorted[0].URL)
}
}
func TestProbeUpstream(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}))
defer srv.Close()
p := prober.New(0.3)
p.ProbeUpstream(srv.URL)
h := p.GetHealth(srv.URL)
if h == nil || h.Status != prober.StatusActive {
t.Errorf("expected Active after successful probe, got %v", h)
}
}
func TestProbeUpstreamFailure(t *testing.T) {
p := prober.New(0.3)
p.ProbeUpstream("http://127.0.0.1:1") // nothing listening
h := p.GetHealth("http://127.0.0.1:1")
if h == nil || h.ConsecutiveFails == 0 {
t.Error("expected failure recorded")
}
}