pscand/crates/pscand-core/src/logging.rs
NotAShelf ffae695240
treewide: set up rustfmt and taplo with custom rules
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I794f9152bb02e3dd91c9738369b94fc66a6a6964
2026-02-19 01:42:46 +03:00

661 lines
17 KiB
Rust

use std::{
collections::HashMap,
fs::{
self,
OpenOptions,
},
io::Write,
path::{
Path,
PathBuf,
},
sync::{
atomic::{
AtomicU64,
Ordering,
},
Arc,
},
time::Instant,
};
use chrono::{
DateTime,
Utc,
};
use parking_lot::Mutex;
use ringbuf::{
storage::Heap,
traits::*,
wrap::caching::{
CachingCons,
CachingProd,
},
SharedRb,
};
use serde::{
Deserialize,
Serialize,
};
use crate::scanner::MetricValue;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum LogLevel {
Debug,
Info,
Warn,
Error,
Critical,
}
impl LogLevel {
pub fn parse(s: &str) -> Self {
match s.to_lowercase().as_str() {
"debug" => LogLevel::Debug,
"info" => LogLevel::Info,
"warn" | "warning" => LogLevel::Warn,
"error" | "err" => LogLevel::Error,
"critical" | "crit" | "fatal" => LogLevel::Critical,
_ => LogLevel::Info,
}
}
pub fn as_str(&self) -> &'static str {
match self {
LogLevel::Debug => "DEBUG",
LogLevel::Info => "INFO",
LogLevel::Warn => "WARN",
LogLevel::Error => "ERROR",
LogLevel::Critical => "CRITICAL",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub timestamp: DateTime<Utc>,
pub scanner: String,
pub level: LogLevel,
pub message: Option<String>,
pub metrics: HashMap<String, MetricValue>,
pub collection_time_ms: Option<u64>,
pub error_count: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DaemonLogEntry {
pub timestamp: DateTime<Utc>,
pub level: LogLevel,
pub source: String,
pub event: String,
pub message: String,
}
impl DaemonLogEntry {
pub fn new(
source: impl Into<String>,
event: impl Into<String>,
message: String,
) -> Self {
Self {
timestamp: Utc::now(),
level: LogLevel::Info,
source: source.into(),
event: event.into(),
message,
}
}
pub fn with_level(mut self, level: LogLevel) -> Self {
self.level = level;
self
}
pub fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_default()
}
}
impl LogEntry {
pub fn new(
scanner: impl Into<String>,
metrics: HashMap<String, MetricValue>,
) -> Self {
Self {
timestamp: Utc::now(),
scanner: scanner.into(),
level: LogLevel::Info,
message: None,
metrics,
collection_time_ms: None,
error_count: None,
}
}
pub fn with_level(mut self, level: LogLevel) -> Self {
self.level = level;
self
}
pub fn with_message(mut self, message: impl Into<String>) -> Self {
self.message = Some(message.into());
self
}
pub fn with_timing(mut self, duration: Instant) -> Self {
self.collection_time_ms = Some(duration.elapsed().as_millis() as u64);
self
}
pub fn with_error_count(mut self, count: u64) -> Self {
self.error_count = Some(count);
self
}
pub fn to_json(&self) -> String {
serde_json::to_string(self)
.unwrap_or_else(|e| format!("{{\"error\":\"{}\"}}", e))
}
pub fn to_journal(&self) -> String {
let metrics_json = serde_json::to_string(&self.metrics).unwrap_or_default();
let level_str = self.level.as_str();
if let Some(ref msg) = self.message {
format!(
"PSCAND_SCANNER={} PSCAND_LEVEL={} PSCAND_MSG={} PSCAND_METRICS={}",
self.scanner, level_str, msg, metrics_json
)
} else {
format!(
"PSCAND_SCANNER={} PSCAND_LEVEL={} PSCAND_METRICS={}",
self.scanner, level_str, metrics_json
)
}
}
}
type RbStorage = Heap<LogEntry>;
type RbStorageDaemon = Heap<DaemonLogEntry>;
type SharedRbLog = SharedRb<RbStorage>;
type SharedRbLogDaemon = SharedRb<RbStorageDaemon>;
struct RingBufferHandles {
prod: CachingProd<Arc<SharedRbLog>>,
cons: CachingCons<Arc<SharedRbLog>>,
}
struct DaemonBufferHandles {
prod: CachingProd<Arc<SharedRbLogDaemon>>,
cons: CachingCons<Arc<SharedRbLogDaemon>>,
}
pub struct RingBufferLogger {
buffer: Arc<Mutex<RingBufferHandles>>,
daemon_buffer: Arc<Mutex<DaemonBufferHandles>>,
file_path: Option<PathBuf>,
journal_enabled: bool,
file_enabled: bool,
}
impl RingBufferLogger {
pub fn new(
capacity: usize,
file_path: Option<PathBuf>,
journal_enabled: bool,
file_enabled: bool,
) -> Self {
let rb = SharedRb::<RbStorage>::new(capacity);
let (prod, cons) = rb.split();
let handles = RingBufferHandles { prod, cons };
let daemon_rb = SharedRb::<RbStorageDaemon>::new(capacity);
let (daemon_prod, daemon_cons) = daemon_rb.split();
let daemon_handles = DaemonBufferHandles {
prod: daemon_prod,
cons: daemon_cons,
};
Self {
buffer: Arc::new(Mutex::new(handles)),
daemon_buffer: Arc::new(Mutex::new(daemon_handles)),
file_path,
journal_enabled,
file_enabled,
}
}
pub fn push(&self, entry: LogEntry) {
{
let mut handles = self.buffer.lock();
if handles.prod.is_full() {
let _ = handles.cons.try_pop();
}
let _ = handles.prod.try_push(entry.clone());
}
if self.journal_enabled {
self.write_to_journal(&entry);
}
if self.file_enabled {
self.write_to_file(&entry);
}
}
pub fn log(
&self,
level: LogLevel,
source: &str,
event: &str,
message: String,
) {
let entry = DaemonLogEntry {
timestamp: Utc::now(),
level,
source: source.to_string(),
event: event.to_string(),
message,
};
{
let mut daemon_handles = self.daemon_buffer.lock();
if daemon_handles.prod.is_full() {
let _ = daemon_handles.cons.try_pop();
}
let _ = daemon_handles.prod.try_push(entry.clone());
}
if self.journal_enabled {
self.write_daemon_to_journal(&entry);
}
if self.file_enabled {
self.write_daemon_to_file(&entry);
}
}
fn write_to_journal(&self, entry: &LogEntry) {
let priority = match entry.level {
LogLevel::Debug => 7,
LogLevel::Info => 6,
LogLevel::Warn => 4,
LogLevel::Error => 3,
LogLevel::Critical => 2,
};
let msg = entry
.message
.clone()
.unwrap_or_else(|| "collection completed".to_string());
let scanner = entry.scanner.clone();
// Write directly to systemd journal socket
std::thread::spawn(move || {
use std::os::unix::net::UnixDatagram;
let journal_msg = format!(
"PRIORITY={}\nSYSLOG_IDENTIFIER=pscand\nPSCAND_SCANNER={}\nMESSAGE={}\\
n",
priority, scanner, msg
);
if let Ok(sock) = UnixDatagram::unbound() {
let _ =
sock.send_to(journal_msg.as_bytes(), "/run/systemd/journal/socket");
}
});
}
fn write_to_file(&self, entry: &LogEntry) {
if let Some(ref path) = self.file_path {
if let Ok(mut file) =
OpenOptions::new().create(true).append(true).open(path)
{
let _ = writeln!(file, "{}", entry.to_json());
}
}
}
fn write_daemon_to_journal(&self, entry: &DaemonLogEntry) {
let priority = match entry.level {
LogLevel::Debug => 7,
LogLevel::Info => 6,
LogLevel::Warn => 4,
LogLevel::Error => 3,
LogLevel::Critical => 2,
};
let source = entry.source.clone();
let event = entry.event.clone();
let msg = entry.message.clone();
// Write directly to systemd journal socket
std::thread::spawn(move || {
use std::os::unix::net::UnixDatagram;
let journal_msg = format!(
"PRIORITY={}\nSYSLOG_IDENTIFIER=pscand\nPSCAND_SOURCE={}\\
nPSCAND_EVENT={}\nMESSAGE={}\n",
priority, source, event, msg
);
if let Ok(sock) = UnixDatagram::unbound() {
let _ =
sock.send_to(journal_msg.as_bytes(), "/run/systemd/journal/socket");
}
});
}
fn write_daemon_to_file(&self, entry: &DaemonLogEntry) {
if let Some(ref path) = self.file_path {
if let Ok(mut file) =
OpenOptions::new().create(true).append(true).open(path)
{
let _ = writeln!(file, "{}", entry.to_json());
}
}
}
pub fn get_daemon_recent(&self, count: usize) -> Vec<DaemonLogEntry> {
let handles = self.daemon_buffer.lock();
handles.cons.iter().rev().take(count).cloned().collect()
}
pub fn get_recent(&self, count: usize) -> Vec<LogEntry> {
let handles = self.buffer.lock();
handles.cons.iter().rev().take(count).cloned().collect()
}
pub fn flush_to_file(&self, path: &Path) -> std::io::Result<()> {
let entries = self.get_recent(usize::MAX);
let mut file = fs::File::create(path)?;
for entry in entries {
writeln!(file, "{}", entry.to_json())?;
}
Ok(())
}
}
impl Default for RingBufferLogger {
fn default() -> Self {
Self::new(60, None, true, false)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeStats {
pub uptime_secs: u64,
pub total_collections: u64,
pub total_errors: u64,
pub last_collection_time_ms: u64,
pub avg_collection_time_ms: f64,
pub scanner_stats: HashMap<String, ScannerStats>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScannerStats {
pub collections: u64,
pub errors: u64,
pub last_error: Option<String>,
pub last_collection_time_ms: u64,
pub avg_collection_time_ms: f64,
}
pub struct RuntimeMonitor {
start_time: Instant,
total_collections: AtomicU64,
total_errors: AtomicU64,
last_collection_time: AtomicU64,
collection_time_sum: AtomicU64,
scanner_collections: parking_lot::Mutex<HashMap<String, AtomicU64>>,
scanner_errors: parking_lot::Mutex<HashMap<String, AtomicU64>>,
scanner_last_error: parking_lot::Mutex<HashMap<String, String>>,
scanner_last_time: parking_lot::Mutex<HashMap<String, AtomicU64>>,
scanner_time_sum: parking_lot::Mutex<HashMap<String, AtomicU64>>,
}
impl RuntimeMonitor {
pub fn new() -> Self {
Self {
start_time: Instant::now(),
total_collections: AtomicU64::new(0),
total_errors: AtomicU64::new(0),
last_collection_time: AtomicU64::new(0),
collection_time_sum: AtomicU64::new(0),
scanner_collections: parking_lot::Mutex::new(HashMap::new()),
scanner_errors: parking_lot::Mutex::new(HashMap::new()),
scanner_last_error: parking_lot::Mutex::new(HashMap::new()),
scanner_last_time: parking_lot::Mutex::new(HashMap::new()),
scanner_time_sum: parking_lot::Mutex::new(HashMap::new()),
}
}
pub fn record_collection(
&self,
scanner: &str,
time_ms: u64,
error: Option<&str>,
) {
self.total_collections.fetch_add(1, Ordering::Relaxed);
self.last_collection_time.store(time_ms, Ordering::Relaxed);
self
.collection_time_sum
.fetch_add(time_ms, Ordering::Relaxed);
let mut collections = self.scanner_collections.lock();
let collections = collections
.entry(scanner.to_string())
.or_insert_with(|| AtomicU64::new(0));
collections.fetch_add(1, Ordering::Relaxed);
let mut last_time = self.scanner_last_time.lock();
let last_time = last_time
.entry(scanner.to_string())
.or_insert_with(|| AtomicU64::new(0));
last_time.store(time_ms, Ordering::Relaxed);
let mut time_sum = self.scanner_time_sum.lock();
let time_sum = time_sum
.entry(scanner.to_string())
.or_insert_with(|| AtomicU64::new(0));
time_sum.fetch_add(time_ms, Ordering::Relaxed);
if let Some(err) = error {
self.total_errors.fetch_add(1, Ordering::Relaxed);
let mut errors = self.scanner_errors.lock();
let errors = errors
.entry(scanner.to_string())
.or_insert_with(|| AtomicU64::new(0));
errors.fetch_add(1, Ordering::Relaxed);
let mut last_error = self.scanner_last_error.lock();
last_error.insert(scanner.to_string(), err.to_string());
}
}
pub fn get_stats(&self) -> RuntimeStats {
let uptime = self.start_time.elapsed().as_secs();
let total = self.total_collections.load(Ordering::Relaxed);
let errors = self.total_errors.load(Ordering::Relaxed);
let last_time = self.last_collection_time.load(Ordering::Relaxed);
let sum_time = self.collection_time_sum.load(Ordering::Relaxed);
let avg = if total > 0 {
sum_time as f64 / total as f64
} else {
0.0
};
let mut scanner_stats = HashMap::new();
let collections = self.scanner_collections.lock();
let errors_map = self.scanner_errors.lock();
let last_error_map = self.scanner_last_error.lock();
let last_time_map = self.scanner_last_time.lock();
let time_sum_map = self.scanner_time_sum.lock();
for (name, coll) in collections.iter() {
let coll_count = coll.load(Ordering::Relaxed);
let err_count = errors_map
.get(name)
.map(|e| e.load(Ordering::Relaxed))
.unwrap_or(0);
let last_err = last_error_map.get(name).cloned();
let last_t = last_time_map
.get(name)
.map(|t| t.load(Ordering::Relaxed))
.unwrap_or(0);
let sum_t = time_sum_map
.get(name)
.map(|s| s.load(Ordering::Relaxed))
.unwrap_or(0);
let avg_t = if coll_count > 0 {
sum_t as f64 / coll_count as f64
} else {
0.0
};
scanner_stats.insert(name.clone(), ScannerStats {
collections: coll_count,
errors: err_count,
last_error: last_err,
last_collection_time_ms: last_t,
avg_collection_time_ms: avg_t,
});
}
RuntimeStats {
uptime_secs: uptime,
total_collections: total,
total_errors: errors,
last_collection_time_ms: last_time,
avg_collection_time_ms: avg,
scanner_stats,
}
}
}
impl Default for RuntimeMonitor {
fn default() -> Self {
Self::new()
}
}
pub struct Heartbeat {
path: PathBuf,
interval_secs: u64,
last_update: parking_lot::Mutex<Instant>,
}
impl Heartbeat {
pub fn new(path: PathBuf, interval_secs: u64) -> Self {
Self {
path,
interval_secs,
last_update: parking_lot::Mutex::new(Instant::now()),
}
}
pub fn touch(&self) -> std::io::Result<()> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
if let Some(parent) = self.path.parent() {
fs::create_dir_all(parent)?
}
let mut file = fs::File::create(&self.path)?;
writeln!(file, "{}", now)?;
*self.last_update.lock() = Instant::now();
Ok(())
}
pub fn is_stale(&self) -> bool {
self.last_update.lock().elapsed().as_secs() > self.interval_secs * 3
}
}
pub struct CrashDetector {
heartbeat: Heartbeat,
state_file: PathBuf,
}
impl CrashDetector {
pub fn new(state_dir: PathBuf) -> Self {
let heartbeat = Heartbeat::new(state_dir.join("heartbeat"), 5);
let state_file = state_dir.join("state");
Self {
heartbeat,
state_file,
}
}
pub fn new_with_interval(
state_dir: PathBuf,
heartbeat_interval_secs: u64,
) -> Self {
let heartbeat =
Heartbeat::new(state_dir.join("heartbeat"), heartbeat_interval_secs);
let state_file = state_dir.join("state");
Self {
heartbeat,
state_file,
}
}
pub fn write_state(&self, stats: &RuntimeStats) -> std::io::Result<()> {
if let Some(parent) = self.state_file.parent() {
fs::create_dir_all(parent)?
};
let json = serde_json::to_string_pretty(stats).unwrap_or_default();
fs::write(&self.state_file, json)
}
pub fn mark_running(&self) -> std::io::Result<()> {
self.heartbeat.touch()
}
pub fn mark_stopped(&self) -> std::io::Result<()> {
if let Some(parent) = self.state_file.parent() {
fs::create_dir_all(parent)?;
}
let json = serde_json::json!({
"status": "stopped",
"timestamp": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
});
fs::write(
&self.state_file,
serde_json::to_string_pretty(&json).unwrap_or_default(),
)
}
pub fn is_healthy(&self) -> bool {
if self.heartbeat.is_stale() {
return false;
}
if let Some(content) = fs::read_to_string(&self.state_file)
.and_then(|c| {
serde_json::from_str::<serde_json::Value>(&c)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
})
.ok()
.and_then(|state| {
state
.get("status")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
})
{
if content == "stopped" {
return false;
}
}
true
}
}