From 605923a94a6a87878bc3d09fea6f7968dbe194f8 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Fri, 6 Mar 2026 00:34:40 +0300 Subject: [PATCH] mesh: init ed25519-signed gossip layer; conflict-resolving route store Signed-off-by: NotAShelf Change-Id: I92ff256da8e84e86cd9e7c2b511c0ae56a6a6964 --- go.mod | 2 + go.sum | 10 +++ internal/mesh/mesh.go | 160 +++++++++++++++++++++++++++++++++++++ internal/mesh/mesh_test.go | 75 +++++++++++++++++ 4 files changed, 247 insertions(+) create mode 100644 internal/mesh/mesh_test.go diff --git a/go.mod b/go.mod index 6e4ad38..3516d4b 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module notashelf.dev/ncro go 1.25.7 require ( + github.com/vmihailenco/msgpack/v5 v5.4.1 gopkg.in/yaml.v3 v3.0.1 modernc.org/sqlite v1.46.1 ) @@ -13,6 +14,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/ncruces/go-strftime v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect golang.org/x/sys v0.37.0 // indirect modernc.org/libc v1.67.6 // indirect diff --git a/go.sum b/go.sum index b2791d1..73d8879 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= @@ -10,8 +12,16 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY= golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= diff --git a/internal/mesh/mesh.go b/internal/mesh/mesh.go index d2ca262..4ee4703 100644 --- a/internal/mesh/mesh.go +++ b/internal/mesh/mesh.go @@ -1 +1,161 @@ package mesh + +import ( + "crypto/ed25519" + "crypto/rand" + "encoding/hex" + "errors" + "fmt" + "os" + "sync" + "time" + + "github.com/vmihailenco/msgpack/v5" + "notashelf.dev/ncro/internal/cache" +) + +// Gossip message types. +type MsgType uint8 + +const ( + MsgAnnounce MsgType = 1 + MsgSyncRequest MsgType = 2 + MsgSyncReply MsgType = 3 +) + +// Wire format for gossip messages. +type Message struct { + Type MsgType + NodeID string + Timestamp int64 + Routes []cache.RouteEntry +} + +// Cryptographic identity of an ncro node. +type Node struct { + pubKey ed25519.PublicKey + privKey ed25519.PrivateKey + store *RouteStore +} + +// Loads or generates an ed25519 keypair from keyPath. +// Pass "" for an ephemeral in-memory key. +func NewNode(keyPath string, store *RouteStore) (*Node, error) { + if store == nil { + store = NewRouteStore() + } + if keyPath == "" { + pub, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + return nil, fmt.Errorf("generate key: %w", err) + } + return &Node{pubKey: pub, privKey: priv, store: store}, nil + } + data, err := os.ReadFile(keyPath) + if err == nil && len(data) == ed25519.PrivateKeySize { + priv := ed25519.PrivateKey(data) + return &Node{pubKey: priv.Public().(ed25519.PublicKey), privKey: priv, store: store}, nil + } + pub, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + return nil, fmt.Errorf("generate key: %w", err) + } + if err := os.WriteFile(keyPath, priv, 0600); err != nil { + return nil, fmt.Errorf("write key: %w", err) + } + return &Node{pubKey: pub, privKey: priv, store: store}, nil +} + +// Returns the hex-encoded public key fingerprint. +func (n *Node) ID() string { + return hex.EncodeToString(n.pubKey[:8]) +} + +// Returns the node's public key. +func (n *Node) PublicKey() ed25519.PublicKey { + return n.pubKey +} + +// Serializes msg with msgpack and signs it; returns (data, signature, error). +func (n *Node) Sign(msg Message) ([]byte, []byte, error) { + data, err := msgpack.Marshal(msg) + if err != nil { + return nil, nil, err + } + return data, ed25519.Sign(n.privKey, data), nil +} + +// Checks that sig is a valid ed25519 signature of data under pubKey. +func Verify(pubKey ed25519.PublicKey, data, sig []byte) error { + if !ed25519.Verify(pubKey, data, sig) { + return errors.New("invalid signature") + } + return nil +} + +// In-memory route table with merge-conflict resolution for gossip. +type RouteStore struct { + mu sync.RWMutex + routes map[string]*cache.RouteEntry +} + +// Creates an empty RouteStore. +func NewRouteStore() *RouteStore { + return &RouteStore{routes: make(map[string]*cache.RouteEntry)} +} + +// Applies incoming routes: lower latency wins; newer LastVerified wins on tie. +func (rs *RouteStore) Merge(incoming []cache.RouteEntry) { + rs.mu.Lock() + defer rs.mu.Unlock() + now := time.Now() + for _, r := range incoming { + r := r + if r.TTL.Before(now) { + continue + } + existing, ok := rs.routes[r.StorePath] + if !ok { + rs.routes[r.StorePath] = &r + continue + } + if r.LatencyEMA < existing.LatencyEMA { + rs.routes[r.StorePath] = &r + } else if r.LatencyEMA == existing.LatencyEMA && r.LastVerified.After(existing.LastVerified) { + rs.routes[r.StorePath] = &r + } + } +} + +// Returns a copy of the stored route, or nil. +func (rs *RouteStore) Get(storePath string) *cache.RouteEntry { + rs.mu.RLock() + defer rs.mu.RUnlock() + e, ok := rs.routes[storePath] + if !ok { + return nil + } + cp := *e + return &cp +} + +// Returns up to n routes for sync batching. +func (rs *RouteStore) Top(n int) []cache.RouteEntry { + rs.mu.RLock() + defer rs.mu.RUnlock() + result := make([]cache.RouteEntry, 0, min(n, len(rs.routes))) + for _, e := range rs.routes { + result = append(result, *e) + if len(result) >= n { + break + } + } + return result +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/internal/mesh/mesh_test.go b/internal/mesh/mesh_test.go new file mode 100644 index 0000000..f6fdf2b --- /dev/null +++ b/internal/mesh/mesh_test.go @@ -0,0 +1,75 @@ +package mesh_test + +import ( + "testing" + "time" + + "notashelf.dev/ncro/internal/cache" + "notashelf.dev/ncro/internal/mesh" +) + +func TestSignVerify(t *testing.T) { + node, err := mesh.NewNode("", nil) + if err != nil { + t.Fatal(err) + } + + msg := mesh.Message{ + Type: mesh.MsgAnnounce, + NodeID: node.ID(), + Timestamp: time.Now().UnixNano(), + Routes: []cache.RouteEntry{{StorePath: "abc123", UpstreamURL: "https://cache.nixos.org"}}, + } + + data, sig, err := node.Sign(msg) + if err != nil { + t.Fatalf("Sign: %v", err) + } + if err := mesh.Verify(node.PublicKey(), data, sig); err != nil { + t.Errorf("Verify: %v", err) + } +} + +func TestVerifyFailsOnTamper(t *testing.T) { + node, _ := mesh.NewNode("", nil) + msg := mesh.Message{Type: mesh.MsgAnnounce, NodeID: node.ID()} + data, sig, _ := node.Sign(msg) + data[0] ^= 0xFF + if err := mesh.Verify(node.PublicKey(), data, sig); err == nil { + t.Error("expected verification failure on tampered data") + } +} + +func TestMergeLowerLatencyWins(t *testing.T) { + store := mesh.NewRouteStore() + store.Merge([]cache.RouteEntry{ + {StorePath: "pkg-a", UpstreamURL: "https://slow.example.com", LatencyEMA: 200, TTL: time.Now().Add(time.Hour)}, + }) + store.Merge([]cache.RouteEntry{ + {StorePath: "pkg-a", UpstreamURL: "https://fast.example.com", LatencyEMA: 10, TTL: time.Now().Add(time.Hour)}, + }) + + entry := store.Get("pkg-a") + if entry == nil { + t.Fatal("entry is nil") + } + if entry.UpstreamURL != "https://fast.example.com" { + t.Errorf("expected fast upstream, got %q", entry.UpstreamURL) + } +} + +func TestMergeNewerTimestampWinsOnTie(t *testing.T) { + store := mesh.NewRouteStore() + now := time.Now() + store.Merge([]cache.RouteEntry{ + {StorePath: "pkg-b", UpstreamURL: "https://a.example.com", LatencyEMA: 50, LastVerified: now.Add(-time.Minute), TTL: time.Now().Add(time.Hour)}, + }) + store.Merge([]cache.RouteEntry{ + {StorePath: "pkg-b", UpstreamURL: "https://b.example.com", LatencyEMA: 50, LastVerified: now, TTL: time.Now().Add(time.Hour)}, + }) + + entry := store.Get("pkg-b") + if entry.UpstreamURL != "https://b.example.com" { + t.Errorf("expected newer upstream, got %q", entry.UpstreamURL) + } +}