diff --git a/internal/prober/prober.go b/internal/prober/prober.go index d88c7a7..62ecb4a 100644 --- a/internal/prober/prober.go +++ b/internal/prober/prober.go @@ -112,7 +112,10 @@ func (p *Prober) SetHealthPersistence(fn func(url string, ema float64, consecuti func (p *Prober) RecordLatency(url string, ms float64) { p.mu.Lock() defer p.mu.Unlock() - h := p.getOrCreate(url) + h, ok := p.table[url] + if !ok { + return + } if h.TotalQueries == 0 { h.EMALatency = ms } else { @@ -133,7 +136,10 @@ func (p *Prober) RecordLatency(url string, ms float64) { func (p *Prober) RecordFailure(url string) { p.mu.Lock() defer p.mu.Unlock() - h := p.getOrCreate(url) + h, ok := p.table[url] + if !ok { + return + } h.ConsecutiveFails++ switch { case h.ConsecutiveFails >= 10: @@ -190,6 +196,17 @@ func (p *Prober) SortedByLatency() []*UpstreamHealth { // Performs a HEAD /nix-cache-info against url and updates health. func (p *Prober) ProbeUpstream(url string) { + // Skip if URL is not in table. This prevents in-flight probes from + // resurrecting removed upstreams (race: RemoveUpstream called while + // ProbeUpstream is in flight). + p.mu.Lock() + _, exists := p.table[url] + p.mu.Unlock() + if !exists { + // URL was removed or never added; do not resurrect. + return + } + start := time.Now() resp, err := p.client.Head(url + "/nix-cache-info") elapsed := float64(time.Since(start).Nanoseconds()) / 1e6 @@ -234,7 +251,7 @@ func (p *Prober) getOrCreate(url string) *UpstreamHealth { } // Adds a new upstream dynamically (e.g., discovered via mDNS). -// Thread-safe. Logs the addition and begins probing. +// Thread-safe. Triggers an immediate probe in the background. func (p *Prober) AddUpstream(url string, priority int) { p.mu.Lock() defer p.mu.Unlock()