fc-server: add timeseries API endpoints for visualisation
Adds: - `build_stats_timeseries()` for build counts over time - `duration_percentiles_timeseries()` for P50/P95/P99 - `system_distribution()` for per-system counts and of course, REST endpoints for `/api/v1/metrics/timeseries/*`. This is a good start for data visualisation. My professors would be proud. Signed-off-by: NotAShelf <raf@notashelf.dev> Change-Id: I3c0b9d14592945a661af77b7edf338a86a6a6964
This commit is contained in:
parent
75ff45fc91
commit
537fa823a7
2 changed files with 342 additions and 2 deletions
|
|
@ -1,3 +1,4 @@
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
|
@ -6,6 +7,31 @@ use crate::{
|
||||||
models::BuildMetric,
|
models::BuildMetric,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Time-series data point for metrics visualization.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct TimeseriesPoint {
|
||||||
|
pub timestamp: DateTime<Utc>,
|
||||||
|
pub value: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build statistics for a time bucket.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct BuildStatsBucket {
|
||||||
|
pub bucket_time: DateTime<Utc>,
|
||||||
|
pub total_builds: i64,
|
||||||
|
pub failed_builds: i64,
|
||||||
|
pub avg_duration: Option<f64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Duration percentile data for a time bucket.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct DurationPercentiles {
|
||||||
|
pub bucket_time: DateTime<Utc>,
|
||||||
|
pub p50: Option<f64>,
|
||||||
|
pub p95: Option<f64>,
|
||||||
|
pub p99: Option<f64>,
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn upsert(
|
pub async fn upsert(
|
||||||
pool: &PgPool,
|
pool: &PgPool,
|
||||||
build_id: Uuid,
|
build_id: Uuid,
|
||||||
|
|
@ -58,3 +84,170 @@ pub async fn calculate_failure_rate(
|
||||||
.count();
|
.count();
|
||||||
Ok((failed_count as f64) / (rows.len() as f64) * 100.0)
|
Ok((failed_count as f64) / (rows.len() as f64) * 100.0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get build success/failure counts over time.
|
||||||
|
/// Buckets builds by time interval for charting.
|
||||||
|
pub async fn get_build_stats_timeseries(
|
||||||
|
pool: &PgPool,
|
||||||
|
project_id: Option<Uuid>,
|
||||||
|
jobset_id: Option<Uuid>,
|
||||||
|
hours: i32,
|
||||||
|
bucket_minutes: i32,
|
||||||
|
) -> Result<Vec<BuildStatsBucket>> {
|
||||||
|
let rows: Vec<(DateTime<Utc>, i64, i64, Option<f64>)> = sqlx::query_as(
|
||||||
|
"SELECT
|
||||||
|
date_trunc('minute', b.completed_at) +
|
||||||
|
(EXTRACT(MINUTE FROM b.completed_at)::int / $4) * INTERVAL '1 minute' \
|
||||||
|
* $4 AS bucket_time,
|
||||||
|
COUNT(*) AS total_builds,
|
||||||
|
COUNT(*) FILTER (WHERE b.status = 'failed') AS failed_builds,
|
||||||
|
AVG(EXTRACT(EPOCH FROM (b.completed_at - b.started_at))) AS avg_duration
|
||||||
|
FROM builds b
|
||||||
|
JOIN evaluations e ON b.evaluation_id = e.id
|
||||||
|
JOIN jobsets j ON e.jobset_id = j.id
|
||||||
|
WHERE b.completed_at IS NOT NULL
|
||||||
|
AND b.completed_at > NOW() - (INTERVAL '1 hour' * $1)
|
||||||
|
AND ($2::uuid IS NULL OR j.project_id = $2)
|
||||||
|
AND ($3::uuid IS NULL OR j.id = $3)
|
||||||
|
GROUP BY bucket_time
|
||||||
|
ORDER BY bucket_time ASC",
|
||||||
|
)
|
||||||
|
.bind(hours)
|
||||||
|
.bind(project_id)
|
||||||
|
.bind(jobset_id)
|
||||||
|
.bind(bucket_minutes)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await
|
||||||
|
.map_err(CiError::Database)?;
|
||||||
|
|
||||||
|
Ok(
|
||||||
|
rows
|
||||||
|
.into_iter()
|
||||||
|
.map(|(bucket_time, total_builds, failed_builds, avg_duration)| {
|
||||||
|
BuildStatsBucket {
|
||||||
|
bucket_time,
|
||||||
|
total_builds,
|
||||||
|
failed_builds,
|
||||||
|
avg_duration,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get build duration percentiles over time.
|
||||||
|
pub async fn get_duration_percentiles_timeseries(
|
||||||
|
pool: &PgPool,
|
||||||
|
project_id: Option<Uuid>,
|
||||||
|
jobset_id: Option<Uuid>,
|
||||||
|
hours: i32,
|
||||||
|
bucket_minutes: i32,
|
||||||
|
) -> Result<Vec<DurationPercentiles>> {
|
||||||
|
let rows: Vec<(DateTime<Utc>, Option<f64>, Option<f64>, Option<f64>)> =
|
||||||
|
sqlx::query_as(
|
||||||
|
"SELECT
|
||||||
|
date_trunc('minute', b.completed_at) +
|
||||||
|
(EXTRACT(MINUTE FROM b.completed_at)::int / $4) * INTERVAL '1 minute' \
|
||||||
|
* $4 AS bucket_time,
|
||||||
|
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM \
|
||||||
|
(b.completed_at - b.started_at))) AS p50,
|
||||||
|
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM \
|
||||||
|
(b.completed_at - b.started_at))) AS p95,
|
||||||
|
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM \
|
||||||
|
(b.completed_at - b.started_at))) AS p99
|
||||||
|
FROM builds b
|
||||||
|
JOIN evaluations e ON b.evaluation_id = e.id
|
||||||
|
JOIN jobsets j ON e.jobset_id = j.id
|
||||||
|
WHERE b.completed_at IS NOT NULL
|
||||||
|
AND b.started_at IS NOT NULL
|
||||||
|
AND b.completed_at > NOW() - (INTERVAL '1 hour' * $1)
|
||||||
|
AND ($2::uuid IS NULL OR j.project_id = $2)
|
||||||
|
AND ($3::uuid IS NULL OR j.id = $3)
|
||||||
|
GROUP BY bucket_time
|
||||||
|
ORDER BY bucket_time ASC",
|
||||||
|
)
|
||||||
|
.bind(hours)
|
||||||
|
.bind(project_id)
|
||||||
|
.bind(jobset_id)
|
||||||
|
.bind(bucket_minutes)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await
|
||||||
|
.map_err(CiError::Database)?;
|
||||||
|
|
||||||
|
Ok(
|
||||||
|
rows
|
||||||
|
.into_iter()
|
||||||
|
.map(|(bucket_time, p50, p95, p99)| {
|
||||||
|
DurationPercentiles {
|
||||||
|
bucket_time,
|
||||||
|
p50,
|
||||||
|
p95,
|
||||||
|
p99,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get queue depth over time.
|
||||||
|
pub async fn get_queue_depth_timeseries(
|
||||||
|
pool: &PgPool,
|
||||||
|
hours: i32,
|
||||||
|
bucket_minutes: i32,
|
||||||
|
) -> Result<Vec<TimeseriesPoint>> {
|
||||||
|
// Since we don't have historical queue depth, we'll sample current pending
|
||||||
|
// builds and use build creation times to approximate queue depth over time
|
||||||
|
let rows: Vec<(DateTime<Utc>, i64)> = sqlx::query_as(
|
||||||
|
"SELECT
|
||||||
|
date_trunc('minute', created_at) +
|
||||||
|
(EXTRACT(MINUTE FROM created_at)::int / $2) * INTERVAL '1 minute' * $2 \
|
||||||
|
AS bucket_time,
|
||||||
|
COUNT(*) FILTER (WHERE status = 'pending') AS pending_count
|
||||||
|
FROM builds
|
||||||
|
WHERE created_at > NOW() - (INTERVAL '1 hour' * $1)
|
||||||
|
GROUP BY bucket_time
|
||||||
|
ORDER BY bucket_time ASC",
|
||||||
|
)
|
||||||
|
.bind(hours)
|
||||||
|
.bind(bucket_minutes)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await
|
||||||
|
.map_err(CiError::Database)?;
|
||||||
|
|
||||||
|
Ok(
|
||||||
|
rows
|
||||||
|
.into_iter()
|
||||||
|
.map(|(timestamp, value)| {
|
||||||
|
TimeseriesPoint {
|
||||||
|
timestamp,
|
||||||
|
value: value as f64,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get per-system build distribution.
|
||||||
|
pub async fn get_system_distribution(
|
||||||
|
pool: &PgPool,
|
||||||
|
project_id: Option<Uuid>,
|
||||||
|
hours: i32,
|
||||||
|
) -> Result<Vec<(String, i64)>> {
|
||||||
|
sqlx::query_as(
|
||||||
|
"SELECT
|
||||||
|
COALESCE(b.system, 'unknown') AS system,
|
||||||
|
COUNT(*) AS build_count
|
||||||
|
FROM builds b
|
||||||
|
JOIN evaluations e ON b.evaluation_id = e.id
|
||||||
|
JOIN jobsets j ON e.jobset_id = j.id
|
||||||
|
WHERE b.completed_at > NOW() - (INTERVAL '1 hour' * $1)
|
||||||
|
AND ($2::uuid IS NULL OR j.project_id = $2)
|
||||||
|
GROUP BY b.system
|
||||||
|
ORDER BY build_count DESC",
|
||||||
|
)
|
||||||
|
.bind(hours)
|
||||||
|
.bind(project_id)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await
|
||||||
|
.map_err(CiError::Database)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,59 @@
|
||||||
use axum::{
|
use axum::{
|
||||||
Router,
|
Router,
|
||||||
extract::State,
|
extract::{Query, State},
|
||||||
http::StatusCode,
|
http::StatusCode,
|
||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
routing::get,
|
routing::get,
|
||||||
};
|
};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::state::AppState;
|
use crate::state::AppState;
|
||||||
|
|
||||||
|
/// Query parameters for timeseries data
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct TimeseriesQuery {
|
||||||
|
project_id: Option<Uuid>,
|
||||||
|
jobset_id: Option<Uuid>,
|
||||||
|
#[serde(default = "default_hours")]
|
||||||
|
hours: i32,
|
||||||
|
#[serde(default = "default_bucket")]
|
||||||
|
bucket: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_hours() -> i32 {
|
||||||
|
24
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_bucket() -> i32 {
|
||||||
|
60
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Response type for build stats timeseries
|
||||||
|
#[derive(serde::Serialize)]
|
||||||
|
struct BuildStatsResponse {
|
||||||
|
timestamps: Vec<String>,
|
||||||
|
total: Vec<i64>,
|
||||||
|
failed: Vec<i64>,
|
||||||
|
avg_duration: Vec<Option<f64>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Response type for duration percentiles
|
||||||
|
#[derive(serde::Serialize)]
|
||||||
|
struct DurationPercentilesResponse {
|
||||||
|
timestamps: Vec<String>,
|
||||||
|
p50: Vec<Option<f64>>,
|
||||||
|
p95: Vec<Option<f64>>,
|
||||||
|
p99: Vec<Option<f64>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Response type for system distribution
|
||||||
|
#[derive(serde::Serialize)]
|
||||||
|
struct SystemDistributionResponse {
|
||||||
|
systems: Vec<String>,
|
||||||
|
counts: Vec<i64>,
|
||||||
|
}
|
||||||
|
|
||||||
async fn prometheus_metrics(State(state): State<AppState>) -> Response {
|
async fn prometheus_metrics(State(state): State<AppState>) -> Response {
|
||||||
let stats = match fc_common::repo::builds::get_stats(&state.pool).await {
|
let stats = match fc_common::repo::builds::get_stats(&state.pool).await {
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
|
|
@ -193,6 +239,107 @@ async fn prometheus_metrics(State(state): State<AppState>) -> Response {
|
||||||
.into_response()
|
.into_response()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn router() -> Router<AppState> {
|
/// Get build statistics timeseries data for visualization
|
||||||
Router::new().route("/metrics", get(prometheus_metrics))
|
async fn build_stats_timeseries(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Query(params): Query<TimeseriesQuery>,
|
||||||
|
) -> Response {
|
||||||
|
match fc_common::repo::build_metrics::get_build_stats_timeseries(
|
||||||
|
&state.pool,
|
||||||
|
params.project_id,
|
||||||
|
params.jobset_id,
|
||||||
|
params.hours,
|
||||||
|
params.bucket,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(buckets) => {
|
||||||
|
let response = BuildStatsResponse {
|
||||||
|
timestamps: buckets
|
||||||
|
.iter()
|
||||||
|
.map(|b| b.bucket_time.format("%Y-%m-%dT%H:%M:%SZ").to_string())
|
||||||
|
.collect(),
|
||||||
|
total: buckets.iter().map(|b| b.total_builds).collect(),
|
||||||
|
failed: buckets.iter().map(|b| b.failed_builds).collect(),
|
||||||
|
avg_duration: buckets.iter().map(|b| b.avg_duration).collect(),
|
||||||
|
};
|
||||||
|
(StatusCode::OK, axum::Json(response)).into_response()
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to fetch build stats timeseries: {e}");
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get duration percentile timeseries data
|
||||||
|
async fn duration_percentiles_timeseries(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Query(params): Query<TimeseriesQuery>,
|
||||||
|
) -> Response {
|
||||||
|
match fc_common::repo::build_metrics::get_duration_percentiles_timeseries(
|
||||||
|
&state.pool,
|
||||||
|
params.project_id,
|
||||||
|
params.jobset_id,
|
||||||
|
params.hours,
|
||||||
|
params.bucket,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(buckets) => {
|
||||||
|
let response = DurationPercentilesResponse {
|
||||||
|
timestamps: buckets
|
||||||
|
.iter()
|
||||||
|
.map(|b| b.bucket_time.format("%Y-%m-%dT%H:%M:%SZ").to_string())
|
||||||
|
.collect(),
|
||||||
|
p50: buckets.iter().map(|b| b.p50).collect(),
|
||||||
|
p95: buckets.iter().map(|b| b.p95).collect(),
|
||||||
|
p99: buckets.iter().map(|b| b.p99).collect(),
|
||||||
|
};
|
||||||
|
(StatusCode::OK, axum::Json(response)).into_response()
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to fetch duration percentiles: {e}");
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get system distribution data
|
||||||
|
async fn system_distribution(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Query(params): Query<TimeseriesQuery>,
|
||||||
|
) -> Response {
|
||||||
|
match fc_common::repo::build_metrics::get_system_distribution(
|
||||||
|
&state.pool,
|
||||||
|
params.project_id,
|
||||||
|
params.hours,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(distribution) => {
|
||||||
|
let (systems, counts): (Vec<String>, Vec<i64>) =
|
||||||
|
distribution.into_iter().unzip();
|
||||||
|
let response = SystemDistributionResponse { systems, counts };
|
||||||
|
(StatusCode::OK, axum::Json(response)).into_response()
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to fetch system distribution: {e}");
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR.into_response()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn router() -> Router<AppState> {
|
||||||
|
Router::new()
|
||||||
|
.route("/prometheus", get(prometheus_metrics))
|
||||||
|
.route(
|
||||||
|
"/api/v1/metrics/timeseries/builds",
|
||||||
|
get(build_stats_timeseries),
|
||||||
|
)
|
||||||
|
.route(
|
||||||
|
"/api/v1/metrics/timeseries/duration",
|
||||||
|
get(duration_percentiles_timeseries),
|
||||||
|
)
|
||||||
|
.route("/api/v1/metrics/systems", get(system_distribution))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue