treewide: move application-specific crates to crates/

Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I47d36cb7d1dec5ab0c892190015cfc576a6a6964
This commit is contained in:
raf 2026-02-18 23:39:44 +03:00
commit 6e772ffefb
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
31 changed files with 2106 additions and 2108 deletions

View file

@ -0,0 +1,28 @@
[package]
name = "pscand-cli"
version.workspace = true
edition.workspace = true
license.workspace = true
authors.workspace = true
[[bin]]
name = "pscand"
path = "src/main.rs"
[dependencies]
pscand-core.workspace = true
pscand-macros.workspace = true
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
toml.workspace = true
libloading.workspace = true
chrono.workspace = true
log.workspace = true
env_logger.workspace = true
thiserror.workspace = true
parking_lot.workspace = true
ringbuf.workspace = true
dirs.workspace = true
sysinfo.workspace = true
clap.workspace = true

View file

@ -0,0 +1,526 @@
#![allow(improper_ctypes_definitions)]
use clap::Parser;
use libloading::Library;
use pscand_core::logging::{LogLevel, RingBufferLogger};
use pscand_core::scanner::Scanner;
use pscand_core::Config as CoreConfig;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use tokio::time::interval;
type ScannerCreator = pscand_core::ScannerCreatorFfi;
#[derive(Parser, Debug)]
#[command(
name = "pscand",
version = "0.1.0",
about = "Pluggable System Condition Monitoring Daemon"
)]
enum Args {
Run(RunArgs),
List,
}
#[derive(Parser, Debug)]
struct RunArgs {
#[arg(short, long, default_value = "/etc/pscand/pscand.conf")]
config: PathBuf,
#[arg(short, long)]
debug: bool,
}
struct LoadedScanner {
name: String,
scanner: Arc<RwLock<Box<dyn Scanner>>>,
#[allow(dead_code)]
library: Library,
}
#[derive(Clone)]
struct DaemonState {
running: Arc<AtomicBool>,
shutdown_requested: Arc<AtomicBool>,
start_time: Arc<RwLock<SystemTime>>,
last_collection: Arc<RwLock<SystemTime>>,
collection_count: Arc<RwLock<u64>>,
error_count: Arc<RwLock<u64>>,
heartbeat_path: PathBuf,
}
impl DaemonState {
fn new(heartbeat_path: PathBuf) -> Self {
Self {
running: Arc::new(AtomicBool::new(true)),
shutdown_requested: Arc::new(AtomicBool::new(false)),
start_time: Arc::new(RwLock::new(SystemTime::now())),
last_collection: Arc::new(RwLock::new(SystemTime::UNIX_EPOCH)),
collection_count: Arc::new(RwLock::new(0)),
error_count: Arc::new(RwLock::new(0)),
heartbeat_path,
}
}
async fn record_collection(&self) {
*self.last_collection.write().await = SystemTime::now();
*self.collection_count.write().await += 1;
}
async fn record_error(&self) {
*self.error_count.write().await += 1;
}
fn get_stats_sync(&self) -> (u64, u64, u64) {
let collections = self.collection_count.try_read().map(|r| *r).unwrap_or(0);
let errors = self.error_count.try_read().map(|r| *r).unwrap_or(0);
let uptime = self
.start_time
.try_read()
.map(|t| {
SystemTime::now()
.duration_since(*t)
.map(|d| d.as_secs())
.unwrap_or(0)
})
.unwrap_or(0);
(collections, errors, uptime)
}
async fn update_heartbeat(&self) -> std::io::Result<()> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let (collections, errors, uptime) = self.get_stats_sync();
let heartbeat = format!(
"{},{},{},{},{}\n",
now,
collections,
errors,
uptime,
if self.shutdown_requested.load(Ordering::SeqCst) {
"shutdown"
} else {
"running"
}
);
std::fs::write(&self.heartbeat_path, heartbeat)?;
Ok(())
}
async fn write_status(&self, logger: &RingBufferLogger) -> std::io::Result<()> {
let uptime = SystemTime::now()
.duration_since(self.start_time.try_read().map(|t| *t).unwrap_or(UNIX_EPOCH))
.map(|d| d.as_secs())
.unwrap_or(0);
let collections = self.collection_count.try_read().map(|c| *c).unwrap_or(0);
let errors = self.error_count.try_read().map(|e| *e).unwrap_or(0);
let last = self
.last_collection
.try_read()
.map(|l| *l)
.unwrap_or(UNIX_EPOCH);
let last_secs = last
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let status = serde_json::json!({
"uptime_seconds": uptime,
"total_collections": collections,
"total_errors": errors,
"last_collection": last_secs,
"shutdown_requested": self.shutdown_requested.load(Ordering::SeqCst),
});
logger.log(
LogLevel::Info,
"daemon",
"status",
serde_json::to_string(&status).unwrap_or_default(),
);
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
match args {
Args::Run(run_args) => {
run_daemon(run_args).await?;
}
Args::List => {
list_scanners().await?;
}
}
Ok(())
}
async fn run_daemon(args: RunArgs) -> Result<(), Box<dyn std::error::Error>> {
if args.debug {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("debug")).init();
} else {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
}
log::info!("Starting pscand daemon");
let config = if args.config.exists() {
CoreConfig::load(&args.config)?
} else {
log::warn!("Config file not found, using defaults");
CoreConfig::default()
};
std::fs::create_dir_all(&config.log_dir)?;
let log_file = config.log_dir.join("pscand.log");
let logger = Arc::new(RingBufferLogger::new(
config.ring_buffer_size,
Some(log_file),
config.journal_enabled,
config.file_enabled,
));
let heartbeat_path = config.log_dir.join("heartbeat");
let daemon_state = DaemonState::new(heartbeat_path);
daemon_state.update_heartbeat().await?;
logger.log(
LogLevel::Info,
"daemon",
"startup",
serde_json::json!({
"version": "0.1.0",
"ring_buffer_size": config.ring_buffer_size,
"journal_enabled": config.journal_enabled,
"file_enabled": config.file_enabled,
})
.to_string(),
);
std::panic::set_hook(Box::new({
let logger = Arc::clone(&logger);
let daemon_state = daemon_state.clone();
let log_dir = config.log_dir.clone();
move |panic_info| {
daemon_state.running.store(false, Ordering::SeqCst);
let (collections, errors, uptime) = daemon_state.get_stats_sync();
let entries = logger.get_recent(60);
let crash_log = log_dir.join("crash.log");
if let Ok(mut file) = std::fs::File::create(&crash_log) {
use std::io::Write;
let _ = writeln!(file, "=== Crash at {} ===", chrono::Utc::now());
let _ = writeln!(file, "Panic: {}", panic_info);
let _ = writeln!(file, "Uptime: {} seconds", uptime);
let _ = writeln!(file, "Total collections: {}", collections);
let _ = writeln!(file, "Total errors: {}", errors);
let _ = writeln!(file, "\n=== Last {} log entries ===", entries.len());
for entry in entries {
let _ = writeln!(file, "{}", entry.to_json());
}
}
if let Ok(mut file) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&daemon_state.heartbeat_path)
{
use std::io::Write;
let _ = writeln!(
file,
"PANIC,{},{},{},{}",
uptime,
collections,
errors,
chrono::Utc::now()
);
}
}
}));
let scanners = load_scanners(&config, &logger).await?;
if scanners.is_empty() {
log::warn!("No scanners loaded!");
logger.log(LogLevel::Warn, "daemon", "no_scanners", "{}".to_string());
} else {
log::info!("Loaded {} scanners", scanners.len());
logger.log(
LogLevel::Info,
"daemon",
"scanners_loaded",
serde_json::json!({
"count": scanners.len(),
"names": scanners.iter().map(|s| s.name.clone()).collect::<Vec<_>>()
})
.to_string(),
);
}
let scanner_handles = Arc::new(RwLock::new(Vec::new()));
let scanner_handles_shutdown = scanner_handles.clone();
let daemon_state_clone = daemon_state.clone();
let logger_clone = Arc::clone(&logger);
let scanner_task = tokio::spawn(async move {
let mut handles = Vec::new();
for loaded in scanners {
let logger = Arc::clone(&logger_clone);
let name = loaded.name.clone();
let scanner = loaded.scanner.clone();
let state = daemon_state_clone.clone();
let handle = tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(1));
let _collection_start = Instant::now();
loop {
ticker.tick().await;
if state.shutdown_requested.load(Ordering::SeqCst) {
let scanner_guard = scanner.read().await;
match scanner_guard.collect() {
Ok(metrics) => {
logger.log(
LogLevel::Info,
&name,
"final_collection",
serde_json::json!({
"metrics": metrics,
"reason": "shutdown"
})
.to_string(),
);
}
Err(e) => {
logger.log(
LogLevel::Error,
&name,
"final_collection_error",
e.to_string(),
);
}
}
state.record_collection().await;
state.update_heartbeat().await.ok();
break;
}
let scan_start = Instant::now();
let scanner_guard = scanner.read().await;
match scanner_guard.collect() {
Ok(metrics) => {
let elapsed = scan_start.elapsed().as_millis();
logger.log(
LogLevel::Info,
&name,
"metrics",
serde_json::json!({
"metrics": metrics,
"collection_time_ms": elapsed
})
.to_string(),
);
state.record_collection().await;
}
Err(e) => {
logger.log(LogLevel::Error, &name, "collection_error", e.to_string());
state.record_error().await;
}
}
state.update_heartbeat().await.ok();
}
});
handles.push(handle);
}
*scanner_handles_shutdown.write().await = handles;
});
let daemon_state_hb = daemon_state.clone();
let heartbeat_task = tokio::spawn(async move {
let mut ticker = interval(Duration::from_secs(5));
loop {
ticker.tick().await;
daemon_state_hb.update_heartbeat().await.ok();
}
});
let sigint = tokio::signal::ctrl_c();
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
tokio::select! {
_ = sigint => {
log::info!("Received Ctrl+C, initiating graceful shutdown");
}
_ = sigterm.recv() => {
log::info!("Received SIGTERM, initiating graceful shutdown");
}
}
daemon_state
.shutdown_requested
.store(true, Ordering::SeqCst);
daemon_state.running.store(false, Ordering::SeqCst);
let (collections, errors, _) = daemon_state.get_stats_sync();
logger.log(
LogLevel::Info,
"daemon",
"shutdown_initiated",
format!(
"{{\"collections\": {}, \"errors\": {}}}",
collections, errors
),
);
heartbeat_task.abort();
let _ = heartbeat_task.await;
daemon_state
.shutdown_requested
.store(true, Ordering::SeqCst);
scanner_task.await.ok();
daemon_state.write_status(&logger).await.ok();
daemon_state.update_heartbeat().await.ok();
logger.log(
LogLevel::Info,
"daemon",
"shutdown_complete",
"{}".to_string(),
);
let ring_buffer_path = config.log_dir.join("pscand.buffer");
logger.flush_to_file(&ring_buffer_path)?;
log::info!("pscand shut down cleanly");
Ok(())
}
async fn load_scanners(
config: &CoreConfig,
logger: &RingBufferLogger,
) -> Result<Vec<LoadedScanner>, Box<dyn std::error::Error>> {
let mut loaded = Vec::new();
for dir in &config.scanner_dirs {
if !dir.exists() {
continue;
}
log::info!("Loading scanners from {:?}", dir);
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("so") {
continue;
}
unsafe {
match Library::new(&path) {
Ok(lib) => {
let creator: libloading::Symbol<ScannerCreator> =
match lib.get(b"pscand_scanner") {
Ok(s) => s,
Err(e) => {
log::warn!("Scanner {:?} missing symbol: {}", path, e);
logger.log(
LogLevel::Warn,
"loader",
"missing_symbol",
format!("{}: {}", path.display(), e),
);
continue;
}
};
let scanner = match pscand_core::get_scanner(creator()) {
Some(s) => s,
None => {
log::error!(
"Failed to get scanner from library {}",
path.display()
);
continue;
}
};
let name = scanner.name().to_string();
let scanner_enabled = config.is_scanner_enabled(&name);
if !scanner_enabled {
log::info!("Scanner {} disabled in config", name);
continue;
}
let mut scanner = scanner;
if let Some(scanner_config) = config.scanner_config(&name) {
let toml_map: toml::map::Map<String, toml::Value> =
scanner_config.extra.clone().into_iter().collect();
let toml_val = toml::Value::Table(toml_map);
if let Err(e) = scanner.init(&toml_val) {
log::error!("Failed to init scanner {}: {}", name, e);
logger.log(
LogLevel::Error,
"loader",
"init_failed",
format!("{}: {}", name, e),
);
continue;
}
}
loaded.push(LoadedScanner {
name,
scanner: Arc::new(RwLock::new(scanner)),
library: lib,
});
}
Err(e) => {
log::warn!("Failed to load scanner {:?}: {}", path, e);
logger.log(
LogLevel::Warn,
"loader",
"load_failed",
format!("{}: {}", path.display(), e),
);
}
}
}
}
}
Ok(loaded)
}
async fn list_scanners() -> Result<(), Box<dyn std::error::Error>> {
println!("Available built-in scanners:");
println!(" - system: CPU, memory, disk, network, load average");
println!(" - sensor: hwmon temperature, fan, voltage sensors");
println!(" - power: battery and power supply status");
println!(" - proc: process count and zombie detection");
println!("\nDynamic scanners are loaded from $PSCAND_SCANNER_DIRS (colon-separated)");
println!(" Default fallback: ~/.local/share/pscand/scanners/ or ~/.config/pscand/scanners/");
Ok(())
}

