From e0ec475a819fe36af79e64943dfaf71d5afe7e92 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Sun, 1 Mar 2026 05:06:37 +0300 Subject: [PATCH] internal/api: ingestion handler; wire normalization pipeline Signed-off-by: NotAShelf Change-Id: I1890a039b874fcc76ac4a545c2901d4e6a6a6964 --- internal/api/handler.go | 118 ++++++++++++++++++ internal/api/handler_test.go | 234 +++++++++++++++++++++++++++++++++++ 2 files changed, 352 insertions(+) create mode 100644 internal/api/handler.go create mode 100644 internal/api/handler_test.go diff --git a/internal/api/handler.go b/internal/api/handler.go new file mode 100644 index 0000000..21827c0 --- /dev/null +++ b/internal/api/handler.go @@ -0,0 +1,118 @@ +package api + +import ( + "log" + "net/http" + + "notashelf.dev/watchdog/internal/aggregate" + "notashelf.dev/watchdog/internal/config" + "notashelf.dev/watchdog/internal/normalize" +) + +// Handles incoming analytics events +type IngestionHandler struct { + cfg config.Config + pathNorm *normalize.PathNormalizer + pathRegistry *aggregate.PathRegistry + refRegistry *normalize.ReferrerRegistry + metricsAgg *aggregate.MetricsAggregator +} + +// Creates a new ingestion handler +func NewIngestionHandler( + cfg config.Config, + pathNorm *normalize.PathNormalizer, + pathRegistry *aggregate.PathRegistry, + refRegistry *normalize.ReferrerRegistry, + metricsAgg *aggregate.MetricsAggregator, +) *IngestionHandler { + return &IngestionHandler{ + cfg: cfg, + pathNorm: pathNorm, + pathRegistry: pathRegistry, + refRegistry: refRegistry, + metricsAgg: metricsAgg, + } +} + +func (h *IngestionHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Only accept POST requests + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Parse event from request body + event, err := ParseEvent(r.Body) + if err != nil { + log.Printf("Failed to parse event: %v", err) + http.Error(w, "Bad request", http.StatusBadRequest) + return + } + + // Validate event + if err := event.Validate(h.cfg.Site.Domain); err != nil { + log.Printf("Event validation failed: %v", err) + http.Error(w, "Bad request", http.StatusBadRequest) + return + } + + // Normalize path and check if path can be added to the registry. + normalizedPath := h.pathNorm.Normalize(event.Path) + if !h.pathRegistry.Add(normalizedPath) { + // Path was rejected due to cardinality limit + h.metricsAgg.RecordPathOverflow() + log.Printf("Path overflow: rejected %s", normalizedPath) + + // Still return success to client + w.WriteHeader(http.StatusNoContent) + + return + } + + // Process based on event type + if event.Event != "" { + // Custom event + h.metricsAgg.RecordCustomEvent(event.Event) + } else { + // Pageview; process with full normalization pipeline + var country, device, referrer string + + // Device classification + if h.cfg.Site.Collect.Device { + device = classifyDevice(event.Width) + } + + // Referrer classification + if h.cfg.Site.Collect.Referrer == "domain" { + refDomain := normalize.ExtractReferrerDomain(event.Referrer, h.cfg.Site.Domain) + if refDomain != "" { + referrer = h.refRegistry.Add(refDomain) + } + } + + // FIXME: Country would be extracted from IP here. For now, we skip country extraction + // because I have neither the time nor the patience to look into it. Return later. + + // Record pageview + h.metricsAgg.RecordPageview(normalizedPath, country, device, referrer) + } + + // Return success + w.WriteHeader(http.StatusNoContent) +} + +// Classifies screen width into device categories +func classifyDevice(width int) string { + // FIXME: probably not the best logic for this... + if width == 0 { + return "unknown" + } + if width < 768 { + return "mobile" + } + if width < 1024 { + return "tablet" + } + return "desktop" +} diff --git a/internal/api/handler_test.go b/internal/api/handler_test.go new file mode 100644 index 0000000..227c23d --- /dev/null +++ b/internal/api/handler_test.go @@ -0,0 +1,234 @@ +package api + +import ( + "bytes" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "notashelf.dev/watchdog/internal/aggregate" + "notashelf.dev/watchdog/internal/config" + "notashelf.dev/watchdog/internal/normalize" +) + +func TestIngestionHandler_Pageview(t *testing.T) { + cfg := config.Config{ + Site: config.SiteConfig{ + Domain: "example.com", + Collect: config.CollectConfig{ + Pageviews: true, + Country: true, + Device: true, + Referrer: "domain", + }, + Path: config.PathConfig{ + StripQuery: true, + StripFragment: true, + CollapseNumericSegments: true, + NormalizeTrailingSlash: true, + }, + }, + Limits: config.LimitsConfig{ + MaxPaths: 100, + MaxSources: 50, + }, + } + + pathNorm := normalize.NewPathNormalizer(cfg.Site.Path) + pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths) + refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources) + metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, cfg) + + handler := NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg) + + body := `{"d":"example.com","p":"/home?query=1","r":"https://google.com","w":1920}` + req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(body)) + req.Header.Set("Content-Type", "application/json") + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + if w.Code != http.StatusNoContent { + t.Errorf("expected status %d, got %d", http.StatusNoContent, w.Code) + } + + // Verify path was normalized and registered + if !pathRegistry.Contains("/home") { + t.Error("expected path to be registered") + } +} + +func TestIngestionHandler_CustomEvent(t *testing.T) { + cfg := config.Config{ + Site: config.SiteConfig{ + Domain: "example.com", + Collect: config.CollectConfig{ + Pageviews: true, + }, + CustomEvents: []string{"signup", "purchase"}, + Path: config.PathConfig{ + StripQuery: true, + }, + }, + Limits: config.LimitsConfig{ + MaxPaths: 100, + MaxSources: 50, + }, + } + + pathNorm := normalize.NewPathNormalizer(cfg.Site.Path) + pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths) + refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources) + metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, cfg) + + handler := NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg) + + body := `{"d":"example.com","p":"/signup","e":"signup"}` + req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(body)) + req.Header.Set("Content-Type", "application/json") + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + if w.Code != http.StatusNoContent { + t.Errorf("expected status %d, got %d", http.StatusNoContent, w.Code) + } +} + +func TestIngestionHandler_WrongDomain(t *testing.T) { + cfg := config.Config{ + Site: config.SiteConfig{ + Domain: "example.com", + Collect: config.CollectConfig{ + Pageviews: true, + }, + Path: config.PathConfig{}, + }, + Limits: config.LimitsConfig{ + MaxPaths: 100, + MaxSources: 50, + }, + } + + pathNorm := normalize.NewPathNormalizer(cfg.Site.Path) + pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths) + refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources) + metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, cfg) + + handler := NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg) + + body := `{"d":"wrong.com","p":"/home"}` + req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(body)) + req.Header.Set("Content-Type", "application/json") + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("expected status %d, got %d", http.StatusBadRequest, w.Code) + } +} + +func TestIngestionHandler_MethodNotAllowed(t *testing.T) { + cfg := config.Config{ + Site: config.SiteConfig{ + Domain: "example.com", + }, + Limits: config.LimitsConfig{ + MaxPaths: 100, + MaxSources: 50, + }, + } + + pathNorm := normalize.NewPathNormalizer(cfg.Site.Path) + pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths) + refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources) + metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, cfg) + + handler := NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg) + + req := httptest.NewRequest("GET", "/api/event", nil) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + if w.Code != http.StatusMethodNotAllowed { + t.Errorf("expected status %d, got %d", http.StatusMethodNotAllowed, w.Code) + } +} + +func TestIngestionHandler_InvalidJSON(t *testing.T) { + cfg := config.Config{ + Site: config.SiteConfig{ + Domain: "example.com", + }, + Limits: config.LimitsConfig{ + MaxPaths: 100, + MaxSources: 50, + }, + } + + pathNorm := normalize.NewPathNormalizer(cfg.Site.Path) + pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths) + refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources) + metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, cfg) + + handler := NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg) + + body := `{invalid json}` + req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(body)) + req.Header.Set("Content-Type", "application/json") + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("expected status %d, got %d", http.StatusBadRequest, w.Code) + } +} + +func TestIngestionHandler_DeviceClassification(t *testing.T) { + cfg := config.Config{ + Site: config.SiteConfig{ + Domain: "example.com", + Collect: config.CollectConfig{ + Pageviews: true, + Device: true, + }, + Path: config.PathConfig{}, + }, + Limits: config.LimitsConfig{ + MaxPaths: 100, + MaxSources: 50, + }, + } + + pathNorm := normalize.NewPathNormalizer(cfg.Site.Path) + pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths) + refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources) + metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, cfg) + + handler := NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg) + + tests := []struct { + name string + width int + }{ + {"mobile", 375}, + {"tablet", 768}, + {"desktop", 1920}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + body := fmt.Sprintf(`{"d":"example.com","p":"/test","w":%d}`, tt.width) + req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(body)) + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + + if w.Code != http.StatusNoContent { + t.Errorf("expected status %d, got %d", http.StatusNoContent, w.Code) + } + }) + } +}