From 4f9c7057ff5ba3bb7517ceaf54667c3194e7ab2a Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Wed, 18 Feb 2026 21:08:00 +0300 Subject: [PATCH] various: cleanup Signed-off-by: NotAShelf Change-Id: I26df4d852b4b22d0df6b6871fe9cbde96a6a6964 --- pscand-cli/src/main.rs | 341 ++++++++++++++++++++-- pscand-core/src/config.rs | 4 +- pscand-core/src/helpers/process.rs | 13 +- pscand-core/src/helpers/sensor.rs | 6 +- pscand-core/src/lib.rs | 2 +- pscand-core/src/logging.rs | 452 ++++++++++++++++++++++++++++- scanners/scanner-system/src/lib.rs | 14 +- 7 files changed, 774 insertions(+), 58 deletions(-) diff --git a/pscand-cli/src/main.rs b/pscand-cli/src/main.rs index 0fd514e..0fd6699 100644 --- a/pscand-cli/src/main.rs +++ b/pscand-cli/src/main.rs @@ -1,11 +1,12 @@ use clap::Parser; use libloading::Library; -use pscand_core::logging::{LogEntry, RingBufferLogger}; +use pscand_core::logging::{LogLevel, RingBufferLogger}; use pscand_core::scanner::Scanner; use pscand_core::Config as CoreConfig; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock; use tokio::time::interval; @@ -38,6 +39,115 @@ struct LoadedScanner { library: Library, } +#[derive(Clone)] +struct DaemonState { + running: Arc, + shutdown_requested: Arc, + start_time: Arc>, + last_collection: Arc>, + collection_count: Arc>, + error_count: Arc>, + heartbeat_path: PathBuf, +} + +impl DaemonState { + fn new(heartbeat_path: PathBuf) -> Self { + Self { + running: Arc::new(AtomicBool::new(true)), + shutdown_requested: Arc::new(AtomicBool::new(false)), + start_time: Arc::new(RwLock::new(SystemTime::now())), + last_collection: Arc::new(RwLock::new(SystemTime::UNIX_EPOCH)), + collection_count: Arc::new(RwLock::new(0)), + error_count: Arc::new(RwLock::new(0)), + heartbeat_path, + } + } + + async fn record_collection(&self) { + *self.last_collection.write().await = SystemTime::now(); + *self.collection_count.write().await += 1; + } + + async fn record_error(&self) { + *self.error_count.write().await += 1; + } + + fn get_stats_sync(&self) -> (u64, u64, u64) { + let collections = self.collection_count.try_read().map(|r| *r).unwrap_or(0); + let errors = self.error_count.try_read().map(|r| *r).unwrap_or(0); + let uptime = self + .start_time + .try_read() + .map(|t| { + SystemTime::now() + .duration_since(*t) + .map(|d| d.as_secs()) + .unwrap_or(0) + }) + .unwrap_or(0); + (collections, errors, uptime) + } + + async fn update_heartbeat(&self) -> std::io::Result<()> { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + let (collections, errors, uptime) = self.get_stats_sync(); + + let heartbeat = format!( + "{},{},{},{},{}\n", + now, + collections, + errors, + uptime, + if self.shutdown_requested.load(Ordering::SeqCst) { + "shutdown" + } else { + "running" + } + ); + + std::fs::write(&self.heartbeat_path, heartbeat)?; + Ok(()) + } + + async fn write_status(&self, logger: &RingBufferLogger) -> std::io::Result<()> { + let uptime = SystemTime::now() + .duration_since(self.start_time.try_read().map(|t| *t).unwrap_or(UNIX_EPOCH)) + .map(|d| d.as_secs()) + .unwrap_or(0); + let collections = self.collection_count.try_read().map(|c| *c).unwrap_or(0); + let errors = self.error_count.try_read().map(|e| *e).unwrap_or(0); + let last = self + .last_collection + .try_read() + .map(|l| *l) + .unwrap_or(UNIX_EPOCH); + let last_secs = last + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + + let status = serde_json::json!({ + "uptime_seconds": uptime, + "total_collections": collections, + "total_errors": errors, + "last_collection": last_secs, + "shutdown_requested": self.shutdown_requested.load(Ordering::SeqCst), + }); + + logger.log( + LogLevel::Info, + "daemon", + "status", + serde_json::to_string(&status).unwrap_or_default(), + ); + + Ok(()) + } +} + #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); @@ -80,73 +190,232 @@ async fn run_daemon(args: RunArgs) -> Result<(), Box> { config.file_enabled, )); + let heartbeat_path = config.log_dir.join("heartbeat"); + let daemon_state = DaemonState::new(heartbeat_path); + + daemon_state.update_heartbeat().await?; + + logger.log( + LogLevel::Info, + "daemon", + "startup", + serde_json::json!({ + "version": "0.1.0", + "ring_buffer_size": config.ring_buffer_size, + "journal_enabled": config.journal_enabled, + "file_enabled": config.file_enabled, + }) + .to_string(), + ); + std::panic::set_hook(Box::new({ let logger = Arc::clone(&logger); + let daemon_state = daemon_state.clone(); let log_dir = config.log_dir.clone(); move |panic_info| { + daemon_state.running.store(false, Ordering::SeqCst); + + let (collections, errors, uptime) = daemon_state.get_stats_sync(); let entries = logger.get_recent(60); let crash_log = log_dir.join("crash.log"); + if let Ok(mut file) = std::fs::File::create(&crash_log) { use std::io::Write; let _ = writeln!(file, "=== Crash at {} ===", chrono::Utc::now()); let _ = writeln!(file, "Panic: {}", panic_info); + let _ = writeln!(file, "Uptime: {} seconds", uptime); + let _ = writeln!(file, "Total collections: {}", collections); + let _ = writeln!(file, "Total errors: {}", errors); let _ = writeln!(file, "\n=== Last {} log entries ===", entries.len()); for entry in entries { let _ = writeln!(file, "{}", entry.to_json()); } } + + if let Ok(mut file) = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&daemon_state.heartbeat_path) + { + use std::io::Write; + let _ = writeln!( + file, + "PANIC,{},{},{},{}", + uptime, + collections, + errors, + chrono::Utc::now() + ); + } } })); - let scanners = load_scanners(&config).await?; + let scanners = load_scanners(&config, &logger).await?; if scanners.is_empty() { log::warn!("No scanners loaded!"); + logger.log(LogLevel::Warn, "daemon", "no_scanners", "{}".to_string()); } else { log::info!("Loaded {} scanners", scanners.len()); + logger.log( + LogLevel::Info, + "daemon", + "scanners_loaded", + serde_json::json!({ + "count": scanners.len(), + "names": scanners.iter().map(|s| s.name.clone()).collect::>() + }) + .to_string(), + ); } - let mut handles = Vec::new(); + let scanner_handles = Arc::new(RwLock::new(Vec::new())); + let scanner_handles_shutdown = scanner_handles.clone(); + let daemon_state_clone = daemon_state.clone(); + let logger_clone = Arc::clone(&logger); - for loaded in scanners { - let logger = Arc::clone(&logger); - let name = loaded.name.clone(); - let scanner = loaded.scanner.clone(); + let scanner_task = tokio::spawn(async move { + let mut handles = Vec::new(); - let handle = tokio::spawn(async move { - let mut ticker = interval(Duration::from_secs(1)); + for loaded in scanners { + let logger = Arc::clone(&logger_clone); + let name = loaded.name.clone(); + let scanner = loaded.scanner.clone(); + let state = daemon_state_clone.clone(); - loop { - ticker.tick().await; + let handle = tokio::spawn(async move { + let mut ticker = interval(Duration::from_secs(1)); + let _collection_start = Instant::now(); - let scanner_guard = scanner.read().await; - match scanner_guard.collect() { - Ok(metrics) => { - let entry = LogEntry::new(name.as_str(), metrics); - logger.push(entry); + loop { + ticker.tick().await; + + if state.shutdown_requested.load(Ordering::SeqCst) { + let scanner_guard = scanner.read().await; + match scanner_guard.collect() { + Ok(metrics) => { + logger.log( + LogLevel::Info, + &name, + "final_collection", + serde_json::json!({ + "metrics": metrics, + "reason": "shutdown" + }) + .to_string(), + ); + } + Err(e) => { + logger.log( + LogLevel::Error, + &name, + "final_collection_error", + e.to_string(), + ); + } + } + state.record_collection().await; + state.update_heartbeat().await.ok(); + break; } - Err(e) => { - log::error!("Scanner {} error: {}", name, e); + + let scan_start = Instant::now(); + let scanner_guard = scanner.read().await; + match scanner_guard.collect() { + Ok(metrics) => { + let elapsed = scan_start.elapsed().as_millis(); + logger.log( + LogLevel::Info, + &name, + "metrics", + serde_json::json!({ + "metrics": metrics, + "collection_time_ms": elapsed + }) + .to_string(), + ); + state.record_collection().await; + } + Err(e) => { + logger.log(LogLevel::Error, &name, "collection_error", e.to_string()); + state.record_error().await; + } } + state.update_heartbeat().await.ok(); } - } - }); + }); - handles.push(handle); + handles.push(handle); + } + + *scanner_handles_shutdown.write().await = handles; + }); + + let daemon_state_hb = daemon_state.clone(); + let heartbeat_task = tokio::spawn(async move { + let mut ticker = interval(Duration::from_secs(5)); + loop { + ticker.tick().await; + daemon_state_hb.update_heartbeat().await.ok(); + } + }); + + let sigint = tokio::signal::ctrl_c(); + let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; + + tokio::select! { + _ = sigint => { + log::info!("Received Ctrl+C, initiating graceful shutdown"); + } + _ = sigterm.recv() => { + log::info!("Received SIGTERM, initiating graceful shutdown"); + } } - tokio::signal::ctrl_c().await?; - log::info!("Shutting down pscand"); + daemon_state + .shutdown_requested + .store(true, Ordering::SeqCst); + daemon_state.running.store(false, Ordering::SeqCst); - for handle in handles { - handle.abort(); - } + let (collections, errors, _) = daemon_state.get_stats_sync(); + logger.log( + LogLevel::Info, + "daemon", + "shutdown_initiated", + format!( + "{{\"collections\": {}, \"errors\": {}}}", + collections, errors + ), + ); + heartbeat_task.abort(); + let _ = heartbeat_task.await; + + daemon_state + .shutdown_requested + .store(true, Ordering::SeqCst); + scanner_task.await.ok(); + + daemon_state.write_status(&logger).await.ok(); + daemon_state.update_heartbeat().await.ok(); + + logger.log( + LogLevel::Info, + "daemon", + "shutdown_complete", + "{}".to_string(), + ); + + let ring_buffer_path = config.log_dir.join("pscand.buffer"); + logger.flush_to_file(&ring_buffer_path)?; + + log::info!("pscand shut down cleanly"); Ok(()) } async fn load_scanners( config: &CoreConfig, + logger: &RingBufferLogger, ) -> Result, Box> { let mut loaded = Vec::new(); @@ -173,6 +442,12 @@ async fn load_scanners( Ok(s) => s, Err(e) => { log::warn!("Scanner {:?} missing symbol: {}", path, e); + logger.log( + LogLevel::Warn, + "loader", + "missing_symbol", + format!("{}: {}", path.display(), e), + ); continue; } }; @@ -195,6 +470,12 @@ async fn load_scanners( let toml_val = toml::Value::Table(toml_map); if let Err(e) = scanner.init(&toml_val) { log::error!("Failed to init scanner {}: {}", name, e); + logger.log( + LogLevel::Error, + "loader", + "init_failed", + format!("{}: {}", name, e), + ); continue; } } @@ -207,6 +488,12 @@ async fn load_scanners( } Err(e) => { log::warn!("Failed to load scanner {:?}: {}", path, e); + logger.log( + LogLevel::Warn, + "loader", + "load_failed", + format!("{}: {}", path.display(), e), + ); } } } diff --git a/pscand-core/src/config.rs b/pscand-core/src/config.rs index e1e3d46..f800719 100644 --- a/pscand-core/src/config.rs +++ b/pscand-core/src/config.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use thiserror::Error; #[derive(Error, Debug)] @@ -109,7 +109,7 @@ impl Default for Config { } impl Config { - pub fn load(path: &PathBuf) -> ConfigResult { + pub fn load(path: &Path) -> ConfigResult { let content = std::fs::read_to_string(path)?; let mut config: Config = toml::from_str(&content)?; config.scanner_dirs.retain(|p| p.exists()); diff --git a/pscand-core/src/helpers/process.rs b/pscand-core/src/helpers/process.rs index f31cf74..d471ca5 100644 --- a/pscand-core/src/helpers/process.rs +++ b/pscand-core/src/helpers/process.rs @@ -61,24 +61,17 @@ impl ProcessHelper { .collect::>() .join(" "); } else if line.starts_with("State:") { - state = line - .split_whitespace() - .skip(1) - .next() - .unwrap_or("") - .to_string(); + state = line.split_whitespace().nth(1).unwrap_or("").to_string(); } else if line.starts_with("PPid:") { ppid = line .split_whitespace() - .skip(1) - .next() + .nth(1) .and_then(|s| s.parse().ok()) .unwrap_or(0); } else if line.starts_with("VmRSS:") { memory_kb = line .split_whitespace() - .skip(1) - .next() + .nth(1) .and_then(|s| s.parse().ok()) .unwrap_or(0); } diff --git a/pscand-core/src/helpers/sensor.rs b/pscand-core/src/helpers/sensor.rs index 183b3d4..4413b7a 100644 --- a/pscand-core/src/helpers/sensor.rs +++ b/pscand-core/src/helpers/sensor.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::fs; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; pub struct SensorHelper; @@ -21,7 +21,7 @@ impl SensorHelper { Ok(hwmons) } - pub fn read_hwmon_sensor(hwmon_path: &PathBuf, sensor: &str) -> std::io::Result> { + pub fn read_hwmon_sensor(hwmon_path: &Path, sensor: &str) -> std::io::Result> { let sensor_path = hwmon_path.join(sensor); if sensor_path.exists() { let content = fs::read_to_string(sensor_path)?; @@ -31,7 +31,7 @@ impl SensorHelper { } } - pub fn hwmon_info(hwmon_path: &PathBuf) -> std::io::Result> { + pub fn hwmon_info(hwmon_path: &Path) -> std::io::Result> { let mut info = HashMap::new(); let name_path = hwmon_path.join("name"); diff --git a/pscand-core/src/lib.rs b/pscand-core/src/lib.rs index eb30000..1067fa9 100644 --- a/pscand-core/src/lib.rs +++ b/pscand-core/src/lib.rs @@ -4,6 +4,6 @@ pub mod logging; pub mod scanner; pub use config::Config; -pub use logging::{LogEntry, RingBufferLogger}; +pub use logging::{DaemonLogEntry, LogLevel, RingBufferLogger}; pub use scanner::{MetricValue, Scanner, ScannerError}; pub type Result = std::result::Result; diff --git a/pscand-core/src/logging.rs b/pscand-core/src/logging.rs index 709b0ee..8426961 100644 --- a/pscand-core/src/logging.rs +++ b/pscand-core/src/logging.rs @@ -10,16 +10,84 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs::{self, OpenOptions}; use std::io::Write; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::time::Instant; use crate::scanner::MetricValue; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum LogLevel { + Debug, + Info, + Warn, + Error, + Critical, +} + +impl LogLevel { + pub fn from_str(s: &str) -> Self { + match s.to_lowercase().as_str() { + "debug" => LogLevel::Debug, + "info" => LogLevel::Info, + "warn" | "warning" => LogLevel::Warn, + "error" | "err" => LogLevel::Error, + "critical" | "crit" | "fatal" => LogLevel::Critical, + _ => LogLevel::Info, + } + } + + pub fn as_str(&self) -> &'static str { + match self { + LogLevel::Debug => "DEBUG", + LogLevel::Info => "INFO", + LogLevel::Warn => "WARN", + LogLevel::Error => "ERROR", + LogLevel::Critical => "CRITICAL", + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LogEntry { pub timestamp: DateTime, pub scanner: String, + pub level: LogLevel, + pub message: Option, pub metrics: HashMap, + pub collection_time_ms: Option, + pub error_count: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DaemonLogEntry { + pub timestamp: DateTime, + pub level: LogLevel, + pub source: String, + pub event: String, + pub message: String, +} + +impl DaemonLogEntry { + pub fn new(source: impl Into, event: impl Into, message: String) -> Self { + Self { + timestamp: Utc::now(), + level: LogLevel::Info, + source: source.into(), + event: event.into(), + message, + } + } + + pub fn with_level(mut self, level: LogLevel) -> Self { + self.level = level; + self + } + + pub fn to_json(&self) -> String { + serde_json::to_string(self).unwrap_or_default() + } } impl LogEntry { @@ -27,33 +95,73 @@ impl LogEntry { Self { timestamp: Utc::now(), scanner: scanner.into(), + level: LogLevel::Info, + message: None, metrics, + collection_time_ms: None, + error_count: None, } } + pub fn with_level(mut self, level: LogLevel) -> Self { + self.level = level; + self + } + + pub fn with_message(mut self, message: impl Into) -> Self { + self.message = Some(message.into()); + self + } + + pub fn with_timing(mut self, duration: Instant) -> Self { + self.collection_time_ms = Some(duration.elapsed().as_millis() as u64); + self + } + + pub fn with_error_count(mut self, count: u64) -> Self { + self.error_count = Some(count); + self + } + pub fn to_json(&self) -> String { serde_json::to_string(self).unwrap_or_else(|e| format!("{{\"error\":\"{}\"}}", e)) } pub fn to_journal(&self) -> String { let metrics_json = serde_json::to_string(&self.metrics).unwrap_or_default(); - format!( - "PSCAND_SCANNER={} PSCAND_METRICS={}", - self.scanner, metrics_json - ) + let level_str = self.level.as_str(); + if let Some(ref msg) = self.message { + format!( + "PSCAND_SCANNER={} PSCAND_LEVEL={} PSCAND_MSG={} PSCAND_METRICS={}", + self.scanner, level_str, msg, metrics_json + ) + } else { + format!( + "PSCAND_SCANNER={} PSCAND_LEVEL={} PSCAND_METRICS={}", + self.scanner, level_str, metrics_json + ) + } } } type RbStorage = Heap; +type RbStorageDaemon = Heap; type SharedRbLog = SharedRb; +type SharedRbLogDaemon = SharedRb; struct RingBufferHandles { prod: CachingProd>, cons: CachingCons>, } +struct DaemonBufferHandles { + prod: CachingProd>, + cons: CachingCons>, +} + pub struct RingBufferLogger { buffer: Arc>, + daemon_buffer: Arc>, file_path: Option, journal_enabled: bool, file_enabled: bool, @@ -68,11 +176,18 @@ impl RingBufferLogger { ) -> Self { let rb = SharedRb::::new(capacity); let (prod, cons) = rb.split(); - let handles = RingBufferHandles { prod, cons }; + let daemon_rb = SharedRb::::new(capacity); + let (daemon_prod, daemon_cons) = daemon_rb.split(); + let daemon_handles = DaemonBufferHandles { + prod: daemon_prod, + cons: daemon_cons, + }; + Self { buffer: Arc::new(Mutex::new(handles)), + daemon_buffer: Arc::new(Mutex::new(daemon_handles)), file_path, journal_enabled, file_enabled, @@ -96,6 +211,31 @@ impl RingBufferLogger { } } + pub fn log(&self, level: LogLevel, source: &str, event: &str, message: String) { + let entry = DaemonLogEntry { + timestamp: Utc::now(), + level, + source: source.to_string(), + event: event.to_string(), + message, + }; + + { + let mut daemon_handles = self.daemon_buffer.lock(); + if daemon_handles.prod.is_full() { + let _ = daemon_handles.cons.try_pop(); + } + let _ = daemon_handles.prod.try_push(entry.clone()); + } + + if self.journal_enabled { + self.write_daemon_to_journal(&entry); + } + if self.file_enabled { + self.write_daemon_to_file(&entry); + } + } + fn write_to_journal(&self, entry: &LogEntry) { let msg = entry.to_journal(); let _ = std::process::Command::new("logger") @@ -115,12 +255,46 @@ impl RingBufferLogger { } } + fn write_daemon_to_journal(&self, entry: &DaemonLogEntry) { + let priority = match entry.level { + LogLevel::Debug => 7, + LogLevel::Info => 6, + LogLevel::Warn => 4, + LogLevel::Error => 3, + LogLevel::Critical => 2, + }; + let msg = format!( + "PSCAND_SOURCE={} PSCAND_EVENT={} PSCAND_MSG={}", + entry.source, entry.event, entry.message + ); + let _ = std::process::Command::new("logger") + .arg("-t") + .arg("pscand") + .arg("-p") + .arg(format!("user.{}", priority)) + .arg(msg) + .spawn(); + } + + fn write_daemon_to_file(&self, entry: &DaemonLogEntry) { + if let Some(ref path) = self.file_path { + if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) { + let _ = writeln!(file, "{}", entry.to_json()); + } + } + } + + pub fn get_daemon_recent(&self, count: usize) -> Vec { + let handles = self.daemon_buffer.lock(); + handles.cons.iter().rev().take(count).cloned().collect() + } + pub fn get_recent(&self, count: usize) -> Vec { let handles = self.buffer.lock(); handles.cons.iter().rev().take(count).cloned().collect() } - pub fn flush_to_file(&self, path: &PathBuf) -> std::io::Result<()> { + pub fn flush_to_file(&self, path: &Path) -> std::io::Result<()> { let entries = self.get_recent(usize::MAX); let mut file = fs::File::create(path)?; for entry in entries { @@ -135,3 +309,267 @@ impl Default for RingBufferLogger { Self::new(60, None, true, false) } } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RuntimeStats { + pub uptime_secs: u64, + pub total_collections: u64, + pub total_errors: u64, + pub last_collection_time_ms: u64, + pub avg_collection_time_ms: f64, + pub scanner_stats: HashMap, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScannerStats { + pub collections: u64, + pub errors: u64, + pub last_error: Option, + pub last_collection_time_ms: u64, + pub avg_collection_time_ms: f64, +} + +pub struct RuntimeMonitor { + start_time: Instant, + total_collections: AtomicU64, + total_errors: AtomicU64, + last_collection_time: AtomicU64, + collection_time_sum: AtomicU64, + scanner_collections: parking_lot::Mutex>, + scanner_errors: parking_lot::Mutex>, + scanner_last_error: parking_lot::Mutex>, + scanner_last_time: parking_lot::Mutex>, + scanner_time_sum: parking_lot::Mutex>, +} + +impl RuntimeMonitor { + pub fn new() -> Self { + Self { + start_time: Instant::now(), + total_collections: AtomicU64::new(0), + total_errors: AtomicU64::new(0), + last_collection_time: AtomicU64::new(0), + collection_time_sum: AtomicU64::new(0), + scanner_collections: parking_lot::Mutex::new(HashMap::new()), + scanner_errors: parking_lot::Mutex::new(HashMap::new()), + scanner_last_error: parking_lot::Mutex::new(HashMap::new()), + scanner_last_time: parking_lot::Mutex::new(HashMap::new()), + scanner_time_sum: parking_lot::Mutex::new(HashMap::new()), + } + } + + pub fn record_collection(&self, scanner: &str, time_ms: u64, error: Option<&str>) { + self.total_collections.fetch_add(1, Ordering::Relaxed); + self.last_collection_time.store(time_ms, Ordering::Relaxed); + self.collection_time_sum + .fetch_add(time_ms, Ordering::Relaxed); + + let mut collections = self.scanner_collections.lock(); + let collections = collections + .entry(scanner.to_string()) + .or_insert_with(|| AtomicU64::new(0)); + collections.fetch_add(1, Ordering::Relaxed); + + let mut last_time = self.scanner_last_time.lock(); + let last_time = last_time + .entry(scanner.to_string()) + .or_insert_with(|| AtomicU64::new(0)); + last_time.store(time_ms, Ordering::Relaxed); + + let mut time_sum = self.scanner_time_sum.lock(); + let time_sum = time_sum + .entry(scanner.to_string()) + .or_insert_with(|| AtomicU64::new(0)); + time_sum.fetch_add(time_ms, Ordering::Relaxed); + + if let Some(err) = error { + self.total_errors.fetch_add(1, Ordering::Relaxed); + + let mut errors = self.scanner_errors.lock(); + let errors = errors + .entry(scanner.to_string()) + .or_insert_with(|| AtomicU64::new(0)); + errors.fetch_add(1, Ordering::Relaxed); + + let mut last_error = self.scanner_last_error.lock(); + last_error.insert(scanner.to_string(), err.to_string()); + } + } + + pub fn get_stats(&self) -> RuntimeStats { + let uptime = self.start_time.elapsed().as_secs(); + let total = self.total_collections.load(Ordering::Relaxed); + let errors = self.total_errors.load(Ordering::Relaxed); + let last_time = self.last_collection_time.load(Ordering::Relaxed); + let sum_time = self.collection_time_sum.load(Ordering::Relaxed); + let avg = if total > 0 { + sum_time as f64 / total as f64 + } else { + 0.0 + }; + + let mut scanner_stats = HashMap::new(); + + let collections = self.scanner_collections.lock(); + let errors_map = self.scanner_errors.lock(); + let last_error_map = self.scanner_last_error.lock(); + let last_time_map = self.scanner_last_time.lock(); + let time_sum_map = self.scanner_time_sum.lock(); + + for (name, coll) in collections.iter() { + let coll_count = coll.load(Ordering::Relaxed); + let err_count = errors_map + .get(name) + .map(|e| e.load(Ordering::Relaxed)) + .unwrap_or(0); + let last_err = last_error_map.get(name).cloned(); + let last_t = last_time_map + .get(name) + .map(|t| t.load(Ordering::Relaxed)) + .unwrap_or(0); + let sum_t = time_sum_map + .get(name) + .map(|s| s.load(Ordering::Relaxed)) + .unwrap_or(0); + let avg_t = if coll_count > 0 { + sum_t as f64 / coll_count as f64 + } else { + 0.0 + }; + + scanner_stats.insert( + name.clone(), + ScannerStats { + collections: coll_count, + errors: err_count, + last_error: last_err, + last_collection_time_ms: last_t, + avg_collection_time_ms: avg_t, + }, + ); + } + + RuntimeStats { + uptime_secs: uptime, + total_collections: total, + total_errors: errors, + last_collection_time_ms: last_time, + avg_collection_time_ms: avg, + scanner_stats, + } + } +} + +impl Default for RuntimeMonitor { + fn default() -> Self { + Self::new() + } +} + +pub struct Heartbeat { + path: PathBuf, + interval_secs: u64, + last_update: parking_lot::Mutex, +} + +impl Heartbeat { + pub fn new(path: PathBuf, interval_secs: u64) -> Self { + Self { + path, + interval_secs, + last_update: parking_lot::Mutex::new(Instant::now()), + } + } + + pub fn touch(&self) -> std::io::Result<()> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + + if let Some(parent) = self.path.parent() { + fs::create_dir_all(parent)?; + } + + let mut file = fs::File::create(&self.path)?; + writeln!(file, "{}", now)?; + *self.last_update.lock() = Instant::now(); + Ok(()) + } + + pub fn is_stale(&self) -> bool { + self.last_update.lock().elapsed().as_secs() > self.interval_secs * 3 + } +} + +pub struct CrashDetector { + heartbeat: Heartbeat, + state_file: PathBuf, +} + +impl CrashDetector { + pub fn new(state_dir: PathBuf) -> Self { + let heartbeat = Heartbeat::new(state_dir.join("heartbeat"), 5); + let state_file = state_dir.join("state"); + Self { + heartbeat, + state_file, + } + } + + pub fn new_with_interval(state_dir: PathBuf, heartbeat_interval_secs: u64) -> Self { + let heartbeat = Heartbeat::new(state_dir.join("heartbeat"), heartbeat_interval_secs); + let state_file = state_dir.join("state"); + Self { + heartbeat, + state_file, + } + } + + pub fn write_state(&self, stats: &RuntimeStats) -> std::io::Result<()> { + if let Some(parent) = self.state_file.parent() { + fs::create_dir_all(parent)?; + } + let json = serde_json::to_string_pretty(stats).unwrap_or_default(); + fs::write(&self.state_file, json) + } + + pub fn mark_running(&self) -> std::io::Result<()> { + self.heartbeat.touch() + } + + pub fn mark_stopped(&self) -> std::io::Result<()> { + if let Some(parent) = self.state_file.parent() { + fs::create_dir_all(parent)?; + } + let json = serde_json::json!({ + "status": "stopped", + "timestamp": std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0) + }); + fs::write( + &self.state_file, + serde_json::to_string_pretty(&json).unwrap_or_default(), + ) + } + + pub fn is_healthy(&self) -> bool { + if self.heartbeat.is_stale() { + return false; + } + + if let Ok(content) = fs::read_to_string(&self.state_file) { + if let Ok(state) = serde_json::from_str::(&content) { + if let Some(status) = state.get("status").and_then(|v| v.as_str()) { + if status == "stopped" { + return false; + } + } + } + } + + true + } +} diff --git a/scanners/scanner-system/src/lib.rs b/scanners/scanner-system/src/lib.rs index 1319e56..a97fc0a 100644 --- a/scanners/scanner-system/src/lib.rs +++ b/scanners/scanner-system/src/lib.rs @@ -68,14 +68,12 @@ impl Scanner for SystemScanner { MetricValue::Integer(*available as i64), ); } - if let Some(used) = mem.get("MemAvailable") { - if let Some(total) = mem.get("MemTotal") { - let used_mem = total.saturating_sub(*used); - metrics.insert( - "mem_used_bytes".to_string(), - MetricValue::Integer(used_mem as i64), - ); - } + if let (Some(used), Some(total)) = (mem.get("MemAvailable"), mem.get("MemTotal")) { + let used_mem = total.saturating_sub(*used); + metrics.insert( + "mem_used_bytes".to_string(), + MetricValue::Integer(used_mem as i64), + ); } }