View file

@ -0,0 +1,23 @@
[package]
name = "pscand-core"
version.workspace = true
edition.workspace = true
license.workspace = true
authors.workspace = true
[dependencies]
tokio.workspace = true
serde.workspace = true
serde_json.workspace = true
toml.workspace = true
chrono.workspace = true
log.workspace = true
thiserror.workspace = true
parking_lot.workspace = true
dirs.workspace = true
sysinfo.workspace = true
ringbuf.workspace = true
[lib]
name = "pscand_core"
path = "src/lib.rs"

View file

@ -0,0 +1,126 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum ConfigError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Parse error: {0}")]
Parse(#[from] toml::de::Error),
#[error("Scanner {0} not configured")]
ScannerNotConfigured(String),
#[error("Invalid scanner name: {0}")]
InvalidScannerName(String),
}
pub type ConfigResult<T> = std::result::Result<T, ConfigError>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScannerConfig {
pub enabled: bool,
pub interval_secs: Option<u64>,
pub extra: HashMap<String, toml::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
#[serde(default = "default_scanner_dirs")]
pub scanner_dirs: Vec<PathBuf>,
#[serde(default = "default_log_dir")]
pub log_dir: PathBuf,
#[serde(default = "default_retention_days")]
pub retention_days: u32,
#[serde(default = "default_ring_buffer_size")]
pub ring_buffer_size: usize,
#[serde(default)]
pub scanners: HashMap<String, ScannerConfig>,
#[serde(default)]
pub journal_enabled: bool,
#[serde(default)]
pub file_enabled: bool,
}
fn default_scanner_dirs() -> Vec<PathBuf> {
let mut dirs = Vec::new();
if let Ok(env_var) = std::env::var("PSCAND_SCANNER_DIRS") {
for path in env_var.split(':') {
let path = PathBuf::from(path);
if !path.as_os_str().is_empty() {
dirs.push(path);
}
}
if !dirs.is_empty() {
return dirs;
}
}
if let Some(lib) = std::env::var_os("LIB_PSCAND") {
dirs.push(PathBuf::from(lib));
}
if let Ok(lib_dir) = std::env::var("LIBDIR_PSCAND") {
dirs.push(PathBuf::from(lib_dir));
}
if let Some(local) = dirs::data_local_dir() {
dirs.push(local.join("pscand/scanners"));
}
if let Some(config) = dirs::config_dir() {
dirs.push(config.join("pscand/scanners"));
}
if dirs.is_empty() {
dirs.push(PathBuf::from(".pscand/scanners"));
}
dirs
}
fn default_log_dir() -> PathBuf {
dirs::data_local_dir()
.map(|p| p.join("pscand/logs"))
.unwrap_or_else(|| PathBuf::from(".pscand/logs"))
}
fn default_retention_days() -> u32 {
7
}
fn default_ring_buffer_size() -> usize {
60
}
impl Default for Config {
fn default() -> Self {
Self {
scanner_dirs: default_scanner_dirs(),
log_dir: default_log_dir(),
retention_days: default_retention_days(),
ring_buffer_size: default_ring_buffer_size(),
scanners: HashMap::new(),
journal_enabled: true,
file_enabled: true,
}
}
}
impl Config {
pub fn load(path: &Path) -> ConfigResult<Self> {
let content = std::fs::read_to_string(path)?;
let mut config: Config = toml::from_str(&content)?;
config.scanner_dirs.retain(|p| p.exists());
Ok(config)
}
pub fn scanner_config(&self, name: &str) -> Option<&ScannerConfig> {
self.scanners.get(name)
}
pub fn is_scanner_enabled(&self, name: &str) -> bool {
self.scanners.get(name).map(|c| c.enabled).unwrap_or(true)
}
}

