diff --git a/internal/router/router.go b/internal/router/router.go index 7ef135b..4825637 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -1 +1,133 @@ package router + +import ( + "context" + "fmt" + "net/http" + "sync" + "time" + + "notashelf.dev/ncro/internal/cache" + "notashelf.dev/ncro/internal/prober" +) + +// Result of a Resolve call. +type Result struct { + URL string + LatencyMs float64 + CacheHit bool +} + +// Resolves store paths to the best upstream via cache lookup or parallel racing. +type Router struct { + db *cache.DB + prober *prober.Prober + routeTTL time.Duration + raceTimeout time.Duration + client *http.Client +} + +// Creates a Router. +func New(db *cache.DB, p *prober.Prober, routeTTL, raceTimeout time.Duration) *Router { + return &Router{ + db: db, + prober: p, + routeTTL: routeTTL, + raceTimeout: raceTimeout, + client: &http.Client{Timeout: raceTimeout}, + } +} + +// Returns the best upstream for the given store hash. +// Checks the route cache first; on miss races the provided candidates. +func (r *Router) Resolve(storeHash string, candidates []string) (*Result, error) { + entry, err := r.db.GetRoute(storeHash) + if err == nil && entry != nil && entry.IsValid() { + h := r.prober.GetHealth(entry.UpstreamURL) + if h == nil || h.Status == prober.StatusActive { + return &Result{ + URL: entry.UpstreamURL, + LatencyMs: entry.LatencyEMA, + CacheHit: true, + }, nil + } + } + return r.race(storeHash, candidates) +} + +type raceResult struct { + url string + latencyMs float64 +} + +func (r *Router) race(storeHash string, candidates []string) (*Result, error) { + if len(candidates) == 0 { + return nil, fmt.Errorf("no candidates for %q", storeHash) + } + + ctx, cancel := context.WithTimeout(context.Background(), r.raceTimeout) + defer cancel() + + ch := make(chan raceResult, len(candidates)) + var wg sync.WaitGroup + + for _, u := range candidates { + wg.Add(1) + go func(upstream string) { + defer wg.Done() + start := time.Now() + req, _ := http.NewRequestWithContext(ctx, http.MethodHead, + upstream+"/"+storeHash+".narinfo", nil) + resp, err := r.client.Do(req) + if err != nil { + return + } + resp.Body.Close() + if resp.StatusCode != 200 { + return + } + ms := float64(time.Since(start).Nanoseconds()) / 1e6 + select { + case ch <- raceResult{url: upstream, latencyMs: ms}: + default: + } + }(u) + } + + go func() { + wg.Wait() + close(ch) + }() + + winner, ok := <-ch + if !ok { + return nil, fmt.Errorf("all upstreams failed for %q", storeHash) + } + cancel() + + for res := range ch { + if res.latencyMs < winner.latencyMs { + winner = res + } + } + + health := r.prober.GetHealth(winner.url) + ema := winner.latencyMs + if health != nil { + ema = 0.3*winner.latencyMs + 0.7*health.EMALatency + } + r.prober.RecordLatency(winner.url, winner.latencyMs) + + now := time.Now() + _ = r.db.SetRoute(&cache.RouteEntry{ + StorePath: storeHash, + UpstreamURL: winner.url, + LatencyMs: winner.latencyMs, + LatencyEMA: ema, + LastVerified: now, + QueryCount: 1, + TTL: now.Add(r.routeTTL), + }) + + return &Result{URL: winner.url, LatencyMs: winner.latencyMs}, nil +} diff --git a/internal/router/router_test.go b/internal/router/router_test.go new file mode 100644 index 0000000..e9cadab --- /dev/null +++ b/internal/router/router_test.go @@ -0,0 +1,108 @@ +package router_test + +import ( + "fmt" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "notashelf.dev/ncro/internal/cache" + "notashelf.dev/ncro/internal/prober" + "notashelf.dev/ncro/internal/router" +) + +func newTestRouter(t *testing.T, upstreams ...string) (*router.Router, func()) { + t.Helper() + f, _ := os.CreateTemp("", "ncro-router-*.db") + f.Close() + db, err := cache.Open(f.Name(), 1000) + if err != nil { + t.Fatal(err) + } + p := prober.New(0.3) + for _, u := range upstreams { + p.RecordLatency(u, 10) + } + r := router.New(db, p, time.Hour, 5*time.Second) + return r, func() { + db.Close() + os.Remove(f.Name()) + } +} + +func TestRouteHit(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "StorePath: /nix/store/abc123-hello") + })) + defer srv.Close() + + r, cleanup := newTestRouter(t, srv.URL) + defer cleanup() + + result, err := r.Resolve("abc123", []string{srv.URL}) + if err != nil { + t.Fatalf("Resolve: %v", err) + } + if result.URL != srv.URL { + t.Errorf("url = %q, want %q", result.URL, srv.URL) + } + if result.LatencyMs <= 0 { + t.Error("expected positive latency") + } +} + +func TestRouteRacePicksFastest(t *testing.T) { + fast := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + })) + defer fast.Close() + + slow := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(100 * time.Millisecond) + w.WriteHeader(200) + })) + defer slow.Close() + + r, cleanup := newTestRouter(t, fast.URL, slow.URL) + defer cleanup() + + result, err := r.Resolve("somehash", []string{slow.URL, fast.URL}) + if err != nil { + t.Fatalf("Resolve: %v", err) + } + if result.URL != fast.URL { + t.Errorf("expected fast server to win, got %q", result.URL) + } +} + +func TestRouteAllFail(t *testing.T) { + r, cleanup := newTestRouter(t) + defer cleanup() + + _, err := r.Resolve("somehash", []string{"http://127.0.0.1:1"}) + if err == nil { + t.Error("expected error when all upstreams fail") + } +} + +func TestCacheHit(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + })) + defer srv.Close() + + r, cleanup := newTestRouter(t, srv.URL) + defer cleanup() + + r.Resolve("abc123", []string{srv.URL}) + + result, err := r.Resolve("abc123", []string{srv.URL}) + if err != nil { + t.Fatalf("second Resolve: %v", err) + } + if !result.CacheHit { + t.Error("expected cache hit on second resolve") + } +}