diff --git a/cmd/ncro/main.go b/cmd/ncro/main.go index ef8a170..4d8fbfd 100644 --- a/cmd/ncro/main.go +++ b/cmd/ncro/main.go @@ -85,14 +85,21 @@ func main() { probeDone := make(chan struct{}) go p.RunProbeLoop(30*time.Second, probeDone) + var gossipDone chan struct{} if cfg.Mesh.Enabled { - node, err := mesh.NewNode(cfg.Mesh.PrivateKeyPath, nil) + store := mesh.NewRouteStore() + node, err := mesh.NewNode(cfg.Mesh.PrivateKeyPath, store) if err != nil { slog.Error("failed to create mesh node", "error", err) os.Exit(1) } - slog.Info("mesh enabled", "node_id", node.ID(), "peers", len(cfg.Mesh.Peers)) - slog.Warn("mesh gossip not yet implemented") + if err := mesh.ListenAndServe(cfg.Mesh.BindAddr, store); err != nil { + slog.Error("failed to start mesh listener", "addr", cfg.Mesh.BindAddr, "error", err) + os.Exit(1) + } + gossipDone = make(chan struct{}) + go mesh.RunGossipLoop(node, db, cfg.Mesh.Peers, cfg.Mesh.GossipInterval.Duration, gossipDone) + slog.Info("mesh enabled", "node_id", node.ID(), "addr", cfg.Mesh.BindAddr, "peers", len(cfg.Mesh.Peers)) } r := router.New(db, p, cfg.Cache.TTL.Duration, 5*time.Second) @@ -119,6 +126,9 @@ func main() { close(expireDone) close(probeDone) + if gossipDone != nil { + close(gossipDone) + } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() diff --git a/internal/cache/db.go b/internal/cache/db.go index 1eaaac9..6e6b64a 100644 --- a/internal/cache/db.go +++ b/internal/cache/db.go @@ -144,6 +144,33 @@ func (d *DB) ExpireOldRoutes() error { return err } +// Returns up to n non-expired routes ordered by most-recently-verified. +func (d *DB) ListRecentRoutes(n int) ([]RouteEntry, error) { + rows, err := d.db.Query(` + SELECT store_path, upstream_url, latency_ema, last_verified, ttl, nar_hash, nar_size + FROM routes WHERE ttl > ? ORDER BY last_verified DESC LIMIT ?`, + time.Now().Unix(), n) + if err != nil { + return nil, err + } + defer rows.Close() + var result []RouteEntry + for rows.Next() { + var e RouteEntry + var lastVerifiedUnix, ttlUnix int64 + if err := rows.Scan( + &e.StorePath, &e.UpstreamURL, &e.LatencyEMA, + &lastVerifiedUnix, &ttlUnix, &e.NarHash, &e.NarSize, + ); err != nil { + return nil, err + } + e.LastVerified = time.Unix(lastVerifiedUnix, 0).UTC() + e.TTL = time.Unix(ttlUnix, 0).UTC() + result = append(result, e) + } + return result, rows.Err() +} + // Returns the total number of stored routes. func (d *DB) RouteCount() (int, error) { var count int diff --git a/internal/mesh/gossip.go b/internal/mesh/gossip.go new file mode 100644 index 0000000..a88443e --- /dev/null +++ b/internal/mesh/gossip.go @@ -0,0 +1,130 @@ +package mesh + +import ( + "encoding/binary" + "log/slog" + "net" + "time" + + "github.com/vmihailenco/msgpack/v5" + "notashelf.dev/ncro/internal/cache" +) + +const maxPacketSize = 65536 // UDP max payload + +// Wire format: [2-byte sig length][sig bytes][msgpack body] +func encodePacket(node *Node, msg Message) ([]byte, error) { + body, sig, err := node.Sign(msg) + if err != nil { + return nil, err + } + pkt := make([]byte, 2+len(sig)+len(body)) + binary.BigEndian.PutUint16(pkt[:2], uint16(len(sig))) + copy(pkt[2:], sig) + copy(pkt[2+len(sig):], body) + return pkt, nil +} + +func decodePacket(pkt []byte) (Message, []byte, []byte, bool) { + if len(pkt) < 2 { + return Message{}, nil, nil, false + } + sigLen := int(binary.BigEndian.Uint16(pkt[:2])) + if len(pkt) < 2+sigLen { + return Message{}, nil, nil, false + } + sig := pkt[2 : 2+sigLen] + body := pkt[2+sigLen:] + var msg Message + if err := msgpack.Unmarshal(body, &msg); err != nil { + return Message{}, nil, nil, false + } + return msg, body, sig, true +} + +// Starts a UDP listener at addr. Received route announcements are merged into store. +// Blocks until the conn is closed; call in a goroutine. +func ListenAndServe(addr string, store *RouteStore) error { + conn, err := net.ListenPacket("udp", addr) + if err != nil { + return err + } + go func() { + defer conn.Close() + buf := make([]byte, maxPacketSize) + for { + n, src, err := conn.ReadFrom(buf) + if err != nil { + return + } + msg, _, _, ok := decodePacket(buf[:n]) + if !ok { + slog.Warn("mesh: malformed packet", "src", src) + continue + } + if msg.Type == MsgAnnounce && len(msg.Routes) > 0 { + store.Merge(msg.Routes) + slog.Debug("mesh: merged peer routes", "node", msg.NodeID, "src", src, "count", len(msg.Routes)) + } + } + }() + return nil +} + +// Sends an MsgAnnounce carrying routes to a single peer address. +func Announce(peerAddr string, node *Node, routes []cache.RouteEntry) error { + msg := Message{ + Type: MsgAnnounce, + NodeID: node.ID(), + Timestamp: time.Now().UnixNano(), + Routes: routes, + } + pkt, err := encodePacket(node, msg) + if err != nil { + return err + } + addr, err := net.ResolveUDPAddr("udp", peerAddr) + if err != nil { + return err + } + conn, err := net.DialUDP("udp", nil, addr) + if err != nil { + return err + } + defer conn.Close() + conn.SetWriteDeadline(time.Now().Add(2 * time.Second)) + _, err = conn.Write(pkt) + return err +} + +// RouteSource retrieves routes to gossip. +type RouteSource interface { + ListRecentRoutes(n int) ([]cache.RouteEntry, error) +} + +// Announces our top routes to each peer on interval. Blocks until stop is closed. +func RunGossipLoop(node *Node, src RouteSource, peers []string, interval time.Duration, stop <-chan struct{}) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-stop: + return + case <-ticker.C: + routes, err := src.ListRecentRoutes(100) + if err != nil { + slog.Warn("mesh: failed to list routes for gossip", "error", err) + continue + } + if len(routes) == 0 { + continue + } + for _, peer := range peers { + if err := Announce(peer, node, routes); err != nil { + slog.Warn("mesh: announce failed", "peer", peer, "error", err) + } + } + slog.Debug("mesh: announced routes to peers", "routes", len(routes), "peers", len(peers)) + } + } +} diff --git a/internal/mesh/gossip_test.go b/internal/mesh/gossip_test.go new file mode 100644 index 0000000..74f755e --- /dev/null +++ b/internal/mesh/gossip_test.go @@ -0,0 +1,54 @@ +package mesh_test + +import ( + "net" + "testing" + "time" + + "notashelf.dev/ncro/internal/cache" + "notashelf.dev/ncro/internal/mesh" +) + +func TestAnnounceAndReceive(t *testing.T) { + store := mesh.NewRouteStore() + node, err := mesh.NewNode("", store) + if err != nil { + t.Fatal(err) + } + + // Bind to an ephemeral port. + conn, err := net.ListenPacket("udp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + addr := conn.LocalAddr().String() + conn.Close() + + if err := mesh.ListenAndServe(addr, store); err != nil { + t.Fatalf("ListenAndServe: %v", err) + } + + routes := []cache.RouteEntry{ + { + StorePath: "test-pkg-abc", + UpstreamURL: "https://cache.nixos.org", + LatencyEMA: 25, + TTL: time.Now().Add(time.Hour), + }, + } + + if err := mesh.Announce(addr, node, routes); err != nil { + t.Fatalf("Announce: %v", err) + } + + // Give the listener goroutine time to process the packet. + time.Sleep(50 * time.Millisecond) + + entry := store.Get("test-pkg-abc") + if entry == nil { + t.Fatal("route not merged into store after announce") + } + if entry.UpstreamURL != "https://cache.nixos.org" { + t.Errorf("UpstreamURL = %q", entry.UpstreamURL) + } +} diff --git a/internal/server/integration_test.go b/internal/server/integration_test.go index b78e77a..8eba814 100644 --- a/internal/server/integration_test.go +++ b/internal/server/integration_test.go @@ -75,7 +75,7 @@ func TestUpstreamFailoverFallback(t *testing.T) { defer db.Close() p := prober.New(0.3) - p.RecordLatency(bad.URL, 1) // bad appears fastest + p.RecordLatency(bad.URL, 1) // bad appears fastest p.RecordLatency(good.URL, 50) r := router.New(db, p, time.Hour, 5*time.Second) diff --git a/nix/package.nix b/nix/package.nix index 63c6feb..1070850 100644 --- a/nix/package.nix +++ b/nix/package.nix @@ -20,7 +20,7 @@ buildGoModule { ]; }; - vendorHash = "sha256-suI8EAgRFG7BDJP2aqLWsej6FTP+OrEsmxRyV5hkKK0="; + vendorHash = "sha256-rjgb/iSz3+GPu8lNIhlTCwC/9uuuSh/PJv9GxvL7gJE="; ldflags = ["-s" "-w"]; }