View file

@ -0,0 +1,11 @@
pub mod power;
pub mod process;
pub mod resource;
pub mod sensor;
pub mod system;
pub use power::PowerHelper;
pub use process::ProcessHelper;
pub use resource::ResourceHelper;
pub use sensor::SensorHelper;
pub use system::SystemHelper;

View file

@ -0,0 +1,157 @@
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
pub struct PowerHelper;
#[derive(Debug, Clone)]
pub struct BatteryInfo {
pub name: String,
pub status: String,
pub capacity: u32,
pub charge_percent: i32,
pub voltage: f64,
pub current_now: i64,
pub power_now: i64,
pub present: bool,
}
impl PowerHelper {
pub fn battery_info() -> std::io::Result<Option<BatteryInfo>> {
let battery_path = PathBuf::from("/sys/class/power_supply");
for entry in fs::read_dir(&battery_path)? {
let entry = entry?;
let path = entry.path();
let type_path = path.join("type");
if type_path.exists() {
let battery_type = fs::read_to_string(&type_path)?.trim().to_string();
if battery_type == "Battery" {
let present_path = path.join("present");
let present = fs::read_to_string(&present_path)
.map(|s| s.trim() == "1")
.unwrap_or(false);
if !present {
continue;
}
let name = fs::read_to_string(path.join("name"))
.unwrap_or_else(|_| "Unknown".to_string())
.trim()
.to_string();
let status = fs::read_to_string(path.join("status"))
.unwrap_or_else(|_| "Unknown".to_string())
.trim()
.to_string();
let capacity = fs::read_to_string(path.join("capacity"))
.ok()
.and_then(|s| s.trim().parse::<u32>().ok())
.unwrap_or(0);
let charge_now = fs::read_to_string(path.join("charge_now"))
.or_else(|_| fs::read_to_string(path.join("energy_now")))
.ok()
.and_then(|s| s.trim().parse::<i64>().ok())
.unwrap_or(0);
let charge_full = fs::read_to_string(path.join("charge_full"))
.or_else(|_| fs::read_to_string(path.join("energy_full")))
.ok()
.and_then(|s| s.trim().parse::<i64>().ok())
.unwrap_or(1);
let charge_percent = if charge_full > 0 {
((charge_now as f64 / charge_full as f64) * 100.0) as i32
} else {
0
};
let voltage = fs::read_to_string(path.join("voltage_now"))
.ok()
.and_then(|s| s.trim().parse::<f64>().ok())
.unwrap_or(0.0)
/ 1_000_000.0;
let current_now = fs::read_to_string(path.join("current_now"))
.ok()
.and_then(|s| s.trim().parse::<i64>().ok())
.unwrap_or(0);
let power_now = fs::read_to_string(path.join("power_now"))
.ok()
.and_then(|s| s.trim().parse::<i64>().ok())
.unwrap_or(0);
return Ok(Some(BatteryInfo {
name,
status,
capacity,
charge_percent,
voltage,
current_now,
power_now,
present,
}));
}
}
}
Ok(None)
}
pub fn power_supplies() -> std::io::Result<HashMap<String, HashMap<String, String>>> {
let mut supplies = HashMap::new();
let power_supply_path = PathBuf::from("/sys/class/power_supply");
if !power_supply_path.exists() {
return Ok(supplies);
}
for entry in fs::read_dir(&power_supply_path)? {
let entry = entry?;
let path = entry.path();
let name = path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_default();
let mut info = HashMap::new();
for attr in [
"type",
"status",
"capacity",
"voltage_now",
"power_now",
"online",
] {
let attr_path = path.join(attr);
if attr_path.exists() {
if let Ok(content) = fs::read_to_string(&attr_path) {
info.insert(attr.to_string(), content.trim().to_string());
}
}
}
if !info.is_empty() {
supplies.insert(name, info);
}
}
Ok(supplies)
}
pub fn suspend_state() -> std::io::Result<String> {
let state_path = PathBuf::from("/sys/power/state");
fs::read_to_string(state_path).map(|s| s.trim().to_string())
}
pub fn mem_sleep_state() -> std::io::Result<String> {
let state_path = PathBuf::from("/sys/power/mem_sleep");
fs::read_to_string(state_path).map(|s| s.trim().to_string())
}
}

View file

@ -0,0 +1,136 @@
use std::collections::HashMap;
use std::fs;
pub struct ProcessHelper;
#[derive(Debug)]
pub struct ProcessInfo {
pub pid: u32,
pub name: String,
pub state: String,
pub ppid: u32,
pub memory_kb: u64,
pub cpu_percent: f32,
}
impl ProcessHelper {
pub fn list_processes() -> std::io::Result<Vec<ProcessInfo>> {
let mut processes = Vec::new();
let proc_path = fs::read_dir("/proc")?;
for entry in proc_path.flatten() {
let path = entry.path();
if !path.is_dir() {
continue;
}
let pid: u32 = match path.file_name() {
Some(name) => match name.to_str() {
Some(s) => s.parse().ok(),
None => None,
}
.ok_or(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"invalid pid",
))?,
None => continue,
};
if let Ok(info) = Self::process_info(pid) {
processes.push(info);
}
}
Ok(processes)
}
pub fn process_info(pid: u32) -> std::io::Result<ProcessInfo> {
let status_path = format!("/proc/{}/status", pid);
let content = fs::read_to_string(status_path)?;
let mut name = String::new();
let mut state = String::new();
let mut ppid: u32 = 0;
let mut memory_kb: u64 = 0;
for line in content.lines() {
if line.starts_with("Name:") {
name = line
.split_whitespace()
.skip(1)
.collect::<Vec<_>>()
.join(" ");
} else if line.starts_with("State:") {
state = line.split_whitespace().nth(1).unwrap_or("").to_string();
} else if line.starts_with("PPid:") {
ppid = line
.split_whitespace()
.nth(1)
.and_then(|s| s.parse().ok())
.unwrap_or(0);
} else if line.starts_with("VmRSS:") {
memory_kb = line
.split_whitespace()
.nth(1)
.and_then(|s| s.parse().ok())
.unwrap_or(0);
}
}
Ok(ProcessInfo {
pid,
name,
state,
ppid,
memory_kb,
cpu_percent: 0.0,
})
}
pub fn zombie_processes() -> std::io::Result<Vec<ProcessInfo>> {
Ok(Self::list_processes()?
.into_iter()
.filter(|p| p.state.starts_with('Z'))
.collect())
}
pub fn process_count() -> std::io::Result<HashMap<String, usize>> {
let mut counts = HashMap::new();
counts.insert("total".to_string(), 0);
counts.insert("running".to_string(), 0);
counts.insert("sleeping".to_string(), 0);
counts.insert("zombie".to_string(), 0);
for proc in Self::list_processes()? {
*counts.get_mut("total").unwrap() += 1;
let first_char = proc.state.chars().next().unwrap_or(' ');
match first_char {
'R' => *counts.get_mut("running").unwrap() += 1,
'S' | 'D' => *counts.get_mut("sleeping").unwrap() += 1,
'Z' => *counts.get_mut("zombie").unwrap() += 1,
_ => {}
}
}
Ok(counts)
}
pub fn top_memory_processes(count: usize) -> std::io::Result<Vec<ProcessInfo>> {
let mut processes = Self::list_processes()?;
processes.sort_by(|a, b| b.memory_kb.cmp(&a.memory_kb));
processes.truncate(count);
Ok(processes)
}
pub fn top_cpu_processes(count: usize) -> std::io::Result<Vec<ProcessInfo>> {
let mut processes = Self::list_processes()?;
processes.sort_by(|a, b| {
b.cpu_percent
.partial_cmp(&a.cpu_percent)
.unwrap_or(std::cmp::Ordering::Equal)
});
processes.truncate(count);
Ok(processes)
}
}

View file

@ -0,0 +1,123 @@
use std::collections::HashMap;
use std::fs;
pub struct ResourceHelper;
impl ResourceHelper {
pub fn cpu_usage() -> std::io::Result<HashMap<String, f64>> {
let content = fs::read_to_string("/proc/stat")?;
let mut result = HashMap::new();
for line in content.lines() {
if line.starts_with("cpu") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 5 {
let cpu = parts[0];
let values: Vec<u64> = parts[1..]
.iter()
.take(7)
.filter_map(|s| s.parse().ok())
.collect();
if values.len() >= 4 {
let user = values[0] as f64;
let nice = values[1] as f64;
let system = values[2] as f64;
let idle = values[3] as f64;
let iowait = values.get(4).copied().unwrap_or(0) as f64;
let irq = values.get(5).copied().unwrap_or(0) as f64;
let softirq = values.get(6).copied().unwrap_or(0) as f64;
let total = user + nice + system + idle + iowait + irq + softirq;
let active = user + nice + system + irq + softirq;
if cpu == "cpu" {
result.insert("total_user".to_string(), user);
result.insert("total_nice".to_string(), nice);
result.insert("total_system".to_string(), system);
result.insert("total_idle".to_string(), idle);
result.insert("total_iowait".to_string(), iowait);
result.insert(
"total_usage_percent".to_string(),
(active / total) * 100.0,
);
} else {
let core = cpu.replace("cpu", "core_");
result.insert(format!("{}_user", core), user);
result.insert(
format!("{}_usage_percent", core),
(active / total) * 100.0,
);
}
}
}
}
}
Ok(result)
}
pub fn memory_info() -> std::io::Result<HashMap<String, u64>> {
let content = fs::read_to_string("/proc/meminfo")?;
let mut result = HashMap::new();
for line in content.lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
let key = parts[0].trim_end_matches(':');
if let Ok(value) = parts[1].parse::<u64>() {
result.insert(key.to_string(), value * 1024);
}
}
}
Ok(result)
}
pub fn disk_stats() -> std::io::Result<HashMap<String, HashMap<String, u64>>> {
let content = fs::read_to_string("/proc/diskstats")?;
let mut result = HashMap::new();
for line in content.lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 14 {
let device = parts[2].to_string();
let mut stats = HashMap::new();
stats.insert("reads_completed".to_string(), parts[3].parse().unwrap_or(0));
stats.insert("reads_merged".to_string(), parts[4].parse().unwrap_or(0));
stats.insert("sectors_read".to_string(), parts[5].parse().unwrap_or(0));
stats.insert("reads_ms".to_string(), parts[6].parse().unwrap_or(0));
stats.insert(
"writes_completed".to_string(),
parts[7].parse().unwrap_or(0),
);
stats.insert("writes_merged".to_string(), parts[8].parse().unwrap_or(0));
stats.insert("sectors_written".to_string(), parts[9].parse().unwrap_or(0));
stats.insert("writes_ms".to_string(), parts[10].parse().unwrap_or(0));
result.insert(device, stats);
}
}
Ok(result)
}
pub fn net_dev() -> std::io::Result<HashMap<String, HashMap<String, u64>>> {
let content = fs::read_to_string("/proc/net/dev")?;
let mut result = HashMap::new();
for line in content.lines().skip(2) {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 17 {
let iface = parts[0].trim_end_matches(':');
let mut stats = HashMap::new();
stats.insert("rx_bytes".to_string(), parts[1].parse().unwrap_or(0));
stats.insert("rx_packets".to_string(), parts[2].parse().unwrap_or(0));
stats.insert("rx_errors".to_string(), parts[3].parse().unwrap_or(0));
stats.insert("rx_dropped".to_string(), parts[4].parse().unwrap_or(0));
stats.insert("tx_bytes".to_string(), parts[9].parse().unwrap_or(0));
stats.insert("tx_packets".to_string(), parts[10].parse().unwrap_or(0));
stats.insert("tx_errors".to_string(), parts[11].parse().unwrap_or(0));
stats.insert("tx_dropped".to_string(), parts[12].parse().unwrap_or(0));
result.insert(iface.to_string(), stats);
}
}
Ok(result)
}
}

View file

@ -0,0 +1,102 @@
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
pub struct SensorHelper;
impl SensorHelper {
pub fn discover_hwmon() -> std::io::Result<Vec<PathBuf>> {
let hwmon_path = PathBuf::from("/sys/class/hwmon");
let mut hwmons = Vec::new();
if hwmon_path.exists() {
for entry in fs::read_dir(&hwmon_path)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
hwmons.push(path);
}
}
}
Ok(hwmons)
}
pub fn read_hwmon_sensor(hwmon_path: &Path, sensor: &str) -> std::io::Result<Option<f64>> {
let sensor_path = hwmon_path.join(sensor);
if sensor_path.exists() {
let content = fs::read_to_string(sensor_path)?;
Ok(content.trim().parse::<f64>().ok())
} else {
Ok(None)
}
}
pub fn hwmon_info(hwmon_path: &Path) -> std::io::Result<HashMap<String, String>> {
let mut info = HashMap::new();
let name_path = hwmon_path.join("name");
if name_path.exists() {
info.insert(
"name".to_string(),
fs::read_to_string(name_path)?.trim().to_string(),
);
}
if let Ok(files) = fs::read_dir(hwmon_path) {
for file in files.flatten() {
let filename = file.file_name().to_string_lossy().to_string();
if filename.starts_with("temp") && filename.ends_with("_input") {
let id = filename
.trim_start_matches("temp")
.trim_end_matches("_input");
if let Some(t) = Self::read_hwmon_sensor(hwmon_path, &filename)
.ok()
.flatten()
{
info.insert(format!("temp_{}_celsius", id), format!("{}", t / 1000.0));
}
}
if filename.starts_with("fan") && filename.ends_with("_input") {
let id = filename
.trim_start_matches("fan")
.trim_end_matches("_input");
if let Some(f) = Self::read_hwmon_sensor(hwmon_path, &filename)
.ok()
.flatten()
{
info.insert(format!("fan_{}_rpm", id), format!("{}", f));
}
}
if filename.starts_with("in") && filename.ends_with("_input") {
let id = filename.trim_start_matches("in").trim_end_matches("_input");
if let Some(v) = Self::read_hwmon_sensor(hwmon_path, &filename)
.ok()
.flatten()
{
info.insert(format!("voltage_{}_mv", id), format!("{}", v / 1000.0));
}
}
}
}
Ok(info)
}
pub fn all_sensors() -> std::io::Result<HashMap<String, HashMap<String, String>>> {
let mut all = HashMap::new();
for hwmon in Self::discover_hwmon()? {
if let Ok(info) = Self::hwmon_info(&hwmon) {
let name = info.get("name").cloned().unwrap_or_else(|| {
hwmon
.file_name()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_else(|| "unknown".to_string())
});
all.insert(name, info);
}
}
Ok(all)
}
}

View file

@ -0,0 +1,45 @@
use std::fs;
use std::time::Duration;
pub struct SystemHelper;
impl SystemHelper {
pub fn uptime() -> std::io::Result<Duration> {
let uptime_secs = fs::read_to_string("/proc/uptime")?
.split_whitespace()
.next()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
Ok(Duration::from_secs_f64(uptime_secs))
}
pub fn boot_id() -> std::io::Result<String> {
fs::read_to_string("/proc/sys/kernel/random/boot_id").map(|s| s.trim().to_string())
}
pub fn load_average() -> std::io::Result<(f64, f64, f64)> {
let content = fs::read_to_string("/proc/loadavg")?;
let mut parts = content.split_whitespace();
let load1 = parts
.next()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let load5 = parts
.next()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
let load15 = parts
.next()
.and_then(|s| s.parse::<f64>().ok())
.unwrap_or(0.0);
Ok((load1, load5, load15))
}
pub fn hostname() -> std::io::Result<String> {
fs::read_to_string("/proc/sys/kernel/hostname").map(|s| s.trim().to_string())
}
pub fn kernel_version() -> std::io::Result<String> {
fs::read_to_string("/proc/sys/kernel/osrelease").map(|s| s.trim().to_string())
}
}

View file

@ -0,0 +1,11 @@
pub mod config;
pub mod helpers;
pub mod logging;
pub mod scanner;
pub use config::Config;
pub use logging::{DaemonLogEntry, LogLevel, RingBufferLogger};
pub use scanner::{
get_scanner, register_scanner, MetricValue, Scanner, ScannerCreatorFfi, ScannerError,
};
pub type Result<T> = std::result::Result<T, ScannerError>;

View file

@ -0,0 +1,583 @@
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use ringbuf::{
storage::Heap,
traits::*,
wrap::caching::{CachingCons, CachingProd},
SharedRb,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use crate::scanner::MetricValue;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum LogLevel {
Debug,
Info,
Warn,
Error,
Critical,
}
impl LogLevel {
pub fn parse(s: &str) -> Self {
match s.to_lowercase().as_str() {
"debug" => LogLevel::Debug,
"info" => LogLevel::Info,
"warn" | "warning" => LogLevel::Warn,
"error" | "err" => LogLevel::Error,
"critical" | "crit" | "fatal" => LogLevel::Critical,
_ => LogLevel::Info,
}
}
pub fn as_str(&self) -> &'static str {
match self {
LogLevel::Debug => "DEBUG",
LogLevel::Info => "INFO",
LogLevel::Warn => "WARN",
LogLevel::Error => "ERROR",
LogLevel::Critical => "CRITICAL",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub timestamp: DateTime<Utc>,
pub scanner: String,
pub level: LogLevel,
pub message: Option<String>,
pub metrics: HashMap<String, MetricValue>,
pub collection_time_ms: Option<u64>,
pub error_count: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DaemonLogEntry {
pub timestamp: DateTime<Utc>,
pub level: LogLevel,
pub source: String,
pub event: String,
pub message: String,
}
impl DaemonLogEntry {
pub fn new(source: impl Into<String>, event: impl Into<String>, message: String) -> Self {
Self {
timestamp: Utc::now(),
level: LogLevel::Info,
source: source.into(),
event: event.into(),
message,
}
}
pub fn with_level(mut self, level: LogLevel) -> Self {
self.level = level;
self
}
pub fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_default()
}
}
impl LogEntry {
pub fn new(scanner: impl Into<String>, metrics: HashMap<String, MetricValue>) -> Self {
Self {
timestamp: Utc::now(),
scanner: scanner.into(),
level: LogLevel::Info,
message: None,
metrics,
collection_time_ms: None,
error_count: None,
}
}
pub fn with_level(mut self, level: LogLevel) -> Self {
self.level = level;
self
}
pub fn with_message(mut self, message: impl Into<String>) -> Self {
self.message = Some(message.into());
self
}
pub fn with_timing(mut self, duration: Instant) -> Self {
self.collection_time_ms = Some(duration.elapsed().as_millis() as u64);
self
}
pub fn with_error_count(mut self, count: u64) -> Self {
self.error_count = Some(count);
self
}
pub fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|e| format!("{{\"error\":\"{}\"}}", e))
}
pub fn to_journal(&self) -> String {
let metrics_json = serde_json::to_string(&self.metrics).unwrap_or_default();
let level_str = self.level.as_str();
if let Some(ref msg) = self.message {
format!(
"PSCAND_SCANNER={} PSCAND_LEVEL={} PSCAND_MSG={} PSCAND_METRICS={}",
self.scanner, level_str, msg, metrics_json
)
} else {
format!(
"PSCAND_SCANNER={} PSCAND_LEVEL={} PSCAND_METRICS={}",
self.scanner, level_str, metrics_json
)
}
}
}
type RbStorage = Heap<LogEntry>;
type RbStorageDaemon = Heap<DaemonLogEntry>;
type SharedRbLog = SharedRb<RbStorage>;
type SharedRbLogDaemon = SharedRb<RbStorageDaemon>;
struct RingBufferHandles {
prod: CachingProd<Arc<SharedRbLog>>,
cons: CachingCons<Arc<SharedRbLog>>,
}
struct DaemonBufferHandles {
prod: CachingProd<Arc<SharedRbLogDaemon>>,
cons: CachingCons<Arc<SharedRbLogDaemon>>,
}
pub struct RingBufferLogger {
buffer: Arc<Mutex<RingBufferHandles>>,
daemon_buffer: Arc<Mutex<DaemonBufferHandles>>,
file_path: Option<PathBuf>,
journal_enabled: bool,
file_enabled: bool,
}
impl RingBufferLogger {
pub fn new(
capacity: usize,
file_path: Option<PathBuf>,
journal_enabled: bool,
file_enabled: bool,
) -> Self {
let rb = SharedRb::<RbStorage>::new(capacity);
let (prod, cons) = rb.split();
let handles = RingBufferHandles { prod, cons };
let daemon_rb = SharedRb::<RbStorageDaemon>::new(capacity);
let (daemon_prod, daemon_cons) = daemon_rb.split();
let daemon_handles = DaemonBufferHandles {
prod: daemon_prod,
cons: daemon_cons,
};
Self {
buffer: Arc::new(Mutex::new(handles)),
daemon_buffer: Arc::new(Mutex::new(daemon_handles)),
file_path,
journal_enabled,
file_enabled,
}
}
pub fn push(&self, entry: LogEntry) {
{
let mut handles = self.buffer.lock();
if handles.prod.is_full() {
let _ = handles.cons.try_pop();
}
let _ = handles.prod.try_push(entry.clone());
}
if self.journal_enabled {
self.write_to_journal(&entry);
}
if self.file_enabled {
self.write_to_file(&entry);
}
}
pub fn log(&self, level: LogLevel, source: &str, event: &str, message: String) {
let entry = DaemonLogEntry {
timestamp: Utc::now(),
level,
source: source.to_string(),
event: event.to_string(),
message,
};
{
let mut daemon_handles = self.daemon_buffer.lock();
if daemon_handles.prod.is_full() {
let _ = daemon_handles.cons.try_pop();
}
let _ = daemon_handles.prod.try_push(entry.clone());
}
if self.journal_enabled {
self.write_daemon_to_journal(&entry);
}
if self.file_enabled {
self.write_daemon_to_file(&entry);
}
}
fn write_to_journal(&self, entry: &LogEntry) {
let msg = entry.to_journal();
let _ = std::process::Command::new("logger")
.arg("-t")
.arg("pscand")
.arg("-p")
.arg("info")
.arg(msg)
.spawn();
}
fn write_to_file(&self, entry: &LogEntry) {
if let Some(ref path) = self.file_path {
if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) {
let _ = writeln!(file, "{}", entry.to_json());
}
}
}
fn write_daemon_to_journal(&self, entry: &DaemonLogEntry) {
let priority = match entry.level {
LogLevel::Debug => 7,
LogLevel::Info => 6,
LogLevel::Warn => 4,
LogLevel::Error => 3,
LogLevel::Critical => 2,
};
let msg = format!(
"PSCAND_SOURCE={} PSCAND_EVENT={} PSCAND_MSG={}",
entry.source, entry.event, entry.message
);
let _ = std::process::Command::new("logger")
.arg("-t")
.arg("pscand")
.arg("-p")
.arg(format!("user.{}", priority))
.arg(msg)
.spawn();
}
fn write_daemon_to_file(&self, entry: &DaemonLogEntry) {
if let Some(ref path) = self.file_path {
if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path) {
let _ = writeln!(file, "{}", entry.to_json());
}
}
}
pub fn get_daemon_recent(&self, count: usize) -> Vec<DaemonLogEntry> {
let handles = self.daemon_buffer.lock();
handles.cons.iter().rev().take(count).cloned().collect()
}
pub fn get_recent(&self, count: usize) -> Vec<LogEntry> {
let handles = self.buffer.lock();
handles.cons.iter().rev().take(count).cloned().collect()
}
pub fn flush_to_file(&self, path: &Path) -> std::io::Result<()> {
let entries = self.get_recent(usize::MAX);
let mut file = fs::File::create(path)?;
for entry in entries {
writeln!(file, "{}", entry.to_json())?;
}
Ok(())
}
}
impl Default for RingBufferLogger {
fn default() -> Self {
Self::new(60, None, true, false)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeStats {
pub uptime_secs: u64,
pub total_collections: u64,
pub total_errors: u64,
pub last_collection_time_ms: u64,
pub avg_collection_time_ms: f64,
pub scanner_stats: HashMap<String, ScannerStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScannerStats {
pub collections: u64,
pub errors: u64,
pub last_error: Option<String>,
pub last_collection_time_ms: u64,
pub avg_collection_time_ms: f64,
}
pub struct RuntimeMonitor {
start_time: Instant,
total_collections: AtomicU64,
total_errors: AtomicU64,
last_collection_time: AtomicU64,
collection_time_sum: AtomicU64,
scanner_collections: parking_lot::Mutex<HashMap<String, AtomicU64>>,
scanner_errors: parking_lot::Mutex<HashMap<String, AtomicU64>>,
scanner_last_error: parking_lot::Mutex<HashMap<String, String>>,
scanner_last_time: parking_lot::Mutex<HashMap<String, AtomicU64>>,
scanner_time_sum: parking_lot::Mutex<HashMap<String, AtomicU64>>,
}
impl RuntimeMonitor {
pub fn new() -> Self {
Self {
start_time: Instant::now(),
total_collections: AtomicU64::new(0),
total_errors: AtomicU64::new(0),
last_collection_time: AtomicU64::new(0),
collection_time_sum: AtomicU64::new(0),
scanner_collections: parking_lot::Mutex::new(HashMap::new()),
scanner_errors: parking_lot::Mutex::new(HashMap::new()),
scanner_last_error: parking_lot::Mutex::new(HashMap::new()),
scanner_last_time: parking_lot::Mutex::new(HashMap::new()),
scanner_time_sum: parking_lot::Mutex::new(HashMap::new()),
}
}
pub fn record_collection(&self, scanner: &str, time_ms: u64, error: Option<&str>) {
self.total_collections.fetch_add(1, Ordering::Relaxed);
self.last_collection_time.store(time_ms, Ordering::Relaxed);
self.collection_time_sum
.fetch_add(time_ms, Ordering::Relaxed);
let mut collections = self.scanner_collections.lock();
let collections = collections
.entry(scanner.to_string())
.or_insert_with(|| AtomicU64::new(0));
collections.fetch_add(1, Ordering::Relaxed);
let mut last_time = self.scanner_last_time.lock();
let last_time = last_time
.entry(scanner.to_string())
.or_insert_with(|| AtomicU64::new(0));
last_time.store(time_ms, Ordering::Relaxed);
let mut time_sum = self.scanner_time_sum.lock();
let time_sum = time_sum
.entry(scanner.to_string())
.or_insert_with(|| AtomicU64::new(0));
time_sum.fetch_add(time_ms, Ordering::Relaxed);
if let Some(err) = error {
self.total_errors.fetch_add(1, Ordering::Relaxed);
let mut errors = self.scanner_errors.lock();
let errors = errors
.entry(scanner.to_string())
.or_insert_with(|| AtomicU64::new(0));
errors.fetch_add(1, Ordering::Relaxed);
let mut last_error = self.scanner_last_error.lock();
last_error.insert(scanner.to_string(), err.to_string());
}
}
pub fn get_stats(&self) -> RuntimeStats {
let uptime = self.start_time.elapsed().as_secs();
let total = self.total_collections.load(Ordering::Relaxed);
let errors = self.total_errors.load(Ordering::Relaxed);
let last_time = self.last_collection_time.load(Ordering::Relaxed);
let sum_time = self.collection_time_sum.load(Ordering::Relaxed);
let avg = if total > 0 {
sum_time as f64 / total as f64
} else {
0.0
};
let mut scanner_stats = HashMap::new();
let collections = self.scanner_collections.lock();
let errors_map = self.scanner_errors.lock();
let last_error_map = self.scanner_last_error.lock();
let last_time_map = self.scanner_last_time.lock();
let time_sum_map = self.scanner_time_sum.lock();
for (name, coll) in collections.iter() {
let coll_count = coll.load(Ordering::Relaxed);
let err_count = errors_map
.get(name)
.map(|e| e.load(Ordering::Relaxed))
.unwrap_or(0);
let last_err = last_error_map.get(name).cloned();
let last_t = last_time_map
.get(name)
.map(|t| t.load(Ordering::Relaxed))
.unwrap_or(0);
let sum_t = time_sum_map
.get(name)
.map(|s| s.load(Ordering::Relaxed))
.unwrap_or(0);
let avg_t = if coll_count > 0 {
sum_t as f64 / coll_count as f64
} else {
0.0
};
scanner_stats.insert(
name.clone(),
ScannerStats {
collections: coll_count,
errors: err_count,
last_error: last_err,
last_collection_time_ms: last_t,
avg_collection_time_ms: avg_t,
},
);
}
RuntimeStats {
uptime_secs: uptime,
total_collections: total,
total_errors: errors,
last_collection_time_ms: last_time,
avg_collection_time_ms: avg,
scanner_stats,
}
}
}
impl Default for RuntimeMonitor {
fn default() -> Self {
Self::new()
}
}
pub struct Heartbeat {
path: PathBuf,
interval_secs: u64,
last_update: parking_lot::Mutex<Instant>,
}
impl Heartbeat {
pub fn new(path: PathBuf, interval_secs: u64) -> Self {
Self {
path,
interval_secs,
last_update: parking_lot::Mutex::new(Instant::now()),
}
}
pub fn touch(&self) -> std::io::Result<()> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
if let Some(parent) = self.path.parent() {
fs::create_dir_all(parent)?
}
let mut file = fs::File::create(&self.path)?;
writeln!(file, "{}", now)?;
*self.last_update.lock() = Instant::now();
Ok(())
}
pub fn is_stale(&self) -> bool {
self.last_update.lock().elapsed().as_secs() > self.interval_secs * 3
}
}
pub struct CrashDetector {
heartbeat: Heartbeat,
state_file: PathBuf,
}
impl CrashDetector {
pub fn new(state_dir: PathBuf) -> Self {
let heartbeat = Heartbeat::new(state_dir.join("heartbeat"), 5);
let state_file = state_dir.join("state");
Self {
heartbeat,
state_file,
}
}
pub fn new_with_interval(state_dir: PathBuf, heartbeat_interval_secs: u64) -> Self {
let heartbeat = Heartbeat::new(state_dir.join("heartbeat"), heartbeat_interval_secs);
let state_file = state_dir.join("state");
Self {
heartbeat,
state_file,
}
}
pub fn write_state(&self, stats: &RuntimeStats) -> std::io::Result<()> {
if let Some(parent) = self.state_file.parent() {
fs::create_dir_all(parent)?
};
let json = serde_json::to_string_pretty(stats).unwrap_or_default();
fs::write(&self.state_file, json)
}
pub fn mark_running(&self) -> std::io::Result<()> {
self.heartbeat.touch()
}
pub fn mark_stopped(&self) -> std::io::Result<()> {
if let Some(parent) = self.state_file.parent() {
fs::create_dir_all(parent)?;
}
let json = serde_json::json!({
"status": "stopped",
"timestamp": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
});
fs::write(
&self.state_file,
serde_json::to_string_pretty(&json).unwrap_or_default(),
)
}
pub fn is_healthy(&self) -> bool {
if self.heartbeat.is_stale() {
return false;
}
if let Some(content) = fs::read_to_string(&self.state_file)
.and_then(|c| {
serde_json::from_str::<serde_json::Value>(&c)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
})
.ok()
.and_then(|state| {
state
.get("status")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
})
{
if content == "stopped" {
return false;
}
}
true
}
}

View file

@ -0,0 +1,154 @@
use std::collections::HashMap;
use std::os::raw::c_void;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::LazyLock;
use std::sync::Mutex;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use thiserror::Error;
pub type ScannerBox = Box<dyn Scanner>;
pub type ScannerResult = Result<ScannerBox>;
pub type ScannerMetrics = HashMap<String, MetricValue>;
pub type ScannerCollectionResult = Result<ScannerMetrics>;
pub type ScannerCollectFn = Box<dyn Fn() -> ScannerCollectionResult + Send + Sync>;
pub type ScannerInitFnMut = Mutex<Box<dyn FnMut(&toml::Value) -> Result<()> + Send>>;
pub type ScannerCleanupFnMut = Mutex<Box<dyn FnMut() -> Result<()> + Send>>;
pub type ScannerCreatorFfi = unsafe extern "C" fn() -> *mut c_void;
pub type ScannerInitFn = unsafe extern "C" fn() -> *mut c_void;
pub type ScannerDropFn = unsafe extern "C" fn(*mut c_void);
#[derive(Error, Debug)]
pub enum ScannerError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Parse error: {0}")]
Parse(String),
#[error("Configuration error: {0}")]
Config(String),
#[error("Scanner not initialized")]
NotInitialized,
#[error("Scanner {0} not found")]
NotFound(String),
}
pub type Result<T> = std::result::Result<T, ScannerError>;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum MetricValue {
Integer(i64),
Float(f64),
Boolean(bool),
String(String),
}
impl MetricValue {
pub fn from_i64(v: i64) -> Self {
MetricValue::Integer(v)
}
pub fn from_f64(v: f64) -> Self {
MetricValue::Float(v)
}
pub fn from_bool(v: bool) -> Self {
MetricValue::Boolean(v)
}
pub fn from_string(v: impl Into<String>) -> Self {
MetricValue::String(v.into())
}
}
pub trait Scanner: Send + Sync {
fn name(&self) -> &'static str;
fn interval(&self) -> Duration;
fn init(&mut self, config: &toml::Value) -> Result<()>;
fn collect(&self) -> ScannerCollectionResult;
fn cleanup(&mut self) -> Result<()>;
}
static SCANNER_HANDLES: LazyLock<Mutex<HashMap<usize, ScannerBox>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
static NEXT_HANDLE: AtomicUsize = AtomicUsize::new(1);
#[inline]
/// Register a scanner and return a handle.
/// # Safety
/// The handle must only be used with `get_scanner` from the same process.
pub unsafe fn register_scanner(scanner: ScannerBox) -> usize {
let handle = NEXT_HANDLE.fetch_add(1, Ordering::SeqCst);
SCANNER_HANDLES.lock().unwrap().insert(handle, scanner);
handle
}
#[inline]
/// Retrieve a scanner by handle, consuming the registration.
/// # Safety
/// The handle must have been obtained from `register_scanner` in this process,
/// and the returned Box must be properly dropped.
pub unsafe fn get_scanner(handle: *mut c_void) -> Option<ScannerBox> {
let handle = handle as usize;
if handle == 0 {
None
} else {
SCANNER_HANDLES.lock().unwrap().remove(&handle)
}
}
pub trait ScannerRegistry: Send + Sync {
fn list_scanners(&self) -> Vec<&'static str>;
fn get_scanner(&self, name: &str) -> Option<ScannerBox>;
}
pub struct DynamicScanner {
name: &'static str,
interval: Duration,
collect_fn: ScannerCollectFn,
init_fn: ScannerInitFnMut,
cleanup_fn: ScannerCleanupFnMut,
}
impl DynamicScanner {
pub fn new(
name: &'static str,
interval: Duration,
collect_fn: impl Fn() -> ScannerCollectionResult + Send + Sync + 'static,
init_fn: impl FnMut(&toml::Value) -> Result<()> + Send + 'static,
cleanup_fn: impl FnMut() -> Result<()> + Send + 'static,
) -> Self {
Self {
name,
interval,
collect_fn: Box::new(collect_fn),
init_fn: Mutex::new(Box::new(init_fn)),
cleanup_fn: Mutex::new(Box::new(cleanup_fn)),
}
}
}
impl Scanner for DynamicScanner {
fn name(&self) -> &'static str {
self.name
}
fn interval(&self) -> Duration {
self.interval
}
fn init(&mut self, config: &toml::Value) -> Result<()> {
let mut init_fn = self.init_fn.lock().unwrap();
(init_fn)(config)
}
fn collect(&self) -> ScannerCollectionResult {
(self.collect_fn)()
}
fn cleanup(&mut self) -> Result<()> {
let mut cleanup_fn = self.cleanup_fn.lock().unwrap();
(cleanup_fn)()
}
}

View file

@ -0,0 +1,14 @@
[package]
name = "pscand-macros"
version.workspace = true
edition.workspace = true
license.workspace = true
authors.workspace = true
[lib]
proc-macro = true
[dependencies]
proc-macro2 = "1"
quote = "1"
syn = { version = "2", features = ["full"] }

View file

@ -0,0 +1,64 @@
use proc_macro::TokenStream;
use quote::quote;
use syn::{parse_macro_input, ItemFn};
#[proc_macro_attribute]
pub fn scanner(name: TokenStream, input: TokenStream) -> TokenStream {
let name_str = parse_macro_input!(name as syn::LitStr).value();
let input = parse_macro_input!(input as ItemFn);
let _fn_name = input.sig.ident.clone();
let body = &input.block;
let result = quote! {
#[no_mangle]
pub extern "C" fn pscand_scanner() -> *mut std::os::raw::c_void {
struct ScannerImpl;
impl pscand_core::Scanner for ScannerImpl {
fn name(&self) -> &'static str {
#name_str
}
fn interval(&self) -> std::time::Duration {
std::time::Duration::from_secs(1)
}
fn init(&mut self, _config: &toml::Value) -> pscand_core::Result<()> {
Ok(())
}
fn collect(&self) -> pscand_core::Result<std::collections::HashMap<String, pscand_core::MetricValue>> {
#body
}
fn cleanup(&mut self) -> pscand_core::Result<()> {
Ok(())
}
}
let handle = unsafe { pscand_core::register_scanner(Box::new(ScannerImpl)) };
handle as *mut std::os::raw::c_void
}
};
result.into()
}
#[proc_macro]
pub fn register_scanner(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as ItemFn);
let fn_name = input.sig.ident.clone();
let result = quote! {
#input
#[no_mangle]
pub extern "C" fn pscand_scanner() -> *mut std::os::raw::c_void {
let handle = unsafe { pscand_core::register_scanner(Box::new(#fn_name())) };
handle as *mut std::os::raw::c_void
}
};
result.into()
}