internal/api: better multi-sites support; validate events against allowed domains
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Iff1ced4966b4d42cfd6dfefb0cfd97696a6a6964
This commit is contained in:
parent
16ace569a0
commit
18fe1a8234
10 changed files with 542 additions and 35 deletions
|
|
@ -28,7 +28,7 @@ func Run(configPath string) error {
|
||||||
return fmt.Errorf("failed to load config: %w", err)
|
return fmt.Errorf("failed to load config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Loaded config for domain: %s", cfg.Site.Domain)
|
log.Printf("Loaded config for domains: %v", cfg.Site.Domains)
|
||||||
|
|
||||||
// Initialize components
|
// Initialize components
|
||||||
pathNormalizer := normalize.NewPathNormalizer(cfg.Site.Path)
|
pathNormalizer := normalize.NewPathNormalizer(cfg.Site.Path)
|
||||||
|
|
|
||||||
|
|
@ -43,6 +43,10 @@ func NewMetricsAggregator(pathRegistry *PathRegistry, eventRegistry *CustomEvent
|
||||||
labels = append(labels, "referrer")
|
labels = append(labels, "referrer")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cfg.Site.Collect.Domain {
|
||||||
|
labels = append(labels, "domain")
|
||||||
|
}
|
||||||
|
|
||||||
pageviews := prometheus.NewCounterVec(
|
pageviews := prometheus.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Name: "web_pageviews_total",
|
Name: "web_pageviews_total",
|
||||||
|
|
@ -143,7 +147,7 @@ func sanitizeLabel(label string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Records a pageview with the configured dimensions
|
// Records a pageview with the configured dimensions
|
||||||
func (m *MetricsAggregator) RecordPageview(path, country, device, referrer string) {
|
func (m *MetricsAggregator) RecordPageview(path, country, device, referrer, domain string) {
|
||||||
// Build label values in the same order as label names
|
// Build label values in the same order as label names
|
||||||
labels := prometheus.Labels{"path": sanitizeLabel(path)}
|
labels := prometheus.Labels{"path": sanitizeLabel(path)}
|
||||||
|
|
||||||
|
|
@ -159,6 +163,10 @@ func (m *MetricsAggregator) RecordPageview(path, country, device, referrer strin
|
||||||
labels["referrer"] = sanitizeLabel(referrer)
|
labels["referrer"] = sanitizeLabel(referrer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if m.cfg.Site.Collect.Domain {
|
||||||
|
labels["domain"] = sanitizeLabel(domain)
|
||||||
|
}
|
||||||
|
|
||||||
m.pageviews.With(labels).Inc()
|
m.pageviews.With(labels).Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -193,7 +201,7 @@ func (m *MetricsAggregator) AddUnique(ip, userAgent string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
m.estimator.Add(ip, userAgent)
|
m.estimator.Add(ip, userAgent)
|
||||||
// Note: Gauge is updated in background goroutine, not here
|
// NOTE: Gauge is updated in background goroutine, not here
|
||||||
}
|
}
|
||||||
|
|
||||||
// Registers all metrics with the provided Prometheus registry
|
// Registers all metrics with the provided Prometheus registry
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ func TestMetricsAggregator_RecordPageview(t *testing.T) {
|
||||||
agg := NewMetricsAggregator(registry, NewCustomEventRegistry(100), cfg)
|
agg := NewMetricsAggregator(registry, NewCustomEventRegistry(100), cfg)
|
||||||
|
|
||||||
// Record pageview with all dimensions
|
// Record pageview with all dimensions
|
||||||
agg.RecordPageview("/home", "US", "desktop", "google.com")
|
agg.RecordPageview("/home", "US", "desktop", "google.com", "")
|
||||||
|
|
||||||
// Verify metric was recorded
|
// Verify metric was recorded
|
||||||
expected := `
|
expected := `
|
||||||
|
|
@ -54,7 +54,7 @@ func TestMetricsAggregator_RecordPageview_MinimalDimensions(t *testing.T) {
|
||||||
agg := NewMetricsAggregator(registry, NewCustomEventRegistry(100), cfg)
|
agg := NewMetricsAggregator(registry, NewCustomEventRegistry(100), cfg)
|
||||||
|
|
||||||
// Record pageview with only path
|
// Record pageview with only path
|
||||||
agg.RecordPageview("/home", "", "", "")
|
agg.RecordPageview("/home", "", "", "", "")
|
||||||
|
|
||||||
// Verify metric was recorded
|
// Verify metric was recorded
|
||||||
expected := `
|
expected := `
|
||||||
|
|
@ -176,7 +176,7 @@ func TestMetricsAggregator_MustRegister(t *testing.T) {
|
||||||
agg.MustRegister(promRegistry)
|
agg.MustRegister(promRegistry)
|
||||||
|
|
||||||
// Record some metrics to ensure they show up
|
// Record some metrics to ensure they show up
|
||||||
agg.RecordPageview("/test", "", "", "")
|
agg.RecordPageview("/test", "", "", "", "")
|
||||||
agg.RecordPathOverflow()
|
agg.RecordPathOverflow()
|
||||||
|
|
||||||
// Verify metrics can be gathered
|
// Verify metrics can be gathered
|
||||||
|
|
|
||||||
|
|
@ -39,14 +39,22 @@ func ParseEvent(body io.Reader) (*Event, error) {
|
||||||
return &event, nil
|
return &event, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate checks if the event is valid for the given domain
|
// Validate checks if the event is valid for the given domains
|
||||||
func (e *Event) Validate(expectedDomain string) error {
|
func (e *Event) Validate(allowedDomains []string) error {
|
||||||
if e.Domain == "" {
|
if e.Domain == "" {
|
||||||
return fmt.Errorf("domain required")
|
return fmt.Errorf("domain required")
|
||||||
}
|
}
|
||||||
|
|
||||||
if e.Domain != expectedDomain {
|
// Check if domain is in allowed list
|
||||||
return fmt.Errorf("domain mismatch")
|
allowed := false
|
||||||
|
for _, domain := range allowedDomains {
|
||||||
|
if e.Domain == domain {
|
||||||
|
allowed = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !allowed {
|
||||||
|
return fmt.Errorf("domain not allowed")
|
||||||
}
|
}
|
||||||
|
|
||||||
if e.Path == "" {
|
if e.Path == "" {
|
||||||
|
|
|
||||||
|
|
@ -115,7 +115,7 @@ func TestValidateEvent(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
event Event
|
event Event
|
||||||
domain string
|
domains []string
|
||||||
wantErr bool
|
wantErr bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
|
@ -124,7 +124,7 @@ func TestValidateEvent(t *testing.T) {
|
||||||
Domain: "example.com",
|
Domain: "example.com",
|
||||||
Path: "/home",
|
Path: "/home",
|
||||||
},
|
},
|
||||||
domain: "example.com",
|
domains: []string{"example.com"},
|
||||||
wantErr: false,
|
wantErr: false,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
@ -134,7 +134,7 @@ func TestValidateEvent(t *testing.T) {
|
||||||
Path: "/signup",
|
Path: "/signup",
|
||||||
Event: "signup",
|
Event: "signup",
|
||||||
},
|
},
|
||||||
domain: "example.com",
|
domains: []string{"example.com"},
|
||||||
wantErr: false,
|
wantErr: false,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
@ -143,7 +143,7 @@ func TestValidateEvent(t *testing.T) {
|
||||||
Domain: "wrong.com",
|
Domain: "wrong.com",
|
||||||
Path: "/home",
|
Path: "/home",
|
||||||
},
|
},
|
||||||
domain: "example.com",
|
domains: []string{"example.com"},
|
||||||
wantErr: true,
|
wantErr: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
@ -152,7 +152,7 @@ func TestValidateEvent(t *testing.T) {
|
||||||
Domain: "",
|
Domain: "",
|
||||||
Path: "/home",
|
Path: "/home",
|
||||||
},
|
},
|
||||||
domain: "example.com",
|
domains: []string{"example.com"},
|
||||||
wantErr: true,
|
wantErr: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
@ -161,7 +161,7 @@ func TestValidateEvent(t *testing.T) {
|
||||||
Domain: "example.com",
|
Domain: "example.com",
|
||||||
Path: "",
|
Path: "",
|
||||||
},
|
},
|
||||||
domain: "example.com",
|
domains: []string{"example.com"},
|
||||||
wantErr: true,
|
wantErr: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
@ -170,7 +170,7 @@ func TestValidateEvent(t *testing.T) {
|
||||||
Domain: "example.com",
|
Domain: "example.com",
|
||||||
Path: "/" + strings.Repeat("a", 3000),
|
Path: "/" + strings.Repeat("a", 3000),
|
||||||
},
|
},
|
||||||
domain: "example.com",
|
domains: []string{"example.com"},
|
||||||
wantErr: true,
|
wantErr: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
@ -180,7 +180,7 @@ func TestValidateEvent(t *testing.T) {
|
||||||
Path: "/home",
|
Path: "/home",
|
||||||
Referrer: strings.Repeat("a", 3000),
|
Referrer: strings.Repeat("a", 3000),
|
||||||
},
|
},
|
||||||
domain: "example.com",
|
domains: []string{"example.com"},
|
||||||
wantErr: true,
|
wantErr: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
@ -189,14 +189,41 @@ func TestValidateEvent(t *testing.T) {
|
||||||
Domain: "example.com",
|
Domain: "example.com",
|
||||||
Path: "/" + strings.Repeat("a", 2000),
|
Path: "/" + strings.Repeat("a", 2000),
|
||||||
},
|
},
|
||||||
domain: "example.com",
|
domains: []string{"example.com"},
|
||||||
wantErr: false,
|
wantErr: false,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "multi-site valid domain 1",
|
||||||
|
event: Event{
|
||||||
|
Domain: "site1.com",
|
||||||
|
Path: "/home",
|
||||||
|
},
|
||||||
|
domains: []string{"site1.com", "site2.com"},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multi-site valid domain 2",
|
||||||
|
event: Event{
|
||||||
|
Domain: "site2.com",
|
||||||
|
Path: "/about",
|
||||||
|
},
|
||||||
|
domains: []string{"site1.com", "site2.com"},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multi-site invalid domain",
|
||||||
|
event: Event{
|
||||||
|
Domain: "site3.com",
|
||||||
|
Path: "/home",
|
||||||
|
},
|
||||||
|
domains: []string{"site1.com", "site2.com"},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
err := tt.event.Validate(tt.domain)
|
err := tt.event.Validate(tt.domains)
|
||||||
if (err != nil) != tt.wantErr {
|
if (err != nil) != tt.wantErr {
|
||||||
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
|
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -96,7 +96,7 @@ func (h *IngestionHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate event
|
// Validate event
|
||||||
if err := event.Validate(h.cfg.Site.Domain); err != nil {
|
if err := event.Validate(h.cfg.Site.Domains); err != nil {
|
||||||
http.Error(w, "Bad request", http.StatusBadRequest)
|
http.Error(w, "Bad request", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -134,7 +134,7 @@ func (h *IngestionHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
// Referrer classification
|
// Referrer classification
|
||||||
if h.cfg.Site.Collect.Referrer == "domain" {
|
if h.cfg.Site.Collect.Referrer == "domain" {
|
||||||
refDomain := normalize.ExtractReferrerDomain(event.Referrer, h.cfg.Site.Domain)
|
refDomain := normalize.ExtractReferrerDomain(event.Referrer, event.Domain)
|
||||||
if refDomain != "" {
|
if refDomain != "" {
|
||||||
accepted := h.refRegistry.Add(refDomain)
|
accepted := h.refRegistry.Add(refDomain)
|
||||||
if accepted == "other" {
|
if accepted == "other" {
|
||||||
|
|
@ -144,11 +144,17 @@ func (h *IngestionHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Domain tracking (if enabled for multi-site analytics)
|
||||||
|
var domain string
|
||||||
|
if h.cfg.Site.Collect.Domain {
|
||||||
|
domain = event.Domain
|
||||||
|
}
|
||||||
|
|
||||||
// FIXME: Country would be extracted from IP here. For now, we skip country extraction
|
// FIXME: Country would be extracted from IP here. For now, we skip country extraction
|
||||||
// because I have neither the time nor the patience to look into it. Return later.
|
// because I have neither the time nor the patience to look into it. Return later.
|
||||||
|
|
||||||
// Record pageview
|
// Record pageview
|
||||||
h.metricsAgg.RecordPageview(normalizedPath, country, device, referrer)
|
h.metricsAgg.RecordPageview(normalizedPath, country, device, referrer, domain)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return success
|
// Return success
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ import (
|
||||||
func TestIngestionHandler_Pageview(t *testing.T) {
|
func TestIngestionHandler_Pageview(t *testing.T) {
|
||||||
cfg := config.Config{
|
cfg := config.Config{
|
||||||
Site: config.SiteConfig{
|
Site: config.SiteConfig{
|
||||||
Domain: "example.com",
|
Domains: []string{"example.com"},
|
||||||
Collect: config.CollectConfig{
|
Collect: config.CollectConfig{
|
||||||
Pageviews: true,
|
Pageviews: true,
|
||||||
Country: true,
|
Country: true,
|
||||||
|
|
@ -62,7 +62,7 @@ func TestIngestionHandler_Pageview(t *testing.T) {
|
||||||
func TestIngestionHandler_CustomEvent(t *testing.T) {
|
func TestIngestionHandler_CustomEvent(t *testing.T) {
|
||||||
cfg := config.Config{
|
cfg := config.Config{
|
||||||
Site: config.SiteConfig{
|
Site: config.SiteConfig{
|
||||||
Domain: "example.com",
|
Domains: []string{"example.com"},
|
||||||
Collect: config.CollectConfig{
|
Collect: config.CollectConfig{
|
||||||
Pageviews: true,
|
Pageviews: true,
|
||||||
},
|
},
|
||||||
|
|
@ -99,7 +99,7 @@ func TestIngestionHandler_CustomEvent(t *testing.T) {
|
||||||
func TestIngestionHandler_WrongDomain(t *testing.T) {
|
func TestIngestionHandler_WrongDomain(t *testing.T) {
|
||||||
cfg := config.Config{
|
cfg := config.Config{
|
||||||
Site: config.SiteConfig{
|
Site: config.SiteConfig{
|
||||||
Domain: "example.com",
|
Domains: []string{"example.com"},
|
||||||
Collect: config.CollectConfig{
|
Collect: config.CollectConfig{
|
||||||
Pageviews: true,
|
Pageviews: true,
|
||||||
},
|
},
|
||||||
|
|
@ -133,7 +133,7 @@ func TestIngestionHandler_WrongDomain(t *testing.T) {
|
||||||
func TestIngestionHandler_MethodNotAllowed(t *testing.T) {
|
func TestIngestionHandler_MethodNotAllowed(t *testing.T) {
|
||||||
cfg := config.Config{
|
cfg := config.Config{
|
||||||
Site: config.SiteConfig{
|
Site: config.SiteConfig{
|
||||||
Domain: "example.com",
|
Domains: []string{"example.com"},
|
||||||
},
|
},
|
||||||
Limits: config.LimitsConfig{
|
Limits: config.LimitsConfig{
|
||||||
MaxPaths: 100,
|
MaxPaths: 100,
|
||||||
|
|
@ -160,7 +160,7 @@ func TestIngestionHandler_MethodNotAllowed(t *testing.T) {
|
||||||
func TestIngestionHandler_InvalidJSON(t *testing.T) {
|
func TestIngestionHandler_InvalidJSON(t *testing.T) {
|
||||||
cfg := config.Config{
|
cfg := config.Config{
|
||||||
Site: config.SiteConfig{
|
Site: config.SiteConfig{
|
||||||
Domain: "example.com",
|
Domains: []string{"example.com"},
|
||||||
},
|
},
|
||||||
Limits: config.LimitsConfig{
|
Limits: config.LimitsConfig{
|
||||||
MaxPaths: 100,
|
MaxPaths: 100,
|
||||||
|
|
@ -190,7 +190,7 @@ func TestIngestionHandler_InvalidJSON(t *testing.T) {
|
||||||
func TestIngestionHandler_DeviceClassification(t *testing.T) {
|
func TestIngestionHandler_DeviceClassification(t *testing.T) {
|
||||||
cfg := config.Config{
|
cfg := config.Config{
|
||||||
Site: config.SiteConfig{
|
Site: config.SiteConfig{
|
||||||
Domain: "example.com",
|
Domains: []string{"example.com"},
|
||||||
Collect: config.CollectConfig{
|
Collect: config.CollectConfig{
|
||||||
Pageviews: true,
|
Pageviews: true,
|
||||||
Device: true,
|
Device: true,
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ type Config struct {
|
||||||
|
|
||||||
// Site-specific settings
|
// Site-specific settings
|
||||||
type SiteConfig struct {
|
type SiteConfig struct {
|
||||||
Domain string `yaml:"domain"`
|
Domains []string `yaml:"domains"` // list of allowed domains
|
||||||
SaltRotation string `yaml:"salt_rotation"`
|
SaltRotation string `yaml:"salt_rotation"`
|
||||||
Collect CollectConfig `yaml:"collect"`
|
Collect CollectConfig `yaml:"collect"`
|
||||||
CustomEvents []string `yaml:"custom_events"`
|
CustomEvents []string `yaml:"custom_events"`
|
||||||
|
|
@ -31,6 +31,7 @@ type CollectConfig struct {
|
||||||
Country bool `yaml:"country"`
|
Country bool `yaml:"country"`
|
||||||
Device bool `yaml:"device"`
|
Device bool `yaml:"device"`
|
||||||
Referrer string `yaml:"referrer"`
|
Referrer string `yaml:"referrer"`
|
||||||
|
Domain bool `yaml:"domain"` // track domain as metric dimension (for multi-site)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Path normalization options
|
// Path normalization options
|
||||||
|
|
@ -106,8 +107,8 @@ func Load(path string) (*Config, error) {
|
||||||
// Check required fields and sets defaults
|
// Check required fields and sets defaults
|
||||||
func (c *Config) Validate() error {
|
func (c *Config) Validate() error {
|
||||||
// Site validation
|
// Site validation
|
||||||
if c.Site.Domain == "" {
|
if len(c.Site.Domains) == 0 {
|
||||||
return fmt.Errorf("site.domain is required")
|
return fmt.Errorf("site.domains is required")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate salt_rotation
|
// Validate salt_rotation
|
||||||
|
|
|
||||||
|
|
@ -10,8 +10,8 @@ func TestLoadConfig_ValidFile(t *testing.T) {
|
||||||
t.Fatalf("expected no error, got %v", err)
|
t.Fatalf("expected no error, got %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Site.Domain != "example.com" {
|
if len(cfg.Site.Domains) == 0 || cfg.Site.Domains[0] != "example.com" {
|
||||||
t.Errorf("expected domain 'example.com', got '%s'", cfg.Site.Domain)
|
t.Errorf("expected domains to contain 'example.com', got %v", cfg.Site.Domains)
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Site.SaltRotation != "daily" {
|
if cfg.Site.SaltRotation != "daily" {
|
||||||
|
|
@ -29,7 +29,7 @@ func TestLoadConfig_MissingFile(t *testing.T) {
|
||||||
func TestValidate_MaxPathsRequired(t *testing.T) {
|
func TestValidate_MaxPathsRequired(t *testing.T) {
|
||||||
cfg := &Config{
|
cfg := &Config{
|
||||||
Site: SiteConfig{
|
Site: SiteConfig{
|
||||||
Domain: "example.com",
|
Domains: []string{"example.com"},
|
||||||
SaltRotation: "daily",
|
SaltRotation: "daily",
|
||||||
},
|
},
|
||||||
Limits: LimitsConfig{
|
Limits: LimitsConfig{
|
||||||
|
|
|
||||||
457
test/integration_test.go
Normal file
457
test/integration_test.go
Normal file
|
|
@ -0,0 +1,457 @@
|
||||||
|
//go:build integration
|
||||||
|
|
||||||
|
package test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
|
"notashelf.dev/watchdog/internal/aggregate"
|
||||||
|
"notashelf.dev/watchdog/internal/api"
|
||||||
|
"notashelf.dev/watchdog/internal/config"
|
||||||
|
"notashelf.dev/watchdog/internal/normalize"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEndToEnd_BasicFlow(t *testing.T) {
|
||||||
|
// Load config
|
||||||
|
cfg, err := config.Load("../config.example.yaml")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to load config: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Override domain for test
|
||||||
|
cfg.Site.Domains = []string{"test.example.com"}
|
||||||
|
cfg.Site.SaltRotation = "daily"
|
||||||
|
cfg.Limits.MaxPaths = 100
|
||||||
|
cfg.Limits.MaxSources = 50
|
||||||
|
cfg.Limits.MaxCustomEvents = 10
|
||||||
|
|
||||||
|
// Initialize components
|
||||||
|
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
||||||
|
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
||||||
|
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
||||||
|
eventRegistry := aggregate.NewCustomEventRegistry(cfg.Limits.MaxCustomEvents)
|
||||||
|
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, eventRegistry, *cfg)
|
||||||
|
|
||||||
|
// Register metrics
|
||||||
|
promRegistry := prometheus.NewRegistry()
|
||||||
|
metricsAgg.MustRegister(promRegistry)
|
||||||
|
|
||||||
|
// Create handlers
|
||||||
|
ingestionHandler := api.NewIngestionHandler(*cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
||||||
|
metricsHandler := promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{})
|
||||||
|
|
||||||
|
// Test pageview ingestion
|
||||||
|
t.Run("PageviewIngestion", func(t *testing.T) {
|
||||||
|
events := []string{
|
||||||
|
`{"d":"test.example.com","p":"/","r":"","w":1920}`,
|
||||||
|
`{"d":"test.example.com","p":"/about","r":"https://google.com","w":768}`,
|
||||||
|
`{"d":"test.example.com","p":"/blog/post-1","r":"https://twitter.com","w":1024}`,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, event := range events {
|
||||||
|
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(event))
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
|
||||||
|
ingestionHandler.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if w.Code != http.StatusNoContent {
|
||||||
|
t.Errorf("expected status 204, got %d: %s", w.Code, w.Body.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify paths were registered
|
||||||
|
if !pathRegistry.Contains("/") {
|
||||||
|
t.Error("expected / to be registered")
|
||||||
|
}
|
||||||
|
if !pathRegistry.Contains("/about") {
|
||||||
|
t.Error("expected /about to be registered")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test custom events
|
||||||
|
t.Run("CustomEventIngestion", func(t *testing.T) {
|
||||||
|
events := []string{
|
||||||
|
`{"d":"test.example.com","p":"/signup","e":"signup","w":1920}`,
|
||||||
|
`{"d":"test.example.com","p":"/checkout","e":"purchase","w":1920}`,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, event := range events {
|
||||||
|
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(event))
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
|
||||||
|
ingestionHandler.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if w.Code != http.StatusNoContent {
|
||||||
|
t.Errorf("expected status 204, got %d", w.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test metrics export
|
||||||
|
t.Run("MetricsExport", func(t *testing.T) {
|
||||||
|
req := httptest.NewRequest("GET", "/metrics", nil)
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
|
||||||
|
metricsHandler.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if w.Code != http.StatusOK {
|
||||||
|
t.Errorf("expected status 200, got %d", w.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
body := w.Body.String()
|
||||||
|
|
||||||
|
// Check for expected metrics
|
||||||
|
if !strings.Contains(body, "web_pageviews_total") {
|
||||||
|
t.Error("expected web_pageviews_total metric")
|
||||||
|
}
|
||||||
|
if !strings.Contains(body, "web_custom_events_total") {
|
||||||
|
t.Error("expected web_custom_events_total metric")
|
||||||
|
}
|
||||||
|
if !strings.Contains(body, "web_path_overflow_total") {
|
||||||
|
t.Error("expected web_path_overflow_total metric")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
metricsAgg.Shutdown(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEndToEnd_CardinalityLimits(t *testing.T) {
|
||||||
|
cfg := config.Config{
|
||||||
|
Site: config.SiteConfig{
|
||||||
|
Domains: []string{"test.example.com"},
|
||||||
|
Collect: config.CollectConfig{
|
||||||
|
Pageviews: true,
|
||||||
|
Referrer: "domain",
|
||||||
|
},
|
||||||
|
Path: config.PathConfig{
|
||||||
|
StripQuery: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Limits: config.LimitsConfig{
|
||||||
|
MaxPaths: 5,
|
||||||
|
MaxSources: 3,
|
||||||
|
MaxCustomEvents: 2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
||||||
|
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
||||||
|
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
||||||
|
eventRegistry := aggregate.NewCustomEventRegistry(cfg.Limits.MaxCustomEvents)
|
||||||
|
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, eventRegistry, cfg)
|
||||||
|
|
||||||
|
promRegistry := prometheus.NewRegistry()
|
||||||
|
metricsAgg.MustRegister(promRegistry)
|
||||||
|
|
||||||
|
handler := api.NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
||||||
|
|
||||||
|
// Send more paths than limit
|
||||||
|
t.Run("PathOverflow", func(t *testing.T) {
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
event := `{"d":"test.example.com","p":"/path-` + string(rune('0'+i)) + `","r":"","w":1920}`
|
||||||
|
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(event))
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(w, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify overflow counter increased
|
||||||
|
req := httptest.NewRequest("GET", "/metrics", nil)
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{}).ServeHTTP(w, req)
|
||||||
|
|
||||||
|
body := w.Body.String()
|
||||||
|
if !strings.Contains(body, "web_path_overflow_total") {
|
||||||
|
t.Error("expected overflow metric")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
metricsAgg.Shutdown(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEndToEnd_GracefulShutdown(t *testing.T) {
|
||||||
|
cfg := config.Config{
|
||||||
|
Site: config.SiteConfig{
|
||||||
|
Domains: []string{"test.example.com"},
|
||||||
|
SaltRotation: "daily",
|
||||||
|
},
|
||||||
|
Limits: config.LimitsConfig{
|
||||||
|
MaxPaths: 100,
|
||||||
|
MaxSources: 50,
|
||||||
|
MaxCustomEvents: 10,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
||||||
|
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
||||||
|
eventRegistry := aggregate.NewCustomEventRegistry(cfg.Limits.MaxCustomEvents)
|
||||||
|
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, eventRegistry, cfg)
|
||||||
|
|
||||||
|
// Send some events to populate HLL
|
||||||
|
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
||||||
|
handler := api.NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
event := `{"d":"test.example.com","p":"/","r":"","w":1920}`
|
||||||
|
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(event))
|
||||||
|
req.RemoteAddr = "192.168.1." + string(rune('0'+i)) + ":1234"
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(w, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown and save state
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := metricsAgg.Shutdown(ctx); err != nil {
|
||||||
|
t.Errorf("shutdown failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify state file was created
|
||||||
|
if _, err := os.Stat("/tmp/watchdog-hll.state"); os.IsNotExist(err) {
|
||||||
|
t.Error("HLL state file was not created")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
os.Remove("/tmp/watchdog-hll.state")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEndToEnd_InvalidRequests(t *testing.T) {
|
||||||
|
cfg := config.Config{
|
||||||
|
Site: config.SiteConfig{
|
||||||
|
Domains: []string{"test.example.com"},
|
||||||
|
},
|
||||||
|
Limits: config.LimitsConfig{
|
||||||
|
MaxPaths: 100,
|
||||||
|
MaxSources: 50,
|
||||||
|
MaxCustomEvents: 10,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
||||||
|
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
||||||
|
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
||||||
|
eventRegistry := aggregate.NewCustomEventRegistry(cfg.Limits.MaxCustomEvents)
|
||||||
|
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, eventRegistry, cfg)
|
||||||
|
|
||||||
|
handler := api.NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
method string
|
||||||
|
body string
|
||||||
|
wantStatus int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "GET request",
|
||||||
|
method: "GET",
|
||||||
|
body: "",
|
||||||
|
wantStatus: http.StatusMethodNotAllowed,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Invalid JSON",
|
||||||
|
method: "POST",
|
||||||
|
body: `{invalid}`,
|
||||||
|
wantStatus: http.StatusBadRequest,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Wrong domain",
|
||||||
|
method: "POST",
|
||||||
|
body: `{"d":"wrong.com","p":"/","r":"","w":1920}`,
|
||||||
|
wantStatus: http.StatusBadRequest,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Missing path",
|
||||||
|
method: "POST",
|
||||||
|
body: `{"d":"test.example.com","r":"","w":1920}`,
|
||||||
|
wantStatus: http.StatusBadRequest,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Invalid width",
|
||||||
|
method: "POST",
|
||||||
|
body: `{"d":"test.example.com","p":"/","r":"","w":99999}`,
|
||||||
|
wantStatus: http.StatusBadRequest,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
req := httptest.NewRequest(tt.method, "/api/event", bytes.NewBufferString(tt.body))
|
||||||
|
if tt.method == "POST" {
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
}
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
|
||||||
|
handler.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if w.Code != tt.wantStatus {
|
||||||
|
t.Errorf("expected status %d, got %d", tt.wantStatus, w.Code)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
metricsAgg.Shutdown(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEndToEnd_RateLimiting(t *testing.T) {
|
||||||
|
cfg := config.Config{
|
||||||
|
Site: config.SiteConfig{
|
||||||
|
Domains: []string{"test.example.com"},
|
||||||
|
},
|
||||||
|
Limits: config.LimitsConfig{
|
||||||
|
MaxPaths: 100,
|
||||||
|
MaxSources: 50,
|
||||||
|
MaxCustomEvents: 10,
|
||||||
|
MaxEventsPerMinute: 5, // very low limit for testing
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
||||||
|
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
||||||
|
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
||||||
|
eventRegistry := aggregate.NewCustomEventRegistry(cfg.Limits.MaxCustomEvents)
|
||||||
|
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, eventRegistry, cfg)
|
||||||
|
|
||||||
|
handler := api.NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
||||||
|
|
||||||
|
// Send requests until rate limited
|
||||||
|
rateLimited := false
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
event := `{"d":"test.example.com","p":"/","r":"","w":1920}`
|
||||||
|
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(event))
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if w.Code == http.StatusTooManyRequests {
|
||||||
|
rateLimited = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !rateLimited {
|
||||||
|
t.Error("expected rate limiting to trigger")
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
metricsAgg.Shutdown(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEndToEnd_CORS(t *testing.T) {
|
||||||
|
cfg := config.Config{
|
||||||
|
Site: config.SiteConfig{
|
||||||
|
Domains: []string{"test.example.com"},
|
||||||
|
},
|
||||||
|
Limits: config.LimitsConfig{
|
||||||
|
MaxPaths: 100,
|
||||||
|
MaxSources: 50,
|
||||||
|
MaxCustomEvents: 10,
|
||||||
|
},
|
||||||
|
Security: config.SecurityConfig{
|
||||||
|
CORS: config.CORSConfig{
|
||||||
|
Enabled: true,
|
||||||
|
AllowedOrigins: []string{"https://example.com"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
||||||
|
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
||||||
|
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
||||||
|
eventRegistry := aggregate.NewCustomEventRegistry(cfg.Limits.MaxCustomEvents)
|
||||||
|
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, eventRegistry, cfg)
|
||||||
|
|
||||||
|
handler := api.NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
||||||
|
|
||||||
|
// Test OPTIONS preflight
|
||||||
|
t.Run("Preflight", func(t *testing.T) {
|
||||||
|
req := httptest.NewRequest("OPTIONS", "/api/event", nil)
|
||||||
|
req.Header.Set("Origin", "https://example.com")
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
|
||||||
|
handler.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if w.Code != http.StatusNoContent {
|
||||||
|
t.Errorf("expected status 204, got %d", w.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
if w.Header().Get("Access-Control-Allow-Origin") == "" {
|
||||||
|
t.Error("expected CORS headers")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test actual request with CORS
|
||||||
|
t.Run("CORSRequest", func(t *testing.T) {
|
||||||
|
event := `{"d":"test.example.com","p":"/","r":"","w":1920}`
|
||||||
|
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(event))
|
||||||
|
req.Header.Set("Origin", "https://example.com")
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
|
||||||
|
handler.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if w.Header().Get("Access-Control-Allow-Origin") == "" {
|
||||||
|
t.Error("expected CORS headers on POST")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
metricsAgg.Shutdown(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkIngestionThroughput(b *testing.B) {
|
||||||
|
cfg := config.Config{
|
||||||
|
Site: config.SiteConfig{
|
||||||
|
Domains: []string{"test.example.com"},
|
||||||
|
Path: config.PathConfig{
|
||||||
|
StripQuery: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Limits: config.LimitsConfig{
|
||||||
|
MaxPaths: 10000,
|
||||||
|
MaxSources: 1000,
|
||||||
|
MaxCustomEvents: 100,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
||||||
|
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
||||||
|
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
||||||
|
eventRegistry := aggregate.NewCustomEventRegistry(cfg.Limits.MaxCustomEvents)
|
||||||
|
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, eventRegistry, cfg)
|
||||||
|
|
||||||
|
handler := api.NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
||||||
|
|
||||||
|
event := `{"d":"test.example.com","p":"/","r":"https://google.com","w":1920}`
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
for pb.Next() {
|
||||||
|
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(event))
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(w, req)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
metricsAgg.Shutdown(ctx)
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue