various: cleanup

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I26df4d852b4b22d0df6b6871fe9cbde96a6a6964
This commit is contained in:
raf 2026-02-18 21:08:00 +03:00
commit 4f9c7057ff
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
7 changed files with 778 additions and 62 deletions

View file

@ -1,11 +1,12 @@
use clap::Parser;
use libloading::Library;
use pscand_core::logging::{LogEntry, RingBufferLogger};
use pscand_core::logging::{LogLevel, RingBufferLogger};
use pscand_core::scanner::Scanner;
use pscand_core::Config as CoreConfig;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use tokio::time::interval;
@ -38,6 +39,115 @@ struct LoadedScanner {
library: Library,
}
#[derive(Clone)]
struct DaemonState {
running: Arc<AtomicBool>,
shutdown_requested: Arc<AtomicBool>,
start_time: Arc<RwLock<SystemTime>>,
last_collection: Arc<RwLock<SystemTime>>,
collection_count: Arc<RwLock<u64>>,
error_count: Arc<RwLock<u64>>,
heartbeat_path: PathBuf,
}
impl DaemonState {
fn new(heartbeat_path: PathBuf) -> Self {
Self {
running: Arc::new(AtomicBool::new(true)),
shutdown_requested: Arc::new(AtomicBool::new(false)),
start_time: Arc::new(RwLock::new(SystemTime::now())),
last_collection: Arc::new(RwLock::new(SystemTime::UNIX_EPOCH)),
collection_count: Arc::new(RwLock::new(0)),
error_count: Arc::new(RwLock::new(0)),
heartbeat_path,
}
}
async fn record_collection(&self) {
*self.last_collection.write().await = SystemTime::now();
*self.collection_count.write().await += 1;
}
async fn record_error(&self) {
*self.error_count.write().await += 1;
}
fn get_stats_sync(&self) -> (u64, u64, u64) {
let collections = self.collection_count.try_read().map(|r| *r).unwrap_or(0);
let errors = self.error_count.try_read().map(|r| *r).unwrap_or(0);
let uptime = self
.start_time
.try_read()
.map(|t| {
SystemTime::now()
.duration_since(*t)
.map(|d| d.as_secs())
.unwrap_or(0)
})
.unwrap_or(0);
(collections, errors, uptime)
}
async fn update_heartbeat(&self) -> std::io::Result<()> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let (collections, errors, uptime) = self.get_stats_sync();
let heartbeat = format!(
"{},{},{},{},{}\n",
now,
collections,
errors,
uptime,
if self.shutdown_requested.load(Ordering::SeqCst) {
"shutdown"
} else {
"running"
}
);
std::fs::write(&self.heartbeat_path, heartbeat)?;
Ok(())
}
async fn write_status(&self, logger: &RingBufferLogger) -> std::io::Result<()> {
let uptime = SystemTime::now()
.duration_since(self.start_time.try_read().map(|t| *t).unwrap_or(UNIX_EPOCH))
.map(|d| d.as_secs())
.unwrap_or(0);
let collections = self.collection_count.try_read().map(|c| *c).unwrap_or(0);
let errors = self.error_count.try_read().map(|e| *e).unwrap_or(0);
let last = self
.last_collection
.try_read()
.map(|l| *l)
.unwrap_or(UNIX_EPOCH);
let last_secs = last
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let status = serde_json::json!({
"uptime_seconds": uptime,
"total_collections": collections,
"total_errors": errors,
"last_collection": last_secs,
"shutdown_requested": self.shutdown_requested.load(Ordering::SeqCst),
});
logger.log(
LogLevel::Info,
"daemon",
"status",
serde_json::to_string(&status).unwrap_or_default(),
);
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
@ -80,73 +190,232 @@ async fn run_daemon(args: RunArgs) -> Result<(), Box<dyn std::error::Error>> {
config.file_enabled,
));
let heartbeat_path = config.log_dir.join("heartbeat");
let daemon_state = DaemonState::new(heartbeat_path);
daemon_state.update_heartbeat().await?;
logger.log(
LogLevel::Info,
"daemon",
"startup",
serde_json::json!({
"version": "0.1.0",
"ring_buffer_size": config.ring_buffer_size,
"journal_enabled": config.journal_enabled,
"file_enabled": config.file_enabled,
})
.to_string(),
);
std::panic::set_hook(Box::new({
let logger = Arc::clone(&logger);
let daemon_state = daemon_state.clone();
let log_dir = config.log_dir.clone();
move |panic_info| {
daemon_state.running.store(false, Ordering::SeqCst);
let (collections, errors, uptime) = daemon_state.get_stats_sync();
let entries = logger.get_recent(60);
let crash_log = log_dir.join("crash.log");
if let Ok(mut file) = std::fs::File::create(&crash_log) {
use std::io::Write;
let _ = writeln!(file, "=== Crash at {} ===", chrono::Utc::now());
let _ = writeln!(file, "Panic: {}", panic_info);
let _ = writeln!(file, "Uptime: {} seconds", uptime);
let _ = writeln!(file, "Total collections: {}", collections);
let _ = writeln!(file, "Total errors: {}", errors);
let _ = writeln!(file, "\n=== Last {} log entries ===", entries.len());
for entry in entries {
let _ = writeln!(file, "{}", entry.to_json());
}
}
if let Ok(mut file) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&daemon_state.heartbeat_path)
{
use std::io::Write;
let _ = writeln!(
file,
"PANIC,{},{},{},{}",
uptime,
collections,
errors,
chrono::Utc::now()
);
}
}
}));
let scanners = load_scanners(&config).await?;
let scanners = load_scanners(&config, &logger).await?;
if scanners.is_empty() {
log::warn!("No scanners loaded!");
logger.log(LogLevel::Warn, "daemon", "no_scanners", "{}".to_string());
} else {
log::info!("Loaded {} scanners", scanners.len());
logger.log(
LogLevel::Info,
"daemon",
"scanners_loaded",
serde_json::json!({
"count": scanners.len(),
"names": scanners.iter().map(|s| s.name.clone()).collect::<Vec<_>>()
})
.to_string(),
);
}
let mut handles = Vec::new();
let scanner_handles = Arc::new(RwLock::new(Vec::new()));
let scanner_handles_shutdown = scanner_handles.clone();
let daemon_state_clone = daemon_state.clone();
let logger_clone = Arc::clone(&logger);
for loaded in scanners {
let logger = Arc::clone(&logger);
let name = loaded.name.clone();
let scanner = loaded.scanner.clone();
let scanner_task = tokio::spawn(async move {
let mut handles = Vec::new();
let handle = tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(1));
for loaded in scanners {
let logger = Arc::clone(&logger_clone);
let name = loaded.name.clone();
let scanner = loaded.scanner.clone();
let state = daemon_state_clone.clone();
loop {
ticker.tick().await;
let handle = tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(1));
let _collection_start = Instant::now();
let scanner_guard = scanner.read().await;
match scanner_guard.collect() {
Ok(metrics) => {
let entry = LogEntry::new(name.as_str(), metrics);
logger.push(entry);
loop {
ticker.tick().await;
if state.shutdown_requested.load(Ordering::SeqCst) {
let scanner_guard = scanner.read().await;
match scanner_guard.collect() {
Ok(metrics) => {
logger.log(
LogLevel::Info,
&name,
"final_collection",
serde_json::json!({
"metrics": metrics,
"reason": "shutdown"
})
.to_string(),
);
}
Err(e) => {
logger.log(
LogLevel::Error,
&name,
"final_collection_error",
e.to_string(),
);
}
}
state.record_collection().await;
state.update_heartbeat().await.ok();
break;
}
Err(e) => {
log::error!("Scanner {} error: {}", name, e);
let scan_start = Instant::now();
let scanner_guard = scanner.read().await;
match scanner_guard.collect() {
Ok(metrics) => {
let elapsed = scan_start.elapsed().as_millis();
logger.log(
LogLevel::Info,
&name,
"metrics",
serde_json::json!({
"metrics": metrics,
"collection_time_ms": elapsed
})
.to_string(),
);
state.record_collection().await;
}
Err(e) => {
logger.log(LogLevel::Error, &name, "collection_error", e.to_string());
state.record_error().await;
}
}
state.update_heartbeat().await.ok();
}
}
});
});
handles.push(handle);
handles.push(handle);
}
*scanner_handles_shutdown.write().await = handles;
});
let daemon_state_hb = daemon_state.clone();
let heartbeat_task = tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(5));
loop {
ticker.tick().await;
daemon_state_hb.update_heartbeat().await.ok();
}
});
let sigint = tokio::signal::ctrl_c();
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
tokio::select! {
_ = sigint => {
log::info!("Received Ctrl+C, initiating graceful shutdown");
}
_ = sigterm.recv() => {
log::info!("Received SIGTERM, initiating graceful shutdown");
}
}
tokio::signal::ctrl_c().await?;
log::info!("Shutting down pscand");
daemon_state
.shutdown_requested
.store(true, Ordering::SeqCst);
daemon_state.running.store(false, Ordering::SeqCst);
for handle in handles {
handle.abort();
}
let (collections, errors, _) = daemon_state.get_stats_sync();
logger.log(
LogLevel::Info,
"daemon",
"shutdown_initiated",
format!(
"{{\"collections\": {}, \"errors\": {}}}",
collections, errors
),
);
heartbeat_task.abort();
let _ = heartbeat_task.await;
daemon_state
.shutdown_requested
.store(true, Ordering::SeqCst);
scanner_task.await.ok();
daemon_state.write_status(&logger).await.ok();
daemon_state.update_heartbeat().await.ok();
logger.log(
LogLevel::Info,
"daemon",
"shutdown_complete",
"{}".to_string(),
);
let ring_buffer_path = config.log_dir.join("pscand.buffer");
logger.flush_to_file(&ring_buffer_path)?;
log::info!("pscand shut down cleanly");
Ok(())
}
async fn load_scanners(
config: &CoreConfig,
logger: &RingBufferLogger,
) -> Result<Vec<LoadedScanner>, Box<dyn std::error::Error>> {
let mut loaded = Vec::new();
@ -173,6 +442,12 @@ async fn load_scanners(
Ok(s) => s,
Err(e) => {
log::warn!("Scanner {:?} missing symbol: {}", path, e);
logger.log(
LogLevel::Warn,
"loader",
"missing_symbol",
format!("{}: {}", path.display(), e),
);
continue;
}
};
@ -195,6 +470,12 @@ async fn load_scanners(
let toml_val = toml::Value::Table(toml_map);
if let Err(e) = scanner.init(&toml_val) {
log::error!("Failed to init scanner {}: {}", name, e);
logger.log(
LogLevel::Error,
"loader",
"init_failed",
format!("{}: {}", name, e),
);
continue;
}
}
@ -207,6 +488,12 @@ async fn load_scanners(
}
Err(e) => {
log::warn!("Failed to load scanner {:?}: {}", path, e);
logger.log(
LogLevel::Warn,
"loader",
"load_failed",
format!("{}: {}", path.display(), e),
);
}
}
}