server: fix NAR fallback; distinguish 404 vs 502; add /metrics endpoint
Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: Ib86ef7efb3cefc34cf5642fce9c7d0ef6a6a6964
This commit is contained in:
parent
35b8fb2672
commit
f504f3114f
1 changed files with 56 additions and 6 deletions
|
|
@ -1,6 +1,7 @@
|
||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
|
@ -8,7 +9,9 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
"notashelf.dev/ncro/internal/config"
|
"notashelf.dev/ncro/internal/config"
|
||||||
|
"notashelf.dev/ncro/internal/metrics"
|
||||||
"notashelf.dev/ncro/internal/prober"
|
"notashelf.dev/ncro/internal/prober"
|
||||||
"notashelf.dev/ncro/internal/router"
|
"notashelf.dev/ncro/internal/router"
|
||||||
)
|
)
|
||||||
|
|
@ -38,6 +41,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
s.handleCacheInfo(w, r)
|
s.handleCacheInfo(w, r)
|
||||||
case path == "/health":
|
case path == "/health":
|
||||||
s.handleHealth(w, r)
|
s.handleHealth(w, r)
|
||||||
|
case path == "/metrics":
|
||||||
|
promhttp.Handler().ServeHTTP(w, r)
|
||||||
case strings.HasSuffix(path, ".narinfo"):
|
case strings.HasSuffix(path, ".narinfo"):
|
||||||
s.handleNarinfo(w, r)
|
s.handleNarinfo(w, r)
|
||||||
case strings.HasPrefix(path, "/nar/"):
|
case strings.HasPrefix(path, "/nar/"):
|
||||||
|
|
@ -64,23 +69,65 @@ func (s *Server) handleNarinfo(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
result, err := s.router.Resolve(hash, s.upstreamURLs())
|
result, err := s.router.Resolve(hash, s.upstreamURLs())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Warn("narinfo not found", "hash", hash, "error", err)
|
slog.Warn("narinfo resolve failed", "hash", hash, "error", err)
|
||||||
|
metrics.NarinfoRequests.WithLabelValues("error").Inc()
|
||||||
|
switch {
|
||||||
|
case errors.Is(err, router.ErrNotFound):
|
||||||
http.NotFound(w, r)
|
http.NotFound(w, r)
|
||||||
|
default:
|
||||||
|
http.Error(w, "upstream unavailable", http.StatusBadGateway)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("narinfo routed", "hash", hash, "upstream", result.URL, "cache_hit", result.CacheHit)
|
slog.Info("narinfo routed", "hash", hash, "upstream", result.URL, "cache_hit", result.CacheHit)
|
||||||
|
metrics.NarinfoRequests.WithLabelValues("200").Inc()
|
||||||
|
|
||||||
|
if len(result.NarInfoBytes) > 0 {
|
||||||
|
w.Header().Set("Content-Type", "text/x-nix-narinfo")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write(result.NarInfoBytes)
|
||||||
|
return
|
||||||
|
}
|
||||||
s.proxyRequest(w, r, result.URL+r.URL.Path)
|
s.proxyRequest(w, r, result.URL+r.URL.Path)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleNAR(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleNAR(w http.ResponseWriter, r *http.Request) {
|
||||||
|
metrics.NARRequests.Inc()
|
||||||
sorted := s.prober.SortedByLatency()
|
sorted := s.prober.SortedByLatency()
|
||||||
if len(sorted) == 0 {
|
if len(sorted) == 0 {
|
||||||
http.Error(w, "no upstreams available", http.StatusServiceUnavailable)
|
http.Error(w, "no upstreams available", http.StatusServiceUnavailable)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
slog.Debug("proxying NAR", "path", r.URL.Path, "upstream", sorted[0].URL)
|
for _, h := range sorted {
|
||||||
s.proxyRequest(w, r, sorted[0].URL+r.URL.Path)
|
if h.Status == prober.StatusDown {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
targetURL := h.URL + r.URL.Path
|
||||||
|
req, err := http.NewRequestWithContext(r.Context(), r.Method, targetURL, r.Body)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, hdr := range []string{"Accept", "Accept-Encoding", "Range"} {
|
||||||
|
if v := r.Header.Get(hdr); v != "" {
|
||||||
|
req.Header.Set(hdr, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resp, err := s.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("NAR upstream failed", "upstream", h.URL, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
|
resp.Body.Close()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
slog.Debug("proxying NAR", "path", r.URL.Path, "upstream", h.URL)
|
||||||
|
s.copyResponse(w, resp)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
http.NotFound(w, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Forwards r to targetURL and streams the response zero-copy.
|
// Forwards r to targetURL and streams the response zero-copy.
|
||||||
|
|
@ -95,7 +142,6 @@ func (s *Server) proxyRequest(w http.ResponseWriter, r *http.Request, targetURL
|
||||||
req.Header.Set(h, v)
|
req.Header.Set(h, v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := s.client.Do(req)
|
resp, err := s.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("upstream request failed", "url", targetURL, "error", err)
|
slog.Error("upstream request failed", "url", targetURL, "error", err)
|
||||||
|
|
@ -103,7 +149,11 @@ func (s *Server) proxyRequest(w http.ResponseWriter, r *http.Request, targetURL
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
s.copyResponse(w, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copies response headers and body from resp to w.
|
||||||
|
func (s *Server) copyResponse(w http.ResponseWriter, resp *http.Response) {
|
||||||
for _, h := range []string{
|
for _, h := range []string{
|
||||||
"Content-Type", "Content-Length", "Content-Encoding",
|
"Content-Type", "Content-Length", "Content-Encoding",
|
||||||
"X-Nix-Signature", "Cache-Control", "Last-Modified",
|
"X-Nix-Signature", "Cache-Control", "Last-Modified",
|
||||||
|
|
@ -114,7 +164,7 @@ func (s *Server) proxyRequest(w http.ResponseWriter, r *http.Request, targetURL
|
||||||
}
|
}
|
||||||
w.WriteHeader(resp.StatusCode)
|
w.WriteHeader(resp.StatusCode)
|
||||||
if _, err := io.Copy(w, resp.Body); err != nil {
|
if _, err := io.Copy(w, resp.Body); err != nil {
|
||||||
slog.Warn("stream interrupted", "url", targetURL, "error", err)
|
slog.Warn("stream interrupted", "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue