server: route NAR requests via route cache; extract tryNARUpstream helper
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Icd57a268015d58faa7ffc2b8c711283b6a6a6964
This commit is contained in:
parent
ded9b6d464
commit
118eda08a1
4 changed files with 114 additions and 44 deletions
|
|
@ -140,7 +140,7 @@ func main() {
|
||||||
|
|
||||||
srv := &http.Server{
|
srv := &http.Server{
|
||||||
Addr: cfg.Server.Listen,
|
Addr: cfg.Server.Listen,
|
||||||
Handler: server.New(r, p, cfg.Upstreams),
|
Handler: server.New(r, p, db, cfg.Upstreams, 30),
|
||||||
ReadTimeout: cfg.Server.ReadTimeout.Duration,
|
ReadTimeout: cfg.Server.ReadTimeout.Duration,
|
||||||
WriteTimeout: cfg.Server.WriteTimeout.Duration,
|
WriteTimeout: cfg.Server.WriteTimeout.Duration,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,7 @@ func TestRouteReuseOnSecondRequest(t *testing.T) {
|
||||||
p := prober.New(0.3)
|
p := prober.New(0.3)
|
||||||
p.RecordLatency(upstream.URL, 10)
|
p.RecordLatency(upstream.URL, 10)
|
||||||
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
|
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
|
||||||
ts := httptest.NewServer(server.New(r, p, []config.UpstreamConfig{{URL: upstream.URL}}))
|
ts := httptest.NewServer(server.New(r, p, db, []config.UpstreamConfig{{URL: upstream.URL}}, 30))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
resp1, _ := http.Get(ts.URL + "/deadbeef00000000.narinfo")
|
resp1, _ := http.Get(ts.URL + "/deadbeef00000000.narinfo")
|
||||||
|
|
@ -79,10 +79,10 @@ func TestUpstreamFailoverFallback(t *testing.T) {
|
||||||
p.RecordLatency(good.URL, 50)
|
p.RecordLatency(good.URL, 50)
|
||||||
|
|
||||||
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
|
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
|
||||||
ts := httptest.NewServer(server.New(r, p, []config.UpstreamConfig{
|
ts := httptest.NewServer(server.New(r, p, db, []config.UpstreamConfig{
|
||||||
{URL: bad.URL},
|
{URL: bad.URL},
|
||||||
{URL: good.URL},
|
{URL: good.URL},
|
||||||
}))
|
}, 30))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
resp, err := http.Get(ts.URL + "/cafebabe00000000.narinfo")
|
resp, err := http.Get(ts.URL + "/cafebabe00000000.narinfo")
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
|
"notashelf.dev/ncro/internal/cache"
|
||||||
"notashelf.dev/ncro/internal/config"
|
"notashelf.dev/ncro/internal/config"
|
||||||
"notashelf.dev/ncro/internal/metrics"
|
"notashelf.dev/ncro/internal/metrics"
|
||||||
"notashelf.dev/ncro/internal/prober"
|
"notashelf.dev/ncro/internal/prober"
|
||||||
|
|
@ -18,19 +19,23 @@ import (
|
||||||
|
|
||||||
// HTTP handler implementing the Nix binary cache protocol.
|
// HTTP handler implementing the Nix binary cache protocol.
|
||||||
type Server struct {
|
type Server struct {
|
||||||
router *router.Router
|
router *router.Router
|
||||||
prober *prober.Prober
|
prober *prober.Prober
|
||||||
upstreams []config.UpstreamConfig
|
db *cache.DB
|
||||||
client *http.Client
|
upstreams []config.UpstreamConfig
|
||||||
|
client *http.Client
|
||||||
|
cachePriority int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a Server.
|
// Creates a Server.
|
||||||
func New(r *router.Router, p *prober.Prober, upstreams []config.UpstreamConfig) *Server {
|
func New(r *router.Router, p *prober.Prober, db *cache.DB, upstreams []config.UpstreamConfig, cachePriority int) *Server {
|
||||||
return &Server{
|
return &Server{
|
||||||
router: r,
|
router: r,
|
||||||
prober: p,
|
prober: p,
|
||||||
upstreams: upstreams,
|
db: db,
|
||||||
client: &http.Client{Timeout: 60 * time.Second},
|
upstreams: upstreams,
|
||||||
|
client: &http.Client{Timeout: 60 * time.Second},
|
||||||
|
cachePriority: cachePriority,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -56,7 +61,7 @@ func (s *Server) handleCacheInfo(w http.ResponseWriter, _ *http.Request) {
|
||||||
w.Header().Set("Content-Type", "text/plain")
|
w.Header().Set("Content-Type", "text/plain")
|
||||||
fmt.Fprintln(w, "StoreDir: /nix/store")
|
fmt.Fprintln(w, "StoreDir: /nix/store")
|
||||||
fmt.Fprintln(w, "WantMassQuery: 1")
|
fmt.Fprintln(w, "WantMassQuery: 1")
|
||||||
fmt.Fprintln(w, "Priority: 30")
|
fmt.Fprintf(w, "Priority: %d\n", s.cachePriority)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
|
func (s *Server) handleHealth(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
|
@ -94,42 +99,57 @@ func (s *Server) handleNarinfo(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
func (s *Server) handleNAR(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleNAR(w http.ResponseWriter, r *http.Request) {
|
||||||
metrics.NARRequests.Inc()
|
metrics.NARRequests.Inc()
|
||||||
sorted := s.prober.SortedByLatency()
|
|
||||||
if len(sorted) == 0 {
|
// Consult route cache: the narURL is the path without the leading slash.
|
||||||
http.Error(w, "no upstreams available", http.StatusServiceUnavailable)
|
narURL := strings.TrimPrefix(r.URL.Path, "/")
|
||||||
return
|
var tried string
|
||||||
|
if entry, err := s.db.GetRouteByNarURL(narURL); err == nil && entry != nil && entry.IsValid() {
|
||||||
|
tried = entry.UpstreamURL
|
||||||
|
if s.tryNARUpstream(w, r, entry.UpstreamURL) {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for _, h := range sorted {
|
|
||||||
if h.Status == prober.StatusDown {
|
// Fall back through all upstreams sorted by latency.
|
||||||
|
for _, h := range s.prober.SortedByLatency() {
|
||||||
|
if h.Status == prober.StatusDown || h.URL == tried {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
targetURL := h.URL + r.URL.Path
|
if s.tryNARUpstream(w, r, h.URL) {
|
||||||
req, err := http.NewRequestWithContext(r.Context(), r.Method, targetURL, r.Body)
|
return
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
for _, hdr := range []string{"Accept", "Accept-Encoding", "Range"} {
|
|
||||||
if v := r.Header.Get(hdr); v != "" {
|
|
||||||
req.Header.Set(hdr, v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
resp, err := s.client.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
slog.Warn("NAR upstream failed", "upstream", h.URL, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if resp.StatusCode == http.StatusNotFound {
|
|
||||||
resp.Body.Close()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
slog.Debug("proxying NAR", "path", r.URL.Path, "upstream", h.URL)
|
|
||||||
s.copyResponse(w, resp)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
http.NotFound(w, r)
|
http.NotFound(w, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Attempts to serve a NAR from upstreamBase. Returns true if the upstream
|
||||||
|
// responded with a non-404 status.
|
||||||
|
func (s *Server) tryNARUpstream(w http.ResponseWriter, r *http.Request, upstreamBase string) bool {
|
||||||
|
targetURL := upstreamBase + r.URL.Path
|
||||||
|
req, err := http.NewRequestWithContext(r.Context(), r.Method, targetURL, r.Body)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for _, hdr := range []string{"Accept", "Accept-Encoding", "Range"} {
|
||||||
|
if v := r.Header.Get(hdr); v != "" {
|
||||||
|
req.Header.Set(hdr, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resp, err := s.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("NAR upstream failed", "upstream", upstreamBase, "error", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
|
resp.Body.Close()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
slog.Debug("proxying NAR", "path", r.URL.Path, "upstream", upstreamBase)
|
||||||
|
s.copyResponse(w, resp)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// Forwards r to targetURL and streams the response zero-copy.
|
// Forwards r to targetURL and streams the response zero-copy.
|
||||||
func (s *Server) proxyRequest(w http.ResponseWriter, r *http.Request, targetURL string) {
|
func (s *Server) proxyRequest(w http.ResponseWriter, r *http.Request, targetURL string) {
|
||||||
req, err := http.NewRequestWithContext(r.Context(), r.Method, targetURL, r.Body)
|
req, err := http.NewRequestWithContext(r.Context(), r.Method, targetURL, r.Body)
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -40,7 +41,7 @@ func makeTestServer(t *testing.T, upstreams ...string) *httptest.Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
|
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
|
||||||
return httptest.NewServer(server.New(r, p, upsCfg))
|
return httptest.NewServer(server.New(r, p, db, upsCfg, 30))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNixCacheInfo(t *testing.T) {
|
func TestNixCacheInfo(t *testing.T) {
|
||||||
|
|
@ -287,6 +288,55 @@ func TestNARRangeHeaderForwarded(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNARRoutingUsesCache(t *testing.T) {
|
||||||
|
// Upstream A: has the NAR.
|
||||||
|
upstreamA := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if strings.HasSuffix(r.URL.Path, ".narinfo") {
|
||||||
|
w.Header().Set("Content-Type", "text/x-nix-narinfo")
|
||||||
|
fmt.Fprintln(w, "StorePath: /nix/store/abc123-test")
|
||||||
|
fmt.Fprintln(w, "URL: nar/abc123.nar.xz")
|
||||||
|
} else {
|
||||||
|
fmt.Fprintln(w, "NAR data from A")
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer upstreamA.Close()
|
||||||
|
|
||||||
|
// Upstream B: does NOT have the NAR.
|
||||||
|
var bHit int32
|
||||||
|
upstreamB := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
atomic.AddInt32(&bHit, 1)
|
||||||
|
http.NotFound(w, r)
|
||||||
|
}))
|
||||||
|
defer upstreamB.Close()
|
||||||
|
|
||||||
|
db, _ := cache.Open(":memory:", 100)
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
// Pre-seed the route cache: abc123 → upstreamA, NarURL = "nar/abc123.nar.xz"
|
||||||
|
_ = db.SetRoute(&cache.RouteEntry{
|
||||||
|
StorePath: "abc123",
|
||||||
|
UpstreamURL: upstreamA.URL,
|
||||||
|
NarURL: "nar/abc123.nar.xz",
|
||||||
|
TTL: time.Now().Add(time.Hour),
|
||||||
|
})
|
||||||
|
|
||||||
|
p := prober.New(0.3)
|
||||||
|
p.InitUpstreams([]config.UpstreamConfig{{URL: upstreamA.URL}, {URL: upstreamB.URL}})
|
||||||
|
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
|
||||||
|
srv := server.New(r, p, db, []config.UpstreamConfig{{URL: upstreamA.URL}, {URL: upstreamB.URL}}, 30)
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/nar/abc123.nar.xz", nil)
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
srv.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if w.Code != 200 {
|
||||||
|
t.Fatalf("status = %d, want 200", w.Code)
|
||||||
|
}
|
||||||
|
if atomic.LoadInt32(&bHit) > 0 {
|
||||||
|
t.Error("upstream B should not have been contacted when route cache has the answer")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestNARFallbackWhenFirstUpstreamMissing(t *testing.T) {
|
func TestNARFallbackWhenFirstUpstreamMissing(t *testing.T) {
|
||||||
missing := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
missing := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(404)
|
w.WriteHeader(404)
|
||||||
|
|
@ -312,7 +362,7 @@ func TestNARFallbackWhenFirstUpstreamMissing(t *testing.T) {
|
||||||
|
|
||||||
upsCfg := []config.UpstreamConfig{{URL: missing.URL}, {URL: hasIt.URL}}
|
upsCfg := []config.UpstreamConfig{{URL: missing.URL}, {URL: hasIt.URL}}
|
||||||
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
|
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
|
||||||
ts := httptest.NewServer(server.New(r, p, upsCfg))
|
ts := httptest.NewServer(server.New(r, p, db, upsCfg, 30))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
resp, err := http.Get(ts.URL + "/nar/abc123.nar")
|
resp, err := http.Get(ts.URL + "/nar/abc123.nar")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue