diff --git a/internal/server/server.go b/internal/server/server.go index abb4e43..28346e6 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1 +1,127 @@ package server + +import ( + "fmt" + "io" + "log/slog" + "net/http" + "strings" + "time" + + "notashelf.dev/ncro/internal/config" + "notashelf.dev/ncro/internal/prober" + "notashelf.dev/ncro/internal/router" +) + +// HTTP handler implementing the Nix binary cache protocol. +type Server struct { + router *router.Router + prober *prober.Prober + upstreams []config.UpstreamConfig + client *http.Client +} + +// Creates a Server. +func New(r *router.Router, p *prober.Prober, upstreams []config.UpstreamConfig) *Server { + return &Server{ + router: r, + prober: p, + upstreams: upstreams, + client: &http.Client{Timeout: 60 * time.Second}, + } +} + +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + path := r.URL.Path + switch { + case path == "/nix-cache-info": + s.handleCacheInfo(w, r) + case path == "/health": + s.handleHealth(w, r) + case strings.HasSuffix(path, ".narinfo"): + s.handleNarinfo(w, r) + case strings.HasPrefix(path, "/nar/"): + s.handleNAR(w, r) + default: + http.NotFound(w, r) + } +} + +func (s *Server) handleCacheInfo(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/plain") + fmt.Fprintln(w, "StoreDir: /nix/store") + fmt.Fprintln(w, "WantMassQuery: 1") + fmt.Fprintln(w, "Priority: 30") +} + +func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + fmt.Fprintln(w, `{"status":"ok"}`) +} + +func (s *Server) handleNarinfo(w http.ResponseWriter, r *http.Request) { + hash := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/"), ".narinfo") + + result, err := s.router.Resolve(hash, s.upstreamURLs()) + if err != nil { + slog.Warn("narinfo not found", "hash", hash, "error", err) + http.NotFound(w, r) + return + } + + slog.Info("narinfo routed", "hash", hash, "upstream", result.URL, "cache_hit", result.CacheHit) + s.proxyRequest(w, r, result.URL+r.URL.Path) +} + +func (s *Server) handleNAR(w http.ResponseWriter, r *http.Request) { + sorted := s.prober.SortedByLatency() + if len(sorted) == 0 { + http.Error(w, "no upstreams available", http.StatusServiceUnavailable) + return + } + slog.Debug("proxying NAR", "path", r.URL.Path, "upstream", sorted[0].URL) + s.proxyRequest(w, r, sorted[0].URL+r.URL.Path) +} + +// Forwards r to targetURL and streams the response zero-copy. +func (s *Server) proxyRequest(w http.ResponseWriter, r *http.Request, targetURL string) { + req, err := http.NewRequestWithContext(r.Context(), r.Method, targetURL, r.Body) + if err != nil { + http.Error(w, "internal error", http.StatusInternalServerError) + return + } + for _, h := range []string{"Accept", "Accept-Encoding", "Range"} { + if v := r.Header.Get(h); v != "" { + req.Header.Set(h, v) + } + } + + resp, err := s.client.Do(req) + if err != nil { + slog.Error("upstream request failed", "url", targetURL, "error", err) + http.Error(w, "upstream error", http.StatusBadGateway) + return + } + defer resp.Body.Close() + + for _, h := range []string{ + "Content-Type", "Content-Length", "Content-Encoding", + "X-Nix-Signature", "Cache-Control", "Last-Modified", + } { + if v := resp.Header.Get(h); v != "" { + w.Header().Set(h, v) + } + } + w.WriteHeader(resp.StatusCode) + if _, err := io.Copy(w, resp.Body); err != nil { + slog.Warn("stream interrupted", "url", targetURL, "error", err) + } +} + +func (s *Server) upstreamURLs() []string { + urls := make([]string, len(s.upstreams)) + for i, u := range s.upstreams { + urls[i] = u.URL + } + return urls +} diff --git a/internal/server/server_test.go b/internal/server/server_test.go new file mode 100644 index 0000000..c941b11 --- /dev/null +++ b/internal/server/server_test.go @@ -0,0 +1,153 @@ +package server_test + +import ( + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" + + "notashelf.dev/ncro/internal/cache" + "notashelf.dev/ncro/internal/config" + "notashelf.dev/ncro/internal/prober" + "notashelf.dev/ncro/internal/router" + "notashelf.dev/ncro/internal/server" +) + +func makeTestServer(t *testing.T, upstreams ...string) *httptest.Server { + t.Helper() + f, _ := os.CreateTemp("", "ncro-srv-*.db") + f.Close() + t.Cleanup(func() { os.Remove(f.Name()) }) + + db, err := cache.Open(f.Name(), 1000) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { db.Close() }) + + p := prober.New(0.3) + for _, u := range upstreams { + p.RecordLatency(u, 10) + } + + upsCfg := make([]config.UpstreamConfig, len(upstreams)) + for i, u := range upstreams { + upsCfg[i] = config.UpstreamConfig{URL: u} + } + + r := router.New(db, p, time.Hour, 5*time.Second) + return httptest.NewServer(server.New(r, p, upsCfg)) +} + +func TestNixCacheInfo(t *testing.T) { + ts := makeTestServer(t) + defer ts.Close() + + resp, err := http.Get(ts.URL + "/nix-cache-info") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + t.Errorf("status = %d, want 200", resp.StatusCode) + } + body, _ := io.ReadAll(resp.Body) + if !strings.Contains(string(body), "StoreDir:") { + t.Errorf("body missing StoreDir: %q", body) + } +} + +func TestHealthEndpoint(t *testing.T) { + ts := makeTestServer(t) + defer ts.Close() + + resp, err := http.Get(ts.URL + "/health") + if err != nil { + t.Fatal(err) + } + if resp.StatusCode != 200 { + t.Errorf("status = %d, want 200", resp.StatusCode) + } +} + +func TestNarinfoProxy(t *testing.T) { + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, ".narinfo") { + w.Header().Set("Content-Type", "text/x-nix-narinfo") + fmt.Fprint(w, "StorePath: /nix/store/abc123-hello-2.12\nURL: nar/abc123.nar\nCompression: none\n") + return + } + w.WriteHeader(404) + })) + defer upstream.Close() + + ts := makeTestServer(t, upstream.URL) + defer ts.Close() + + resp, err := http.Get(ts.URL + "/abc123def456.narinfo") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + t.Errorf("narinfo status = %d, want 200", resp.StatusCode) + } + body, _ := io.ReadAll(resp.Body) + if !strings.Contains(string(body), "StorePath:") { + t.Errorf("expected narinfo body, got: %q", body) + } +} + +func TestNarinfoNotFound(t *testing.T) { + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(404) + })) + defer upstream.Close() + + ts := makeTestServer(t, upstream.URL) + defer ts.Close() + + resp, _ := http.Get(ts.URL + "/notfound000000.narinfo") + if resp.StatusCode != 404 { + t.Errorf("status = %d, want 404", resp.StatusCode) + } +} + +func TestNARStreamingPassthrough(t *testing.T) { + narContent := []byte("fake-nar-content-bytes") + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasPrefix(r.URL.Path, "/nar/") { + w.Header().Set("Content-Type", "application/x-nix-archive") + w.Write(narContent) + return + } + if strings.HasSuffix(r.URL.Path, ".narinfo") { + w.WriteHeader(200) + return + } + w.WriteHeader(404) + })) + defer upstream.Close() + + ts := makeTestServer(t, upstream.URL) + defer ts.Close() + + resp, err := http.Get(ts.URL + "/nar/abc123.nar") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + t.Errorf("NAR status = %d, want 200", resp.StatusCode) + } + body, _ := io.ReadAll(resp.Body) + if string(body) != string(narContent) { + t.Errorf("NAR body mismatch: got %q, want %q", body, narContent) + } +}