internal: various cleanup
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I48f7eac2e99d1d2af02fcf237401be096a6a6964
This commit is contained in:
parent
0ca9d5b615
commit
56e1935ead
6 changed files with 33 additions and 48 deletions
23
internal/cache/db.go
vendored
23
internal/cache/db.go
vendored
|
|
@ -248,12 +248,12 @@ func (d *DB) SetNegative(storePath string, ttl time.Duration) error {
|
|||
|
||||
// Returns true if a non-expired negative entry exists for storePath.
|
||||
func (d *DB) IsNegative(storePath string) (bool, error) {
|
||||
var count int
|
||||
var exists bool
|
||||
err := d.db.QueryRow(
|
||||
`SELECT COUNT(*) FROM negative_cache WHERE store_path = ? AND expires_at > ?`,
|
||||
`SELECT EXISTS(SELECT 1 FROM negative_cache WHERE store_path = ? AND expires_at > ?)`,
|
||||
storePath, time.Now().Unix(),
|
||||
).Scan(&count)
|
||||
return count > 0, err
|
||||
).Scan(&exists)
|
||||
return exists, err
|
||||
}
|
||||
|
||||
// Deletes expired negative cache entries.
|
||||
|
|
@ -304,17 +304,10 @@ func (d *DB) LoadAllHealth() ([]HealthRow, error) {
|
|||
|
||||
// Deletes the oldest routes (by last_verified) when over capacity.
|
||||
func (d *DB) evictIfNeeded() error {
|
||||
count, err := d.RouteCount()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if count <= d.maxEntries {
|
||||
return nil
|
||||
}
|
||||
excess := count - d.maxEntries
|
||||
_, err = d.db.Exec(`
|
||||
_, err := d.db.Exec(`
|
||||
DELETE FROM routes WHERE store_path IN (
|
||||
SELECT store_path FROM routes ORDER BY last_verified ASC LIMIT ?
|
||||
)`, excess)
|
||||
SELECT store_path FROM routes ORDER BY last_verified ASC
|
||||
LIMIT MAX(0, (SELECT COUNT(*) FROM routes) - ?)
|
||||
)`, d.maxEntries)
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -140,9 +140,11 @@ func RunGossipLoop(node *Node, src RouteSource, peers []string, interval time.Du
|
|||
continue
|
||||
}
|
||||
for _, peer := range peers {
|
||||
if err := Announce(peer, node, routes); err != nil {
|
||||
slog.Warn("mesh: announce failed", "peer", peer, "error", err)
|
||||
}
|
||||
go func(p string) {
|
||||
if err := Announce(p, node, routes); err != nil {
|
||||
slog.Warn("mesh: announce failed", "peer", p, "error", err)
|
||||
}
|
||||
}(peer)
|
||||
}
|
||||
slog.Debug("mesh: announced routes to peers", "routes", len(routes), "peers", len(peers))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -150,10 +150,3 @@ func (rs *RouteStore) Top(n int) []cache.RouteEntry {
|
|||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func min(a, b int) int {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
|
|
|||
|
|
@ -141,12 +141,7 @@ func (p *Prober) RecordFailure(url string) {
|
|||
return
|
||||
}
|
||||
h.ConsecutiveFails++
|
||||
switch {
|
||||
case h.ConsecutiveFails >= 10:
|
||||
h.Status = StatusDown
|
||||
case h.ConsecutiveFails >= 3:
|
||||
h.Status = StatusDegraded
|
||||
}
|
||||
h.Status = computeStatus(h.ConsecutiveFails)
|
||||
if p.persistHealth != nil {
|
||||
u, ema, cf, tq := h.URL, h.EMALatency, h.ConsecutiveFails, h.TotalQueries
|
||||
fn := p.persistHealth
|
||||
|
|
@ -199,9 +194,9 @@ func (p *Prober) ProbeUpstream(url string) {
|
|||
// Skip if URL is not in table. This prevents in-flight probes from
|
||||
// resurrecting removed upstreams (race: RemoveUpstream called while
|
||||
// ProbeUpstream is in flight).
|
||||
p.mu.Lock()
|
||||
p.mu.RLock()
|
||||
_, exists := p.table[url]
|
||||
p.mu.Unlock()
|
||||
p.mu.RUnlock()
|
||||
if !exists {
|
||||
// URL was removed or never added; do not resurrect.
|
||||
return
|
||||
|
|
|
|||
|
|
@ -152,7 +152,7 @@ func (r *Router) race(storeHash string, candidates []string) (*Result, error) {
|
|||
return
|
||||
}
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode != 200 {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
mu.Lock()
|
||||
notFounds++
|
||||
mu.Unlock()
|
||||
|
|
@ -229,7 +229,7 @@ func (r *Router) fetchNarInfo(upstream, storeHash string) ([]byte, string, strin
|
|||
return nil, "", "", 0
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != 200 {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, "", "", 0
|
||||
}
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
|
|
|
|||
|
|
@ -20,23 +20,25 @@ import (
|
|||
|
||||
// HTTP handler implementing the Nix binary cache protocol.
|
||||
type Server struct {
|
||||
router *router.Router
|
||||
prober *prober.Prober
|
||||
db *cache.DB
|
||||
upstreams []config.UpstreamConfig
|
||||
client *http.Client
|
||||
cachePriority int
|
||||
router *router.Router
|
||||
prober *prober.Prober
|
||||
db *cache.DB
|
||||
upstreams []config.UpstreamConfig
|
||||
client *http.Client
|
||||
cachePriority int
|
||||
metricsHandler http.Handler
|
||||
}
|
||||
|
||||
// Creates a Server.
|
||||
func New(r *router.Router, p *prober.Prober, db *cache.DB, upstreams []config.UpstreamConfig, cachePriority int) *Server {
|
||||
return &Server{
|
||||
router: r,
|
||||
prober: p,
|
||||
db: db,
|
||||
upstreams: upstreams,
|
||||
client: &http.Client{Timeout: 60 * time.Second},
|
||||
cachePriority: cachePriority,
|
||||
router: r,
|
||||
prober: p,
|
||||
db: db,
|
||||
upstreams: upstreams,
|
||||
client: &http.Client{Timeout: 60 * time.Second},
|
||||
cachePriority: cachePriority,
|
||||
metricsHandler: promhttp.Handler(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -48,7 +50,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
case path == "/health":
|
||||
s.handleHealth(w, r)
|
||||
case path == "/metrics":
|
||||
promhttp.Handler().ServeHTTP(w, r)
|
||||
s.metricsHandler.ServeHTTP(w, r)
|
||||
case strings.HasSuffix(path, ".narinfo"):
|
||||
s.handleNarinfo(w, r)
|
||||
case strings.HasPrefix(path, "/nar/"):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue