From 7ecc03ac1962613b0eb17cec89cbea67afab9064 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Tue, 10 Mar 2026 12:57:14 +0300 Subject: [PATCH] various: cleanup Fixes a status code conflict in `LimitedResponseWriter`, and a clock skew bug that I probably introduced last time I dealt with time. I hate computers. We now use `tie.Since()`, which employs a monotonic clock that is immune to system wall clock changes. Signed-off-by: NotAShelf Change-Id: Iec3147c21c5a295170f48cbf1a4620596a6a6964 --- cmd/watchdog/root.go | 44 ++++++++++++++++++++++++++++++----- internal/aggregate/uniques.go | 41 ++++++++++++++++++++++++++++---- internal/ratelimit/limiter.go | 7 +++--- 3 files changed, 79 insertions(+), 13 deletions(-) diff --git a/cmd/watchdog/root.go b/cmd/watchdog/root.go index 96fdbd4..f3301e1 100644 --- a/cmd/watchdog/root.go +++ b/cmd/watchdog/root.go @@ -198,9 +198,19 @@ func rateLimitMiddleware(next http.Handler, limiter *ratelimit.TokenBucket) http // Wraps http.ResponseWriter to enforce max response size type limitedResponseWriter struct { http.ResponseWriter - maxSize int - written int - limitExceeded bool + maxSize int + written int + limitExceeded bool + headersWritten bool + statusCode int +} + +func (w *limitedResponseWriter) WriteHeader(statusCode int) { + if !w.headersWritten { + w.statusCode = statusCode + w.headersWritten = true + w.ResponseWriter.WriteHeader(statusCode) + } } func (w *limitedResponseWriter) Write(p []byte) (int, error) { @@ -208,11 +218,33 @@ func (w *limitedResponseWriter) Write(p []byte) (int, error) { return 0, fmt.Errorf("response size limit exceeded") } + // Check if adding this data would exceed the limit if w.written+len(p) > w.maxSize { w.limitExceeded = true - w.Header().Set("X-Response-Truncated", "true") - http.Error(w.ResponseWriter, "Response size limit exceeded", http.StatusInternalServerError) - return 0, fmt.Errorf("response size limit exceeded: %d bytes", w.maxSize) + + // If headers haven't been written yet, we can send an error response + if !w.headersWritten { + w.Header().Set("X-Response-Truncated", "true") + http.Error(w.ResponseWriter, "Response size limit exceeded", http.StatusInternalServerError) + return 0, fmt.Errorf("response size limit exceeded: %d bytes", w.maxSize) + } + + // Headers already written - we can't change status code + // Write only up to the limit and then stop + remaining := w.maxSize - w.written + if remaining > 0 { + _, _ = w.ResponseWriter.Write(p[:remaining]) + w.written = w.maxSize + } + return len(p), fmt.Errorf("response size limit exceeded after headers sent") + } + + // Normal write + if !w.headersWritten { + w.headersWritten = true + if w.statusCode == 0 { + w.statusCode = http.StatusOK + } } n, err := w.ResponseWriter.Write(p) w.written += n diff --git a/internal/aggregate/uniques.go b/internal/aggregate/uniques.go index c3c0955..a02ed39 100644 --- a/internal/aggregate/uniques.go +++ b/internal/aggregate/uniques.go @@ -107,6 +107,7 @@ func DailySalt(t time.Time) string { } // Save persists the HLL state to disk +// Format: saltKey\nsalt\nHLLdata func (u *UniquesEstimator) Save(path string) error { u.mu.Lock() defer u.mu.Unlock() @@ -116,11 +117,19 @@ func (u *UniquesEstimator) Save(path string) error { return err } - // Save both HLL data and current salt - return os.WriteFile(path, append([]byte(u.salt+"\n"), data...), 0600) + // Save saltKey, salt, and HLL data + var buf bytes.Buffer + buf.WriteString(u.saltKey) + buf.WriteByte('\n') + buf.WriteString(u.salt) + buf.WriteByte('\n') + buf.Write(data) + + return os.WriteFile(path, buf.Bytes(), 0600) } // Load restores the HLL state from disk +// Supports both new format (saltKey\nsalt\nHLLdata) and old format (salt\nHLLdata) func (u *UniquesEstimator) Load(path string) error { data, err := os.ReadFile(path) if err != nil { @@ -133,8 +142,32 @@ func (u *UniquesEstimator) Load(path string) error { u.mu.Lock() defer u.mu.Unlock() - // Parse saved salt and HLL data - parts := bytes.SplitN(data, []byte("\n"), 2) + // Try new format first: saltKey\nsalt\nHLLdata + parts := bytes.SplitN(data, []byte("\n"), 3) + if len(parts) == 3 { + savedSaltKey := string(parts[0]) + savedSalt := string(parts[1]) + hllData := parts[2] + + now := time.Now() + currentKey := getSaltKey(now, u.rotation) + + // Only restore if it's the same period + if savedSaltKey == currentKey { + u.salt = savedSalt + u.saltKey = savedSaltKey + return u.hll.UnmarshalBinary(hllData) + } + + // Different period, start fresh + u.hll = hyperloglog.New() + u.salt = generateSaltFromKey(currentKey) + u.saltKey = currentKey + return nil + } + + // Try old format for backward compatibility: salt\nHLLdata + parts = bytes.SplitN(data, []byte("\n"), 2) if len(parts) != 2 { return fmt.Errorf("invalid state file format") } diff --git a/internal/ratelimit/limiter.go b/internal/ratelimit/limiter.go index d0700b3..eec5929 100644 --- a/internal/ratelimit/limiter.go +++ b/internal/ratelimit/limiter.go @@ -27,13 +27,14 @@ func NewTokenBucket(capacity, refillPerInterval int, interval time.Duration) *To } // Allow checks if a request should be allowed +// Uses monotonic time via time.Since() to prevent clock skew issues func (tb *TokenBucket) Allow() bool { tb.mu.Lock() defer tb.mu.Unlock() - // Refill tokens based on elapsed time - now := time.Now() - elapsed := now.Sub(tb.lastFill) + // Refill tokens based on elapsed time using monotonic clock + // time.Since() uses monotonic readings when available, unaffected by wall clock changes + elapsed := time.Since(tb.lastFill) if elapsed >= tb.interval { periods := int(elapsed / tb.interval) tb.tokens += periods * tb.refill