prober: latency EMA tracking and upstream health monitoring

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I0ef237a1e6db3ac9c47bdaa72101e4d86a6a6964
This commit is contained in:
raf 2026-03-06 00:18:13 +03:00
commit 65ddeb48f6
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
2 changed files with 240 additions and 0 deletions

View file

@ -1 +1,133 @@
package router
import (
"context"
"fmt"
"net/http"
"sync"
"time"
"notashelf.dev/ncro/internal/cache"
"notashelf.dev/ncro/internal/prober"
)
// Result of a Resolve call.
type Result struct {
URL string
LatencyMs float64
CacheHit bool
}
// Resolves store paths to the best upstream via cache lookup or parallel racing.
type Router struct {
db *cache.DB
prober *prober.Prober
routeTTL time.Duration
raceTimeout time.Duration
client *http.Client
}
// Creates a Router.
func New(db *cache.DB, p *prober.Prober, routeTTL, raceTimeout time.Duration) *Router {
return &Router{
db: db,
prober: p,
routeTTL: routeTTL,
raceTimeout: raceTimeout,
client: &http.Client{Timeout: raceTimeout},
}
}
// Returns the best upstream for the given store hash.
// Checks the route cache first; on miss races the provided candidates.
func (r *Router) Resolve(storeHash string, candidates []string) (*Result, error) {
entry, err := r.db.GetRoute(storeHash)
if err == nil && entry != nil && entry.IsValid() {
h := r.prober.GetHealth(entry.UpstreamURL)
if h == nil || h.Status == prober.StatusActive {
return &Result{
URL: entry.UpstreamURL,
LatencyMs: entry.LatencyEMA,
CacheHit: true,
}, nil
}
}
return r.race(storeHash, candidates)
}
type raceResult struct {
url string
latencyMs float64
}
func (r *Router) race(storeHash string, candidates []string) (*Result, error) {
if len(candidates) == 0 {
return nil, fmt.Errorf("no candidates for %q", storeHash)
}
ctx, cancel := context.WithTimeout(context.Background(), r.raceTimeout)
defer cancel()
ch := make(chan raceResult, len(candidates))
var wg sync.WaitGroup
for _, u := range candidates {
wg.Add(1)
go func(upstream string) {
defer wg.Done()
start := time.Now()
req, _ := http.NewRequestWithContext(ctx, http.MethodHead,
upstream+"/"+storeHash+".narinfo", nil)
resp, err := r.client.Do(req)
if err != nil {
return
}
resp.Body.Close()
if resp.StatusCode != 200 {
return
}
ms := float64(time.Since(start).Nanoseconds()) / 1e6
select {
case ch <- raceResult{url: upstream, latencyMs: ms}:
default:
}
}(u)
}
go func() {
wg.Wait()
close(ch)
}()
winner, ok := <-ch
if !ok {
return nil, fmt.Errorf("all upstreams failed for %q", storeHash)
}
cancel()
for res := range ch {
if res.latencyMs < winner.latencyMs {
winner = res
}
}
health := r.prober.GetHealth(winner.url)
ema := winner.latencyMs
if health != nil {
ema = 0.3*winner.latencyMs + 0.7*health.EMALatency
}
r.prober.RecordLatency(winner.url, winner.latencyMs)
now := time.Now()
_ = r.db.SetRoute(&cache.RouteEntry{
StorePath: storeHash,
UpstreamURL: winner.url,
LatencyMs: winner.latencyMs,
LatencyEMA: ema,
LastVerified: now,
QueryCount: 1,
TTL: now.Add(r.routeTTL),
})
return &Result{URL: winner.url, LatencyMs: winner.latencyMs}, nil
}

View file

@ -0,0 +1,108 @@
package router_test
import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"
"notashelf.dev/ncro/internal/cache"
"notashelf.dev/ncro/internal/prober"
"notashelf.dev/ncro/internal/router"
)
func newTestRouter(t *testing.T, upstreams ...string) (*router.Router, func()) {
t.Helper()
f, _ := os.CreateTemp("", "ncro-router-*.db")
f.Close()
db, err := cache.Open(f.Name(), 1000)
if err != nil {
t.Fatal(err)
}
p := prober.New(0.3)
for _, u := range upstreams {
p.RecordLatency(u, 10)
}
r := router.New(db, p, time.Hour, 5*time.Second)
return r, func() {
db.Close()
os.Remove(f.Name())
}
}
func TestRouteHit(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "StorePath: /nix/store/abc123-hello")
}))
defer srv.Close()
r, cleanup := newTestRouter(t, srv.URL)
defer cleanup()
result, err := r.Resolve("abc123", []string{srv.URL})
if err != nil {
t.Fatalf("Resolve: %v", err)
}
if result.URL != srv.URL {
t.Errorf("url = %q, want %q", result.URL, srv.URL)
}
if result.LatencyMs <= 0 {
t.Error("expected positive latency")
}
}
func TestRouteRacePicksFastest(t *testing.T) {
fast := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}))
defer fast.Close()
slow := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
w.WriteHeader(200)
}))
defer slow.Close()
r, cleanup := newTestRouter(t, fast.URL, slow.URL)
defer cleanup()
result, err := r.Resolve("somehash", []string{slow.URL, fast.URL})
if err != nil {
t.Fatalf("Resolve: %v", err)
}
if result.URL != fast.URL {
t.Errorf("expected fast server to win, got %q", result.URL)
}
}
func TestRouteAllFail(t *testing.T) {
r, cleanup := newTestRouter(t)
defer cleanup()
_, err := r.Resolve("somehash", []string{"http://127.0.0.1:1"})
if err == nil {
t.Error("expected error when all upstreams fail")
}
}
func TestCacheHit(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}))
defer srv.Close()
r, cleanup := newTestRouter(t, srv.URL)
defer cleanup()
r.Resolve("abc123", []string{srv.URL})
result, err := r.Resolve("abc123", []string{srv.URL})
if err != nil {
t.Fatalf("second Resolve: %v", err)
}
if !result.CacheHit {
t.Error("expected cache hit on second resolve")
}
}