From 35b8fb26728aad6f6ac3739aef64c4391351ea52 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Fri, 6 Mar 2026 17:38:25 +0300 Subject: [PATCH] router: fix silent error in race; add sentinel errors; populate NarHash/NarSize Signed-off-by: NotAShelf Change-Id: I9fa56ed75c609f557ab601883ca899a16a6a6964 --- internal/router/router.go | 84 +++++++++++++++++++++++++++++++++++---- 1 file changed, 77 insertions(+), 7 deletions(-) diff --git a/internal/router/router.go b/internal/router/router.go index 4825637..ca3e2ee 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -1,21 +1,34 @@ package router import ( + "bytes" "context" + "errors" "fmt" + "io" + "log/slog" "net/http" "sync" "time" "notashelf.dev/ncro/internal/cache" + "notashelf.dev/ncro/internal/metrics" + "notashelf.dev/ncro/internal/narinfo" "notashelf.dev/ncro/internal/prober" ) +// Returned when all upstreams were reached but none had the path. +var ErrNotFound = errors.New("not found in any upstream") + +// Returned when all upstreams failed with network errors. +var ErrUpstreamUnavailable = errors.New("all upstreams unavailable") + // Result of a Resolve call. type Result struct { - URL string - LatencyMs float64 - CacheHit bool + URL string + LatencyMs float64 + CacheHit bool + NarInfoBytes []byte // raw narinfo response on cache miss; nil on cache hit } // Resolves store paths to the best upstream via cache lookup or parallel racing. @@ -45,6 +58,7 @@ func (r *Router) Resolve(storeHash string, candidates []string) (*Result, error) if err == nil && entry != nil && entry.IsValid() { h := r.prober.GetHealth(entry.UpstreamURL) if h == nil || h.Status == prober.StatusActive { + metrics.NarinfoCacheHits.Inc() return &Result{ URL: entry.UpstreamURL, LatencyMs: entry.LatencyEMA, @@ -52,6 +66,7 @@ func (r *Router) Resolve(storeHash string, candidates []string) (*Result, error) }, nil } } + metrics.NarinfoCacheMisses.Inc() return r.race(storeHash, candidates) } @@ -69,21 +84,39 @@ func (r *Router) race(storeHash string, candidates []string) (*Result, error) { defer cancel() ch := make(chan raceResult, len(candidates)) - var wg sync.WaitGroup + var ( + wg sync.WaitGroup + mu sync.Mutex + netErrs int + notFounds int + ) for _, u := range candidates { wg.Add(1) go func(upstream string) { defer wg.Done() start := time.Now() - req, _ := http.NewRequestWithContext(ctx, http.MethodHead, + req, err := http.NewRequestWithContext(ctx, http.MethodHead, upstream+"/"+storeHash+".narinfo", nil) + if err != nil { + slog.Warn("bad upstream URL in race", "upstream", upstream, "error", err) + mu.Lock() + netErrs++ + mu.Unlock() + return + } resp, err := r.client.Do(req) if err != nil { + mu.Lock() + netErrs++ + mu.Unlock() return } resp.Body.Close() if resp.StatusCode != 200 { + mu.Lock() + notFounds++ + mu.Unlock() return } ms := float64(time.Since(start).Nanoseconds()) / 1e6 @@ -101,7 +134,13 @@ func (r *Router) race(storeHash string, candidates []string) (*Result, error) { winner, ok := <-ch if !ok { - return nil, fmt.Errorf("all upstreams failed for %q", storeHash) + mu.Lock() + ne, nf := netErrs, notFounds + mu.Unlock() + if ne > 0 && nf == 0 { + return nil, ErrUpstreamUnavailable + } + return nil, ErrNotFound } cancel() @@ -111,6 +150,12 @@ func (r *Router) race(storeHash string, candidates []string) (*Result, error) { } } + metrics.UpstreamRaceWins.WithLabelValues(winner.url).Inc() + metrics.UpstreamLatency.WithLabelValues(winner.url).Observe(winner.latencyMs / 1000) + + // Fetch narinfo body to parse metadata and forward to caller. + narInfoBytes, narHash, narSize := r.fetchNarInfo(winner.url, storeHash) + health := r.prober.GetHealth(winner.url) ema := winner.latencyMs if health != nil { @@ -127,7 +172,32 @@ func (r *Router) race(storeHash string, candidates []string) (*Result, error) { LastVerified: now, QueryCount: 1, TTL: now.Add(r.routeTTL), + NarHash: narHash, + NarSize: narSize, }) - return &Result{URL: winner.url, LatencyMs: winner.latencyMs}, nil + return &Result{URL: winner.url, LatencyMs: winner.latencyMs, NarInfoBytes: narInfoBytes}, nil +} + +// Fetches narinfo content from upstream and parses metadata. +// Returns (body, narHash, narSize); body may be non-nil even on parse error. +func (r *Router) fetchNarInfo(upstream, storeHash string) ([]byte, string, uint64) { + url := upstream + "/" + storeHash + ".narinfo" + resp, err := r.client.Get(url) + if err != nil { + return nil, "", 0 + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + return nil, "", 0 + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, "", 0 + } + ni, err := narinfo.Parse(bytes.NewReader(body)) + if err != nil { + return body, "", 0 + } + return body, ni.NarHash, ni.NarSize }