From 5fc3b2883b6be07639b078a2ff58db1d6affed54 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Fri, 27 Mar 2026 20:03:37 +0300 Subject: [PATCH] "discovery: mDNS/DNS-SD peer discovery; dynamic upstream management" Allows ncro instances to discover each other dynamically without static configuration and enables a peer-to-peer style (hello funny cube) mesh where nodes share cached builds locally instead of all hitting upstream caches. Signed-off-by: NotAShelf Change-Id: I7d723876c6816cb6aaaf3fe14cb24a426a6a6964 --- cmd/ncro/root.go | 38 ++++++ go.mod | 7 +- go.sum | 30 ++++- internal/config/config.go | 27 ++++ internal/discovery/discovery.go | 216 ++++++++++++++++++++++++++++++++ internal/prober/prober.go | 21 ++++ 6 files changed, 336 insertions(+), 3 deletions(-) create mode 100644 internal/discovery/discovery.go diff --git a/cmd/ncro/root.go b/cmd/ncro/root.go index 563bc25..a1ff37b 100644 --- a/cmd/ncro/root.go +++ b/cmd/ncro/root.go @@ -18,6 +18,7 @@ import ( "github.com/spf13/viper" "notashelf.dev/ncro/internal/cache" "notashelf.dev/ncro/internal/config" + "notashelf.dev/ncro/internal/discovery" "notashelf.dev/ncro/internal/mesh" "notashelf.dev/ncro/internal/metrics" "notashelf.dev/ncro/internal/prober" @@ -132,6 +133,26 @@ func runServer(_ *cobra.Command, _ []string) error { probeDone := make(chan struct{}) go p.RunProbeLoop(30*time.Second, probeDone) + // Setup mDNS discovery if enabled + var discoveryMgr *discovery.Discovery + if cfg.Discovery.Enabled { + discoveryMgr, err = discovery.New(cfg.Discovery, p) + if err != nil { + return fmt.Errorf("create discovery manager: %w", err) + } + discoveryMgr.SetCallbacks( + func(url string, priority int) { + slog.Info("adding discovered upstream", "url", url) + p.AddUpstream(url, priority) + }, + func(url string) { + slog.Info("removing discovered upstream", "url", url) + p.RemoveUpstream(url) + }, + ) + slog.Info("mDNS discovery enabled", "service", cfg.Discovery.ServiceName) + } + r := router.New(db, p, cfg.Cache.TTL.Duration, 5*time.Second, cfg.Cache.NegativeTTL.Duration) for _, u := range cfg.Upstreams { if u.PublicKey != "" { @@ -174,6 +195,22 @@ func runServer(_ *cobra.Command, _ []string) error { slog.Info("mesh enabled", "addr", cfg.Mesh.BindAddr, "peers", len(cfg.Mesh.Peers)) } + // Start mDNS discovery in background + discoveryDone := make(chan struct{}) + if discoveryMgr != nil { + go func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := discoveryMgr.Start(ctx); err != nil { + slog.Error("discovery error", "error", err) + } + }() + go func() { + <-discoveryDone + discoveryMgr.Stop() + }() + } + srv := &http.Server{ Addr: cfg.Server.Listen, Handler: server.New(r, p, db, cfg.Upstreams, cfg.Server.CachePriority), @@ -206,6 +243,7 @@ func runServer(_ *cobra.Command, _ []string) error { if gossipDone != nil { close(gossipDone) } + close(discoveryDone) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/go.mod b/go.mod index 56e4356..ae6a392 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module notashelf.dev/ncro go 1.25.7 require ( + github.com/grandcat/zeroconf v1.0.0 github.com/prometheus/client_golang v1.23.2 github.com/spf13/cobra v1.10.2 github.com/spf13/viper v1.21.0 @@ -14,6 +15,7 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect + github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect @@ -21,6 +23,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/miekg/dns v1.1.27 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/ncruces/go-strftime v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect @@ -37,8 +40,10 @@ require ( github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/crypto v0.48.0 // indirect + golang.org/x/net v0.50.0 // indirect golang.org/x/sys v0.41.0 // indirect - golang.org/x/text v0.32.0 // indirect + golang.org/x/text v0.34.0 // indirect golang.org/x/tools v0.42.0 // indirect google.golang.org/protobuf v1.36.11 // indirect modernc.org/libc v1.69.0 // indirect diff --git a/go.sum b/go.sum index 63744ff..dd594ec 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= +github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= @@ -19,6 +21,8 @@ github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17k github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grandcat/zeroconf v1.0.0 h1:uHhahLBKqwWBV6WZUDAT71044vwOTL+McW0mBJvo6kE= +github.com/grandcat/zeroconf v1.0.0/go.mod h1:lTKmG1zh86XyCoUeIHSA4FJMBwCJiQmGfcP2PdzytEs= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -33,12 +37,16 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/miekg/dns v1.1.27 h1:aEH/kqUzUxGJ/UHcEKdJY+ugH6WEzsEBBSPa8zuy1aM= +github.com/miekg/dns v1.1.27/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= @@ -83,17 +91,35 @@ go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= +golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= -golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= +golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= +golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/config/config.go b/internal/config/config.go index 2fd4c78..dd20ee1 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -71,6 +71,15 @@ type MeshConfig struct { GossipInterval Duration `yaml:"gossip_interval"` } +// Controls mDNS/DNS-SD based dynamic upstream discovery. +type DiscoveryConfig struct { + Enabled bool `yaml:"enabled"` + ServiceName string `yaml:"service_name"` + Domain string `yaml:"domain"` + DiscoveryTime Duration `yaml:"discovery_time"` + Priority int `yaml:"priority"` +} + type LoggingConfig struct { Level string `yaml:"level"` Format string `yaml:"format"` @@ -81,6 +90,7 @@ type Config struct { Upstreams []UpstreamConfig `yaml:"upstreams"` Cache CacheConfig `yaml:"cache"` Mesh MeshConfig `yaml:"mesh"` + Discovery DiscoveryConfig `yaml:"discovery"` Logging LoggingConfig `yaml:"logging"` } @@ -106,6 +116,12 @@ func defaults() Config { BindAddr: "0.0.0.0:7946", GossipInterval: Duration{30 * time.Second}, }, + Discovery: DiscoveryConfig{ + ServiceName: "_nix-serve._tcp", + Domain: "local", + DiscoveryTime: Duration{5 * time.Second}, + Priority: 20, + }, Logging: LoggingConfig{ Level: "info", Format: "json", @@ -161,6 +177,17 @@ func (c *Config) Validate() error { } } } + if c.Discovery.Enabled { + if c.Discovery.ServiceName == "" { + return fmt.Errorf("discovery.service_name is required when discovery is enabled") + } + if c.Discovery.Domain == "" { + return fmt.Errorf("discovery.domain is required when discovery is enabled") + } + if c.Discovery.DiscoveryTime.Duration <= 0 { + return fmt.Errorf("discovery.discovery_time must be positive") + } + } return nil } diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go new file mode 100644 index 0000000..0f04b0c --- /dev/null +++ b/internal/discovery/discovery.go @@ -0,0 +1,216 @@ +package discovery + +import ( + "context" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/grandcat/zeroconf" + "notashelf.dev/ncro/internal/config" + "notashelf.dev/ncro/internal/prober" +) + +// Tracks discovered nix-serve instances and maintains the upstream list. +type Discovery struct { + cfg config.DiscoveryConfig + prober *prober.Prober + resolver *zeroconf.Resolver + discovered map[string]*discoveredPeer + mu sync.RWMutex + stopCh chan struct{} + waitGroup sync.WaitGroup + onAddUpstream func(url string, priority int) + onRemoveUpstream func(url string) +} + +type discoveredPeer struct { + url string + lastSeen time.Time +} + +// Creates a new Discovery manager. +func New(cfg config.DiscoveryConfig, p *prober.Prober) (*Discovery, error) { + resolver, err := zeroconf.NewResolver(nil) + if err != nil { + return nil, fmt.Errorf("create zeroconf resolver: %w", err) + } + + return &Discovery{ + cfg: cfg, + prober: p, + resolver: resolver, + discovered: make(map[string]*discoveredPeer), + stopCh: make(chan struct{}), + }, nil +} + +// Sets callbacks for upstream addition/removal. These are invoked when peers +// are discovered or leave the network. +func (d *Discovery) SetCallbacks( + add func(url string, priority int), + remove func(url string), +) { + d.mu.Lock() + defer d.mu.Unlock() + d.onAddUpstream = add + d.onRemoveUpstream = remove +} + +// Starts browsing for services on the local network. Blocks until the context +// is cancelled or Stop is called. +func (d *Discovery) Start(ctx context.Context) error { + entries := make(chan *zeroconf.ServiceEntry) + + d.waitGroup.Add(1) + go d.handleEntries(ctx, entries) + + d.waitGroup.Add(1) + go d.maintainPeers(ctx) + + if err := d.resolver.Browse(ctx, d.cfg.ServiceName, d.cfg.Domain, entries); err != nil { + close(entries) + d.waitGroup.Wait() + return fmt.Errorf("browse services: %w", err) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-d.stopCh: + return nil + } +} + +// Stops the discovery process. +func (d *Discovery) Stop() { + close(d.stopCh) + d.waitGroup.Wait() +} + +// Processes discovered service entries. +func (d *Discovery) handleEntries(ctx context.Context, entries chan *zeroconf.ServiceEntry) { + defer d.waitGroup.Done() + + for { + select { + case <-ctx.Done(): + return + case <-d.stopCh: + return + case entry, ok := <-entries: + if !ok { + return + } + d.handleEntry(ctx, entry) + } + } +} + +// Handles a single service entry. +func (d *Discovery) handleEntry(_ context.Context, entry *zeroconf.ServiceEntry) { + if len(entry.AddrIPv4) == 0 && len(entry.AddrIPv6) == 0 { + slog.Debug("discovered service has no addresses", "instance", entry.Instance) + return + } + + var addr string + if len(entry.AddrIPv4) > 0 { + addr = entry.AddrIPv4[0].String() + } else { + addr = entry.AddrIPv6[0].String() + } + + port := entry.Port + url := fmt.Sprintf("http://%s:%d", addr, port) + key := fmt.Sprintf("%s@%s", entry.Instance, entry.HostName) + + d.mu.Lock() + defer d.mu.Unlock() + + // Check if we already know this peer + if _, exists := d.discovered[key]; exists { + d.discovered[key].lastSeen = time.Now() + return + } + + // New peer discovered + slog.Info("discovered nix-serve instance", "instance", entry.Instance, "url", url) + + d.discovered[key] = &discoveredPeer{ + url: url, + lastSeen: time.Now(), + } + + // Notify callback if set + if d.onAddUpstream != nil { + go func() { + defer func() { + if r := recover(); r != nil { + slog.Error("panic in add upstream callback", "recover", r) + } + }() + d.onAddUpstream(url, d.cfg.Priority) + }() + } +} + +// Removes peers that haven't been seen within the TTL period. +func (d *Discovery) maintainPeers(ctx context.Context) { + defer d.waitGroup.Done() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-d.stopCh: + return + case <-ticker.C: + d.cleanupPeers() + } + } +} + +// Cleans up stale peer entries. +func (d *Discovery) cleanupPeers() { + d.mu.Lock() + defer d.mu.Unlock() + + now := time.Now() + // TTL is the discovery response time; peers should re-announce periodically. + // Use 3x TTL as the expiration window. + expiration := d.cfg.DiscoveryTime.Duration * 3 + + for key, peer := range d.discovered { + if now.Sub(peer.lastSeen) > expiration { + slog.Info("removing stale peer", "url", peer.url) + delete(d.discovered, key) + if d.onRemoveUpstream != nil { + go func(url string) { + defer func() { + if r := recover(); r != nil { + slog.Error("panic in remove upstream callback", "recover", r) + } + }() + d.onRemoveUpstream(url) + }(peer.url) + } + } + } +} + +// Returns a list of currently discovered peer URLs. +func (d *Discovery) DiscoveredPeers() []string { + d.mu.RLock() + defer d.mu.RUnlock() + + peers := make([]string, 0, len(d.discovered)) + for _, peer := range d.discovered { + peers = append(peers, peer.url) + } + return peers +} diff --git a/internal/prober/prober.go b/internal/prober/prober.go index 3dddb48..d88c7a7 100644 --- a/internal/prober/prober.go +++ b/internal/prober/prober.go @@ -232,3 +232,24 @@ func (p *Prober) getOrCreate(url string) *UpstreamHealth { } return h } + +// Adds a new upstream dynamically (e.g., discovered via mDNS). +// Thread-safe. Logs the addition and begins probing. +func (p *Prober) AddUpstream(url string, priority int) { + p.mu.Lock() + defer p.mu.Unlock() + if _, exists := p.table[url]; exists { + return + } + p.table[url] = &UpstreamHealth{URL: url, Priority: priority, Status: StatusActive} + // Trigger an immediate probe in background + go p.ProbeUpstream(url) +} + +// Removes an upstream from tracking (e.g., when a peer leaves the network). +// Thread-safe. No-op if upstream was not known. +func (p *Prober) RemoveUpstream(url string) { + p.mu.Lock() + defer p.mu.Unlock() + delete(p.table, url) +}