eris/lua: move to an event driven architechture

This commit is contained in:
raf 2025-05-02 07:27:05 +03:00
commit a4eedcbc26
Signed by: NotAShelf
GPG key ID: 29D95B64378DB4BF
2 changed files with 1110 additions and 362 deletions

File diff suppressed because it is too large Load diff

View file

@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::env;
use std::fs;
use std::hash::Hasher;
use std::io::Write;
use std::net::IpAddr;
use std::path::{Path, PathBuf};
@ -20,7 +21,7 @@ mod lua;
mod markov;
mod metrics;
use lua::ScriptManager;
use lua::{EventContext, EventType, ScriptManager};
use markov::MarkovGenerator;
use metrics::{
ACTIVE_CONNECTIONS, BLOCKED_IPS, HITS_COUNTER, PATH_HITS, UA_HITS, metrics_handler,
@ -413,6 +414,67 @@ fn extract_header_value(data: &[u8], header_name: &str) -> Option<String> {
None
}
// Extract all headers from request data
fn extract_all_headers(data: &[u8]) -> HashMap<String, String> {
let mut headers = HashMap::new();
if let Ok(data_str) = std::str::from_utf8(data) {
let mut lines = data_str.lines();
// Skip the request line
let _ = lines.next();
// Parse headers until empty line
for line in lines {
if line.is_empty() {
break;
}
if let Some(colon_pos) = line.find(':') {
let key = line[..colon_pos].trim().to_lowercase();
let value = line[colon_pos + 1..].trim().to_string();
headers.insert(key, value);
}
}
}
headers
}
// Determine response type based on request path
fn choose_response_type(path: &str) -> &'static str {
if path.contains("phpunit") || path.contains("eval") {
"php_exploit"
} else if path.contains("wp-") {
"wordpress"
} else if path.contains("api") {
"api"
} else {
"generic"
}
}
// Helper function to get current timestamp in seconds
fn get_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
// Create a unique session ID for tracking a connection
fn generate_session_id(ip: &str, user_agent: &str) -> String {
let timestamp = get_timestamp();
let random = rand::random::<u32>();
// Use std::hash instead of xxhash_rust
let mut hasher = std::collections::hash_map::DefaultHasher::new();
std::hash::Hash::hash(&format!("{ip}_{user_agent}_{timestamp}"), &mut hasher);
let hash = hasher.finish();
format!("SID_{hash:x}_{random:x}")
}
// Main connection handler.
// Decides whether to tarpit or proxy
async fn handle_connection(
@ -438,6 +500,13 @@ async fn handle_connection(
return;
}
// Check if Lua scripts allow this connection
if !script_manager.on_connection(&peer_addr.to_string()) {
log::debug!("Connection rejected by Lua script: {peer_addr}");
let _ = stream.shutdown().await;
return;
}
// Pre-check for whitelisted IPs to bypass heavy processing
let mut whitelisted = false;
for network_str in &config.whitelist_networks {
@ -512,14 +581,30 @@ async fn handle_connection(
return;
};
// Extract request headers for Lua scripts
let headers = extract_all_headers(&request_data);
// Extract user agent for logging and decision making
let user_agent =
extract_header_value(&request_data, "user-agent").unwrap_or_else(|| "unknown".to_string());
// Trigger request event for Lua scripts
let request_ctx = EventContext {
event_type: EventType::Request,
ip: Some(peer_addr.to_string()),
path: Some(path.to_string()),
user_agent: Some(user_agent.clone()),
request_headers: Some(headers.clone()),
content: None,
timestamp: get_timestamp(),
session_id: Some(generate_session_id(&peer_addr.to_string(), &user_agent)),
};
script_manager.trigger_event(&request_ctx);
// Check if this request matches our tarpit patterns
let should_tarpit = should_tarpit(path, &peer_addr, &config).await;
if should_tarpit {
// Extract minimal info needed for tarpit
let user_agent = extract_header_value(&request_data, "user-agent")
.unwrap_or_else(|| "unknown".to_string());
log::info!("Tarpit triggered: {path} from {peer_addr} (UA: {user_agent})");
// Update metrics
@ -536,8 +621,11 @@ async fn handle_connection(
*state.hits.entry(peer_addr).or_insert(0) += 1;
let hit_count = state.hits[&peer_addr];
// Use Lua to decide whether to block this IP
let should_block = script_manager.should_block_ip(&peer_addr.to_string(), hit_count);
// Block IPs that hit tarpits too many times
if hit_count >= config.block_threshold && !state.blocked.contains(&peer_addr) {
if should_block && !state.blocked.contains(&peer_addr) {
log::info!("Blocking IP {peer_addr} after {hit_count} hits");
state.blocked.insert(peer_addr);
BLOCKED_IPS.set(state.blocked.len() as f64);
@ -579,20 +667,116 @@ async fn handle_connection(
}
// Generate a deceptive response using Markov chains and Lua
let response =
generate_deceptive_response(path, &user_agent, &markov_generator, &script_manager)
.await;
let response = generate_deceptive_response(
path,
&user_agent,
&peer_addr,
&headers,
&markov_generator,
&script_manager,
)
.await;
// Generate a session ID for tracking this tarpit session
let session_id = generate_session_id(&peer_addr.to_string(), &user_agent);
// Send the response with the tarpit delay strategy
tarpit_connection(
stream,
response,
peer_addr,
state.clone(),
config.min_delay,
config.max_delay,
config.max_tarpit_time,
)
{
let mut stream = stream;
let peer_addr = peer_addr;
let state = state.clone();
let min_delay = config.min_delay;
let max_delay = config.max_delay;
let max_tarpit_time = config.max_tarpit_time;
let script_manager = script_manager.clone();
async move {
let start_time = Instant::now();
let mut chars = response.chars().collect::<Vec<_>>();
for i in (0..chars.len()).rev() {
if i > 0 && rand::random::<f32>() < 0.1 {
chars.swap(i, i - 1);
}
}
log::debug!(
"Starting tarpit for {} with {} chars, min_delay={}ms, max_delay={}ms",
peer_addr,
chars.len(),
min_delay,
max_delay
);
let mut position = 0;
let mut chunks_sent = 0;
let mut total_delay = 0;
while position < chars.len() {
// Check if we've exceeded maximum tarpit time
let elapsed_secs = start_time.elapsed().as_secs();
if elapsed_secs > max_tarpit_time {
log::info!(
"Tarpit maximum time ({max_tarpit_time} sec) reached for {peer_addr}"
);
break;
}
// Decide how many chars to send in this chunk (usually 1, sometimes more)
let chunk_size = if rand::random::<f32>() < 0.9 {
1
} else {
(rand::random::<f32>() * 3.0).floor() as usize + 1
};
let end = (position + chunk_size).min(chars.len());
let chunk: String = chars[position..end].iter().collect();
// Process chunk through Lua before sending
let processed_chunk =
script_manager.process_chunk(&chunk, &peer_addr.to_string(), &session_id);
// Try to write processed chunk
if stream.write_all(processed_chunk.as_bytes()).await.is_err() {
log::debug!("Connection closed by client during tarpit: {peer_addr}");
break;
}
if stream.flush().await.is_err() {
log::debug!("Failed to flush stream during tarpit: {peer_addr}");
break;
}
position = end;
chunks_sent += 1;
// Apply random delay between min and max configured values
let delay_ms =
(rand::random::<f32>() * (max_delay - min_delay) as f32) as u64 + min_delay;
total_delay += delay_ms;
sleep(Duration::from_millis(delay_ms)).await;
}
log::debug!(
"Tarpit stats for {}: sent {} chunks, {}% of data, total delay {}ms over {}s",
peer_addr,
chunks_sent,
position * 100 / chars.len(),
total_delay,
start_time.elapsed().as_secs()
);
let disconnection_ctx = EventContext {
event_type: EventType::Disconnection,
ip: Some(peer_addr.to_string()),
path: None,
user_agent: None,
request_headers: None,
content: None,
timestamp: get_timestamp(),
session_id: Some(session_id),
};
script_manager.trigger_event(&disconnection_ctx);
if let Ok(mut state) = state.try_write() {
state.active_connections.remove(&peer_addr);
ACTIVE_CONNECTIONS.set(state.active_connections.len() as f64);
}
let _ = stream.shutdown().await;
}
}
.await;
} else {
log::debug!("Proxying request: {path} from {peer_addr}");
@ -713,134 +897,25 @@ async fn should_tarpit(path: &str, ip: &IpAddr, config: &Config) -> bool {
async fn generate_deceptive_response(
path: &str,
user_agent: &str,
peer_addr: &IpAddr,
headers: &HashMap<String, String>,
markov: &MarkovGenerator,
script_manager: &ScriptManager,
) -> String {
// Choose response type based on path to seem more realistic
let response_type = if path.contains("phpunit") || path.contains("eval") {
"php_exploit"
} else if path.contains("wp-") {
"wordpress"
} else if path.contains("api") {
"api"
} else {
"generic"
};
log::debug!("Generating {response_type} response for path: {path}");
// Generate tracking token for this interaction
let tracking_token = format!(
"BOT_{}_{}",
user_agent
.chars()
.filter(|c| c.is_alphanumeric())
.collect::<String>(),
chrono::Utc::now().timestamp()
);
// Generate base response using Markov chain text generator
let response_type = choose_response_type(path);
let markov_text = markov.generate(response_type, 30);
// Use Lua to enhance with honeytokens and other deceptive content
let response_expanded =
script_manager.expand_response(&markov_text, response_type, path, &tracking_token);
// Return full HTTP response with appropriate headers
format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nX-Powered-By: PHP/7.4.3\r\nConnection: keep-alive\r\n\r\n{response_expanded}"
// Use Lua scripts to enhance with honeytokens and other deceptive content
script_manager.generate_response(
path,
user_agent,
&peer_addr.to_string(),
headers,
&markov_text,
)
}
// Slowly feed a response to the client with random delays to waste attacker time
async fn tarpit_connection(
mut stream: TcpStream,
response: String,
peer_addr: IpAddr,
state: Arc<RwLock<BotState>>,
min_delay: u64,
max_delay: u64,
max_tarpit_time: u64,
) {
let start_time = Instant::now();
let mut chars = response.chars().collect::<Vec<_>>();
// Randomize the char order slightly to confuse automated tools
for i in (0..chars.len()).rev() {
if i > 0 && rand::random::<f32>() < 0.1 {
chars.swap(i, i - 1);
}
}
log::debug!(
"Starting tarpit for {} with {} chars, min_delay={}ms, max_delay={}ms",
peer_addr,
chars.len(),
min_delay,
max_delay
);
let mut position = 0;
let mut chunks_sent = 0;
let mut total_delay = 0;
// Send the response character by character with random delays
while position < chars.len() {
// Check if we've exceeded maximum tarpit time
let elapsed_secs = start_time.elapsed().as_secs();
if elapsed_secs > max_tarpit_time {
log::info!("Tarpit maximum time ({max_tarpit_time} sec) reached for {peer_addr}");
break;
}
// Decide how many chars to send in this chunk (usually 1, sometimes more)
let chunk_size = if rand::random::<f32>() < 0.9 {
1
} else {
(rand::random::<f32>() * 3.0).floor() as usize + 1
};
let end = (position + chunk_size).min(chars.len());
let chunk: String = chars[position..end].iter().collect();
// Try to write chunk
if stream.write_all(chunk.as_bytes()).await.is_err() {
log::debug!("Connection closed by client during tarpit: {peer_addr}");
break;
}
if stream.flush().await.is_err() {
log::debug!("Failed to flush stream during tarpit: {peer_addr}");
break;
}
position = end;
chunks_sent += 1;
// Apply random delay between min and max configured values
let delay_ms = (rand::random::<f32>() * (max_delay - min_delay) as f32) as u64 + min_delay;
total_delay += delay_ms;
sleep(Duration::from_millis(delay_ms)).await;
}
log::debug!(
"Tarpit stats for {}: sent {} chunks, {}% of data, total delay {}ms over {}s",
peer_addr,
chunks_sent,
position * 100 / chars.len(),
total_delay,
start_time.elapsed().as_secs()
);
// Remove from active connections
if let Ok(mut state) = state.try_write() {
state.active_connections.remove(&peer_addr);
ACTIVE_CONNECTIONS.set(state.active_connections.len() as f64);
}
let _ = stream.shutdown().await;
}
// Set up nftables firewall rules for IP blocking
async fn setup_firewall() -> Result<(), String> {
log::info!("Setting up firewall rules");
@ -1059,6 +1134,8 @@ async fn main() -> std::io::Result<()> {
// Initialize Lua script manager
log::info!("Loading Lua scripts from {}", config.lua_scripts_dir);
let script_manager = Arc::new(ScriptManager::new(&config.lua_scripts_dir));
let script_manager_for_tarpit = script_manager.clone();
let script_manager_for_periodic = script_manager.clone();
// Clone config for metrics server
let metrics_config = config.clone();
@ -1083,7 +1160,7 @@ async fn main() -> std::io::Result<()> {
let state_clone = tarpit_state.clone();
let markov_clone = markov_generator.clone();
let script_manager_clone = script_manager.clone();
let script_manager_clone = script_manager_for_tarpit.clone();
let config_clone = config.clone();
tokio::spawn(async move {
@ -1138,6 +1215,27 @@ async fn main() -> std::io::Result<()> {
}
};
// Setup periodic task runner for Lua scripts
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
// Trigger periodic event
let ctx = EventContext {
event_type: EventType::Periodic,
ip: None,
path: None,
user_agent: None,
request_headers: None,
content: None,
timestamp: get_timestamp(),
session_id: None,
};
script_manager_for_periodic.trigger_event(&ctx);
}
});
// Run both servers concurrently if metrics server is enabled
if let Some(metrics_server) = metrics_server {
tokio::select! {
@ -1317,37 +1415,6 @@ mod tests {
}
}
#[tokio::test]
async fn test_generate_deceptive_response() {
// Create a simple markov generator for testing
let markov = MarkovGenerator::new("/nonexistent/path");
let script_manager = ScriptManager::new("/nonexistent/path");
// Test different path types
let resp1 = generate_deceptive_response(
"/vendor/phpunit/exec",
"TestBot/1.0",
&markov,
&script_manager,
)
.await;
assert!(resp1.contains("HTTP/1.1 200 OK"));
assert!(resp1.contains("X-Powered-By: PHP"));
let resp2 =
generate_deceptive_response("/wp-admin/", "TestBot/1.0", &markov, &script_manager)
.await;
assert!(resp2.contains("HTTP/1.1 200 OK"));
let resp3 =
generate_deceptive_response("/api/users", "TestBot/1.0", &markov, &script_manager)
.await;
assert!(resp3.contains("HTTP/1.1 200 OK"));
// Verify tracking token is included
assert!(resp1.contains("BOT_TestBot"));
}
#[test]
fn test_find_header_end() {
let data = b"GET / HTTP/1.1\r\nHost: example.com\r\nUser-Agent: test\r\n\r\nBody content";
@ -1379,4 +1446,31 @@ mod tests {
);
assert_eq!(extract_header_value(data, "nonexistent"), None);
}
#[test]
fn test_extract_all_headers() {
let data = b"GET / HTTP/1.1\r\nHost: example.com\r\nUser-Agent: TestBot/1.0\r\nAccept: */*\r\n\r\n";
let headers = extract_all_headers(data);
assert_eq!(headers.len(), 3);
assert_eq!(headers.get("host").unwrap(), "example.com");
assert_eq!(headers.get("user-agent").unwrap(), "TestBot/1.0");
assert_eq!(headers.get("accept").unwrap(), "*/*");
}
#[test]
fn test_choose_response_type() {
assert_eq!(
choose_response_type("/vendor/phpunit/whatever"),
"php_exploit"
);
assert_eq!(
choose_response_type("/path/to/eval-stdin.php"),
"php_exploit"
);
assert_eq!(choose_response_type("/wp-admin/login.php"), "wordpress");
assert_eq!(choose_response_type("/wp-login.php"), "wordpress");
assert_eq!(choose_response_type("/api/v1/users"), "api");
assert_eq!(choose_response_type("/index.html"), "generic");
}
}