From 56e1935eade4062e9a9aaa82ae614a36bb49f73c Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sat, 9 May 2026 13:24:46 +0300 Subject: [PATCH] internal: various cleanup Signed-off-by: NotAShelf Change-Id: I48f7eac2e99d1d2af02fcf237401be096a6a6964 --- internal/cache/db.go | 23 ++++++++--------------- internal/mesh/gossip.go | 8 +++++--- internal/mesh/mesh.go | 7 ------- internal/prober/prober.go | 11 +++-------- internal/router/router.go | 4 ++-- internal/server/server.go | 28 +++++++++++++++------------- 6 files changed, 33 insertions(+), 48 deletions(-) diff --git a/internal/cache/db.go b/internal/cache/db.go index e4fce3c..3932bbc 100644 --- a/internal/cache/db.go +++ b/internal/cache/db.go @@ -248,12 +248,12 @@ func (d *DB) SetNegative(storePath string, ttl time.Duration) error { // Returns true if a non-expired negative entry exists for storePath. func (d *DB) IsNegative(storePath string) (bool, error) { - var count int + var exists bool err := d.db.QueryRow( - `SELECT COUNT(*) FROM negative_cache WHERE store_path = ? AND expires_at > ?`, + `SELECT EXISTS(SELECT 1 FROM negative_cache WHERE store_path = ? AND expires_at > ?)`, storePath, time.Now().Unix(), - ).Scan(&count) - return count > 0, err + ).Scan(&exists) + return exists, err } // Deletes expired negative cache entries. @@ -304,17 +304,10 @@ func (d *DB) LoadAllHealth() ([]HealthRow, error) { // Deletes the oldest routes (by last_verified) when over capacity. func (d *DB) evictIfNeeded() error { - count, err := d.RouteCount() - if err != nil { - return err - } - if count <= d.maxEntries { - return nil - } - excess := count - d.maxEntries - _, err = d.db.Exec(` + _, err := d.db.Exec(` DELETE FROM routes WHERE store_path IN ( - SELECT store_path FROM routes ORDER BY last_verified ASC LIMIT ? - )`, excess) + SELECT store_path FROM routes ORDER BY last_verified ASC + LIMIT MAX(0, (SELECT COUNT(*) FROM routes) - ?) + )`, d.maxEntries) return err } diff --git a/internal/mesh/gossip.go b/internal/mesh/gossip.go index 9c0aac3..a5f10f6 100644 --- a/internal/mesh/gossip.go +++ b/internal/mesh/gossip.go @@ -140,9 +140,11 @@ func RunGossipLoop(node *Node, src RouteSource, peers []string, interval time.Du continue } for _, peer := range peers { - if err := Announce(peer, node, routes); err != nil { - slog.Warn("mesh: announce failed", "peer", peer, "error", err) - } + go func(p string) { + if err := Announce(p, node, routes); err != nil { + slog.Warn("mesh: announce failed", "peer", p, "error", err) + } + }(peer) } slog.Debug("mesh: announced routes to peers", "routes", len(routes), "peers", len(peers)) } diff --git a/internal/mesh/mesh.go b/internal/mesh/mesh.go index 4de5f52..ee3e04f 100644 --- a/internal/mesh/mesh.go +++ b/internal/mesh/mesh.go @@ -150,10 +150,3 @@ func (rs *RouteStore) Top(n int) []cache.RouteEntry { } return result } - -func min(a, b int) int { - if a < b { - return a - } - return b -} diff --git a/internal/prober/prober.go b/internal/prober/prober.go index 62ecb4a..3bf8dd8 100644 --- a/internal/prober/prober.go +++ b/internal/prober/prober.go @@ -141,12 +141,7 @@ func (p *Prober) RecordFailure(url string) { return } h.ConsecutiveFails++ - switch { - case h.ConsecutiveFails >= 10: - h.Status = StatusDown - case h.ConsecutiveFails >= 3: - h.Status = StatusDegraded - } + h.Status = computeStatus(h.ConsecutiveFails) if p.persistHealth != nil { u, ema, cf, tq := h.URL, h.EMALatency, h.ConsecutiveFails, h.TotalQueries fn := p.persistHealth @@ -199,9 +194,9 @@ func (p *Prober) ProbeUpstream(url string) { // Skip if URL is not in table. This prevents in-flight probes from // resurrecting removed upstreams (race: RemoveUpstream called while // ProbeUpstream is in flight). - p.mu.Lock() + p.mu.RLock() _, exists := p.table[url] - p.mu.Unlock() + p.mu.RUnlock() if !exists { // URL was removed or never added; do not resurrect. return diff --git a/internal/router/router.go b/internal/router/router.go index 3678d6e..872e470 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -152,7 +152,7 @@ func (r *Router) race(storeHash string, candidates []string) (*Result, error) { return } resp.Body.Close() - if resp.StatusCode != 200 { + if resp.StatusCode != http.StatusOK { mu.Lock() notFounds++ mu.Unlock() @@ -229,7 +229,7 @@ func (r *Router) fetchNarInfo(upstream, storeHash string) ([]byte, string, strin return nil, "", "", 0 } defer resp.Body.Close() - if resp.StatusCode != 200 { + if resp.StatusCode != http.StatusOK { return nil, "", "", 0 } body, err := io.ReadAll(resp.Body) diff --git a/internal/server/server.go b/internal/server/server.go index db88b61..8679f17 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -20,23 +20,25 @@ import ( // HTTP handler implementing the Nix binary cache protocol. type Server struct { - router *router.Router - prober *prober.Prober - db *cache.DB - upstreams []config.UpstreamConfig - client *http.Client - cachePriority int + router *router.Router + prober *prober.Prober + db *cache.DB + upstreams []config.UpstreamConfig + client *http.Client + cachePriority int + metricsHandler http.Handler } // Creates a Server. func New(r *router.Router, p *prober.Prober, db *cache.DB, upstreams []config.UpstreamConfig, cachePriority int) *Server { return &Server{ - router: r, - prober: p, - db: db, - upstreams: upstreams, - client: &http.Client{Timeout: 60 * time.Second}, - cachePriority: cachePriority, + router: r, + prober: p, + db: db, + upstreams: upstreams, + client: &http.Client{Timeout: 60 * time.Second}, + cachePriority: cachePriority, + metricsHandler: promhttp.Handler(), } } @@ -48,7 +50,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { case path == "/health": s.handleHealth(w, r) case path == "/metrics": - promhttp.Handler().ServeHTTP(w, r) + s.metricsHandler.ServeHTTP(w, r) case strings.HasSuffix(path, ".narinfo"): s.handleNarinfo(w, r) case strings.HasPrefix(path, "/nar/"):