mirror of
https://github.com/NotAShelf/watchdog.git
synced 2026-05-30 18:21:30 +00:00
various: cleanup
Fixes a status code conflict in `LimitedResponseWriter`, and a clock skew bug that I probably introduced last time I dealt with time. I hate computers. We now use `tie.Since()`, which employs a monotonic clock that is immune to system wall clock changes. Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Iec3147c21c5a295170f48cbf1a4620596a6a6964
This commit is contained in:
parent
3662adc61a
commit
7ecc03ac19
3 changed files with 79 additions and 13 deletions
|
|
@ -201,6 +201,16 @@ type limitedResponseWriter struct {
|
||||||
maxSize int
|
maxSize int
|
||||||
written int
|
written int
|
||||||
limitExceeded bool
|
limitExceeded bool
|
||||||
|
headersWritten bool
|
||||||
|
statusCode int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *limitedResponseWriter) WriteHeader(statusCode int) {
|
||||||
|
if !w.headersWritten {
|
||||||
|
w.statusCode = statusCode
|
||||||
|
w.headersWritten = true
|
||||||
|
w.ResponseWriter.WriteHeader(statusCode)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *limitedResponseWriter) Write(p []byte) (int, error) {
|
func (w *limitedResponseWriter) Write(p []byte) (int, error) {
|
||||||
|
|
@ -208,12 +218,34 @@ func (w *limitedResponseWriter) Write(p []byte) (int, error) {
|
||||||
return 0, fmt.Errorf("response size limit exceeded")
|
return 0, fmt.Errorf("response size limit exceeded")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if adding this data would exceed the limit
|
||||||
if w.written+len(p) > w.maxSize {
|
if w.written+len(p) > w.maxSize {
|
||||||
w.limitExceeded = true
|
w.limitExceeded = true
|
||||||
|
|
||||||
|
// If headers haven't been written yet, we can send an error response
|
||||||
|
if !w.headersWritten {
|
||||||
w.Header().Set("X-Response-Truncated", "true")
|
w.Header().Set("X-Response-Truncated", "true")
|
||||||
http.Error(w.ResponseWriter, "Response size limit exceeded", http.StatusInternalServerError)
|
http.Error(w.ResponseWriter, "Response size limit exceeded", http.StatusInternalServerError)
|
||||||
return 0, fmt.Errorf("response size limit exceeded: %d bytes", w.maxSize)
|
return 0, fmt.Errorf("response size limit exceeded: %d bytes", w.maxSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Headers already written - we can't change status code
|
||||||
|
// Write only up to the limit and then stop
|
||||||
|
remaining := w.maxSize - w.written
|
||||||
|
if remaining > 0 {
|
||||||
|
_, _ = w.ResponseWriter.Write(p[:remaining])
|
||||||
|
w.written = w.maxSize
|
||||||
|
}
|
||||||
|
return len(p), fmt.Errorf("response size limit exceeded after headers sent")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normal write
|
||||||
|
if !w.headersWritten {
|
||||||
|
w.headersWritten = true
|
||||||
|
if w.statusCode == 0 {
|
||||||
|
w.statusCode = http.StatusOK
|
||||||
|
}
|
||||||
|
}
|
||||||
n, err := w.ResponseWriter.Write(p)
|
n, err := w.ResponseWriter.Write(p)
|
||||||
w.written += n
|
w.written += n
|
||||||
return n, err
|
return n, err
|
||||||
|
|
|
||||||
|
|
@ -107,6 +107,7 @@ func DailySalt(t time.Time) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save persists the HLL state to disk
|
// Save persists the HLL state to disk
|
||||||
|
// Format: saltKey\nsalt\nHLLdata
|
||||||
func (u *UniquesEstimator) Save(path string) error {
|
func (u *UniquesEstimator) Save(path string) error {
|
||||||
u.mu.Lock()
|
u.mu.Lock()
|
||||||
defer u.mu.Unlock()
|
defer u.mu.Unlock()
|
||||||
|
|
@ -116,11 +117,19 @@ func (u *UniquesEstimator) Save(path string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save both HLL data and current salt
|
// Save saltKey, salt, and HLL data
|
||||||
return os.WriteFile(path, append([]byte(u.salt+"\n"), data...), 0600)
|
var buf bytes.Buffer
|
||||||
|
buf.WriteString(u.saltKey)
|
||||||
|
buf.WriteByte('\n')
|
||||||
|
buf.WriteString(u.salt)
|
||||||
|
buf.WriteByte('\n')
|
||||||
|
buf.Write(data)
|
||||||
|
|
||||||
|
return os.WriteFile(path, buf.Bytes(), 0600)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load restores the HLL state from disk
|
// Load restores the HLL state from disk
|
||||||
|
// Supports both new format (saltKey\nsalt\nHLLdata) and old format (salt\nHLLdata)
|
||||||
func (u *UniquesEstimator) Load(path string) error {
|
func (u *UniquesEstimator) Load(path string) error {
|
||||||
data, err := os.ReadFile(path)
|
data, err := os.ReadFile(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -133,8 +142,32 @@ func (u *UniquesEstimator) Load(path string) error {
|
||||||
u.mu.Lock()
|
u.mu.Lock()
|
||||||
defer u.mu.Unlock()
|
defer u.mu.Unlock()
|
||||||
|
|
||||||
// Parse saved salt and HLL data
|
// Try new format first: saltKey\nsalt\nHLLdata
|
||||||
parts := bytes.SplitN(data, []byte("\n"), 2)
|
parts := bytes.SplitN(data, []byte("\n"), 3)
|
||||||
|
if len(parts) == 3 {
|
||||||
|
savedSaltKey := string(parts[0])
|
||||||
|
savedSalt := string(parts[1])
|
||||||
|
hllData := parts[2]
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
currentKey := getSaltKey(now, u.rotation)
|
||||||
|
|
||||||
|
// Only restore if it's the same period
|
||||||
|
if savedSaltKey == currentKey {
|
||||||
|
u.salt = savedSalt
|
||||||
|
u.saltKey = savedSaltKey
|
||||||
|
return u.hll.UnmarshalBinary(hllData)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Different period, start fresh
|
||||||
|
u.hll = hyperloglog.New()
|
||||||
|
u.salt = generateSaltFromKey(currentKey)
|
||||||
|
u.saltKey = currentKey
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try old format for backward compatibility: salt\nHLLdata
|
||||||
|
parts = bytes.SplitN(data, []byte("\n"), 2)
|
||||||
if len(parts) != 2 {
|
if len(parts) != 2 {
|
||||||
return fmt.Errorf("invalid state file format")
|
return fmt.Errorf("invalid state file format")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,13 +27,14 @@ func NewTokenBucket(capacity, refillPerInterval int, interval time.Duration) *To
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allow checks if a request should be allowed
|
// Allow checks if a request should be allowed
|
||||||
|
// Uses monotonic time via time.Since() to prevent clock skew issues
|
||||||
func (tb *TokenBucket) Allow() bool {
|
func (tb *TokenBucket) Allow() bool {
|
||||||
tb.mu.Lock()
|
tb.mu.Lock()
|
||||||
defer tb.mu.Unlock()
|
defer tb.mu.Unlock()
|
||||||
|
|
||||||
// Refill tokens based on elapsed time
|
// Refill tokens based on elapsed time using monotonic clock
|
||||||
now := time.Now()
|
// time.Since() uses monotonic readings when available, unaffected by wall clock changes
|
||||||
elapsed := now.Sub(tb.lastFill)
|
elapsed := time.Since(tb.lastFill)
|
||||||
if elapsed >= tb.interval {
|
if elapsed >= tb.interval {
|
||||||
periods := int(elapsed / tb.interval)
|
periods := int(elapsed / tb.interval)
|
||||||
tb.tokens += periods * tb.refill
|
tb.tokens += periods * tb.refill
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue