"discovery: mDNS/DNS-SD peer discovery; dynamic upstream management"
Allows ncro instances to discover each other dynamically without static configuration and enables a peer-to-peer style (hello funny cube) mesh where nodes share cached builds locally instead of all hitting upstream caches. Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I7d723876c6816cb6aaaf3fe14cb24a426a6a6964
This commit is contained in:
parent
7aee098f1c
commit
5fc3b2883b
6 changed files with 336 additions and 3 deletions
216
internal/discovery/discovery.go
Normal file
216
internal/discovery/discovery.go
Normal file
|
|
@ -0,0 +1,216 @@
|
|||
package discovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grandcat/zeroconf"
|
||||
"notashelf.dev/ncro/internal/config"
|
||||
"notashelf.dev/ncro/internal/prober"
|
||||
)
|
||||
|
||||
// Tracks discovered nix-serve instances and maintains the upstream list.
|
||||
type Discovery struct {
|
||||
cfg config.DiscoveryConfig
|
||||
prober *prober.Prober
|
||||
resolver *zeroconf.Resolver
|
||||
discovered map[string]*discoveredPeer
|
||||
mu sync.RWMutex
|
||||
stopCh chan struct{}
|
||||
waitGroup sync.WaitGroup
|
||||
onAddUpstream func(url string, priority int)
|
||||
onRemoveUpstream func(url string)
|
||||
}
|
||||
|
||||
type discoveredPeer struct {
|
||||
url string
|
||||
lastSeen time.Time
|
||||
}
|
||||
|
||||
// Creates a new Discovery manager.
|
||||
func New(cfg config.DiscoveryConfig, p *prober.Prober) (*Discovery, error) {
|
||||
resolver, err := zeroconf.NewResolver(nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create zeroconf resolver: %w", err)
|
||||
}
|
||||
|
||||
return &Discovery{
|
||||
cfg: cfg,
|
||||
prober: p,
|
||||
resolver: resolver,
|
||||
discovered: make(map[string]*discoveredPeer),
|
||||
stopCh: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Sets callbacks for upstream addition/removal. These are invoked when peers
|
||||
// are discovered or leave the network.
|
||||
func (d *Discovery) SetCallbacks(
|
||||
add func(url string, priority int),
|
||||
remove func(url string),
|
||||
) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
d.onAddUpstream = add
|
||||
d.onRemoveUpstream = remove
|
||||
}
|
||||
|
||||
// Starts browsing for services on the local network. Blocks until the context
|
||||
// is cancelled or Stop is called.
|
||||
func (d *Discovery) Start(ctx context.Context) error {
|
||||
entries := make(chan *zeroconf.ServiceEntry)
|
||||
|
||||
d.waitGroup.Add(1)
|
||||
go d.handleEntries(ctx, entries)
|
||||
|
||||
d.waitGroup.Add(1)
|
||||
go d.maintainPeers(ctx)
|
||||
|
||||
if err := d.resolver.Browse(ctx, d.cfg.ServiceName, d.cfg.Domain, entries); err != nil {
|
||||
close(entries)
|
||||
d.waitGroup.Wait()
|
||||
return fmt.Errorf("browse services: %w", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-d.stopCh:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Stops the discovery process.
|
||||
func (d *Discovery) Stop() {
|
||||
close(d.stopCh)
|
||||
d.waitGroup.Wait()
|
||||
}
|
||||
|
||||
// Processes discovered service entries.
|
||||
func (d *Discovery) handleEntries(ctx context.Context, entries chan *zeroconf.ServiceEntry) {
|
||||
defer d.waitGroup.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-d.stopCh:
|
||||
return
|
||||
case entry, ok := <-entries:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
d.handleEntry(ctx, entry)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handles a single service entry.
|
||||
func (d *Discovery) handleEntry(_ context.Context, entry *zeroconf.ServiceEntry) {
|
||||
if len(entry.AddrIPv4) == 0 && len(entry.AddrIPv6) == 0 {
|
||||
slog.Debug("discovered service has no addresses", "instance", entry.Instance)
|
||||
return
|
||||
}
|
||||
|
||||
var addr string
|
||||
if len(entry.AddrIPv4) > 0 {
|
||||
addr = entry.AddrIPv4[0].String()
|
||||
} else {
|
||||
addr = entry.AddrIPv6[0].String()
|
||||
}
|
||||
|
||||
port := entry.Port
|
||||
url := fmt.Sprintf("http://%s:%d", addr, port)
|
||||
key := fmt.Sprintf("%s@%s", entry.Instance, entry.HostName)
|
||||
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
// Check if we already know this peer
|
||||
if _, exists := d.discovered[key]; exists {
|
||||
d.discovered[key].lastSeen = time.Now()
|
||||
return
|
||||
}
|
||||
|
||||
// New peer discovered
|
||||
slog.Info("discovered nix-serve instance", "instance", entry.Instance, "url", url)
|
||||
|
||||
d.discovered[key] = &discoveredPeer{
|
||||
url: url,
|
||||
lastSeen: time.Now(),
|
||||
}
|
||||
|
||||
// Notify callback if set
|
||||
if d.onAddUpstream != nil {
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
slog.Error("panic in add upstream callback", "recover", r)
|
||||
}
|
||||
}()
|
||||
d.onAddUpstream(url, d.cfg.Priority)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Removes peers that haven't been seen within the TTL period.
|
||||
func (d *Discovery) maintainPeers(ctx context.Context) {
|
||||
defer d.waitGroup.Done()
|
||||
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-d.stopCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
d.cleanupPeers()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Cleans up stale peer entries.
|
||||
func (d *Discovery) cleanupPeers() {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
// TTL is the discovery response time; peers should re-announce periodically.
|
||||
// Use 3x TTL as the expiration window.
|
||||
expiration := d.cfg.DiscoveryTime.Duration * 3
|
||||
|
||||
for key, peer := range d.discovered {
|
||||
if now.Sub(peer.lastSeen) > expiration {
|
||||
slog.Info("removing stale peer", "url", peer.url)
|
||||
delete(d.discovered, key)
|
||||
if d.onRemoveUpstream != nil {
|
||||
go func(url string) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
slog.Error("panic in remove upstream callback", "recover", r)
|
||||
}
|
||||
}()
|
||||
d.onRemoveUpstream(url)
|
||||
}(peer.url)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a list of currently discovered peer URLs.
|
||||
func (d *Discovery) DiscoveredPeers() []string {
|
||||
d.mu.RLock()
|
||||
defer d.mu.RUnlock()
|
||||
|
||||
peers := make([]string, 0, len(d.discovered))
|
||||
for _, peer := range d.discovered {
|
||||
peers = append(peers, peer.url)
|
||||
}
|
||||
return peers
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue