"discovery: mDNS/DNS-SD peer discovery; dynamic upstream management"
Allows ncro instances to discover each other dynamically without static configuration and enables a peer-to-peer style (hello funny cube) mesh where nodes share cached builds locally instead of all hitting upstream caches. Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I7d723876c6816cb6aaaf3fe14cb24a426a6a6964
This commit is contained in:
parent
fe37962a3c
commit
f0055ea515
6 changed files with 336 additions and 3 deletions
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/spf13/viper"
|
||||
"notashelf.dev/ncro/internal/cache"
|
||||
"notashelf.dev/ncro/internal/config"
|
||||
"notashelf.dev/ncro/internal/discovery"
|
||||
"notashelf.dev/ncro/internal/mesh"
|
||||
"notashelf.dev/ncro/internal/metrics"
|
||||
"notashelf.dev/ncro/internal/prober"
|
||||
|
|
@ -132,6 +133,26 @@ func runServer(_ *cobra.Command, _ []string) error {
|
|||
probeDone := make(chan struct{})
|
||||
go p.RunProbeLoop(30*time.Second, probeDone)
|
||||
|
||||
// Setup mDNS discovery if enabled
|
||||
var discoveryMgr *discovery.Discovery
|
||||
if cfg.Discovery.Enabled {
|
||||
discoveryMgr, err = discovery.New(cfg.Discovery, p)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create discovery manager: %w", err)
|
||||
}
|
||||
discoveryMgr.SetCallbacks(
|
||||
func(url string, priority int) {
|
||||
slog.Info("adding discovered upstream", "url", url)
|
||||
p.AddUpstream(url, priority)
|
||||
},
|
||||
func(url string) {
|
||||
slog.Info("removing discovered upstream", "url", url)
|
||||
p.RemoveUpstream(url)
|
||||
},
|
||||
)
|
||||
slog.Info("mDNS discovery enabled", "service", cfg.Discovery.ServiceName)
|
||||
}
|
||||
|
||||
r := router.New(db, p, cfg.Cache.TTL.Duration, 5*time.Second, cfg.Cache.NegativeTTL.Duration)
|
||||
for _, u := range cfg.Upstreams {
|
||||
if u.PublicKey != "" {
|
||||
|
|
@ -174,6 +195,22 @@ func runServer(_ *cobra.Command, _ []string) error {
|
|||
slog.Info("mesh enabled", "addr", cfg.Mesh.BindAddr, "peers", len(cfg.Mesh.Peers))
|
||||
}
|
||||
|
||||
// Start mDNS discovery in background
|
||||
discoveryDone := make(chan struct{})
|
||||
if discoveryMgr != nil {
|
||||
go func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
if err := discoveryMgr.Start(ctx); err != nil {
|
||||
slog.Error("discovery error", "error", err)
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
<-discoveryDone
|
||||
discoveryMgr.Stop()
|
||||
}()
|
||||
}
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: cfg.Server.Listen,
|
||||
Handler: server.New(r, p, db, cfg.Upstreams, cfg.Server.CachePriority),
|
||||
|
|
@ -206,6 +243,7 @@ func runServer(_ *cobra.Command, _ []string) error {
|
|||
if gossipDone != nil {
|
||||
close(gossipDone)
|
||||
}
|
||||
close(discoveryDone)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue