diff --git a/cmd/ncro/main.go b/cmd/ncro/main.go index 2dae8b0..f96c0a0 100644 --- a/cmd/ncro/main.go +++ b/cmd/ncro/main.go @@ -74,6 +74,9 @@ func main() { if err := db.ExpireOldRoutes(); err != nil { slog.Warn("expire routes error", "error", err) } + if err := db.ExpireNegatives(); err != nil { + slog.Warn("expire negatives error", "error", err) + } if count, err := db.RouteCount(); err == nil { metrics.RouteEntries.Set(float64(count)) } @@ -90,7 +93,7 @@ func main() { probeDone := make(chan struct{}) go p.RunProbeLoop(30*time.Second, probeDone) - r := router.New(db, p, cfg.Cache.TTL.Duration, 5*time.Second, 10*time.Minute) + r := router.New(db, p, cfg.Cache.TTL.Duration, 5*time.Second, cfg.Cache.NegativeTTL.Duration) for _, u := range cfg.Upstreams { if u.PublicKey != "" { if err := r.SetUpstreamKey(u.URL, u.PublicKey); err != nil { diff --git a/internal/cache/db.go b/internal/cache/db.go index 6e6b64a..7c793a7 100644 --- a/internal/cache/db.go +++ b/internal/cache/db.go @@ -80,6 +80,11 @@ func migrate(db *sql.DB) error { total_queries INTEGER DEFAULT 0, success_rate REAL DEFAULT 1.0 ); + CREATE TABLE IF NOT EXISTS negative_cache ( + store_path TEXT PRIMARY KEY, + expires_at INTEGER NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_negative_expires ON negative_cache(expires_at); `) return err } @@ -178,6 +183,32 @@ func (d *DB) RouteCount() (int, error) { return count, err } +// Records a negative cache entry for storePath with the given TTL. +func (d *DB) SetNegative(storePath string, ttl time.Duration) error { + _, err := d.db.Exec( + `INSERT INTO negative_cache (store_path, expires_at) VALUES (?, ?) + ON CONFLICT(store_path) DO UPDATE SET expires_at = excluded.expires_at`, + storePath, time.Now().Add(ttl).Unix(), + ) + return err +} + +// Returns true if a non-expired negative entry exists for storePath. +func (d *DB) IsNegative(storePath string) (bool, error) { + var count int + err := d.db.QueryRow( + `SELECT COUNT(*) FROM negative_cache WHERE store_path = ? AND expires_at > ?`, + storePath, time.Now().Unix(), + ).Scan(&count) + return count > 0, err +} + +// Deletes expired negative cache entries. +func (d *DB) ExpireNegatives() error { + _, err := d.db.Exec(`DELETE FROM negative_cache WHERE expires_at < ?`, time.Now().Unix()) + return err +} + // Deletes the oldest routes (by last_verified) when over capacity. func (d *DB) evictIfNeeded() error { count, err := d.RouteCount() diff --git a/internal/cache/db_test.go b/internal/cache/db_test.go index 004b1db..4a9278e 100644 --- a/internal/cache/db_test.go +++ b/internal/cache/db_test.go @@ -180,6 +180,55 @@ func TestRouteCountAfterExpiry(t *testing.T) { } } +func TestNegativeCacheSetAndCheck(t *testing.T) { + db, err := cache.Open(":memory:", 100) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + neg, err := db.IsNegative("missing-path") + if err != nil { + t.Fatalf("IsNegative: %v", err) + } + if neg { + t.Error("expected false for unknown path") + } + + if err := db.SetNegative("missing-path", 10*time.Minute); err != nil { + t.Fatalf("SetNegative: %v", err) + } + + neg, err = db.IsNegative("missing-path") + if err != nil { + t.Fatalf("IsNegative after set: %v", err) + } + if !neg { + t.Error("expected true after SetNegative") + } +} + +func TestNegativeCacheExpiry(t *testing.T) { + db, err := cache.Open(":memory:", 100) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // Set with negative duration so it's already expired. + if err := db.SetNegative("expires-now", -time.Second); err != nil { + t.Fatalf("SetNegative: %v", err) + } + if err := db.ExpireNegatives(); err != nil { + t.Fatalf("ExpireNegatives: %v", err) + } + + neg, _ := db.IsNegative("expires-now") + if neg { + t.Error("expired negative should not be returned") + } +} + func TestLRUEviction(t *testing.T) { // Use maxEntries=3 to trigger eviction easily f, _ := os.CreateTemp("", "ncro-lru-*.db") diff --git a/internal/config/config.go b/internal/config/config.go index 36265a6..bc68fb8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -52,6 +52,7 @@ type CacheConfig struct { DBPath string `yaml:"db_path"` MaxEntries int `yaml:"max_entries"` TTL Duration `yaml:"ttl"` + NegativeTTL Duration `yaml:"negative_ttl"` LatencyAlpha float64 `yaml:"latency_alpha"` } @@ -96,6 +97,7 @@ func defaults() Config { DBPath: "/var/lib/ncro/routes.db", MaxEntries: 100000, TTL: Duration{time.Hour}, + NegativeTTL: Duration{10 * time.Minute}, LatencyAlpha: 0.3, }, Mesh: MeshConfig{ @@ -134,6 +136,9 @@ func (c *Config) Validate() error { if c.Cache.TTL.Duration <= 0 { return fmt.Errorf("cache.ttl must be positive") } + if c.Cache.NegativeTTL.Duration <= 0 { + return fmt.Errorf("cache.negative_ttl must be positive") + } if c.Cache.MaxEntries <= 0 { return fmt.Errorf("cache.max_entries must be positive") } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 162fe4c..10b74fe 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -132,7 +132,7 @@ func TestValidateBadAlpha(t *testing.T) { } } -func TestValidateNegativeTTL(t *testing.T) { +func TestValidateZeroTTL(t *testing.T) { cfg, _ := config.Load("") cfg.Cache.TTL = config.Duration{} if err := cfg.Validate(); err == nil { @@ -140,6 +140,14 @@ func TestValidateNegativeTTL(t *testing.T) { } } +func TestValidateNegativeTTL(t *testing.T) { + cfg, _ := config.Load("") + cfg.Cache.NegativeTTL = config.Duration{} + if err := cfg.Validate(); err == nil { + t.Error("expected error for zero negative_ttl") + } +} + func TestValidateMeshEnabledNoPeers(t *testing.T) { cfg, _ := config.Load("") cfg.Mesh.Enabled = true diff --git a/internal/router/router.go b/internal/router/router.go index 6622c4a..741b2de 100644 --- a/internal/router/router.go +++ b/internal/router/router.go @@ -41,7 +41,7 @@ type Router struct { negativeTTL time.Duration client *http.Client mu sync.RWMutex - upstreamKeys map[string]string // upstream URL → Nix public key string + upstreamKeys map[string]string // upstream URL -> Nix public key string group singleflight.Group } @@ -73,6 +73,12 @@ func (r *Router) SetUpstreamKey(url, pubKeyStr string) error { // Returns the best upstream for the given store hash. // Checks the route cache first; on miss races the provided candidates. func (r *Router) Resolve(storeHash string, candidates []string) (*Result, error) { + // Fast path: negative cache. + if neg, err := r.db.IsNegative(storeHash); err == nil && neg { + return nil, ErrNotFound + } + + // Fast path: route cache hit. entry, err := r.db.GetRoute(storeHash) if err == nil && entry != nil && entry.IsValid() { h := r.prober.GetHealth(entry.UpstreamURL) @@ -88,7 +94,14 @@ func (r *Router) Resolve(storeHash string, candidates []string) (*Result, error) metrics.NarinfoCacheMisses.Inc() v, raceErr, _ := r.group.Do(storeHash, func() (interface{}, error) { - return r.race(storeHash, candidates) + result, err := r.race(storeHash, candidates) + if errors.Is(err, ErrNotFound) { + _ = r.db.SetNegative(storeHash, r.negativeTTL) + } + if err != nil { + return nil, err + } + return result, nil }) if raceErr != nil { return nil, raceErr diff --git a/internal/router/router_test.go b/internal/router/router_test.go index 6d9ad41..4e56140 100644 --- a/internal/router/router_test.go +++ b/internal/router/router_test.go @@ -177,6 +177,40 @@ func TestResolveWithDownUpstream(t *testing.T) { } } +func TestNegativeCaching(t *testing.T) { + var raceCount int32 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&raceCount, 1) + w.WriteHeader(http.StatusNotFound) + })) + defer ts.Close() + + db, err := cache.Open(":memory:", 1000) + if err != nil { + t.Fatal(err) + } + defer db.Close() + p := prober.New(0.3) + p.InitUpstreams([]config.UpstreamConfig{{URL: ts.URL}}) + r := router.New(db, p, time.Hour, 5*time.Second, 10*time.Minute) + + _, err = r.Resolve("not-on-any-upstream", []string{ts.URL}) + if !errors.Is(err, router.ErrNotFound) { + t.Fatalf("first resolve: expected ErrNotFound, got %v", err) + } + count1 := atomic.LoadInt32(&raceCount) + + _, err = r.Resolve("not-on-any-upstream", []string{ts.URL}) + if !errors.Is(err, router.ErrNotFound) { + t.Fatalf("second resolve: expected ErrNotFound, got %v", err) + } + count2 := atomic.LoadInt32(&raceCount) + + if count2 != count1 { + t.Errorf("second resolve hit upstream %d extra times, want 0 (should be negatively cached)", count2-count1) + } +} + func TestSingleflightDedup(t *testing.T) { var headCount int32 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {