Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I9f7b1a34065de77f4a8982b0f5feebc86a6a6964
221 lines
5 KiB
Go
221 lines
5 KiB
Go
package discovery
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/grandcat/zeroconf"
|
|
"notashelf.dev/ncro/internal/config"
|
|
"notashelf.dev/ncro/internal/prober"
|
|
)
|
|
|
|
// Tracks discovered nix-serve instances and maintains the upstream list.
|
|
type Discovery struct {
|
|
cfg config.DiscoveryConfig
|
|
prober *prober.Prober
|
|
resolver *zeroconf.Resolver
|
|
discovered map[string]*discoveredPeer
|
|
mu sync.RWMutex
|
|
stopCh chan struct{}
|
|
stopOnce sync.Once
|
|
waitGroup sync.WaitGroup
|
|
onAddUpstream func(url string, priority int)
|
|
onRemoveUpstream func(url string)
|
|
}
|
|
|
|
type discoveredPeer struct {
|
|
url string
|
|
lastSeen time.Time
|
|
}
|
|
|
|
// Creates a new Discovery manager.
|
|
func New(cfg config.DiscoveryConfig, p *prober.Prober) (*Discovery, error) {
|
|
resolver, err := zeroconf.NewResolver(nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create zeroconf resolver: %w", err)
|
|
}
|
|
|
|
return &Discovery{
|
|
cfg: cfg,
|
|
prober: p,
|
|
resolver: resolver,
|
|
discovered: make(map[string]*discoveredPeer),
|
|
stopCh: make(chan struct{}),
|
|
}, nil
|
|
}
|
|
|
|
// Sets callbacks for upstream addition/removal. These are invoked when peers
|
|
// are discovered or leave the network.
|
|
func (d *Discovery) SetCallbacks(
|
|
add func(url string, priority int),
|
|
remove func(url string),
|
|
) {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
d.onAddUpstream = add
|
|
d.onRemoveUpstream = remove
|
|
}
|
|
|
|
// Starts browsing for services on the local network. Blocks until the context
|
|
// is cancelled or Stop is called.
|
|
func (d *Discovery) Start(ctx context.Context) error {
|
|
entries := make(chan *zeroconf.ServiceEntry)
|
|
|
|
d.waitGroup.Add(1)
|
|
go d.handleEntries(ctx, entries)
|
|
|
|
d.waitGroup.Add(1)
|
|
go d.maintainPeers(ctx)
|
|
|
|
if err := d.resolver.Browse(ctx, d.cfg.ServiceName, d.cfg.Domain, entries); err != nil {
|
|
close(entries)
|
|
d.stopOnce.Do(func() { close(d.stopCh) })
|
|
d.waitGroup.Wait()
|
|
return fmt.Errorf("browse services: %w", err)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-d.stopCh:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Stops the discovery process.
|
|
func (d *Discovery) Stop() {
|
|
d.stopOnce.Do(func() { close(d.stopCh) })
|
|
d.waitGroup.Wait()
|
|
}
|
|
|
|
// Processes discovered service entries.
|
|
func (d *Discovery) handleEntries(ctx context.Context, entries chan *zeroconf.ServiceEntry) {
|
|
defer d.waitGroup.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-d.stopCh:
|
|
return
|
|
case entry, ok := <-entries:
|
|
if !ok {
|
|
return
|
|
}
|
|
d.handleEntry(ctx, entry)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Handles a single service entry.
|
|
func (d *Discovery) handleEntry(_ context.Context, entry *zeroconf.ServiceEntry) {
|
|
if len(entry.AddrIPv4) == 0 && len(entry.AddrIPv6) == 0 {
|
|
slog.Debug("discovered service has no addresses", "instance", entry.Instance)
|
|
return
|
|
}
|
|
|
|
var addr string
|
|
if len(entry.AddrIPv4) > 0 {
|
|
addr = entry.AddrIPv4[0].String()
|
|
} else {
|
|
addr = entry.AddrIPv6[0].String()
|
|
}
|
|
|
|
url := "http://" + net.JoinHostPort(addr, fmt.Sprintf("%d", entry.Port))
|
|
key := fmt.Sprintf("%s@%s", entry.Instance, entry.HostName)
|
|
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
// Check if we already know this peer
|
|
if _, exists := d.discovered[key]; exists {
|
|
d.discovered[key].lastSeen = time.Now()
|
|
return
|
|
}
|
|
|
|
// New peer discovered
|
|
slog.Info("discovered nix-serve instance", "instance", entry.Instance, "url", url)
|
|
|
|
d.discovered[key] = &discoveredPeer{
|
|
url: url,
|
|
lastSeen: time.Now(),
|
|
}
|
|
|
|
// Notify callback if set
|
|
if d.onAddUpstream != nil {
|
|
go func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
slog.Error("panic in add upstream callback", "recover", r)
|
|
}
|
|
}()
|
|
d.onAddUpstream(url, d.cfg.Priority)
|
|
}()
|
|
}
|
|
}
|
|
|
|
// Removes peers that haven't been seen within the TTL period.
|
|
func (d *Discovery) maintainPeers(ctx context.Context) {
|
|
defer d.waitGroup.Done()
|
|
|
|
ticker := time.NewTicker(10 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-d.stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
d.cleanupPeers()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Cleans up stale peer entries.
|
|
func (d *Discovery) cleanupPeers() {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
// TTL is the discovery response time; peers should re-announce periodically.
|
|
// Use 3x TTL as the expiration window.
|
|
expiration := d.cfg.DiscoveryTime.Duration * 3
|
|
if expiration == 0 {
|
|
expiration = time.Second
|
|
}
|
|
|
|
for key, peer := range d.discovered {
|
|
if now.Sub(peer.lastSeen) > expiration {
|
|
slog.Info("removing stale peer", "url", peer.url)
|
|
delete(d.discovered, key)
|
|
if d.onRemoveUpstream != nil {
|
|
go func(url string) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
slog.Error("panic in remove upstream callback", "recover", r)
|
|
}
|
|
}()
|
|
d.onRemoveUpstream(url)
|
|
}(peer.url)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Returns a list of currently discovered peer URLs.
|
|
func (d *Discovery) DiscoveredPeers() []string {
|
|
d.mu.RLock()
|
|
defer d.mu.RUnlock()
|
|
|
|
peers := make([]string, 0, len(d.discovered))
|
|
for _, peer := range d.discovered {
|
|
peers = append(peers, peer.url)
|
|
}
|
|
return peers
|
|
}
|