Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I65dbf466cb030dccc7025585d6282bd26a6a6964
457 lines
13 KiB
Go
457 lines
13 KiB
Go
//go:build integration
|
|
|
|
package test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"os"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"notashelf.dev/watchdog/internal/aggregate"
|
|
"notashelf.dev/watchdog/internal/api"
|
|
"notashelf.dev/watchdog/internal/config"
|
|
"notashelf.dev/watchdog/internal/normalize"
|
|
)
|
|
|
|
func TestEndToEnd_BasicFlow(t *testing.T) {
|
|
// Load config
|
|
cfg, err := config.Load("../config.example.yaml")
|
|
if err != nil {
|
|
t.Fatalf("failed to load config: %v", err)
|
|
}
|
|
|
|
// Override domain for test
|
|
cfg.Site.Domains = []string{"test.example.com"}
|
|
cfg.Site.SaltRotation = "daily"
|
|
cfg.Limits.MaxPaths = 100
|
|
cfg.Limits.MaxSources = 50
|
|
cfg.Limits.MaxCustomEvents = 10
|
|
|
|
// Initialize components
|
|
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
|
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
|
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
|
eventRegistry := aggregate.NewCustomEventRegistry(cfg.Limits.MaxCustomEvents)
|
|
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, eventRegistry, cfg)
|
|
|
|
// Register metrics
|
|
promRegistry := prometheus.NewRegistry()
|
|
metricsAgg.MustRegister(promRegistry)
|
|
|
|
// Create handlers
|
|
ingestionHandler := api.NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
|
metricsHandler := promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{})
|
|
|
|
// Test pageview ingestion
|
|
t.Run("PageviewIngestion", func(t *testing.T) {
|
|
events := []string{
|
|
`{"d":"test.example.com","p":"/","r":"","w":1920}`,
|
|
`{"d":"test.example.com","p":"/about","r":"https://google.com","w":768}`,
|
|
`{"d":"test.example.com","p":"/blog/post-1","r":"https://twitter.com","w":1024}`,
|
|
}
|
|
|
|
for _, event := range events {
|
|
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(event))
|
|
req.Header.Set("Content-Type", "application/json")
|
|
w := httptest.NewRecorder()
|
|
|
|
ingestionHandler.ServeHTTP(w, req)
|
|
|
|
if w.Code != http.StatusNoContent {
|
|
t.Errorf("expected status 204, got %d: %s", w.Code, w.Body.String())
|
|
}
|
|
}
|
|
|
|
// Verify paths were registered
|
|
if !pathRegistry.Contains("/") {
|
|
t.Error("expected / to be registered")
|
|
}
|
|
if !pathRegistry.Contains("/about") {
|
|
t.Error("expected /about to be registered")
|
|
}
|
|
})
|
|
|
|
// Test custom events
|
|
t.Run("CustomEventIngestion", func(t *testing.T) {
|
|
events := []string{
|
|
`{"d":"test.example.com","p":"/signup","e":"signup","w":1920}`,
|
|
`{"d":"test.example.com","p":"/checkout","e":"purchase","w":1920}`,
|
|
}
|
|
|
|
for _, event := range events {
|
|
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(event))
|
|
req.Header.Set("Content-Type", "application/json")
|
|
w := httptest.NewRecorder()
|
|
|
|
ingestionHandler.ServeHTTP(w, req)
|
|
|
|
if w.Code != http.StatusNoContent {
|
|
t.Errorf("expected status 204, got %d", w.Code)
|
|
}
|
|
}
|
|
})
|
|
|
|
// Test metrics export
|
|
t.Run("MetricsExport", func(t *testing.T) {
|
|
req := httptest.NewRequest("GET", "/metrics", nil)
|
|
w := httptest.NewRecorder()
|
|
|
|
metricsHandler.ServeHTTP(w, req)
|
|
|
|
if w.Code != http.StatusOK {
|
|
t.Errorf("expected status 200, got %d", w.Code)
|
|
}
|
|
|
|
body := w.Body.String()
|
|
|
|
// Check for expected metrics
|
|
if !strings.Contains(body, "web_pageviews_total") {
|
|
t.Error("expected web_pageviews_total metric")
|
|
}
|
|
if !strings.Contains(body, "web_custom_events_total") {
|
|
t.Error("expected web_custom_events_total metric")
|
|
}
|
|
if !strings.Contains(body, "web_path_overflow_total") {
|
|
t.Error("expected web_path_overflow_total metric")
|
|
}
|
|
})
|
|
|
|
// Cleanup
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
metricsAgg.Shutdown(ctx)
|
|
}
|
|
|
|
func TestEndToEnd_CardinalityLimits(t *testing.T) {
|
|
cfg := config.Config{
|
|
Site: config.SiteConfig{
|
|
Domains: []string{"test.example.com"},
|
|
Collect: config.CollectConfig{
|
|
Pageviews: true,
|
|
Referrer: "domain",
|
|
},
|
|
Path: config.PathConfig{
|
|
StripQuery: true,
|
|
},
|
|
},
|
|
Limits: config.LimitsConfig{
|
|
MaxPaths: 5,
|
|
MaxSources: 3,
|
|
MaxCustomEvents: 2,
|
|
},
|
|
}
|
|
|
|
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
|
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
|
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
|
eventRegistry := aggregate.NewCustomEventRegistry(cfg.Limits.MaxCustomEvents)
|
|
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, eventRegistry, &cfg)
|
|
|
|
promRegistry := prometheus.NewRegistry()
|
|
metricsAgg.MustRegister(promRegistry)
|
|
|
|
handler := api.NewIngestionHandler(&cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
|
|
|
// Send more paths than limit
|
|
t.Run("PathOverflow", func(t *testing.T) {
|
|
for i := 0; i < 10; i++ {
|
|
event := `{"d":"test.example.com","p":"/path-` + string(rune('0'+i)) + `","r":"","w":1920}`
|
|
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(event))
|
|
w := httptest.NewRecorder()
|
|
handler.ServeHTTP(w, req)
|
|
}
|
|
|
|
// Verify overflow counter increased
|
|
req := httptest.NewRequest("GET", "/metrics", nil)
|
|
w := httptest.NewRecorder()
|
|
promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{}).ServeHTTP(w, req)
|
|
|
|
body := w.Body.String()
|
|
if !strings.Contains(body, "web_path_overflow_total") {
|
|
t.Error("expected overflow metric")
|
|
}
|
|
})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
metricsAgg.Shutdown(ctx)
|
|
}
|
|
|
|
func TestEndToEnd_GracefulShutdown(t *testing.T) {
|
|
cfg := config.Config{
|
|
Site: config.SiteConfig{
|
|
Domains: []string{"test.example.com"},
|
|
SaltRotation: "daily",
|
|
},
|
|
Limits: config.LimitsConfig{
|
|
MaxPaths: 100,
|
|
MaxSources: 50,
|
|
MaxCustomEvents: 10,
|
|
},
|
|
}
|
|
|
|
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
|
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
|
eventRegistry := aggregate.NewCustomEventRegistry(cfg.Limits.MaxCustomEvents)
|
|
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, eventRegistry, &cfg)
|
|
|
|
// Send some events to populate HLL
|
|
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
|
handler := api.NewIngestionHandler(&cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
|
|
|
for i := 0; i < 10; i++ {
|
|
event := `{"d":"test.example.com","p":"/","r":"","w":1920}`
|
|
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(event))
|
|
req.RemoteAddr = "192.168.1." + string(rune('0'+i)) + ":1234"
|
|
w := httptest.NewRecorder()
|
|
handler.ServeHTTP(w, req)
|
|
}
|
|
|
|
// Shutdown and save state
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
if err := metricsAgg.Shutdown(ctx); err != nil {
|
|
t.Errorf("shutdown failed: %v", err)
|
|
}
|
|
|
|
// Verify state file was created
|
|
if _, err := os.Stat("/tmp/watchdog-hll.state"); os.IsNotExist(err) {
|
|
t.Error("HLL state file was not created")
|
|
}
|
|
|
|
// Cleanup
|
|
os.Remove("/tmp/watchdog-hll.state")
|
|
}
|
|
|
|
func TestEndToEnd_InvalidRequests(t *testing.T) {
|
|
cfg := config.Config{
|
|
Site: config.SiteConfig{
|
|
Domains: []string{"test.example.com"},
|
|
},
|
|
Limits: config.LimitsConfig{
|
|
MaxPaths: 100,
|
|
MaxSources: 50,
|
|
MaxCustomEvents: 10,
|
|
},
|
|
}
|
|
|
|
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
|
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
|
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
|
eventRegistry := aggregate.NewCustomEventRegistry(cfg.Limits.MaxCustomEvents)
|
|
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, eventRegistry, &cfg)
|
|
|
|
handler := api.NewIngestionHandler(&cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
|
|
|
tests := []struct {
|
|
name string
|
|
method string
|
|
body string
|
|
wantStatus int
|
|
}{
|
|
{
|
|
name: "GET request",
|
|
method: "GET",
|
|
body: "",
|
|
wantStatus: http.StatusMethodNotAllowed,
|
|
},
|
|
{
|
|
name: "Invalid JSON",
|
|
method: "POST",
|
|
body: `{invalid}`,
|
|
wantStatus: http.StatusBadRequest,
|
|
},
|
|
{
|
|
name: "Wrong domain",
|
|
method: "POST",
|
|
body: `{"d":"wrong.com","p":"/","r":"","w":1920}`,
|
|
wantStatus: http.StatusBadRequest,
|
|
},
|
|
{
|
|
name: "Missing path",
|
|
method: "POST",
|
|
body: `{"d":"test.example.com","r":"","w":1920}`,
|
|
wantStatus: http.StatusBadRequest,
|
|
},
|
|
{
|
|
name: "Invalid width",
|
|
method: "POST",
|
|
body: `{"d":"test.example.com","p":"/","r":"","w":99999}`,
|
|
wantStatus: http.StatusBadRequest,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
req := httptest.NewRequest(tt.method, "/api/event", bytes.NewBufferString(tt.body))
|
|
if tt.method == "POST" {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
}
|
|
w := httptest.NewRecorder()
|
|
|
|
handler.ServeHTTP(w, req)
|
|
|
|
if w.Code != tt.wantStatus {
|
|
t.Errorf("expected status %d, got %d", tt.wantStatus, w.Code)
|
|
}
|
|
})
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
metricsAgg.Shutdown(ctx)
|
|
}
|
|
|
|
func TestEndToEnd_RateLimiting(t *testing.T) {
|
|
cfg := config.Config{
|
|
Site: config.SiteConfig{
|
|
Domains: []string{"test.example.com"},
|
|
},
|
|
Limits: config.LimitsConfig{
|
|
MaxPaths: 100,
|
|
MaxSources: 50,
|
|
MaxCustomEvents: 10,
|
|
MaxEventsPerMinute: 5, // very low limit for testing
|
|
},
|
|
}
|
|
|
|
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
|
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
|
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
|
eventRegistry := aggregate.NewCustomEventRegistry(cfg.Limits.MaxCustomEvents)
|
|
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, eventRegistry, &cfg)
|
|
|
|
handler := api.NewIngestionHandler(&cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
|
|
|
// Send requests until rate limited
|
|
rateLimited := false
|
|
for i := 0; i < 10; i++ {
|
|
event := `{"d":"test.example.com","p":"/","r":"","w":1920}`
|
|
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(event))
|
|
w := httptest.NewRecorder()
|
|
handler.ServeHTTP(w, req)
|
|
|
|
if w.Code == http.StatusTooManyRequests {
|
|
rateLimited = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !rateLimited {
|
|
t.Error("expected rate limiting to trigger")
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
metricsAgg.Shutdown(ctx)
|
|
}
|
|
|
|
func TestEndToEnd_CORS(t *testing.T) {
|
|
cfg := config.Config{
|
|
Site: config.SiteConfig{
|
|
Domains: []string{"test.example.com"},
|
|
},
|
|
Limits: config.LimitsConfig{
|
|
MaxPaths: 100,
|
|
MaxSources: 50,
|
|
MaxCustomEvents: 10,
|
|
},
|
|
Security: config.SecurityConfig{
|
|
CORS: config.CORSConfig{
|
|
Enabled: true,
|
|
AllowedOrigins: []string{"https://example.com"},
|
|
},
|
|
},
|
|
}
|
|
|
|
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
|
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
|
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
|
eventRegistry := aggregate.NewCustomEventRegistry(cfg.Limits.MaxCustomEvents)
|
|
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, eventRegistry, &cfg)
|
|
|
|
handler := api.NewIngestionHandler(&cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
|
|
|
// Test OPTIONS preflight
|
|
t.Run("Preflight", func(t *testing.T) {
|
|
req := httptest.NewRequest("OPTIONS", "/api/event", nil)
|
|
req.Header.Set("Origin", "https://example.com")
|
|
w := httptest.NewRecorder()
|
|
|
|
handler.ServeHTTP(w, req)
|
|
|
|
if w.Code != http.StatusNoContent {
|
|
t.Errorf("expected status 204, got %d", w.Code)
|
|
}
|
|
|
|
if w.Header().Get("Access-Control-Allow-Origin") == "" {
|
|
t.Error("expected CORS headers")
|
|
}
|
|
})
|
|
|
|
// Test actual request with CORS
|
|
t.Run("CORSRequest", func(t *testing.T) {
|
|
event := `{"d":"test.example.com","p":"/","r":"","w":1920}`
|
|
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(event))
|
|
req.Header.Set("Origin", "https://example.com")
|
|
w := httptest.NewRecorder()
|
|
|
|
handler.ServeHTTP(w, req)
|
|
|
|
if w.Header().Get("Access-Control-Allow-Origin") == "" {
|
|
t.Error("expected CORS headers on POST")
|
|
}
|
|
})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
metricsAgg.Shutdown(ctx)
|
|
}
|
|
|
|
func BenchmarkIngestionThroughput(b *testing.B) {
|
|
cfg := config.Config{
|
|
Site: config.SiteConfig{
|
|
Domains: []string{"test.example.com"},
|
|
Path: config.PathConfig{
|
|
StripQuery: true,
|
|
},
|
|
},
|
|
Limits: config.LimitsConfig{
|
|
MaxPaths: 10000,
|
|
MaxSources: 1000,
|
|
MaxCustomEvents: 100,
|
|
},
|
|
}
|
|
|
|
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
|
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
|
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
|
eventRegistry := aggregate.NewCustomEventRegistry(cfg.Limits.MaxCustomEvents)
|
|
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, eventRegistry, &cfg)
|
|
|
|
handler := api.NewIngestionHandler(&cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
|
|
|
event := `{"d":"test.example.com","p":"/","r":"https://google.com","w":1920}`
|
|
|
|
b.ResetTimer()
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
for pb.Next() {
|
|
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(event))
|
|
req.Header.Set("Content-Type", "application/json")
|
|
w := httptest.NewRecorder()
|
|
handler.ServeHTTP(w, req)
|
|
}
|
|
})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
metricsAgg.Shutdown(ctx)
|
|
}
|