diff --git a/internal/aggregate/metrics.go b/internal/aggregate/metrics.go index 3742186..11b8e83 100644 --- a/internal/aggregate/metrics.go +++ b/internal/aggregate/metrics.go @@ -1,23 +1,33 @@ package aggregate import ( + "context" + "regexp" + "time" + "github.com/prometheus/client_golang/prometheus" "notashelf.dev/watchdog/internal/config" ) +var prometheusLabelPattern = regexp.MustCompile(`^[a-zA-Z0-9_/:-]*$`) + // Records analytics events as Prometheus metrics type MetricsAggregator struct { - registry *PathRegistry - cfg config.Config - pageviews *prometheus.CounterVec - customEvents *prometheus.CounterVec - pathOverflow prometheus.Counter - dailyUniques prometheus.Gauge - estimator *UniquesEstimator + pathRegistry *PathRegistry + eventRegistry *CustomEventRegistry + cfg config.Config + pageviews *prometheus.CounterVec + customEvents *prometheus.CounterVec + pathOverflow prometheus.Counter + refOverflow prometheus.Counter + eventOverflow prometheus.Counter + dailyUniques prometheus.Gauge + estimator *UniquesEstimator + stopChan chan struct{} } // Creates a new metrics aggregator with dynamic labels based on config -func NewMetricsAggregator(registry *PathRegistry, cfg config.Config) *MetricsAggregator { +func NewMetricsAggregator(pathRegistry *PathRegistry, eventRegistry *CustomEventRegistry, cfg config.Config) *MetricsAggregator { // Build label names based on what's enabled in config labels := []string{"path"} // path is always included @@ -56,6 +66,20 @@ func NewMetricsAggregator(registry *PathRegistry, cfg config.Config) *MetricsAgg }, ) + refOverflow := prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "web_referrer_overflow_total", + Help: "Referrers rejected due to cardinality limit", + }, + ) + + eventOverflow := prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "web_event_overflow_total", + Help: "Custom events rejected due to cardinality limit", + }, + ) + dailyUniques := prometheus.NewGauge( prometheus.GaugeOpts{ Name: "web_daily_unique_visitors", @@ -63,40 +87,92 @@ func NewMetricsAggregator(registry *PathRegistry, cfg config.Config) *MetricsAgg }, ) - return &MetricsAggregator{ - registry: registry, - cfg: cfg, - pageviews: pageviews, - customEvents: customEvents, - pathOverflow: pathOverflow, - dailyUniques: dailyUniques, - estimator: NewUniquesEstimator(), + m := &MetricsAggregator{ + pathRegistry: pathRegistry, + eventRegistry: eventRegistry, + cfg: cfg, + pageviews: pageviews, + customEvents: customEvents, + pathOverflow: pathOverflow, + refOverflow: refOverflow, + eventOverflow: eventOverflow, + dailyUniques: dailyUniques, + estimator: NewUniquesEstimator(), + stopChan: make(chan struct{}), } + + // Start background goroutine to update HLL gauge periodically + if cfg.Site.SaltRotation != "" { + go m.updateUniquesGauge() + } + + return m +} + +// Background goroutine to update the unique visitors gauge every 10 seconds +// instead of on every request. This should help with performance. +func (m *MetricsAggregator) updateUniquesGauge() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + estimate := m.estimator.Estimate() + m.dailyUniques.Set(float64(estimate)) + case <-m.stopChan: + return + } + } +} + +// Stop gracefully shuts down the background goroutine +func (m *MetricsAggregator) Stop() { + close(m.stopChan) +} + +// sanitizeLabel ensures label values are valid for Prometheus +func sanitizeLabel(label string) string { + if !prometheusLabelPattern.MatchString(label) { + return "other" + } + if len(label) > 100 { + return "other" + } + return label } // Records a pageview with the configured dimensions func (m *MetricsAggregator) RecordPageview(path, country, device, referrer string) { // Build label values in the same order as label names - labels := prometheus.Labels{"path": path} + labels := prometheus.Labels{"path": sanitizeLabel(path)} if m.cfg.Site.Collect.Country { - labels["country"] = country + labels["country"] = sanitizeLabel(country) } if m.cfg.Site.Collect.Device { - labels["device"] = device + labels["device"] = sanitizeLabel(device) } if m.cfg.Site.Collect.Referrer != "off" { - labels["referrer"] = referrer + labels["referrer"] = sanitizeLabel(referrer) } m.pageviews.With(labels).Inc() } -// Records a custom event +// Records a custom event with cardinality protection func (m *MetricsAggregator) RecordCustomEvent(eventName string) { - m.customEvents.With(prometheus.Labels{"event": eventName}).Inc() + // Use registry to enforce cardinality limit + sanitized := sanitizeLabel(eventName) + accepted := m.eventRegistry.Add(sanitized) + + if accepted == "other" { + m.eventOverflow.Inc() + } + + m.customEvents.With(prometheus.Labels{"event": accepted}).Inc() } // Records a path that was rejected due to cardinality limits @@ -104,14 +180,20 @@ func (m *MetricsAggregator) RecordPathOverflow() { m.pathOverflow.Inc() } -// Adds a visitor to the unique visitor estimator. Only tracks if salt_rotation is configured +// Records a referrer that was rejected due to cardinality limits +func (m *MetricsAggregator) RecordReferrerOverflow() { + m.refOverflow.Inc() +} + +// Adds a visitor to the unique visitor estimator. +// Only tracks if salt_rotation is configured func (m *MetricsAggregator) AddUnique(ip, userAgent string) { if m.cfg.Site.SaltRotation == "" { return // only track if salt rotation is enabled } m.estimator.Add(ip, userAgent) - m.dailyUniques.Set(float64(m.estimator.Estimate())) + // Note: Gauge is updated in background goroutine, not here } // Registers all metrics with the provided Prometheus registry @@ -119,5 +201,17 @@ func (m *MetricsAggregator) MustRegister(reg prometheus.Registerer) { reg.MustRegister(m.pageviews) reg.MustRegister(m.customEvents) reg.MustRegister(m.pathOverflow) + reg.MustRegister(m.refOverflow) + reg.MustRegister(m.eventOverflow) reg.MustRegister(m.dailyUniques) } + +// Shutdown performs graceful shutdown operations +func (m *MetricsAggregator) Shutdown(ctx context.Context) error { + m.Stop() + // Persist HLL state if configured + if m.cfg.Site.SaltRotation != "" { + return m.estimator.Save("/tmp/watchdog-hll.state") + } + return nil +} diff --git a/internal/aggregate/uniques.go b/internal/aggregate/uniques.go index b1049e0..74ec2da 100644 --- a/internal/aggregate/uniques.go +++ b/internal/aggregate/uniques.go @@ -1,8 +1,11 @@ package aggregate import ( + "bytes" "crypto/sha256" "encoding/hex" + "fmt" + "os" "sync" "time" @@ -77,3 +80,48 @@ func (u *UniquesEstimator) CurrentSalt() string { func DailySalt(t time.Time) string { return dailySalt(t) } + +// Save persists the HLL state to disk +func (u *UniquesEstimator) Save(path string) error { + u.mu.Lock() + defer u.mu.Unlock() + + data, err := u.hll.MarshalBinary() + if err != nil { + return err + } + + // Save both HLL data and current day salt + return os.WriteFile(path, append([]byte(u.currentDay+"\n"), data...), 0600) +} + +// Load restores the HLL state from disk +func (u *UniquesEstimator) Load(path string) error { + data, err := os.ReadFile(path) + if err != nil { + return err // File not existing is OK (first run) + } + + u.mu.Lock() + defer u.mu.Unlock() + + // Parse saved salt and HLL data + parts := bytes.SplitN(data, []byte("\n"), 2) + if len(parts) != 2 { + return fmt.Errorf("invalid state file format") + } + + savedSalt := string(parts[0]) + today := dailySalt(time.Now()) + + // Only restore if it's the same day + if savedSalt == today { + u.currentDay = savedSalt + return u.hll.UnmarshalBinary(parts[1]) + } + + // Different day - start fresh + u.hll = hyperloglog.New() + u.currentDay = today + return nil +}