router: add singleflight deduplication for concurrent narinfo races

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Ib682889f34ad4ad4fb331ee2924dc9916a6a6964
This commit is contained in:
raf 2026-03-06 19:44:24 +03:00
commit 41b18dd1f8
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
8 changed files with 90 additions and 18 deletions

View file

@ -11,6 +11,7 @@ import (
"sync"
"time"
"golang.org/x/sync/singleflight"
"notashelf.dev/ncro/internal/cache"
"notashelf.dev/ncro/internal/metrics"
"notashelf.dev/ncro/internal/narinfo"
@ -37,18 +38,21 @@ type Router struct {
prober *prober.Prober
routeTTL time.Duration
raceTimeout time.Duration
negativeTTL time.Duration
client *http.Client
mu sync.RWMutex
upstreamKeys map[string]string // upstream URL → Nix public key string
group singleflight.Group
}
// Creates a Router.
func New(db *cache.DB, p *prober.Prober, routeTTL, raceTimeout time.Duration) *Router {
func New(db *cache.DB, p *prober.Prober, routeTTL, raceTimeout, negativeTTL time.Duration) *Router {
return &Router{
db: db,
prober: p,
routeTTL: routeTTL,
raceTimeout: raceTimeout,
negativeTTL: negativeTTL,
client: &http.Client{Timeout: raceTimeout},
upstreamKeys: make(map[string]string),
}
@ -82,7 +86,14 @@ func (r *Router) Resolve(storeHash string, candidates []string) (*Result, error)
}
}
metrics.NarinfoCacheMisses.Inc()
return r.race(storeHash, candidates)
v, raceErr, _ := r.group.Do(storeHash, func() (interface{}, error) {
return r.race(storeHash, candidates)
})
if raceErr != nil {
return nil, raceErr
}
return v.(*Result), nil
}
type raceResult struct {

View file

@ -6,10 +6,13 @@ import (
"net/http"
"net/http/httptest"
"os"
"sync"
"sync/atomic"
"testing"
"time"
"notashelf.dev/ncro/internal/cache"
"notashelf.dev/ncro/internal/config"
"notashelf.dev/ncro/internal/prober"
"notashelf.dev/ncro/internal/router"
)
@ -26,7 +29,7 @@ func newTestRouter(t *testing.T, upstreams ...string) (*router.Router, func()) {
for _, u := range upstreams {
p.RecordLatency(u, 10)
}
r := router.New(db, p, time.Hour, 5*time.Second)
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
return r, func() {
db.Close()
os.Remove(f.Name())
@ -162,7 +165,7 @@ func TestResolveWithDownUpstream(t *testing.T) {
p.RecordFailure(srv.URL)
}
r := router.New(db, p, time.Hour, 5*time.Second)
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
// Router should still attempt the race (the race uses HEAD, not the prober status)
// The upstream is actually healthy (httptest), so the race should succeed.
result, err := r.Resolve("somehash", []string{srv.URL})
@ -173,3 +176,43 @@ func TestResolveWithDownUpstream(t *testing.T) {
t.Errorf("url = %q", result.URL)
}
}
func TestSingleflightDedup(t *testing.T) {
var headCount, getCount int32
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodHead {
atomic.AddInt32(&headCount, 1)
time.Sleep(30 * time.Millisecond) // ensure goroutines overlap
w.WriteHeader(http.StatusOK)
} else {
atomic.AddInt32(&getCount, 1)
w.Header().Set("Content-Type", "text/x-nix-narinfo")
fmt.Fprintln(w, "StorePath: /nix/store/abc123-test")
}
}))
defer ts.Close()
db, err := cache.Open(":memory:", 1000)
if err != nil {
t.Fatal(err)
}
defer db.Close()
p := prober.New(0.3)
p.InitUpstreams([]config.UpstreamConfig{{URL: ts.URL}})
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
const N = 10
var wg sync.WaitGroup
for range N {
wg.Add(1)
go func() {
defer wg.Done()
r.Resolve("abc123dedup", []string{ts.URL})
}()
}
wg.Wait()
if hc := atomic.LoadInt32(&headCount); hc > 1 {
t.Errorf("upstream HEAD hit %d times for %d concurrent callers; want 1", hc, N)
}
}

View file

@ -38,7 +38,7 @@ func TestRouteReuseOnSecondRequest(t *testing.T) {
p := prober.New(0.3)
p.RecordLatency(upstream.URL, 10)
r := router.New(db, p, time.Hour, 5*time.Second)
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
ts := httptest.NewServer(server.New(r, p, []config.UpstreamConfig{{URL: upstream.URL}}))
defer ts.Close()
@ -78,7 +78,7 @@ func TestUpstreamFailoverFallback(t *testing.T) {
p.RecordLatency(bad.URL, 1) // bad appears fastest
p.RecordLatency(good.URL, 50)
r := router.New(db, p, time.Hour, 5*time.Second)
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
ts := httptest.NewServer(server.New(r, p, []config.UpstreamConfig{
{URL: bad.URL},
{URL: good.URL},

View file

@ -39,7 +39,7 @@ func makeTestServer(t *testing.T, upstreams ...string) *httptest.Server {
upsCfg[i] = config.UpstreamConfig{URL: u}
}
r := router.New(db, p, time.Hour, 5*time.Second)
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
return httptest.NewServer(server.New(r, p, upsCfg))
}
@ -311,7 +311,7 @@ func TestNARFallbackWhenFirstUpstreamMissing(t *testing.T) {
p.RecordLatency(hasIt.URL, 50)
upsCfg := []config.UpstreamConfig{{URL: missing.URL}, {URL: hasIt.URL}}
r := router.New(db, p, time.Hour, 5*time.Second)
r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute)
ts := httptest.NewServer(server.New(r, p, upsCfg))
defer ts.Close()