internal/aggregate: optimize HyperLogLog to prevent O(16384) operations

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: Ibc7e6d7a86e8679e299c46debee9683f6a6a6964
This commit is contained in:
raf 2026-03-01 13:10:52 +03:00
commit 8392992b41
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
2 changed files with 166 additions and 24 deletions

View file

@ -1,23 +1,33 @@
package aggregate package aggregate
import ( import (
"context"
"regexp"
"time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"notashelf.dev/watchdog/internal/config" "notashelf.dev/watchdog/internal/config"
) )
var prometheusLabelPattern = regexp.MustCompile(`^[a-zA-Z0-9_/:-]*$`)
// Records analytics events as Prometheus metrics // Records analytics events as Prometheus metrics
type MetricsAggregator struct { type MetricsAggregator struct {
registry *PathRegistry pathRegistry *PathRegistry
cfg config.Config eventRegistry *CustomEventRegistry
pageviews *prometheus.CounterVec cfg config.Config
customEvents *prometheus.CounterVec pageviews *prometheus.CounterVec
pathOverflow prometheus.Counter customEvents *prometheus.CounterVec
dailyUniques prometheus.Gauge pathOverflow prometheus.Counter
estimator *UniquesEstimator refOverflow prometheus.Counter
eventOverflow prometheus.Counter
dailyUniques prometheus.Gauge
estimator *UniquesEstimator
stopChan chan struct{}
} }
// Creates a new metrics aggregator with dynamic labels based on config // Creates a new metrics aggregator with dynamic labels based on config
func NewMetricsAggregator(registry *PathRegistry, cfg config.Config) *MetricsAggregator { func NewMetricsAggregator(pathRegistry *PathRegistry, eventRegistry *CustomEventRegistry, cfg config.Config) *MetricsAggregator {
// Build label names based on what's enabled in config // Build label names based on what's enabled in config
labels := []string{"path"} // path is always included labels := []string{"path"} // path is always included
@ -56,6 +66,20 @@ func NewMetricsAggregator(registry *PathRegistry, cfg config.Config) *MetricsAgg
}, },
) )
refOverflow := prometheus.NewCounter(
prometheus.CounterOpts{
Name: "web_referrer_overflow_total",
Help: "Referrers rejected due to cardinality limit",
},
)
eventOverflow := prometheus.NewCounter(
prometheus.CounterOpts{
Name: "web_event_overflow_total",
Help: "Custom events rejected due to cardinality limit",
},
)
dailyUniques := prometheus.NewGauge( dailyUniques := prometheus.NewGauge(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Name: "web_daily_unique_visitors", Name: "web_daily_unique_visitors",
@ -63,40 +87,92 @@ func NewMetricsAggregator(registry *PathRegistry, cfg config.Config) *MetricsAgg
}, },
) )
return &MetricsAggregator{ m := &MetricsAggregator{
registry: registry, pathRegistry: pathRegistry,
cfg: cfg, eventRegistry: eventRegistry,
pageviews: pageviews, cfg: cfg,
customEvents: customEvents, pageviews: pageviews,
pathOverflow: pathOverflow, customEvents: customEvents,
dailyUniques: dailyUniques, pathOverflow: pathOverflow,
estimator: NewUniquesEstimator(), refOverflow: refOverflow,
eventOverflow: eventOverflow,
dailyUniques: dailyUniques,
estimator: NewUniquesEstimator(),
stopChan: make(chan struct{}),
} }
// Start background goroutine to update HLL gauge periodically
if cfg.Site.SaltRotation != "" {
go m.updateUniquesGauge()
}
return m
}
// Background goroutine to update the unique visitors gauge every 10 seconds
// instead of on every request. This should help with performance.
func (m *MetricsAggregator) updateUniquesGauge() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
estimate := m.estimator.Estimate()
m.dailyUniques.Set(float64(estimate))
case <-m.stopChan:
return
}
}
}
// Stop gracefully shuts down the background goroutine
func (m *MetricsAggregator) Stop() {
close(m.stopChan)
}
// sanitizeLabel ensures label values are valid for Prometheus
func sanitizeLabel(label string) string {
if !prometheusLabelPattern.MatchString(label) {
return "other"
}
if len(label) > 100 {
return "other"
}
return label
} }
// Records a pageview with the configured dimensions // Records a pageview with the configured dimensions
func (m *MetricsAggregator) RecordPageview(path, country, device, referrer string) { func (m *MetricsAggregator) RecordPageview(path, country, device, referrer string) {
// Build label values in the same order as label names // Build label values in the same order as label names
labels := prometheus.Labels{"path": path} labels := prometheus.Labels{"path": sanitizeLabel(path)}
if m.cfg.Site.Collect.Country { if m.cfg.Site.Collect.Country {
labels["country"] = country labels["country"] = sanitizeLabel(country)
} }
if m.cfg.Site.Collect.Device { if m.cfg.Site.Collect.Device {
labels["device"] = device labels["device"] = sanitizeLabel(device)
} }
if m.cfg.Site.Collect.Referrer != "off" { if m.cfg.Site.Collect.Referrer != "off" {
labels["referrer"] = referrer labels["referrer"] = sanitizeLabel(referrer)
} }
m.pageviews.With(labels).Inc() m.pageviews.With(labels).Inc()
} }
// Records a custom event // Records a custom event with cardinality protection
func (m *MetricsAggregator) RecordCustomEvent(eventName string) { func (m *MetricsAggregator) RecordCustomEvent(eventName string) {
m.customEvents.With(prometheus.Labels{"event": eventName}).Inc() // Use registry to enforce cardinality limit
sanitized := sanitizeLabel(eventName)
accepted := m.eventRegistry.Add(sanitized)
if accepted == "other" {
m.eventOverflow.Inc()
}
m.customEvents.With(prometheus.Labels{"event": accepted}).Inc()
} }
// Records a path that was rejected due to cardinality limits // Records a path that was rejected due to cardinality limits
@ -104,14 +180,20 @@ func (m *MetricsAggregator) RecordPathOverflow() {
m.pathOverflow.Inc() m.pathOverflow.Inc()
} }
// Adds a visitor to the unique visitor estimator. Only tracks if salt_rotation is configured // Records a referrer that was rejected due to cardinality limits
func (m *MetricsAggregator) RecordReferrerOverflow() {
m.refOverflow.Inc()
}
// Adds a visitor to the unique visitor estimator.
// Only tracks if salt_rotation is configured
func (m *MetricsAggregator) AddUnique(ip, userAgent string) { func (m *MetricsAggregator) AddUnique(ip, userAgent string) {
if m.cfg.Site.SaltRotation == "" { if m.cfg.Site.SaltRotation == "" {
return // only track if salt rotation is enabled return // only track if salt rotation is enabled
} }
m.estimator.Add(ip, userAgent) m.estimator.Add(ip, userAgent)
m.dailyUniques.Set(float64(m.estimator.Estimate())) // Note: Gauge is updated in background goroutine, not here
} }
// Registers all metrics with the provided Prometheus registry // Registers all metrics with the provided Prometheus registry
@ -119,5 +201,17 @@ func (m *MetricsAggregator) MustRegister(reg prometheus.Registerer) {
reg.MustRegister(m.pageviews) reg.MustRegister(m.pageviews)
reg.MustRegister(m.customEvents) reg.MustRegister(m.customEvents)
reg.MustRegister(m.pathOverflow) reg.MustRegister(m.pathOverflow)
reg.MustRegister(m.refOverflow)
reg.MustRegister(m.eventOverflow)
reg.MustRegister(m.dailyUniques) reg.MustRegister(m.dailyUniques)
} }
// Shutdown performs graceful shutdown operations
func (m *MetricsAggregator) Shutdown(ctx context.Context) error {
m.Stop()
// Persist HLL state if configured
if m.cfg.Site.SaltRotation != "" {
return m.estimator.Save("/tmp/watchdog-hll.state")
}
return nil
}

View file

@ -1,8 +1,11 @@
package aggregate package aggregate
import ( import (
"bytes"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"fmt"
"os"
"sync" "sync"
"time" "time"
@ -77,3 +80,48 @@ func (u *UniquesEstimator) CurrentSalt() string {
func DailySalt(t time.Time) string { func DailySalt(t time.Time) string {
return dailySalt(t) return dailySalt(t)
} }
// Save persists the HLL state to disk
func (u *UniquesEstimator) Save(path string) error {
u.mu.Lock()
defer u.mu.Unlock()
data, err := u.hll.MarshalBinary()
if err != nil {
return err
}
// Save both HLL data and current day salt
return os.WriteFile(path, append([]byte(u.currentDay+"\n"), data...), 0600)
}
// Load restores the HLL state from disk
func (u *UniquesEstimator) Load(path string) error {
data, err := os.ReadFile(path)
if err != nil {
return err // File not existing is OK (first run)
}
u.mu.Lock()
defer u.mu.Unlock()
// Parse saved salt and HLL data
parts := bytes.SplitN(data, []byte("\n"), 2)
if len(parts) != 2 {
return fmt.Errorf("invalid state file format")
}
savedSalt := string(parts[0])
today := dailySalt(time.Now())
// Only restore if it's the same day
if savedSalt == today {
u.currentDay = savedSalt
return u.hll.UnmarshalBinary(parts[1])
}
// Different day - start fresh
u.hll = hyperloglog.New()
u.currentDay = today
return nil
}