server: implement Nix binary cache HTTP handler with zero-copy NAR streaming
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Ibcafdb7084453ee4f8a8b76b1ee222466a6a6964
This commit is contained in:
parent
65ddeb48f6
commit
3a80551898
2 changed files with 279 additions and 0 deletions
|
|
@ -1 +1,127 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"notashelf.dev/ncro/internal/config"
|
||||
"notashelf.dev/ncro/internal/prober"
|
||||
"notashelf.dev/ncro/internal/router"
|
||||
)
|
||||
|
||||
// HTTP handler implementing the Nix binary cache protocol.
|
||||
type Server struct {
|
||||
router *router.Router
|
||||
prober *prober.Prober
|
||||
upstreams []config.UpstreamConfig
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// Creates a Server.
|
||||
func New(r *router.Router, p *prober.Prober, upstreams []config.UpstreamConfig) *Server {
|
||||
return &Server{
|
||||
router: r,
|
||||
prober: p,
|
||||
upstreams: upstreams,
|
||||
client: &http.Client{Timeout: 60 * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
path := r.URL.Path
|
||||
switch {
|
||||
case path == "/nix-cache-info":
|
||||
s.handleCacheInfo(w, r)
|
||||
case path == "/health":
|
||||
s.handleHealth(w, r)
|
||||
case strings.HasSuffix(path, ".narinfo"):
|
||||
s.handleNarinfo(w, r)
|
||||
case strings.HasPrefix(path, "/nar/"):
|
||||
s.handleNAR(w, r)
|
||||
default:
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleCacheInfo(w http.ResponseWriter, _ *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
fmt.Fprintln(w, "StoreDir: /nix/store")
|
||||
fmt.Fprintln(w, "WantMassQuery: 1")
|
||||
fmt.Fprintln(w, "Priority: 30")
|
||||
}
|
||||
|
||||
func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintln(w, `{"status":"ok"}`)
|
||||
}
|
||||
|
||||
func (s *Server) handleNarinfo(w http.ResponseWriter, r *http.Request) {
|
||||
hash := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/"), ".narinfo")
|
||||
|
||||
result, err := s.router.Resolve(hash, s.upstreamURLs())
|
||||
if err != nil {
|
||||
slog.Warn("narinfo not found", "hash", hash, "error", err)
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info("narinfo routed", "hash", hash, "upstream", result.URL, "cache_hit", result.CacheHit)
|
||||
s.proxyRequest(w, r, result.URL+r.URL.Path)
|
||||
}
|
||||
|
||||
func (s *Server) handleNAR(w http.ResponseWriter, r *http.Request) {
|
||||
sorted := s.prober.SortedByLatency()
|
||||
if len(sorted) == 0 {
|
||||
http.Error(w, "no upstreams available", http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
slog.Debug("proxying NAR", "path", r.URL.Path, "upstream", sorted[0].URL)
|
||||
s.proxyRequest(w, r, sorted[0].URL+r.URL.Path)
|
||||
}
|
||||
|
||||
// Forwards r to targetURL and streams the response zero-copy.
|
||||
func (s *Server) proxyRequest(w http.ResponseWriter, r *http.Request, targetURL string) {
|
||||
req, err := http.NewRequestWithContext(r.Context(), r.Method, targetURL, r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, "internal error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
for _, h := range []string{"Accept", "Accept-Encoding", "Range"} {
|
||||
if v := r.Header.Get(h); v != "" {
|
||||
req.Header.Set(h, v)
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := s.client.Do(req)
|
||||
if err != nil {
|
||||
slog.Error("upstream request failed", "url", targetURL, "error", err)
|
||||
http.Error(w, "upstream error", http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
for _, h := range []string{
|
||||
"Content-Type", "Content-Length", "Content-Encoding",
|
||||
"X-Nix-Signature", "Cache-Control", "Last-Modified",
|
||||
} {
|
||||
if v := resp.Header.Get(h); v != "" {
|
||||
w.Header().Set(h, v)
|
||||
}
|
||||
}
|
||||
w.WriteHeader(resp.StatusCode)
|
||||
if _, err := io.Copy(w, resp.Body); err != nil {
|
||||
slog.Warn("stream interrupted", "url", targetURL, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) upstreamURLs() []string {
|
||||
urls := make([]string, len(s.upstreams))
|
||||
for i, u := range s.upstreams {
|
||||
urls[i] = u.URL
|
||||
}
|
||||
return urls
|
||||
}
|
||||
|
|
|
|||
153
internal/server/server_test.go
Normal file
153
internal/server/server_test.go
Normal file
|
|
@ -0,0 +1,153 @@
|
|||
package server_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"notashelf.dev/ncro/internal/cache"
|
||||
"notashelf.dev/ncro/internal/config"
|
||||
"notashelf.dev/ncro/internal/prober"
|
||||
"notashelf.dev/ncro/internal/router"
|
||||
"notashelf.dev/ncro/internal/server"
|
||||
)
|
||||
|
||||
func makeTestServer(t *testing.T, upstreams ...string) *httptest.Server {
|
||||
t.Helper()
|
||||
f, _ := os.CreateTemp("", "ncro-srv-*.db")
|
||||
f.Close()
|
||||
t.Cleanup(func() { os.Remove(f.Name()) })
|
||||
|
||||
db, err := cache.Open(f.Name(), 1000)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Cleanup(func() { db.Close() })
|
||||
|
||||
p := prober.New(0.3)
|
||||
for _, u := range upstreams {
|
||||
p.RecordLatency(u, 10)
|
||||
}
|
||||
|
||||
upsCfg := make([]config.UpstreamConfig, len(upstreams))
|
||||
for i, u := range upstreams {
|
||||
upsCfg[i] = config.UpstreamConfig{URL: u}
|
||||
}
|
||||
|
||||
r := router.New(db, p, time.Hour, 5*time.Second)
|
||||
return httptest.NewServer(server.New(r, p, upsCfg))
|
||||
}
|
||||
|
||||
func TestNixCacheInfo(t *testing.T) {
|
||||
ts := makeTestServer(t)
|
||||
defer ts.Close()
|
||||
|
||||
resp, err := http.Get(ts.URL + "/nix-cache-info")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
t.Errorf("status = %d, want 200", resp.StatusCode)
|
||||
}
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
if !strings.Contains(string(body), "StoreDir:") {
|
||||
t.Errorf("body missing StoreDir: %q", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthEndpoint(t *testing.T) {
|
||||
ts := makeTestServer(t)
|
||||
defer ts.Close()
|
||||
|
||||
resp, err := http.Get(ts.URL + "/health")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.StatusCode != 200 {
|
||||
t.Errorf("status = %d, want 200", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNarinfoProxy(t *testing.T) {
|
||||
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.HasSuffix(r.URL.Path, ".narinfo") {
|
||||
w.Header().Set("Content-Type", "text/x-nix-narinfo")
|
||||
fmt.Fprint(w, "StorePath: /nix/store/abc123-hello-2.12\nURL: nar/abc123.nar\nCompression: none\n")
|
||||
return
|
||||
}
|
||||
w.WriteHeader(404)
|
||||
}))
|
||||
defer upstream.Close()
|
||||
|
||||
ts := makeTestServer(t, upstream.URL)
|
||||
defer ts.Close()
|
||||
|
||||
resp, err := http.Get(ts.URL + "/abc123def456.narinfo")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
t.Errorf("narinfo status = %d, want 200", resp.StatusCode)
|
||||
}
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
if !strings.Contains(string(body), "StorePath:") {
|
||||
t.Errorf("expected narinfo body, got: %q", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNarinfoNotFound(t *testing.T) {
|
||||
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(404)
|
||||
}))
|
||||
defer upstream.Close()
|
||||
|
||||
ts := makeTestServer(t, upstream.URL)
|
||||
defer ts.Close()
|
||||
|
||||
resp, _ := http.Get(ts.URL + "/notfound000000.narinfo")
|
||||
if resp.StatusCode != 404 {
|
||||
t.Errorf("status = %d, want 404", resp.StatusCode)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNARStreamingPassthrough(t *testing.T) {
|
||||
narContent := []byte("fake-nar-content-bytes")
|
||||
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.HasPrefix(r.URL.Path, "/nar/") {
|
||||
w.Header().Set("Content-Type", "application/x-nix-archive")
|
||||
w.Write(narContent)
|
||||
return
|
||||
}
|
||||
if strings.HasSuffix(r.URL.Path, ".narinfo") {
|
||||
w.WriteHeader(200)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(404)
|
||||
}))
|
||||
defer upstream.Close()
|
||||
|
||||
ts := makeTestServer(t, upstream.URL)
|
||||
defer ts.Close()
|
||||
|
||||
resp, err := http.Get(ts.URL + "/nar/abc123.nar")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != 200 {
|
||||
t.Errorf("NAR status = %d, want 200", resp.StatusCode)
|
||||
}
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
if string(body) != string(narContent) {
|
||||
t.Errorf("NAR body mismatch: got %q, want %q", body, narContent)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue