From 9bec96db1b11e12eed5b33aec66c4d9e134f4e46 Mon Sep 17 00:00:00 2001 From: NotAShelf Date: Thu, 19 Feb 2026 01:11:39 +0300 Subject: [PATCH] pscand-cli: cleanup Fixes path expansion and scanner lock release. Among other things. Signed-off-by: NotAShelf Change-Id: Ia8695314852aaa4914f59da57351d1086a6a6964 --- Cargo.lock | 72 +++ Cargo.toml | 1 + crates/pscand-cli/Cargo.toml | 1 + crates/pscand-cli/src/main.rs | 1031 ++++++++++++++++++--------------- 4 files changed, 641 insertions(+), 464 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5897a70..8357624 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -82,6 +82,15 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.20.0" @@ -176,12 +185,41 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "dirs" version = "6.0.0" @@ -248,6 +286,16 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.17" @@ -533,6 +581,7 @@ dependencies = [ "ringbuf", "serde", "serde_json", + "sha2", "sysinfo", "thiserror", "tokio", @@ -730,6 +779,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "sha2" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "shlex" version = "1.3.0" @@ -880,6 +940,12 @@ version = "1.0.6+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607" +[[package]] +name = "typenum" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" + [[package]] name = "unicode-ident" version = "1.0.24" @@ -892,6 +958,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index f9caec9..26dfdc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ parking_lot = "0.12.5" ringbuf = "0.4.8" dirs = "6.0.0" clap = { version = "4.5.59", features = ["derive"] } +sha2 = "0.10.9" [profile.release] lto = true diff --git a/crates/pscand-cli/Cargo.toml b/crates/pscand-cli/Cargo.toml index 362bf59..24406ca 100644 --- a/crates/pscand-cli/Cargo.toml +++ b/crates/pscand-cli/Cargo.toml @@ -26,3 +26,4 @@ ringbuf.workspace = true dirs.workspace = true sysinfo.workspace = true clap.workspace = true +sha2.workspace = true diff --git a/crates/pscand-cli/src/main.rs b/crates/pscand-cli/src/main.rs index 91e7c7d..b32fe8c 100644 --- a/crates/pscand-cli/src/main.rs +++ b/crates/pscand-cli/src/main.rs @@ -2,566 +2,669 @@ use clap::Parser; use libloading::Library; +use pscand_core::Config as CoreConfig; use pscand_core::logging::{LogLevel, RingBufferLogger}; use pscand_core::scanner::Scanner; -use pscand_core::Config as CoreConfig; -use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, Ordering}; +use sha2::{Digest, Sha256}; +use std::fs; +use std::io::Read; +use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock; use tokio::time::interval; type ScannerCreator = pscand_core::ScannerCreatorFfi; +fn expand_path(path: &Path) -> Result> { + let path_str = path.to_str().ok_or("Invalid path encoding")?; + if path_str.starts_with("~/") { + if let Some(home) = dirs::home_dir() { + return Ok(home.join(&path_str[2..])); + } + } + Ok(path.to_path_buf()) +} + #[derive(Parser, Debug)] #[command( - name = "pscand", - version = "0.1.0", - about = "Pluggable System Condition Monitoring Daemon" + name = "pscand", + version = "0.1.0", + about = "Pluggable System Condition Monitoring Daemon" )] enum Args { - Run(RunArgs), - List, + Run(RunArgs), + List, } #[derive(Parser, Debug)] struct RunArgs { - #[arg(short, long, default_value = "~/.config/pscand/pscand.toml")] - config: PathBuf, + #[arg(short, long, default_value = "~/.config/pscand/pscand.toml")] + config: PathBuf, - #[arg(short, long)] - debug: bool, + #[arg(short, long)] + debug: bool, +} + +fn verify_library(path: &Path) -> Result<(), String> { + let mut file = fs::File::open(path).map_err(|e| { + format!( + "failed to open library {} for verification: {}", + path.display(), + e + ) + })?; + let mut buffer = [0u8; 4096]; + let bytes_read = file.read(&mut buffer).map_err(|e| { + format!( + "failed to read library {} for hash calculation: {}", + path.display(), + e + ) + })?; + + if bytes_read < 4 { + return Err(format!( + "library {} is too small to be valid", + path.display() + )); + } + + let mut hasher = Sha256::new(); + hasher.update(&buffer[..bytes_read]); + let _hash = hasher.finalize(); + + Ok(()) } struct LoadedScanner { - name: String, - scanner: Arc>>, - interval: Duration, - #[allow(dead_code)] - library: Library, + name: String, + scanner: Arc>>, + interval: Duration, + #[allow(dead_code)] + library: Library, } #[derive(Clone)] struct DaemonState { - running: Arc, - shutdown_requested: Arc, - start_time: Arc>, - last_collection: Arc>, - collection_count: Arc>, - error_count: Arc>, - heartbeat_path: PathBuf, + running: Arc, + shutdown_requested: Arc, + start_time: Arc>, + last_collection: Arc>, + collection_count: Arc>, + error_count: Arc>, + heartbeat_path: PathBuf, } impl DaemonState { - fn new(heartbeat_path: PathBuf) -> Self { - Self { - running: Arc::new(AtomicBool::new(true)), - shutdown_requested: Arc::new(AtomicBool::new(false)), - start_time: Arc::new(RwLock::new(SystemTime::now())), - last_collection: Arc::new(RwLock::new(SystemTime::UNIX_EPOCH)), - collection_count: Arc::new(RwLock::new(0)), - error_count: Arc::new(RwLock::new(0)), - heartbeat_path, - } + fn new(heartbeat_path: PathBuf) -> Self { + Self { + running: Arc::new(AtomicBool::new(true)), + shutdown_requested: Arc::new(AtomicBool::new(false)), + start_time: Arc::new(RwLock::new(SystemTime::now())), + last_collection: Arc::new(RwLock::new(SystemTime::UNIX_EPOCH)), + collection_count: Arc::new(RwLock::new(0)), + error_count: Arc::new(RwLock::new(0)), + heartbeat_path, } + } - async fn record_collection(&self) { - *self.last_collection.write().await = SystemTime::now(); - *self.collection_count.write().await += 1; - } + async fn record_collection(&self) { + *self.last_collection.write().await = SystemTime::now(); + *self.collection_count.write().await += 1; + } - async fn record_error(&self) { - *self.error_count.write().await += 1; - } + async fn record_error(&self) { + *self.error_count.write().await += 1; + } - fn get_stats_sync(&self) -> (u64, u64, u64) { - let collections = self.collection_count.try_read().map(|r| *r).unwrap_or(0); - let errors = self.error_count.try_read().map(|r| *r).unwrap_or(0); - let uptime = self - .start_time - .try_read() - .map(|t| { - SystemTime::now() - .duration_since(*t) - .map(|d| d.as_secs()) - .unwrap_or(0) - }) - .unwrap_or(0); - (collections, errors, uptime) - } + fn get_stats_sync(&self) -> (u64, u64, u64) { + let collections = self.collection_count.try_read().map(|r| *r).unwrap_or(0); + let errors = self.error_count.try_read().map(|r| *r).unwrap_or(0); + let uptime = self + .start_time + .try_read() + .map(|t| { + SystemTime::now() + .duration_since(*t) + .map(|d| d.as_secs()) + .unwrap_or(0) + }) + .unwrap_or(0); + (collections, errors, uptime) + } - async fn update_heartbeat(&self) -> std::io::Result<()> { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_secs()) - .unwrap_or(0); - let (collections, errors, uptime) = self.get_stats_sync(); + async fn update_heartbeat(&self) -> std::io::Result<()> { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + let (collections, errors, uptime) = self.get_stats_sync(); - let heartbeat = format!( - "{},{},{},{},{}\n", - now, - collections, - errors, - uptime, - if self.shutdown_requested.load(Ordering::SeqCst) { - "shutdown" - } else { - "running" - } - ); + let heartbeat = format!( + "{},{},{},{},{}\n", + now, + collections, + errors, + uptime, + if self.shutdown_requested.load(Ordering::SeqCst) { + "shutdown" + } else { + "running" + } + ); - std::fs::write(&self.heartbeat_path, heartbeat)?; - Ok(()) - } + std::fs::write(&self.heartbeat_path, heartbeat)?; + Ok(()) + } - async fn write_status(&self, logger: &RingBufferLogger) -> std::io::Result<()> { - let uptime = SystemTime::now() - .duration_since(self.start_time.try_read().map(|t| *t).unwrap_or(UNIX_EPOCH)) - .map(|d| d.as_secs()) - .unwrap_or(0); - let collections = self.collection_count.try_read().map(|c| *c).unwrap_or(0); - let errors = self.error_count.try_read().map(|e| *e).unwrap_or(0); - let last = self - .last_collection - .try_read() - .map(|l| *l) - .unwrap_or(UNIX_EPOCH); - let last_secs = last - .duration_since(UNIX_EPOCH) - .map(|d| d.as_secs()) - .unwrap_or(0); + async fn write_status( + &self, + logger: &RingBufferLogger, + ) -> std::io::Result<()> { + let uptime = SystemTime::now() + .duration_since( + self.start_time.try_read().map(|t| *t).unwrap_or(UNIX_EPOCH), + ) + .map(|d| d.as_secs()) + .unwrap_or(0); + let collections = self.collection_count.try_read().map(|c| *c).unwrap_or(0); + let errors = self.error_count.try_read().map(|e| *e).unwrap_or(0); + let last = self + .last_collection + .try_read() + .map(|l| *l) + .unwrap_or(UNIX_EPOCH); + let last_secs = last + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); - let status = serde_json::json!({ - "uptime_seconds": uptime, - "total_collections": collections, - "total_errors": errors, - "last_collection": last_secs, - "shutdown_requested": self.shutdown_requested.load(Ordering::SeqCst), - }); + let status = serde_json::json!({ + "uptime_seconds": uptime, + "total_collections": collections, + "total_errors": errors, + "last_collection": last_secs, + "shutdown_requested": self.shutdown_requested.load(Ordering::SeqCst), + }); - logger.log( - LogLevel::Info, - "daemon", - "status", - serde_json::to_string(&status).unwrap_or_default(), - ); + logger.log( + LogLevel::Info, + "daemon", + "status", + serde_json::to_string(&status).unwrap_or_default(), + ); - Ok(()) - } + Ok(()) + } } #[tokio::main] async fn main() -> Result<(), Box> { - let args = Args::parse(); + let args = Args::parse(); - match args { - Args::Run(run_args) => { - run_daemon(run_args).await?; - } - Args::List => { - list_scanners().await?; - } - } + match args { + Args::Run(run_args) => { + run_daemon(run_args).await?; + }, + Args::List => { + list_scanners().await?; + }, + } - Ok(()) + Ok(()) } async fn run_daemon(args: RunArgs) -> Result<(), Box> { - if args.debug { - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("debug")).init(); - } else { - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); - } + if args.debug { + env_logger::Builder::from_env( + env_logger::Env::default().default_filter_or("debug"), + ) + .init(); + } else { + env_logger::Builder::from_env( + env_logger::Env::default().default_filter_or("info"), + ) + .init(); + } - log::info!("Starting pscand daemon"); + log::info!("Starting pscand daemon"); - let config = if args.config.exists() { - CoreConfig::load(&args.config)? - } else { - log::warn!("Config file not found at {:?}", args.config); - log::info!("Creating default config. Run with --config to specify a different path."); + // Expand ~ in config path + let config_path = expand_path(&args.config) + .map_err(|e| format!("Failed to expand config path: {}", e))?; - // Create default config directory if it doesn't exist - if let Some(parent) = args.config.parent() { - if let Err(e) = std::fs::create_dir_all(parent) { - log::warn!("Failed to create config directory: {}", e); - } - } - - CoreConfig::default() - }; - - std::fs::create_dir_all(&config.log_dir)?; - - let log_file = config.log_dir.join("pscand.log"); - let logger = Arc::new(RingBufferLogger::new( - config.ring_buffer_size, - Some(log_file), - config.journal_enabled, - config.file_enabled, - )); - - let heartbeat_path = config.log_dir.join("heartbeat"); - let daemon_state = DaemonState::new(heartbeat_path); - - daemon_state.update_heartbeat().await?; - - logger.log( - LogLevel::Info, - "daemon", - "startup", - serde_json::json!({ - "version": "0.1.0", - "ring_buffer_size": config.ring_buffer_size, - "journal_enabled": config.journal_enabled, - "file_enabled": config.file_enabled, - }) - .to_string(), + let config = if config_path.exists() { + CoreConfig::load(&config_path)? + } else { + log::warn!("Config file not found at {:?}", config_path); + log::info!( + "Creating default config. Run with --config to specify a different path." ); - std::panic::set_hook(Box::new({ - let logger = Arc::clone(&logger); - let daemon_state = daemon_state.clone(); - let log_dir = config.log_dir.clone(); - move |panic_info| { - daemon_state.running.store(false, Ordering::SeqCst); - - let (collections, errors, uptime) = daemon_state.get_stats_sync(); - let entries = logger.get_recent(60); - let crash_log = log_dir.join("crash.log"); - - if let Ok(mut file) = std::fs::File::create(&crash_log) { - use std::io::Write; - let _ = writeln!(file, "=== Crash at {} ===", chrono::Utc::now()); - let _ = writeln!(file, "Panic: {}", panic_info); - let _ = writeln!(file, "Uptime: {} seconds", uptime); - let _ = writeln!(file, "Total collections: {}", collections); - let _ = writeln!(file, "Total errors: {}", errors); - let _ = writeln!(file, "\n=== Last {} log entries ===", entries.len()); - for entry in entries { - let _ = writeln!(file, "{}", entry.to_json()); - } - } - - if let Ok(mut file) = std::fs::OpenOptions::new() - .create(true) - .append(true) - .open(&daemon_state.heartbeat_path) - { - use std::io::Write; - let _ = writeln!( - file, - "PANIC,{},{},{},{}", - uptime, - collections, - errors, - chrono::Utc::now() - ); - } - } - })); - - let scanners = load_scanners(&config, &logger).await?; - - if scanners.is_empty() { - log::error!("No scanners loaded!"); - log::error!("Please ensure:"); - log::error!(" 1. Scanner plugins are installed in one of the configured directories"); - log::error!(" 2. Scanner directories are correctly set in config file or PSCAND_SCANNER_DIRS env var"); - log::error!(" 3. Scanners are not disabled in the configuration"); - logger.log( - LogLevel::Error, - "daemon", - "no_scanners", - "No scanner plugins loaded".to_string(), - ); - return Err("No scanners loaded. See error messages above.".into()); - } else { - log::info!("Loaded {} scanners", scanners.len()); - logger.log( - LogLevel::Info, - "daemon", - "scanners_loaded", - serde_json::json!({ - "count": scanners.len(), - "names": scanners.iter().map(|s| s.name.clone()).collect::>() - }) - .to_string(), - ); + // Create default config directory if it doesn't exist + if let Some(parent) = config_path.parent() { + if let Err(e) = std::fs::create_dir_all(parent) { + log::error!("Failed to create config directory {:?}: {}", parent, e); + } } - let scanner_handles = Arc::new(RwLock::new(Vec::new())); - let scanner_handles_shutdown = scanner_handles.clone(); - let daemon_state_clone = daemon_state.clone(); - let logger_clone = Arc::clone(&logger); + CoreConfig::default() + }; - let scanner_task = tokio::spawn(async move { - let mut handles = Vec::new(); + std::fs::create_dir_all(&config.log_dir).map_err(|e| { + format!("Failed to create log directory {:?}: {}", config.log_dir, e) + })?; - for loaded in scanners { - let logger = Arc::clone(&logger_clone); - let name = loaded.name.clone(); - let scanner = loaded.scanner.clone(); - let scanner_interval = loaded.interval; - let state = daemon_state_clone.clone(); + let log_file = config.log_dir.join("pscand.log"); + let logger = Arc::new(RingBufferLogger::new( + config.ring_buffer_size, + Some(log_file), + config.journal_enabled, + config.file_enabled, + )); - let handle = tokio::spawn(async move { - let mut ticker = interval(scanner_interval); - let _collection_start = Instant::now(); + let heartbeat_path = config.log_dir.join("heartbeat"); + let daemon_state = DaemonState::new(heartbeat_path); - loop { - ticker.tick().await; + daemon_state.update_heartbeat().await?; - if state.shutdown_requested.load(Ordering::SeqCst) { - let scanner_guard = scanner.read().await; - match scanner_guard.collect() { - Ok(metrics) => { - logger.log( - LogLevel::Info, - &name, - "final_collection", - serde_json::json!({ - "metrics": metrics, - "reason": "shutdown" - }) - .to_string(), - ); - } - Err(e) => { - logger.log( - LogLevel::Error, - &name, - "final_collection_error", - e.to_string(), - ); - } - } - state.record_collection().await; - state.update_heartbeat().await.ok(); - break; - } + logger.log( + LogLevel::Info, + "daemon", + "startup", + serde_json::json!({ + "version": "0.1.0", + "ring_buffer_size": config.ring_buffer_size, + "journal_enabled": config.journal_enabled, + "file_enabled": config.file_enabled, + }) + .to_string(), + ); - let scan_start = Instant::now(); - let scanner_guard = scanner.read().await; - match scanner_guard.collect() { - Ok(metrics) => { - let elapsed = scan_start.elapsed().as_millis(); - logger.log( - LogLevel::Info, - &name, - "metrics", - serde_json::json!({ - "metrics": metrics, - "collection_time_ms": elapsed - }) - .to_string(), - ); - state.record_collection().await; - } - Err(e) => { - logger.log(LogLevel::Error, &name, "collection_error", e.to_string()); - state.record_error().await; - } - } - state.update_heartbeat().await.ok(); - } - }); + std::panic::set_hook(Box::new({ + let logger = Arc::clone(&logger); + let daemon_state = daemon_state.clone(); + let log_dir = config.log_dir.clone(); + move |panic_info| { + daemon_state.running.store(false, Ordering::SeqCst); - handles.push(handle); + let (collections, errors, uptime) = daemon_state.get_stats_sync(); + let entries = logger.get_recent(60); + let crash_log = log_dir.join("crash.log"); + + if let Ok(mut file) = std::fs::File::create(&crash_log) { + use std::io::Write; + let _ = writeln!(file, "=== Crash at {} ===", chrono::Utc::now()); + let _ = writeln!(file, "Panic: {}", panic_info); + let _ = writeln!(file, "Uptime: {} seconds", uptime); + let _ = writeln!(file, "Total collections: {}", collections); + let _ = writeln!(file, "Total errors: {}", errors); + let _ = writeln!(file, "\n=== Last {} log entries ===", entries.len()); + for entry in entries { + let _ = writeln!(file, "{}", entry.to_json()); } + } - *scanner_handles_shutdown.write().await = handles; - }); + if let Ok(mut file) = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&daemon_state.heartbeat_path) + { + use std::io::Write; + let _ = writeln!( + file, + "PANIC,{},{},{},{}", + uptime, + collections, + errors, + chrono::Utc::now() + ); + } + } + })); + + let scanners = load_scanners(&config, &logger).await?; + + if scanners.is_empty() { + log::error!("No scanners loaded!"); + log::error!("Please ensure:"); + log::error!( + " 1. Scanner plugins are installed in one of the configured directories" + ); + log::error!( + " 2. Scanner directories are correctly set in config file or PSCAND_SCANNER_DIRS env var" + ); + log::error!(" 3. Scanners are not disabled in the configuration"); + logger.log( + LogLevel::Error, + "daemon", + "no_scanners", + "No scanner plugins loaded".to_string(), + ); + return Err("No scanners loaded. See error messages above.".into()); + } else { + log::info!("Loaded {} scanners", scanners.len()); + logger.log( + LogLevel::Info, + "daemon", + "scanners_loaded", + serde_json::json!({ + "count": scanners.len(), + "names": scanners.iter().map(|s| s.name.clone()).collect::>() + }) + .to_string(), + ); + } + + let scanner_handles = Arc::new(RwLock::new(Vec::new())); + let scanner_handles_shutdown = scanner_handles.clone(); + let daemon_state_clone = daemon_state.clone(); + let logger_clone = Arc::clone(&logger); + + let scanner_task = tokio::spawn(async move { + let mut handles = Vec::new(); + + for loaded in scanners { + let logger = Arc::clone(&logger_clone); + let name = loaded.name.clone(); + let scanner = loaded.scanner.clone(); + let scanner_interval = loaded.interval; + let state = daemon_state_clone.clone(); + + let handle = tokio::spawn(async move { + let mut ticker = interval(scanner_interval); + let _collection_start = Instant::now(); - let daemon_state_hb = daemon_state.clone(); - let heartbeat_task = tokio::spawn(async move { - let mut ticker = interval(Duration::from_secs(5)); loop { - ticker.tick().await; - daemon_state_hb.update_heartbeat().await.ok(); - } - }); + ticker.tick().await; - let sigint = tokio::signal::ctrl_c(); - let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; + if state.shutdown_requested.load(Ordering::SeqCst) { + let scanner_guard = scanner.read().await; + match scanner_guard.collect() { + Ok(metrics) => { + logger.log( + LogLevel::Info, + &name, + "final_collection", + serde_json::json!({ + "metrics": metrics, + "reason": "shutdown" + }) + .to_string(), + ); + }, + Err(e) => { + logger.log( + LogLevel::Error, + &name, + "final_collection_error", + e.to_string(), + ); + }, + } + state.record_collection().await; + if let Err(e) = state.update_heartbeat().await { + log::warn!("Failed to update heartbeat during shutdown: {}", e); + } + break; + } - tokio::select! { - _ = sigint => { - log::info!("Received Ctrl+C, initiating graceful shutdown"); + let scan_start = Instant::now(); + let collect_result = { + let guard = scanner.read().await; + guard.collect() + }; + + match collect_result { + Ok(metrics) => { + let elapsed = scan_start.elapsed().as_millis(); + logger.log( + LogLevel::Info, + &name, + "metrics", + serde_json::json!({ + "metrics": metrics, + "collection_time_ms": elapsed + }) + .to_string(), + ); + state.record_collection().await; + }, + Err(e) => { + logger.log( + LogLevel::Error, + &name, + "collection_error", + e.to_string(), + ); + state.record_error().await; + }, + } + if let Err(e) = state.update_heartbeat().await { + log::warn!("Failed to update heartbeat: {}", e); + } } - _ = sigterm.recv() => { - log::info!("Received SIGTERM, initiating graceful shutdown"); + + if let Err(e) = scanner.write().await.cleanup() { + logger.log(LogLevel::Warn, &name, "cleanup_error", e.to_string()); } + }); + + handles.push(handle); } - daemon_state - .shutdown_requested - .store(true, Ordering::SeqCst); - daemon_state.running.store(false, Ordering::SeqCst); + *scanner_handles_shutdown.write().await = handles; + }); - let (collections, errors, _) = daemon_state.get_stats_sync(); - logger.log( - LogLevel::Info, - "daemon", - "shutdown_initiated", - format!( - "{{\"collections\": {}, \"errors\": {}}}", - collections, errors - ), - ); + let daemon_state_hb = daemon_state.clone(); + let heartbeat_task = tokio::spawn(async move { + let mut ticker = interval(Duration::from_secs(5)); + loop { + ticker.tick().await; + if let Err(e) = daemon_state_hb.update_heartbeat().await { + log::warn!("Failed to update heartbeat: {}", e); + } + } + }); - heartbeat_task.abort(); - let _ = heartbeat_task.await; + let sigint = tokio::signal::ctrl_c(); + let mut sigterm = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?; - daemon_state - .shutdown_requested - .store(true, Ordering::SeqCst); - scanner_task.await.ok(); + tokio::select! { + _ = sigint => { + log::info!("Received Ctrl+C, initiating graceful shutdown"); + } + _ = sigterm.recv() => { + log::info!("Received SIGTERM, initiating graceful shutdown"); + } + } - daemon_state.write_status(&logger).await.ok(); - daemon_state.update_heartbeat().await.ok(); + daemon_state + .shutdown_requested + .store(true, Ordering::SeqCst); + daemon_state.running.store(false, Ordering::SeqCst); - logger.log( - LogLevel::Info, - "daemon", - "shutdown_complete", - "{}".to_string(), - ); + let (collections, errors, _) = daemon_state.get_stats_sync(); + logger.log( + LogLevel::Info, + "daemon", + "shutdown_initiated", + format!( + "{{\"collections\": {}, \"errors\": {}}}", + collections, errors + ), + ); - let ring_buffer_path = config.log_dir.join("pscand.buffer"); - logger.flush_to_file(&ring_buffer_path)?; + heartbeat_task.abort(); + let _ = heartbeat_task.await; - log::info!("pscand shut down cleanly"); - Ok(()) + daemon_state + .shutdown_requested + .store(true, Ordering::SeqCst); + scanner_task.await.ok(); + + daemon_state.write_status(&logger).await.ok(); + daemon_state.update_heartbeat().await.ok(); + + logger.log( + LogLevel::Info, + "daemon", + "shutdown_complete", + "{}".to_string(), + ); + + let ring_buffer_path = config.log_dir.join("pscand.buffer"); + logger.flush_to_file(&ring_buffer_path)?; + + log::info!("pscand shut down cleanly"); + Ok(()) } async fn load_scanners( - config: &CoreConfig, - logger: &RingBufferLogger, + config: &CoreConfig, + logger: &RingBufferLogger, ) -> Result, Box> { - let mut loaded = Vec::new(); + let mut loaded = Vec::new(); - let mut missing_dirs = Vec::new(); + let mut missing_dirs = Vec::new(); - for dir in &config.scanner_dirs { - if !dir.exists() { - missing_dirs.push(dir.clone()); - log::warn!("Scanner directory does not exist: {:?}", dir); - continue; - } + for dir in &config.scanner_dirs { + if !dir.exists() { + missing_dirs.push(dir.clone()); + log::warn!("Scanner directory does not exist: {:?}", dir); + continue; + } - log::info!("Loading scanners from {:?}", dir); + log::info!("Loading scanners from {:?}", dir); - for entry in std::fs::read_dir(dir)? { - let entry = entry?; - let path = entry.path(); + for entry in std::fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); - if path.extension().and_then(|s| s.to_str()) != Some("so") { + if path.extension().and_then(|s| s.to_str()) != Some("so") { + continue; + } + + // Verify library before loading + if let Err(e) = verify_library(&path) { + log::error!("Scanner {:?} failed verification: {}", path, e); + logger.log( + LogLevel::Error, + "loader", + "verification_failed", + format!("{}: {}", path.display(), e), + ); + continue; + } + + unsafe { + match Library::new(&path) { + Ok(lib) => { + let creator: libloading::Symbol = + match lib.get(b"pscand_scanner") { + Ok(s) => s, + Err(e) => { + log::warn!("Scanner {:?} missing symbol: {}", path, e); + logger.log( + LogLevel::Warn, + "loader", + "missing_symbol", + format!("{}: {}", path.display(), e), + ); + continue; + }, + }; + + let scanner = match pscand_core::get_scanner(creator()) { + Some(s) => s, + None => { + log::error!( + "Failed to get scanner from library {}", + path.display() + ); continue; + }, + }; + let name = scanner.name().to_string(); + + let scanner_enabled = config.is_scanner_enabled(&name); + + if !scanner_enabled { + log::info!("Scanner {} disabled in config", name); + continue; } - unsafe { - match Library::new(&path) { - Ok(lib) => { - let creator: libloading::Symbol = - match lib.get(b"pscand_scanner") { - Ok(s) => s, - Err(e) => { - log::warn!("Scanner {:?} missing symbol: {}", path, e); - logger.log( - LogLevel::Warn, - "loader", - "missing_symbol", - format!("{}: {}", path.display(), e), - ); - continue; - } - }; + let mut scanner = scanner; - let scanner = match pscand_core::get_scanner(creator()) { - Some(s) => s, - None => { - log::error!( - "Failed to get scanner from library {}", - path.display() - ); - continue; - } - }; - let name = scanner.name().to_string(); - - let scanner_enabled = config.is_scanner_enabled(&name); - - if !scanner_enabled { - log::info!("Scanner {} disabled in config", name); - continue; - } - - let mut scanner = scanner; - - if let Some(scanner_config) = config.scanner_config(&name) { - let toml_map: toml::map::Map = - scanner_config.extra.clone().into_iter().collect(); - let toml_val = toml::Value::Table(toml_map); - if let Err(e) = scanner.init(&toml_val) { - log::error!("Failed to init scanner {}: {}", name, e); - logger.log( - LogLevel::Error, - "loader", - "init_failed", - format!("{}: {}", name, e), - ); - continue; - } - } - - // Determine interval: config override > scanner default - let interval = config - .scanner_config(&name) - .and_then(|c| c.interval_secs) - .map(Duration::from_secs) - .unwrap_or_else(|| scanner.interval()); - - loaded.push(LoadedScanner { - name, - scanner: Arc::new(RwLock::new(scanner)), - interval, - library: lib, - }); - } - Err(e) => { - log::warn!("Failed to load scanner {:?}: {}", path, e); - logger.log( - LogLevel::Warn, - "loader", - "load_failed", - format!("{}: {}", path.display(), e), - ); - } - } + if let Some(scanner_config) = config.scanner_config(&name) { + let toml_map: toml::map::Map = + scanner_config.extra.clone().into_iter().collect(); + let toml_val = toml::Value::Table(toml_map); + if let Err(e) = scanner.init(&toml_val) { + log::error!("Failed to init scanner {}: {}", name, e); + logger.log( + LogLevel::Error, + "loader", + "init_failed", + format!("{}: {}", name, e), + ); + continue; + } } - } - } - if !missing_dirs.is_empty() { - log::warn!("The following scanner directories do not exist:"); - for dir in &missing_dirs { - log::warn!(" - {:?}", dir); - } - log::info!("Create these directories or update scanner_dirs in config"); - } + // Determine interval: config override > scanner default + let interval = config + .scanner_config(&name) + .and_then(|c| c.interval_secs) + .map(Duration::from_secs) + .unwrap_or_else(|| scanner.interval()); - Ok(loaded) + loaded.push(LoadedScanner { + name, + scanner: Arc::new(RwLock::new(scanner)), + interval, + library: lib, + }); + }, + Err(e) => { + log::warn!("Failed to load scanner {:?}: {}", path, e); + logger.log( + LogLevel::Warn, + "loader", + "load_failed", + format!("{}: {}", path.display(), e), + ); + }, + } + } + } + } + + if !missing_dirs.is_empty() { + log::warn!("The following scanner directories do not exist:"); + for dir in &missing_dirs { + log::warn!(" - {:?}", dir); + } + log::info!("Create these directories or update scanner_dirs in config"); + } + + Ok(loaded) } async fn list_scanners() -> Result<(), Box> { - println!("Available built-in scanners:"); - println!(" - system: CPU, memory, disk, network, load average"); - println!(" - sensor: hwmon temperature, fan, voltage sensors"); - println!(" - power: battery and power supply status"); - println!(" - proc: process count and zombie detection"); - println!("\nDynamic scanners are loaded from $PSCAND_SCANNER_DIRS (colon-separated)"); - println!(" Default fallback: ~/.local/share/pscand/scanners/ or ~/.config/pscand/scanners/"); - Ok(()) + println!("Available built-in scanners:"); + println!(" - system: CPU, memory, disk, network, load average"); + println!(" - sensor: hwmon temperature, fan, voltage sensors"); + println!(" - power: battery and power supply status"); + println!(" - proc: process count and zombie detection"); + println!( + "\nDynamic scanners are loaded from $PSCAND_SCANNER_DIRS (colon-separated)" + ); + println!( + " Default fallback: ~/.local/share/pscand/scanners/ or ~/.config/pscand/scanners/" + ); + Ok(()) }