prober: persist health across restarts via callback; seed from DB on startup
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Ica467d6db76c495a6ccadbad89276b536a6a6964
This commit is contained in:
parent
049e4202dd
commit
d599ef02a7
4 changed files with 154 additions and 6 deletions
|
|
@ -86,6 +86,23 @@ func main() {
|
|||
|
||||
p := prober.New(cfg.Cache.LatencyAlpha)
|
||||
p.InitUpstreams(cfg.Upstreams)
|
||||
|
||||
// Seed prober with persisted health data from the previous run.
|
||||
if rows, err := db.LoadAllHealth(); err == nil {
|
||||
for _, row := range rows {
|
||||
p.Seed(row.URL, row.EMALatency, row.ConsecutiveFails, int64(row.TotalQueries))
|
||||
}
|
||||
} else {
|
||||
slog.Warn("failed to load persisted health data", "error", err)
|
||||
}
|
||||
|
||||
// Persist health updates to SQLite.
|
||||
p.SetHealthPersistence(func(url string, ema float64, cf uint32, tq uint64) {
|
||||
if err := db.SaveHealth(url, ema, int(cf), int64(tq)); err != nil {
|
||||
slog.Warn("failed to save health", "url", url, "error", err)
|
||||
}
|
||||
})
|
||||
|
||||
for _, u := range cfg.Upstreams {
|
||||
go p.ProbeUpstream(u.URL)
|
||||
}
|
||||
|
|
|
|||
40
internal/cache/db.go
vendored
40
internal/cache/db.go
vendored
|
|
@ -254,6 +254,46 @@ func (d *DB) ExpireNegatives() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Persisted snapshot of one upstream's health metrics.
|
||||
type HealthRow struct {
|
||||
URL string
|
||||
EMALatency float64
|
||||
ConsecutiveFails int
|
||||
TotalQueries int64
|
||||
}
|
||||
|
||||
// Upserts the health metrics for the given upstream URL.
|
||||
func (d *DB) SaveHealth(url string, ema float64, consecutiveFails int, totalQueries int64) error {
|
||||
_, err := d.db.Exec(`
|
||||
INSERT INTO upstream_health (url, ema_latency, consecutive_fails, total_queries)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(url) DO UPDATE SET
|
||||
ema_latency = excluded.ema_latency,
|
||||
consecutive_fails = excluded.consecutive_fails,
|
||||
total_queries = excluded.total_queries`,
|
||||
url, ema, consecutiveFails, totalQueries,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// Returns all rows from the upstream_health table.
|
||||
func (d *DB) LoadAllHealth() ([]HealthRow, error) {
|
||||
rows, err := d.db.Query(`SELECT url, ema_latency, consecutive_fails, total_queries FROM upstream_health`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var result []HealthRow
|
||||
for rows.Next() {
|
||||
var r HealthRow
|
||||
if err := rows.Scan(&r.URL, &r.EMALatency, &r.ConsecutiveFails, &r.TotalQueries); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, r)
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
// Deletes the oldest routes (by last_verified) when over capacity.
|
||||
func (d *DB) evictIfNeeded() error {
|
||||
count, err := d.RouteCount()
|
||||
|
|
|
|||
|
|
@ -43,10 +43,11 @@ type UpstreamHealth struct {
|
|||
|
||||
// Tracks latency and health for a set of upstreams.
|
||||
type Prober struct {
|
||||
mu sync.RWMutex
|
||||
alpha float64
|
||||
table map[string]*UpstreamHealth
|
||||
client *http.Client
|
||||
mu sync.RWMutex
|
||||
alpha float64
|
||||
table map[string]*UpstreamHealth
|
||||
client *http.Client
|
||||
persistHealth func(url string, ema float64, consecutiveFails uint32, totalQueries uint64)
|
||||
}
|
||||
|
||||
// Creates a Prober with the given EMA alpha coefficient.
|
||||
|
|
@ -71,6 +72,42 @@ func (p *Prober) InitUpstreams(upstreams []config.UpstreamConfig) {
|
|||
}
|
||||
}
|
||||
|
||||
// Derives Status from the number of consecutive failures, matching the logic
|
||||
// in RecordFailure.
|
||||
func computeStatus(consecutiveFails uint32) Status {
|
||||
switch {
|
||||
case consecutiveFails >= 10:
|
||||
return StatusDown
|
||||
case consecutiveFails >= 3:
|
||||
return StatusDegraded
|
||||
default:
|
||||
return StatusActive
|
||||
}
|
||||
}
|
||||
|
||||
// Seeds an upstream's health state from persisted data. Should be called
|
||||
// after InitUpstreams to restore state from the previous run.
|
||||
func (p *Prober) Seed(url string, emaLatency float64, consecutiveFails int, totalQueries int64) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
h, ok := p.table[url]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
h.EMALatency = emaLatency
|
||||
h.TotalQueries = uint64(totalQueries)
|
||||
h.ConsecutiveFails = uint32(consecutiveFails)
|
||||
h.Status = computeStatus(uint32(consecutiveFails))
|
||||
}
|
||||
|
||||
// Registers a callback invoked after each RecordLatency or RecordFailure call.
|
||||
// The callback runs in a separate goroutine and must be safe for concurrent use.
|
||||
func (p *Prober) SetHealthPersistence(fn func(url string, ema float64, consecutiveFails uint32, totalQueries uint64)) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.persistHealth = fn
|
||||
}
|
||||
|
||||
// Records a successful latency measurement and updates the EMA.
|
||||
func (p *Prober) RecordLatency(url string, ms float64) {
|
||||
p.mu.Lock()
|
||||
|
|
@ -85,6 +122,11 @@ func (p *Prober) RecordLatency(url string, ms float64) {
|
|||
h.TotalQueries++
|
||||
h.Status = StatusActive
|
||||
h.LastProbe = time.Now()
|
||||
if p.persistHealth != nil {
|
||||
u, ema, cf, tq := h.URL, h.EMALatency, h.ConsecutiveFails, h.TotalQueries
|
||||
fn := p.persistHealth
|
||||
go fn(u, ema, cf, tq)
|
||||
}
|
||||
}
|
||||
|
||||
// Records a probe failure.
|
||||
|
|
@ -99,6 +141,11 @@ func (p *Prober) RecordFailure(url string) {
|
|||
case h.ConsecutiveFails >= 3:
|
||||
h.Status = StatusDegraded
|
||||
}
|
||||
if p.persistHealth != nil {
|
||||
u, ema, cf, tq := h.URL, h.EMALatency, h.ConsecutiveFails, h.TotalQueries
|
||||
fn := p.persistHealth
|
||||
go fn(u, ema, cf, tq)
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a copy of the health entry for url, or nil if unknown.
|
||||
|
|
|
|||
|
|
@ -4,7 +4,9 @@ import (
|
|||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"notashelf.dev/ncro/internal/config"
|
||||
"notashelf.dev/ncro/internal/prober"
|
||||
)
|
||||
|
||||
|
|
@ -104,7 +106,7 @@ func TestSortedByLatencyWithPriority(t *testing.T) {
|
|||
t.Fatalf("expected 2, got %d", len(sorted))
|
||||
}
|
||||
// The 100ms upstream should be first (lower latency wins when not within 10% tie).
|
||||
// 100 vs 102: diff=2, 2/102=1.96% < 10%, so priority decides (both priority=0, tie → latency).
|
||||
// 100 vs 102: diff=2, 2/102=1.96% < 10%, so priority decides (both priority=0, tie --> latency).
|
||||
// Actually 100 < 102 still wins on latency when priority is equal.
|
||||
if sorted[0].EMALatency > sorted[1].EMALatency {
|
||||
t.Errorf("expected lower latency first, got %.2f then %.2f", sorted[0].EMALatency, sorted[1].EMALatency)
|
||||
|
|
@ -113,10 +115,52 @@ func TestSortedByLatencyWithPriority(t *testing.T) {
|
|||
|
||||
func TestProbeUpstreamFailure(t *testing.T) {
|
||||
p := prober.New(0.3)
|
||||
p.ProbeUpstream("http://127.0.0.1:1") // nothing listening
|
||||
p.ProbeUpstream("http://127.0.0.1:1") // nothing listening, maybe except for Makima
|
||||
|
||||
h := p.GetHealth("http://127.0.0.1:1")
|
||||
if h == nil || h.ConsecutiveFails == 0 {
|
||||
t.Error("expected failure recorded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeedRestoresStatus(t *testing.T) {
|
||||
p := prober.New(0.3)
|
||||
p.InitUpstreams([]config.UpstreamConfig{{URL: "https://down.example.com"}})
|
||||
|
||||
// Seed with 10 consecutive fails -> should be StatusDown
|
||||
p.Seed("https://down.example.com", 200.0, 10, 50)
|
||||
|
||||
h := p.GetHealth("https://down.example.com")
|
||||
if h == nil {
|
||||
t.Fatal("expected health entry")
|
||||
}
|
||||
if h.Status != prober.StatusDown {
|
||||
t.Errorf("Status = %v, want StatusDown", h.Status)
|
||||
}
|
||||
if h.EMALatency != 200.0 {
|
||||
t.Errorf("EMALatency = %f, want 200.0", h.EMALatency)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPersistenceCallbackFired(t *testing.T) {
|
||||
p := prober.New(0.3)
|
||||
p.InitUpstreams([]config.UpstreamConfig{{URL: "https://up.example.com"}})
|
||||
|
||||
var savedURL string
|
||||
var savedCF uint32
|
||||
p.SetHealthPersistence(func(url string, ema float64, consecutiveFails uint32, totalQueries uint64) {
|
||||
savedURL = url
|
||||
savedCF = consecutiveFails
|
||||
})
|
||||
|
||||
p.RecordLatency("https://up.example.com", 50.0)
|
||||
// The callback is called in a goroutine; give it a moment.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
if savedURL != "https://up.example.com" {
|
||||
t.Errorf("savedURL = %q, want https://up.example.com", savedURL)
|
||||
}
|
||||
if savedCF != 0 {
|
||||
t.Errorf("consecutiveFails = %d, want 0", savedCF)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue