diff --git a/cmd/ncro/main.go b/cmd/ncro/main.go index f96c0a0..fe8b86a 100644 --- a/cmd/ncro/main.go +++ b/cmd/ncro/main.go @@ -140,7 +140,7 @@ func main() { srv := &http.Server{ Addr: cfg.Server.Listen, - Handler: server.New(r, p, cfg.Upstreams), + Handler: server.New(r, p, db, cfg.Upstreams, 30), ReadTimeout: cfg.Server.ReadTimeout.Duration, WriteTimeout: cfg.Server.WriteTimeout.Duration, } diff --git a/internal/server/integration_test.go b/internal/server/integration_test.go index 13684a5..e28b33c 100644 --- a/internal/server/integration_test.go +++ b/internal/server/integration_test.go @@ -39,7 +39,7 @@ func TestRouteReuseOnSecondRequest(t *testing.T) { p := prober.New(0.3) p.RecordLatency(upstream.URL, 10) r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute) - ts := httptest.NewServer(server.New(r, p, []config.UpstreamConfig{{URL: upstream.URL}})) + ts := httptest.NewServer(server.New(r, p, db, []config.UpstreamConfig{{URL: upstream.URL}}, 30)) defer ts.Close() resp1, _ := http.Get(ts.URL + "/deadbeef00000000.narinfo") @@ -79,10 +79,10 @@ func TestUpstreamFailoverFallback(t *testing.T) { p.RecordLatency(good.URL, 50) r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute) - ts := httptest.NewServer(server.New(r, p, []config.UpstreamConfig{ + ts := httptest.NewServer(server.New(r, p, db, []config.UpstreamConfig{ {URL: bad.URL}, {URL: good.URL}, - })) + }, 30)) defer ts.Close() resp, err := http.Get(ts.URL + "/cafebabe00000000.narinfo") diff --git a/internal/server/server.go b/internal/server/server.go index 26ac8b1..19cfaf6 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -10,6 +10,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus/promhttp" + "notashelf.dev/ncro/internal/cache" "notashelf.dev/ncro/internal/config" "notashelf.dev/ncro/internal/metrics" "notashelf.dev/ncro/internal/prober" @@ -18,19 +19,23 @@ import ( // HTTP handler implementing the Nix binary cache protocol. type Server struct { - router *router.Router - prober *prober.Prober - upstreams []config.UpstreamConfig - client *http.Client + router *router.Router + prober *prober.Prober + db *cache.DB + upstreams []config.UpstreamConfig + client *http.Client + cachePriority int } // Creates a Server. -func New(r *router.Router, p *prober.Prober, upstreams []config.UpstreamConfig) *Server { +func New(r *router.Router, p *prober.Prober, db *cache.DB, upstreams []config.UpstreamConfig, cachePriority int) *Server { return &Server{ - router: r, - prober: p, - upstreams: upstreams, - client: &http.Client{Timeout: 60 * time.Second}, + router: r, + prober: p, + db: db, + upstreams: upstreams, + client: &http.Client{Timeout: 60 * time.Second}, + cachePriority: cachePriority, } } @@ -56,7 +61,7 @@ func (s *Server) handleCacheInfo(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "text/plain") fmt.Fprintln(w, "StoreDir: /nix/store") fmt.Fprintln(w, "WantMassQuery: 1") - fmt.Fprintln(w, "Priority: 30") + fmt.Fprintf(w, "Priority: %d\n", s.cachePriority) } func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { @@ -94,42 +99,57 @@ func (s *Server) handleNarinfo(w http.ResponseWriter, r *http.Request) { func (s *Server) handleNAR(w http.ResponseWriter, r *http.Request) { metrics.NARRequests.Inc() - sorted := s.prober.SortedByLatency() - if len(sorted) == 0 { - http.Error(w, "no upstreams available", http.StatusServiceUnavailable) - return + + // Consult route cache: the narURL is the path without the leading slash. + narURL := strings.TrimPrefix(r.URL.Path, "/") + var tried string + if entry, err := s.db.GetRouteByNarURL(narURL); err == nil && entry != nil && entry.IsValid() { + tried = entry.UpstreamURL + if s.tryNARUpstream(w, r, entry.UpstreamURL) { + return + } } - for _, h := range sorted { - if h.Status == prober.StatusDown { + + // Fall back through all upstreams sorted by latency. + for _, h := range s.prober.SortedByLatency() { + if h.Status == prober.StatusDown || h.URL == tried { continue } - targetURL := h.URL + r.URL.Path - req, err := http.NewRequestWithContext(r.Context(), r.Method, targetURL, r.Body) - if err != nil { - continue + if s.tryNARUpstream(w, r, h.URL) { + return } - for _, hdr := range []string{"Accept", "Accept-Encoding", "Range"} { - if v := r.Header.Get(hdr); v != "" { - req.Header.Set(hdr, v) - } - } - resp, err := s.client.Do(req) - if err != nil { - slog.Warn("NAR upstream failed", "upstream", h.URL, "error", err) - continue - } - if resp.StatusCode == http.StatusNotFound { - resp.Body.Close() - continue - } - defer resp.Body.Close() - slog.Debug("proxying NAR", "path", r.URL.Path, "upstream", h.URL) - s.copyResponse(w, resp) - return } http.NotFound(w, r) } +// Attempts to serve a NAR from upstreamBase. Returns true if the upstream +// responded with a non-404 status. +func (s *Server) tryNARUpstream(w http.ResponseWriter, r *http.Request, upstreamBase string) bool { + targetURL := upstreamBase + r.URL.Path + req, err := http.NewRequestWithContext(r.Context(), r.Method, targetURL, r.Body) + if err != nil { + return false + } + for _, hdr := range []string{"Accept", "Accept-Encoding", "Range"} { + if v := r.Header.Get(hdr); v != "" { + req.Header.Set(hdr, v) + } + } + resp, err := s.client.Do(req) + if err != nil { + slog.Warn("NAR upstream failed", "upstream", upstreamBase, "error", err) + return false + } + if resp.StatusCode == http.StatusNotFound { + resp.Body.Close() + return false + } + defer resp.Body.Close() + slog.Debug("proxying NAR", "path", r.URL.Path, "upstream", upstreamBase) + s.copyResponse(w, resp) + return true +} + // Forwards r to targetURL and streams the response zero-copy. func (s *Server) proxyRequest(w http.ResponseWriter, r *http.Request, targetURL string) { req, err := http.NewRequestWithContext(r.Context(), r.Method, targetURL, r.Body) diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 8e12311..4a72a0f 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "os" "strings" + "sync/atomic" "testing" "time" @@ -40,7 +41,7 @@ func makeTestServer(t *testing.T, upstreams ...string) *httptest.Server { } r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute) - return httptest.NewServer(server.New(r, p, upsCfg)) + return httptest.NewServer(server.New(r, p, db, upsCfg, 30)) } func TestNixCacheInfo(t *testing.T) { @@ -287,6 +288,55 @@ func TestNARRangeHeaderForwarded(t *testing.T) { } } +func TestNARRoutingUsesCache(t *testing.T) { + // Upstream A: has the NAR. + upstreamA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, ".narinfo") { + w.Header().Set("Content-Type", "text/x-nix-narinfo") + fmt.Fprintln(w, "StorePath: /nix/store/abc123-test") + fmt.Fprintln(w, "URL: nar/abc123.nar.xz") + } else { + fmt.Fprintln(w, "NAR data from A") + } + })) + defer upstreamA.Close() + + // Upstream B: does NOT have the NAR. + var bHit int32 + upstreamB := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&bHit, 1) + http.NotFound(w, r) + })) + defer upstreamB.Close() + + db, _ := cache.Open(":memory:", 100) + defer db.Close() + + // Pre-seed the route cache: abc123 → upstreamA, NarURL = "nar/abc123.nar.xz" + _ = db.SetRoute(&cache.RouteEntry{ + StorePath: "abc123", + UpstreamURL: upstreamA.URL, + NarURL: "nar/abc123.nar.xz", + TTL: time.Now().Add(time.Hour), + }) + + p := prober.New(0.3) + p.InitUpstreams([]config.UpstreamConfig{{URL: upstreamA.URL}, {URL: upstreamB.URL}}) + r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute) + srv := server.New(r, p, db, []config.UpstreamConfig{{URL: upstreamA.URL}, {URL: upstreamB.URL}}, 30) + + req := httptest.NewRequest(http.MethodGet, "/nar/abc123.nar.xz", nil) + w := httptest.NewRecorder() + srv.ServeHTTP(w, req) + + if w.Code != 200 { + t.Fatalf("status = %d, want 200", w.Code) + } + if atomic.LoadInt32(&bHit) > 0 { + t.Error("upstream B should not have been contacted when route cache has the answer") + } +} + func TestNARFallbackWhenFirstUpstreamMissing(t *testing.T) { missing := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(404) @@ -312,7 +362,7 @@ func TestNARFallbackWhenFirstUpstreamMissing(t *testing.T) { upsCfg := []config.UpstreamConfig{{URL: missing.URL}, {URL: hasIt.URL}} r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute) - ts := httptest.NewServer(server.New(r, p, upsCfg)) + ts := httptest.NewServer(server.New(r, p, db, upsCfg, 30)) defer ts.Close() resp, err := http.Get(ts.URL + "/nar/abc123.nar")