From d599ef02a79b5e4014b1a7dae58baae96b44a6a0 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Fri, 6 Mar 2026 22:20:58 +0300 Subject: [PATCH] prober: persist health across restarts via callback; seed from DB on startup Signed-off-by: NotAShelf Change-Id: Ica467d6db76c495a6ccadbad89276b536a6a6964 --- cmd/ncro/main.go | 17 +++++++++++ internal/cache/db.go | 40 +++++++++++++++++++++++++ internal/prober/prober.go | 55 +++++++++++++++++++++++++++++++--- internal/prober/prober_test.go | 48 +++++++++++++++++++++++++++-- 4 files changed, 154 insertions(+), 6 deletions(-) diff --git a/cmd/ncro/main.go b/cmd/ncro/main.go index fe8b86a..5b49192 100644 --- a/cmd/ncro/main.go +++ b/cmd/ncro/main.go @@ -86,6 +86,23 @@ func main() { p := prober.New(cfg.Cache.LatencyAlpha) p.InitUpstreams(cfg.Upstreams) + + // Seed prober with persisted health data from the previous run. + if rows, err := db.LoadAllHealth(); err == nil { + for _, row := range rows { + p.Seed(row.URL, row.EMALatency, row.ConsecutiveFails, int64(row.TotalQueries)) + } + } else { + slog.Warn("failed to load persisted health data", "error", err) + } + + // Persist health updates to SQLite. + p.SetHealthPersistence(func(url string, ema float64, cf uint32, tq uint64) { + if err := db.SaveHealth(url, ema, int(cf), int64(tq)); err != nil { + slog.Warn("failed to save health", "url", url, "error", err) + } + }) + for _, u := range cfg.Upstreams { go p.ProbeUpstream(u.URL) } diff --git a/internal/cache/db.go b/internal/cache/db.go index ae9c8b1..de4dd35 100644 --- a/internal/cache/db.go +++ b/internal/cache/db.go @@ -254,6 +254,46 @@ func (d *DB) ExpireNegatives() error { return err } +// Persisted snapshot of one upstream's health metrics. +type HealthRow struct { + URL string + EMALatency float64 + ConsecutiveFails int + TotalQueries int64 +} + +// Upserts the health metrics for the given upstream URL. +func (d *DB) SaveHealth(url string, ema float64, consecutiveFails int, totalQueries int64) error { + _, err := d.db.Exec(` + INSERT INTO upstream_health (url, ema_latency, consecutive_fails, total_queries) + VALUES (?, ?, ?, ?) + ON CONFLICT(url) DO UPDATE SET + ema_latency = excluded.ema_latency, + consecutive_fails = excluded.consecutive_fails, + total_queries = excluded.total_queries`, + url, ema, consecutiveFails, totalQueries, + ) + return err +} + +// Returns all rows from the upstream_health table. +func (d *DB) LoadAllHealth() ([]HealthRow, error) { + rows, err := d.db.Query(`SELECT url, ema_latency, consecutive_fails, total_queries FROM upstream_health`) + if err != nil { + return nil, err + } + defer rows.Close() + var result []HealthRow + for rows.Next() { + var r HealthRow + if err := rows.Scan(&r.URL, &r.EMALatency, &r.ConsecutiveFails, &r.TotalQueries); err != nil { + return nil, err + } + result = append(result, r) + } + return result, rows.Err() +} + // Deletes the oldest routes (by last_verified) when over capacity. func (d *DB) evictIfNeeded() error { count, err := d.RouteCount() diff --git a/internal/prober/prober.go b/internal/prober/prober.go index 52ad156..f6dc6f1 100644 --- a/internal/prober/prober.go +++ b/internal/prober/prober.go @@ -43,10 +43,11 @@ type UpstreamHealth struct { // Tracks latency and health for a set of upstreams. type Prober struct { - mu sync.RWMutex - alpha float64 - table map[string]*UpstreamHealth - client *http.Client + mu sync.RWMutex + alpha float64 + table map[string]*UpstreamHealth + client *http.Client + persistHealth func(url string, ema float64, consecutiveFails uint32, totalQueries uint64) } // Creates a Prober with the given EMA alpha coefficient. @@ -71,6 +72,42 @@ func (p *Prober) InitUpstreams(upstreams []config.UpstreamConfig) { } } +// Derives Status from the number of consecutive failures, matching the logic +// in RecordFailure. +func computeStatus(consecutiveFails uint32) Status { + switch { + case consecutiveFails >= 10: + return StatusDown + case consecutiveFails >= 3: + return StatusDegraded + default: + return StatusActive + } +} + +// Seeds an upstream's health state from persisted data. Should be called +// after InitUpstreams to restore state from the previous run. +func (p *Prober) Seed(url string, emaLatency float64, consecutiveFails int, totalQueries int64) { + p.mu.Lock() + defer p.mu.Unlock() + h, ok := p.table[url] + if !ok { + return + } + h.EMALatency = emaLatency + h.TotalQueries = uint64(totalQueries) + h.ConsecutiveFails = uint32(consecutiveFails) + h.Status = computeStatus(uint32(consecutiveFails)) +} + +// Registers a callback invoked after each RecordLatency or RecordFailure call. +// The callback runs in a separate goroutine and must be safe for concurrent use. +func (p *Prober) SetHealthPersistence(fn func(url string, ema float64, consecutiveFails uint32, totalQueries uint64)) { + p.mu.Lock() + defer p.mu.Unlock() + p.persistHealth = fn +} + // Records a successful latency measurement and updates the EMA. func (p *Prober) RecordLatency(url string, ms float64) { p.mu.Lock() @@ -85,6 +122,11 @@ func (p *Prober) RecordLatency(url string, ms float64) { h.TotalQueries++ h.Status = StatusActive h.LastProbe = time.Now() + if p.persistHealth != nil { + u, ema, cf, tq := h.URL, h.EMALatency, h.ConsecutiveFails, h.TotalQueries + fn := p.persistHealth + go fn(u, ema, cf, tq) + } } // Records a probe failure. @@ -99,6 +141,11 @@ func (p *Prober) RecordFailure(url string) { case h.ConsecutiveFails >= 3: h.Status = StatusDegraded } + if p.persistHealth != nil { + u, ema, cf, tq := h.URL, h.EMALatency, h.ConsecutiveFails, h.TotalQueries + fn := p.persistHealth + go fn(u, ema, cf, tq) + } } // Returns a copy of the health entry for url, or nil if unknown. diff --git a/internal/prober/prober_test.go b/internal/prober/prober_test.go index 8bf8f2d..2c17f2c 100644 --- a/internal/prober/prober_test.go +++ b/internal/prober/prober_test.go @@ -4,7 +4,9 @@ import ( "net/http" "net/http/httptest" "testing" + "time" + "notashelf.dev/ncro/internal/config" "notashelf.dev/ncro/internal/prober" ) @@ -104,7 +106,7 @@ func TestSortedByLatencyWithPriority(t *testing.T) { t.Fatalf("expected 2, got %d", len(sorted)) } // The 100ms upstream should be first (lower latency wins when not within 10% tie). - // 100 vs 102: diff=2, 2/102=1.96% < 10%, so priority decides (both priority=0, tie → latency). + // 100 vs 102: diff=2, 2/102=1.96% < 10%, so priority decides (both priority=0, tie --> latency). // Actually 100 < 102 still wins on latency when priority is equal. if sorted[0].EMALatency > sorted[1].EMALatency { t.Errorf("expected lower latency first, got %.2f then %.2f", sorted[0].EMALatency, sorted[1].EMALatency) @@ -113,10 +115,52 @@ func TestSortedByLatencyWithPriority(t *testing.T) { func TestProbeUpstreamFailure(t *testing.T) { p := prober.New(0.3) - p.ProbeUpstream("http://127.0.0.1:1") // nothing listening + p.ProbeUpstream("http://127.0.0.1:1") // nothing listening, maybe except for Makima h := p.GetHealth("http://127.0.0.1:1") if h == nil || h.ConsecutiveFails == 0 { t.Error("expected failure recorded") } } + +func TestSeedRestoresStatus(t *testing.T) { + p := prober.New(0.3) + p.InitUpstreams([]config.UpstreamConfig{{URL: "https://down.example.com"}}) + + // Seed with 10 consecutive fails -> should be StatusDown + p.Seed("https://down.example.com", 200.0, 10, 50) + + h := p.GetHealth("https://down.example.com") + if h == nil { + t.Fatal("expected health entry") + } + if h.Status != prober.StatusDown { + t.Errorf("Status = %v, want StatusDown", h.Status) + } + if h.EMALatency != 200.0 { + t.Errorf("EMALatency = %f, want 200.0", h.EMALatency) + } +} + +func TestPersistenceCallbackFired(t *testing.T) { + p := prober.New(0.3) + p.InitUpstreams([]config.UpstreamConfig{{URL: "https://up.example.com"}}) + + var savedURL string + var savedCF uint32 + p.SetHealthPersistence(func(url string, ema float64, consecutiveFails uint32, totalQueries uint64) { + savedURL = url + savedCF = consecutiveFails + }) + + p.RecordLatency("https://up.example.com", 50.0) + // The callback is called in a goroutine; give it a moment. + time.Sleep(10 * time.Millisecond) + + if savedURL != "https://up.example.com" { + t.Errorf("savedURL = %q, want https://up.example.com", savedURL) + } + if savedCF != 0 { + t.Errorf("consecutiveFails = %d, want 0", savedCF) + } +}