From 41b18dd1f8d2bb047a30f6b0a62e0a22977c52e2 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Fri, 6 Mar 2026 19:44:24 +0300 Subject: [PATCH] router: add singleflight deduplication for concurrent narinfo races Signed-off-by: NotAShelf Change-Id: Ib682889f34ad4ad4fb331ee2924dc9916a6a6964 --- cmd/ncro/main.go | 2 +- go.mod | 4 ++- go.sum | 30 +++++++++++++----- internal/router/router.go | 15 +++++++-- internal/router/router_test.go | 47 +++++++++++++++++++++++++++-- internal/server/integration_test.go | 4 +-- internal/server/server_test.go | 4 +-- nix/package.nix | 2 +- 8 files changed, 90 insertions(+), 18 deletions(-) diff --git a/cmd/ncro/main.go b/cmd/ncro/main.go index b1c356c..2dae8b0 100644 --- a/cmd/ncro/main.go +++ b/cmd/ncro/main.go @@ -90,7 +90,7 @@ func main() { probeDone := make(chan struct{}) go p.RunProbeLoop(30*time.Second, probeDone) - r := router.New(db, p, cfg.Cache.TTL.Duration, 5*time.Second) + r := router.New(db, p, cfg.Cache.TTL.Duration, 5*time.Second, 10*time.Minute) for _, u := range cfg.Upstreams { if u.PublicKey != "" { if err := r.SetUpstreamKey(u.URL, u.PublicKey); err != nil { diff --git a/go.mod b/go.mod index cf99232..cf4dbbb 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,9 @@ module notashelf.dev/ncro go 1.25.7 require ( + github.com/prometheus/client_golang v1.23.2 github.com/vmihailenco/msgpack/v5 v5.4.1 + golang.org/x/sync v0.19.0 gopkg.in/yaml.v3 v3.0.1 modernc.org/sqlite v1.46.1 ) @@ -13,10 +15,10 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/kr/text v0.2.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/ncruces/go-strftime v1.0.0 // indirect - github.com/prometheus/client_golang v1.23.2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect diff --git a/go.sum b/go.sum index 40d1f24..48bd940 100644 --- a/go.sum +++ b/go.sum @@ -2,16 +2,27 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= @@ -30,20 +41,24 @@ github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzM github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY= golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= -golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= -golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= @@ -51,8 +66,9 @@ golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= diff --git a/internal/router/router.go b/internal/router/router.go index 2c6f2f9..6622c4a 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "golang.org/x/sync/singleflight" "notashelf.dev/ncro/internal/cache" "notashelf.dev/ncro/internal/metrics" "notashelf.dev/ncro/internal/narinfo" @@ -37,18 +38,21 @@ type Router struct { prober *prober.Prober routeTTL time.Duration raceTimeout time.Duration + negativeTTL time.Duration client *http.Client mu sync.RWMutex upstreamKeys map[string]string // upstream URL → Nix public key string + group singleflight.Group } // Creates a Router. -func New(db *cache.DB, p *prober.Prober, routeTTL, raceTimeout time.Duration) *Router { +func New(db *cache.DB, p *prober.Prober, routeTTL, raceTimeout, negativeTTL time.Duration) *Router { return &Router{ db: db, prober: p, routeTTL: routeTTL, raceTimeout: raceTimeout, + negativeTTL: negativeTTL, client: &http.Client{Timeout: raceTimeout}, upstreamKeys: make(map[string]string), } @@ -82,7 +86,14 @@ func (r *Router) Resolve(storeHash string, candidates []string) (*Result, error) } } metrics.NarinfoCacheMisses.Inc() - return r.race(storeHash, candidates) + + v, raceErr, _ := r.group.Do(storeHash, func() (interface{}, error) { + return r.race(storeHash, candidates) + }) + if raceErr != nil { + return nil, raceErr + } + return v.(*Result), nil } type raceResult struct { diff --git a/internal/router/router_test.go b/internal/router/router_test.go index d1d1922..75d1d25 100644 --- a/internal/router/router_test.go +++ b/internal/router/router_test.go @@ -6,10 +6,13 @@ import ( "net/http" "net/http/httptest" "os" + "sync" + "sync/atomic" "testing" "time" "notashelf.dev/ncro/internal/cache" + "notashelf.dev/ncro/internal/config" "notashelf.dev/ncro/internal/prober" "notashelf.dev/ncro/internal/router" ) @@ -26,7 +29,7 @@ func newTestRouter(t *testing.T, upstreams ...string) (*router.Router, func()) { for _, u := range upstreams { p.RecordLatency(u, 10) } - r := router.New(db, p, time.Hour, 5*time.Second) + r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute) return r, func() { db.Close() os.Remove(f.Name()) @@ -162,7 +165,7 @@ func TestResolveWithDownUpstream(t *testing.T) { p.RecordFailure(srv.URL) } - r := router.New(db, p, time.Hour, 5*time.Second) + r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute) // Router should still attempt the race (the race uses HEAD, not the prober status) // The upstream is actually healthy (httptest), so the race should succeed. result, err := r.Resolve("somehash", []string{srv.URL}) @@ -173,3 +176,43 @@ func TestResolveWithDownUpstream(t *testing.T) { t.Errorf("url = %q", result.URL) } } + +func TestSingleflightDedup(t *testing.T) { + var headCount, getCount int32 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodHead { + atomic.AddInt32(&headCount, 1) + time.Sleep(30 * time.Millisecond) // ensure goroutines overlap + w.WriteHeader(http.StatusOK) + } else { + atomic.AddInt32(&getCount, 1) + w.Header().Set("Content-Type", "text/x-nix-narinfo") + fmt.Fprintln(w, "StorePath: /nix/store/abc123-test") + } + })) + defer ts.Close() + + db, err := cache.Open(":memory:", 1000) + if err != nil { + t.Fatal(err) + } + defer db.Close() + p := prober.New(0.3) + p.InitUpstreams([]config.UpstreamConfig{{URL: ts.URL}}) + r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute) + + const N = 10 + var wg sync.WaitGroup + for range N { + wg.Add(1) + go func() { + defer wg.Done() + r.Resolve("abc123dedup", []string{ts.URL}) + }() + } + wg.Wait() + + if hc := atomic.LoadInt32(&headCount); hc > 1 { + t.Errorf("upstream HEAD hit %d times for %d concurrent callers; want 1", hc, N) + } +} diff --git a/internal/server/integration_test.go b/internal/server/integration_test.go index 8eba814..13684a5 100644 --- a/internal/server/integration_test.go +++ b/internal/server/integration_test.go @@ -38,7 +38,7 @@ func TestRouteReuseOnSecondRequest(t *testing.T) { p := prober.New(0.3) p.RecordLatency(upstream.URL, 10) - r := router.New(db, p, time.Hour, 5*time.Second) + r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute) ts := httptest.NewServer(server.New(r, p, []config.UpstreamConfig{{URL: upstream.URL}})) defer ts.Close() @@ -78,7 +78,7 @@ func TestUpstreamFailoverFallback(t *testing.T) { p.RecordLatency(bad.URL, 1) // bad appears fastest p.RecordLatency(good.URL, 50) - r := router.New(db, p, time.Hour, 5*time.Second) + r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute) ts := httptest.NewServer(server.New(r, p, []config.UpstreamConfig{ {URL: bad.URL}, {URL: good.URL}, diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 1bfceaf..8e12311 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -39,7 +39,7 @@ func makeTestServer(t *testing.T, upstreams ...string) *httptest.Server { upsCfg[i] = config.UpstreamConfig{URL: u} } - r := router.New(db, p, time.Hour, 5*time.Second) + r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute) return httptest.NewServer(server.New(r, p, upsCfg)) } @@ -311,7 +311,7 @@ func TestNARFallbackWhenFirstUpstreamMissing(t *testing.T) { p.RecordLatency(hasIt.URL, 50) upsCfg := []config.UpstreamConfig{{URL: missing.URL}, {URL: hasIt.URL}} - r := router.New(db, p, time.Hour, 5*time.Second) + r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute) ts := httptest.NewServer(server.New(r, p, upsCfg)) defer ts.Close() diff --git a/nix/package.nix b/nix/package.nix index 1070850..860a9ed 100644 --- a/nix/package.nix +++ b/nix/package.nix @@ -20,7 +20,7 @@ buildGoModule { ]; }; - vendorHash = "sha256-rjgb/iSz3+GPu8lNIhlTCwC/9uuuSh/PJv9GxvL7gJE="; + vendorHash = "sha256-y4NwCPZTVaWFUzBW4Roo47pi+E0KnU/5kqnMB1rmyy8="; ldflags = ["-s" "-w"]; }