router: fix silent error in race; add sentinel errors; populate NarHash/NarSize
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I9fa56ed75c609f557ab601883ca899a16a6a6964
This commit is contained in:
parent
efca8dff6c
commit
35b8fb2672
1 changed files with 77 additions and 7 deletions
|
|
@ -1,21 +1,34 @@
|
|||
package router
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"notashelf.dev/ncro/internal/cache"
|
||||
"notashelf.dev/ncro/internal/metrics"
|
||||
"notashelf.dev/ncro/internal/narinfo"
|
||||
"notashelf.dev/ncro/internal/prober"
|
||||
)
|
||||
|
||||
// Returned when all upstreams were reached but none had the path.
|
||||
var ErrNotFound = errors.New("not found in any upstream")
|
||||
|
||||
// Returned when all upstreams failed with network errors.
|
||||
var ErrUpstreamUnavailable = errors.New("all upstreams unavailable")
|
||||
|
||||
// Result of a Resolve call.
|
||||
type Result struct {
|
||||
URL string
|
||||
LatencyMs float64
|
||||
CacheHit bool
|
||||
URL string
|
||||
LatencyMs float64
|
||||
CacheHit bool
|
||||
NarInfoBytes []byte // raw narinfo response on cache miss; nil on cache hit
|
||||
}
|
||||
|
||||
// Resolves store paths to the best upstream via cache lookup or parallel racing.
|
||||
|
|
@ -45,6 +58,7 @@ func (r *Router) Resolve(storeHash string, candidates []string) (*Result, error)
|
|||
if err == nil && entry != nil && entry.IsValid() {
|
||||
h := r.prober.GetHealth(entry.UpstreamURL)
|
||||
if h == nil || h.Status == prober.StatusActive {
|
||||
metrics.NarinfoCacheHits.Inc()
|
||||
return &Result{
|
||||
URL: entry.UpstreamURL,
|
||||
LatencyMs: entry.LatencyEMA,
|
||||
|
|
@ -52,6 +66,7 @@ func (r *Router) Resolve(storeHash string, candidates []string) (*Result, error)
|
|||
}, nil
|
||||
}
|
||||
}
|
||||
metrics.NarinfoCacheMisses.Inc()
|
||||
return r.race(storeHash, candidates)
|
||||
}
|
||||
|
||||
|
|
@ -69,21 +84,39 @@ func (r *Router) race(storeHash string, candidates []string) (*Result, error) {
|
|||
defer cancel()
|
||||
|
||||
ch := make(chan raceResult, len(candidates))
|
||||
var wg sync.WaitGroup
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
mu sync.Mutex
|
||||
netErrs int
|
||||
notFounds int
|
||||
)
|
||||
|
||||
for _, u := range candidates {
|
||||
wg.Add(1)
|
||||
go func(upstream string) {
|
||||
defer wg.Done()
|
||||
start := time.Now()
|
||||
req, _ := http.NewRequestWithContext(ctx, http.MethodHead,
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodHead,
|
||||
upstream+"/"+storeHash+".narinfo", nil)
|
||||
if err != nil {
|
||||
slog.Warn("bad upstream URL in race", "upstream", upstream, "error", err)
|
||||
mu.Lock()
|
||||
netErrs++
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
resp, err := r.client.Do(req)
|
||||
if err != nil {
|
||||
mu.Lock()
|
||||
netErrs++
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode != 200 {
|
||||
mu.Lock()
|
||||
notFounds++
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
ms := float64(time.Since(start).Nanoseconds()) / 1e6
|
||||
|
|
@ -101,7 +134,13 @@ func (r *Router) race(storeHash string, candidates []string) (*Result, error) {
|
|||
|
||||
winner, ok := <-ch
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("all upstreams failed for %q", storeHash)
|
||||
mu.Lock()
|
||||
ne, nf := netErrs, notFounds
|
||||
mu.Unlock()
|
||||
if ne > 0 && nf == 0 {
|
||||
return nil, ErrUpstreamUnavailable
|
||||
}
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
cancel()
|
||||
|
||||
|
|
@ -111,6 +150,12 @@ func (r *Router) race(storeHash string, candidates []string) (*Result, error) {
|
|||
}
|
||||
}
|
||||
|
||||
metrics.UpstreamRaceWins.WithLabelValues(winner.url).Inc()
|
||||
metrics.UpstreamLatency.WithLabelValues(winner.url).Observe(winner.latencyMs / 1000)
|
||||
|
||||
// Fetch narinfo body to parse metadata and forward to caller.
|
||||
narInfoBytes, narHash, narSize := r.fetchNarInfo(winner.url, storeHash)
|
||||
|
||||
health := r.prober.GetHealth(winner.url)
|
||||
ema := winner.latencyMs
|
||||
if health != nil {
|
||||
|
|
@ -127,7 +172,32 @@ func (r *Router) race(storeHash string, candidates []string) (*Result, error) {
|
|||
LastVerified: now,
|
||||
QueryCount: 1,
|
||||
TTL: now.Add(r.routeTTL),
|
||||
NarHash: narHash,
|
||||
NarSize: narSize,
|
||||
})
|
||||
|
||||
return &Result{URL: winner.url, LatencyMs: winner.latencyMs}, nil
|
||||
return &Result{URL: winner.url, LatencyMs: winner.latencyMs, NarInfoBytes: narInfoBytes}, nil
|
||||
}
|
||||
|
||||
// Fetches narinfo content from upstream and parses metadata.
|
||||
// Returns (body, narHash, narSize); body may be non-nil even on parse error.
|
||||
func (r *Router) fetchNarInfo(upstream, storeHash string) ([]byte, string, uint64) {
|
||||
url := upstream + "/" + storeHash + ".narinfo"
|
||||
resp, err := r.client.Get(url)
|
||||
if err != nil {
|
||||
return nil, "", 0
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != 200 {
|
||||
return nil, "", 0
|
||||
}
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, "", 0
|
||||
}
|
||||
ni, err := narinfo.Parse(bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return body, "", 0
|
||||
}
|
||||
return body, ni.NarHash, ni.NarSize
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue