internal/api: ingestion handler; wire normalization pipeline
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I1890a039b874fcc76ac4a545c2901d4e6a6a6964
This commit is contained in:
parent
c5109ace92
commit
e0ec475a81
2 changed files with 352 additions and 0 deletions
118
internal/api/handler.go
Normal file
118
internal/api/handler.go
Normal file
|
|
@ -0,0 +1,118 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"notashelf.dev/watchdog/internal/aggregate"
|
||||
"notashelf.dev/watchdog/internal/config"
|
||||
"notashelf.dev/watchdog/internal/normalize"
|
||||
)
|
||||
|
||||
// Handles incoming analytics events
|
||||
type IngestionHandler struct {
|
||||
cfg config.Config
|
||||
pathNorm *normalize.PathNormalizer
|
||||
pathRegistry *aggregate.PathRegistry
|
||||
refRegistry *normalize.ReferrerRegistry
|
||||
metricsAgg *aggregate.MetricsAggregator
|
||||
}
|
||||
|
||||
// Creates a new ingestion handler
|
||||
func NewIngestionHandler(
|
||||
cfg config.Config,
|
||||
pathNorm *normalize.PathNormalizer,
|
||||
pathRegistry *aggregate.PathRegistry,
|
||||
refRegistry *normalize.ReferrerRegistry,
|
||||
metricsAgg *aggregate.MetricsAggregator,
|
||||
) *IngestionHandler {
|
||||
return &IngestionHandler{
|
||||
cfg: cfg,
|
||||
pathNorm: pathNorm,
|
||||
pathRegistry: pathRegistry,
|
||||
refRegistry: refRegistry,
|
||||
metricsAgg: metricsAgg,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *IngestionHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// Only accept POST requests
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
// Parse event from request body
|
||||
event, err := ParseEvent(r.Body)
|
||||
if err != nil {
|
||||
log.Printf("Failed to parse event: %v", err)
|
||||
http.Error(w, "Bad request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Validate event
|
||||
if err := event.Validate(h.cfg.Site.Domain); err != nil {
|
||||
log.Printf("Event validation failed: %v", err)
|
||||
http.Error(w, "Bad request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Normalize path and check if path can be added to the registry.
|
||||
normalizedPath := h.pathNorm.Normalize(event.Path)
|
||||
if !h.pathRegistry.Add(normalizedPath) {
|
||||
// Path was rejected due to cardinality limit
|
||||
h.metricsAgg.RecordPathOverflow()
|
||||
log.Printf("Path overflow: rejected %s", normalizedPath)
|
||||
|
||||
// Still return success to client
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Process based on event type
|
||||
if event.Event != "" {
|
||||
// Custom event
|
||||
h.metricsAgg.RecordCustomEvent(event.Event)
|
||||
} else {
|
||||
// Pageview; process with full normalization pipeline
|
||||
var country, device, referrer string
|
||||
|
||||
// Device classification
|
||||
if h.cfg.Site.Collect.Device {
|
||||
device = classifyDevice(event.Width)
|
||||
}
|
||||
|
||||
// Referrer classification
|
||||
if h.cfg.Site.Collect.Referrer == "domain" {
|
||||
refDomain := normalize.ExtractReferrerDomain(event.Referrer, h.cfg.Site.Domain)
|
||||
if refDomain != "" {
|
||||
referrer = h.refRegistry.Add(refDomain)
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: Country would be extracted from IP here. For now, we skip country extraction
|
||||
// because I have neither the time nor the patience to look into it. Return later.
|
||||
|
||||
// Record pageview
|
||||
h.metricsAgg.RecordPageview(normalizedPath, country, device, referrer)
|
||||
}
|
||||
|
||||
// Return success
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// Classifies screen width into device categories
|
||||
func classifyDevice(width int) string {
|
||||
// FIXME: probably not the best logic for this...
|
||||
if width == 0 {
|
||||
return "unknown"
|
||||
}
|
||||
if width < 768 {
|
||||
return "mobile"
|
||||
}
|
||||
if width < 1024 {
|
||||
return "tablet"
|
||||
}
|
||||
return "desktop"
|
||||
}
|
||||
234
internal/api/handler_test.go
Normal file
234
internal/api/handler_test.go
Normal file
|
|
@ -0,0 +1,234 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"notashelf.dev/watchdog/internal/aggregate"
|
||||
"notashelf.dev/watchdog/internal/config"
|
||||
"notashelf.dev/watchdog/internal/normalize"
|
||||
)
|
||||
|
||||
func TestIngestionHandler_Pageview(t *testing.T) {
|
||||
cfg := config.Config{
|
||||
Site: config.SiteConfig{
|
||||
Domain: "example.com",
|
||||
Collect: config.CollectConfig{
|
||||
Pageviews: true,
|
||||
Country: true,
|
||||
Device: true,
|
||||
Referrer: "domain",
|
||||
},
|
||||
Path: config.PathConfig{
|
||||
StripQuery: true,
|
||||
StripFragment: true,
|
||||
CollapseNumericSegments: true,
|
||||
NormalizeTrailingSlash: true,
|
||||
},
|
||||
},
|
||||
Limits: config.LimitsConfig{
|
||||
MaxPaths: 100,
|
||||
MaxSources: 50,
|
||||
},
|
||||
}
|
||||
|
||||
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
||||
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
||||
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
||||
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, cfg)
|
||||
|
||||
handler := NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
||||
|
||||
body := `{"d":"example.com","p":"/home?query=1","r":"https://google.com","w":1920}`
|
||||
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
handler.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusNoContent {
|
||||
t.Errorf("expected status %d, got %d", http.StatusNoContent, w.Code)
|
||||
}
|
||||
|
||||
// Verify path was normalized and registered
|
||||
if !pathRegistry.Contains("/home") {
|
||||
t.Error("expected path to be registered")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestionHandler_CustomEvent(t *testing.T) {
|
||||
cfg := config.Config{
|
||||
Site: config.SiteConfig{
|
||||
Domain: "example.com",
|
||||
Collect: config.CollectConfig{
|
||||
Pageviews: true,
|
||||
},
|
||||
CustomEvents: []string{"signup", "purchase"},
|
||||
Path: config.PathConfig{
|
||||
StripQuery: true,
|
||||
},
|
||||
},
|
||||
Limits: config.LimitsConfig{
|
||||
MaxPaths: 100,
|
||||
MaxSources: 50,
|
||||
},
|
||||
}
|
||||
|
||||
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
||||
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
||||
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
||||
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, cfg)
|
||||
|
||||
handler := NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
||||
|
||||
body := `{"d":"example.com","p":"/signup","e":"signup"}`
|
||||
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
handler.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusNoContent {
|
||||
t.Errorf("expected status %d, got %d", http.StatusNoContent, w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestionHandler_WrongDomain(t *testing.T) {
|
||||
cfg := config.Config{
|
||||
Site: config.SiteConfig{
|
||||
Domain: "example.com",
|
||||
Collect: config.CollectConfig{
|
||||
Pageviews: true,
|
||||
},
|
||||
Path: config.PathConfig{},
|
||||
},
|
||||
Limits: config.LimitsConfig{
|
||||
MaxPaths: 100,
|
||||
MaxSources: 50,
|
||||
},
|
||||
}
|
||||
|
||||
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
||||
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
||||
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
||||
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, cfg)
|
||||
|
||||
handler := NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
||||
|
||||
body := `{"d":"wrong.com","p":"/home"}`
|
||||
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
handler.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected status %d, got %d", http.StatusBadRequest, w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestionHandler_MethodNotAllowed(t *testing.T) {
|
||||
cfg := config.Config{
|
||||
Site: config.SiteConfig{
|
||||
Domain: "example.com",
|
||||
},
|
||||
Limits: config.LimitsConfig{
|
||||
MaxPaths: 100,
|
||||
MaxSources: 50,
|
||||
},
|
||||
}
|
||||
|
||||
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
||||
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
||||
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
||||
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, cfg)
|
||||
|
||||
handler := NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
||||
|
||||
req := httptest.NewRequest("GET", "/api/event", nil)
|
||||
w := httptest.NewRecorder()
|
||||
handler.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusMethodNotAllowed {
|
||||
t.Errorf("expected status %d, got %d", http.StatusMethodNotAllowed, w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestionHandler_InvalidJSON(t *testing.T) {
|
||||
cfg := config.Config{
|
||||
Site: config.SiteConfig{
|
||||
Domain: "example.com",
|
||||
},
|
||||
Limits: config.LimitsConfig{
|
||||
MaxPaths: 100,
|
||||
MaxSources: 50,
|
||||
},
|
||||
}
|
||||
|
||||
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
||||
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
||||
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
||||
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, cfg)
|
||||
|
||||
handler := NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
||||
|
||||
body := `{invalid json}`
|
||||
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
handler.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected status %d, got %d", http.StatusBadRequest, w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIngestionHandler_DeviceClassification(t *testing.T) {
|
||||
cfg := config.Config{
|
||||
Site: config.SiteConfig{
|
||||
Domain: "example.com",
|
||||
Collect: config.CollectConfig{
|
||||
Pageviews: true,
|
||||
Device: true,
|
||||
},
|
||||
Path: config.PathConfig{},
|
||||
},
|
||||
Limits: config.LimitsConfig{
|
||||
MaxPaths: 100,
|
||||
MaxSources: 50,
|
||||
},
|
||||
}
|
||||
|
||||
pathNorm := normalize.NewPathNormalizer(cfg.Site.Path)
|
||||
pathRegistry := aggregate.NewPathRegistry(cfg.Limits.MaxPaths)
|
||||
refRegistry := normalize.NewReferrerRegistry(cfg.Limits.MaxSources)
|
||||
metricsAgg := aggregate.NewMetricsAggregator(pathRegistry, cfg)
|
||||
|
||||
handler := NewIngestionHandler(cfg, pathNorm, pathRegistry, refRegistry, metricsAgg)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
width int
|
||||
}{
|
||||
{"mobile", 375},
|
||||
{"tablet", 768},
|
||||
{"desktop", 1920},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
body := fmt.Sprintf(`{"d":"example.com","p":"/test","w":%d}`, tt.width)
|
||||
req := httptest.NewRequest("POST", "/api/event", bytes.NewBufferString(body))
|
||||
w := httptest.NewRecorder()
|
||||
handler.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusNoContent {
|
||||
t.Errorf("expected status %d, got %d", http.StatusNoContent, w.Code)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue