mesh: implement UDP gossip transport; wire mesh comms into main
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Iea0b2f250b01df78b1c7be73d69d28c06a6a6964
This commit is contained in:
parent
91ffc0eadd
commit
d290bcf4ad
6 changed files with 226 additions and 5 deletions
|
|
@ -85,14 +85,21 @@ func main() {
|
|||
probeDone := make(chan struct{})
|
||||
go p.RunProbeLoop(30*time.Second, probeDone)
|
||||
|
||||
var gossipDone chan struct{}
|
||||
if cfg.Mesh.Enabled {
|
||||
node, err := mesh.NewNode(cfg.Mesh.PrivateKeyPath, nil)
|
||||
store := mesh.NewRouteStore()
|
||||
node, err := mesh.NewNode(cfg.Mesh.PrivateKeyPath, store)
|
||||
if err != nil {
|
||||
slog.Error("failed to create mesh node", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
slog.Info("mesh enabled", "node_id", node.ID(), "peers", len(cfg.Mesh.Peers))
|
||||
slog.Warn("mesh gossip not yet implemented")
|
||||
if err := mesh.ListenAndServe(cfg.Mesh.BindAddr, store); err != nil {
|
||||
slog.Error("failed to start mesh listener", "addr", cfg.Mesh.BindAddr, "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
gossipDone = make(chan struct{})
|
||||
go mesh.RunGossipLoop(node, db, cfg.Mesh.Peers, cfg.Mesh.GossipInterval.Duration, gossipDone)
|
||||
slog.Info("mesh enabled", "node_id", node.ID(), "addr", cfg.Mesh.BindAddr, "peers", len(cfg.Mesh.Peers))
|
||||
}
|
||||
|
||||
r := router.New(db, p, cfg.Cache.TTL.Duration, 5*time.Second)
|
||||
|
|
@ -119,6 +126,9 @@ func main() {
|
|||
|
||||
close(expireDone)
|
||||
close(probeDone)
|
||||
if gossipDone != nil {
|
||||
close(gossipDone)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
|
|
|||
27
internal/cache/db.go
vendored
27
internal/cache/db.go
vendored
|
|
@ -144,6 +144,33 @@ func (d *DB) ExpireOldRoutes() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Returns up to n non-expired routes ordered by most-recently-verified.
|
||||
func (d *DB) ListRecentRoutes(n int) ([]RouteEntry, error) {
|
||||
rows, err := d.db.Query(`
|
||||
SELECT store_path, upstream_url, latency_ema, last_verified, ttl, nar_hash, nar_size
|
||||
FROM routes WHERE ttl > ? ORDER BY last_verified DESC LIMIT ?`,
|
||||
time.Now().Unix(), n)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var result []RouteEntry
|
||||
for rows.Next() {
|
||||
var e RouteEntry
|
||||
var lastVerifiedUnix, ttlUnix int64
|
||||
if err := rows.Scan(
|
||||
&e.StorePath, &e.UpstreamURL, &e.LatencyEMA,
|
||||
&lastVerifiedUnix, &ttlUnix, &e.NarHash, &e.NarSize,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.LastVerified = time.Unix(lastVerifiedUnix, 0).UTC()
|
||||
e.TTL = time.Unix(ttlUnix, 0).UTC()
|
||||
result = append(result, e)
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
// Returns the total number of stored routes.
|
||||
func (d *DB) RouteCount() (int, error) {
|
||||
var count int
|
||||
|
|
|
|||
130
internal/mesh/gossip.go
Normal file
130
internal/mesh/gossip.go
Normal file
|
|
@ -0,0 +1,130 @@
|
|||
package mesh
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"log/slog"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/vmihailenco/msgpack/v5"
|
||||
"notashelf.dev/ncro/internal/cache"
|
||||
)
|
||||
|
||||
const maxPacketSize = 65536 // UDP max payload
|
||||
|
||||
// Wire format: [2-byte sig length][sig bytes][msgpack body]
|
||||
func encodePacket(node *Node, msg Message) ([]byte, error) {
|
||||
body, sig, err := node.Sign(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pkt := make([]byte, 2+len(sig)+len(body))
|
||||
binary.BigEndian.PutUint16(pkt[:2], uint16(len(sig)))
|
||||
copy(pkt[2:], sig)
|
||||
copy(pkt[2+len(sig):], body)
|
||||
return pkt, nil
|
||||
}
|
||||
|
||||
func decodePacket(pkt []byte) (Message, []byte, []byte, bool) {
|
||||
if len(pkt) < 2 {
|
||||
return Message{}, nil, nil, false
|
||||
}
|
||||
sigLen := int(binary.BigEndian.Uint16(pkt[:2]))
|
||||
if len(pkt) < 2+sigLen {
|
||||
return Message{}, nil, nil, false
|
||||
}
|
||||
sig := pkt[2 : 2+sigLen]
|
||||
body := pkt[2+sigLen:]
|
||||
var msg Message
|
||||
if err := msgpack.Unmarshal(body, &msg); err != nil {
|
||||
return Message{}, nil, nil, false
|
||||
}
|
||||
return msg, body, sig, true
|
||||
}
|
||||
|
||||
// Starts a UDP listener at addr. Received route announcements are merged into store.
|
||||
// Blocks until the conn is closed; call in a goroutine.
|
||||
func ListenAndServe(addr string, store *RouteStore) error {
|
||||
conn, err := net.ListenPacket("udp", addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
defer conn.Close()
|
||||
buf := make([]byte, maxPacketSize)
|
||||
for {
|
||||
n, src, err := conn.ReadFrom(buf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
msg, _, _, ok := decodePacket(buf[:n])
|
||||
if !ok {
|
||||
slog.Warn("mesh: malformed packet", "src", src)
|
||||
continue
|
||||
}
|
||||
if msg.Type == MsgAnnounce && len(msg.Routes) > 0 {
|
||||
store.Merge(msg.Routes)
|
||||
slog.Debug("mesh: merged peer routes", "node", msg.NodeID, "src", src, "count", len(msg.Routes))
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sends an MsgAnnounce carrying routes to a single peer address.
|
||||
func Announce(peerAddr string, node *Node, routes []cache.RouteEntry) error {
|
||||
msg := Message{
|
||||
Type: MsgAnnounce,
|
||||
NodeID: node.ID(),
|
||||
Timestamp: time.Now().UnixNano(),
|
||||
Routes: routes,
|
||||
}
|
||||
pkt, err := encodePacket(node, msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
addr, err := net.ResolveUDPAddr("udp", peerAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conn, err := net.DialUDP("udp", nil, addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
conn.SetWriteDeadline(time.Now().Add(2 * time.Second))
|
||||
_, err = conn.Write(pkt)
|
||||
return err
|
||||
}
|
||||
|
||||
// RouteSource retrieves routes to gossip.
|
||||
type RouteSource interface {
|
||||
ListRecentRoutes(n int) ([]cache.RouteEntry, error)
|
||||
}
|
||||
|
||||
// Announces our top routes to each peer on interval. Blocks until stop is closed.
|
||||
func RunGossipLoop(node *Node, src RouteSource, peers []string, interval time.Duration, stop <-chan struct{}) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-ticker.C:
|
||||
routes, err := src.ListRecentRoutes(100)
|
||||
if err != nil {
|
||||
slog.Warn("mesh: failed to list routes for gossip", "error", err)
|
||||
continue
|
||||
}
|
||||
if len(routes) == 0 {
|
||||
continue
|
||||
}
|
||||
for _, peer := range peers {
|
||||
if err := Announce(peer, node, routes); err != nil {
|
||||
slog.Warn("mesh: announce failed", "peer", peer, "error", err)
|
||||
}
|
||||
}
|
||||
slog.Debug("mesh: announced routes to peers", "routes", len(routes), "peers", len(peers))
|
||||
}
|
||||
}
|
||||
}
|
||||
54
internal/mesh/gossip_test.go
Normal file
54
internal/mesh/gossip_test.go
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
package mesh_test
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"notashelf.dev/ncro/internal/cache"
|
||||
"notashelf.dev/ncro/internal/mesh"
|
||||
)
|
||||
|
||||
func TestAnnounceAndReceive(t *testing.T) {
|
||||
store := mesh.NewRouteStore()
|
||||
node, err := mesh.NewNode("", store)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Bind to an ephemeral port.
|
||||
conn, err := net.ListenPacket("udp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
addr := conn.LocalAddr().String()
|
||||
conn.Close()
|
||||
|
||||
if err := mesh.ListenAndServe(addr, store); err != nil {
|
||||
t.Fatalf("ListenAndServe: %v", err)
|
||||
}
|
||||
|
||||
routes := []cache.RouteEntry{
|
||||
{
|
||||
StorePath: "test-pkg-abc",
|
||||
UpstreamURL: "https://cache.nixos.org",
|
||||
LatencyEMA: 25,
|
||||
TTL: time.Now().Add(time.Hour),
|
||||
},
|
||||
}
|
||||
|
||||
if err := mesh.Announce(addr, node, routes); err != nil {
|
||||
t.Fatalf("Announce: %v", err)
|
||||
}
|
||||
|
||||
// Give the listener goroutine time to process the packet.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
entry := store.Get("test-pkg-abc")
|
||||
if entry == nil {
|
||||
t.Fatal("route not merged into store after announce")
|
||||
}
|
||||
if entry.UpstreamURL != "https://cache.nixos.org" {
|
||||
t.Errorf("UpstreamURL = %q", entry.UpstreamURL)
|
||||
}
|
||||
}
|
||||
|
|
@ -75,7 +75,7 @@ func TestUpstreamFailoverFallback(t *testing.T) {
|
|||
defer db.Close()
|
||||
|
||||
p := prober.New(0.3)
|
||||
p.RecordLatency(bad.URL, 1) // bad appears fastest
|
||||
p.RecordLatency(bad.URL, 1) // bad appears fastest
|
||||
p.RecordLatency(good.URL, 50)
|
||||
|
||||
r := router.New(db, p, time.Hour, 5*time.Second)
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ buildGoModule {
|
|||
];
|
||||
};
|
||||
|
||||
vendorHash = "sha256-suI8EAgRFG7BDJP2aqLWsej6FTP+OrEsmxRyV5hkKK0=";
|
||||
vendorHash = "sha256-rjgb/iSz3+GPu8lNIhlTCwC/9uuuSh/PJv9GxvL7gJE=";
|
||||
|
||||
ldflags = ["-s" "-w"];
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue