eris: modulerize; fix some lints
This commit is contained in:
parent
f40c4a6ea0
commit
37e57fa015
6 changed files with 1449 additions and 1408 deletions
443
src/network.rs
Normal file
443
src/network.rs
Normal file
|
@ -0,0 +1,443 @@
|
|||
use std::collections::HashMap;
|
||||
use std::net::IpAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::process::Command;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::lua::{EventContext, EventType, ScriptManager};
|
||||
use crate::markov::MarkovGenerator;
|
||||
use crate::metrics::{ACTIVE_CONNECTIONS, BLOCKED_IPS, HITS_COUNTER, PATH_HITS, UA_HITS};
|
||||
use crate::state::BotState;
|
||||
use crate::utils::{
|
||||
choose_response_type, extract_all_headers, extract_header_value, extract_path_from_request,
|
||||
find_header_end, generate_session_id, get_timestamp,
|
||||
};
|
||||
|
||||
// Main connection handler.
|
||||
// Decides whether to tarpit or proxy
|
||||
pub async fn handle_connection(
|
||||
mut stream: TcpStream,
|
||||
config: Arc<Config>,
|
||||
state: Arc<RwLock<BotState>>,
|
||||
markov_generator: Arc<MarkovGenerator>,
|
||||
script_manager: Arc<ScriptManager>,
|
||||
) {
|
||||
// Get peer information
|
||||
let peer_addr = match stream.peer_addr() {
|
||||
Ok(addr) => addr.ip(),
|
||||
Err(e) => {
|
||||
log::debug!("Failed to get peer address: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Check for blocked IPs to avoid any processing
|
||||
if state.read().await.blocked.contains(&peer_addr) {
|
||||
log::debug!("Rejected connection from blocked IP: {peer_addr}");
|
||||
let _ = stream.shutdown().await;
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if Lua scripts allow this connection
|
||||
if !script_manager.on_connection(&peer_addr.to_string()) {
|
||||
log::debug!("Connection rejected by Lua script: {peer_addr}");
|
||||
let _ = stream.shutdown().await;
|
||||
return;
|
||||
}
|
||||
|
||||
// Pre-check for whitelisted IPs to bypass heavy processing
|
||||
let mut whitelisted = false;
|
||||
for network_str in &config.whitelist_networks {
|
||||
if let Ok(network) = network_str.parse::<ipnetwork::IpNetwork>() {
|
||||
if network.contains(peer_addr) {
|
||||
whitelisted = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Read buffer
|
||||
let mut buffer = vec![0; 8192];
|
||||
let mut request_data = Vec::with_capacity(8192);
|
||||
let mut header_end_pos = 0;
|
||||
|
||||
// Read with timeout to prevent hanging resource load ops.
|
||||
let read_fut = async {
|
||||
loop {
|
||||
match stream.read(&mut buffer).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
let new_data = &buffer[..n];
|
||||
request_data.extend_from_slice(new_data);
|
||||
|
||||
// Look for end of headers
|
||||
if header_end_pos == 0 {
|
||||
if let Some(pos) = find_header_end(&request_data) {
|
||||
header_end_pos = pos;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Avoid excessive buffering
|
||||
if request_data.len() > 32768 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::debug!("Error reading from stream: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let timeout_fut = sleep(Duration::from_secs(3));
|
||||
|
||||
tokio::select! {
|
||||
() = read_fut => {},
|
||||
() = timeout_fut => {
|
||||
log::debug!("Connection timeout from: {peer_addr}");
|
||||
let _ = stream.shutdown().await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Fast path for whitelisted IPs. Skip full parsing and speed up "approved"
|
||||
// connections automatically.
|
||||
if whitelisted {
|
||||
log::debug!("Whitelisted IP {peer_addr} - using fast proxy path");
|
||||
proxy_fast_path(stream, request_data, &config.backend_addr).await;
|
||||
return;
|
||||
}
|
||||
|
||||
// Parse minimally to extract the path
|
||||
let path = if let Some(p) = extract_path_from_request(&request_data) {
|
||||
p
|
||||
} else {
|
||||
log::debug!("Invalid request from {peer_addr}");
|
||||
let _ = stream.shutdown().await;
|
||||
return;
|
||||
};
|
||||
|
||||
// Extract request headers for Lua scripts
|
||||
let headers = extract_all_headers(&request_data);
|
||||
|
||||
// Extract user agent for logging and decision making
|
||||
let user_agent =
|
||||
extract_header_value(&request_data, "user-agent").unwrap_or_else(|| "unknown".to_string());
|
||||
|
||||
// Trigger request event for Lua scripts
|
||||
let request_ctx = EventContext {
|
||||
event_type: EventType::Request,
|
||||
ip: Some(peer_addr.to_string()),
|
||||
path: Some(path.to_string()),
|
||||
user_agent: Some(user_agent.clone()),
|
||||
request_headers: Some(headers.clone()),
|
||||
content: None,
|
||||
timestamp: get_timestamp(),
|
||||
session_id: Some(generate_session_id(&peer_addr.to_string(), &user_agent)),
|
||||
};
|
||||
script_manager.trigger_event(&request_ctx);
|
||||
|
||||
// Check if this request matches our tarpit patterns
|
||||
let should_tarpit = crate::config::should_tarpit(path, &peer_addr, &config);
|
||||
|
||||
if should_tarpit {
|
||||
log::info!("Tarpit triggered: {path} from {peer_addr} (UA: {user_agent})");
|
||||
|
||||
// Update metrics
|
||||
HITS_COUNTER.inc();
|
||||
PATH_HITS.with_label_values(&[path]).inc();
|
||||
UA_HITS.with_label_values(&[&user_agent]).inc();
|
||||
|
||||
// Update state and check for blocking threshold
|
||||
{
|
||||
let mut state = state.write().await;
|
||||
state.active_connections.insert(peer_addr);
|
||||
ACTIVE_CONNECTIONS.set(state.active_connections.len() as f64);
|
||||
|
||||
*state.hits.entry(peer_addr).or_insert(0) += 1;
|
||||
let hit_count = state.hits[&peer_addr];
|
||||
|
||||
// Use Lua to decide whether to block this IP
|
||||
let should_block = script_manager.should_block_ip(&peer_addr.to_string(), hit_count);
|
||||
|
||||
// Block IPs that hit tarpits too many times
|
||||
if should_block && !state.blocked.contains(&peer_addr) {
|
||||
log::info!("Blocking IP {peer_addr} after {hit_count} hits");
|
||||
state.blocked.insert(peer_addr);
|
||||
BLOCKED_IPS.set(state.blocked.len() as f64);
|
||||
state.save_to_disk();
|
||||
|
||||
// Do firewall blocking in background
|
||||
let peer_addr_str = peer_addr.to_string();
|
||||
tokio::spawn(async move {
|
||||
log::debug!("Adding IP {peer_addr_str} to firewall blacklist");
|
||||
match Command::new("nft")
|
||||
.args([
|
||||
"add",
|
||||
"element",
|
||||
"inet",
|
||||
"filter",
|
||||
"eris_blacklist",
|
||||
"{",
|
||||
&peer_addr_str,
|
||||
"}",
|
||||
])
|
||||
.output()
|
||||
.await
|
||||
{
|
||||
Ok(output) => {
|
||||
if !output.status.success() {
|
||||
log::warn!(
|
||||
"Failed to add IP {} to firewall: {}",
|
||||
peer_addr_str,
|
||||
String::from_utf8_lossy(&output.stderr)
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Failed to execute nft command: {e}");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Generate a deceptive response using Markov chains and Lua
|
||||
let response = generate_deceptive_response(
|
||||
path,
|
||||
&user_agent,
|
||||
&peer_addr,
|
||||
&headers,
|
||||
&markov_generator,
|
||||
&script_manager,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Generate a session ID for tracking this tarpit session
|
||||
let session_id = generate_session_id(&peer_addr.to_string(), &user_agent);
|
||||
|
||||
// Send the response with the tarpit delay strategy
|
||||
{
|
||||
let mut stream = stream;
|
||||
let peer_addr = peer_addr;
|
||||
let state = state.clone();
|
||||
let min_delay = config.min_delay;
|
||||
let max_delay = config.max_delay;
|
||||
let max_tarpit_time = config.max_tarpit_time;
|
||||
let script_manager = script_manager.clone();
|
||||
async move {
|
||||
let start_time = Instant::now();
|
||||
let mut chars = response.chars().collect::<Vec<_>>();
|
||||
for i in (0..chars.len()).rev() {
|
||||
if i > 0 && rand::random::<f32>() < 0.1 {
|
||||
chars.swap(i, i - 1);
|
||||
}
|
||||
}
|
||||
log::debug!(
|
||||
"Starting tarpit for {} with {} chars, min_delay={}ms, max_delay={}ms",
|
||||
peer_addr,
|
||||
chars.len(),
|
||||
min_delay,
|
||||
max_delay
|
||||
);
|
||||
let mut position = 0;
|
||||
let mut chunks_sent = 0;
|
||||
let mut total_delay = 0;
|
||||
while position < chars.len() {
|
||||
// Check if we've exceeded maximum tarpit time
|
||||
let elapsed_secs = start_time.elapsed().as_secs();
|
||||
if elapsed_secs > max_tarpit_time {
|
||||
log::info!(
|
||||
"Tarpit maximum time ({max_tarpit_time} sec) reached for {peer_addr}"
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
// Decide how many chars to send in this chunk (usually 1, sometimes more)
|
||||
let chunk_size = if rand::random::<f32>() < 0.9 {
|
||||
1
|
||||
} else {
|
||||
(rand::random::<f32>() * 3.0).floor() as usize + 1
|
||||
};
|
||||
|
||||
let end = (position + chunk_size).min(chars.len());
|
||||
let chunk: String = chars[position..end].iter().collect();
|
||||
|
||||
// Process chunk through Lua before sending
|
||||
let processed_chunk =
|
||||
script_manager.process_chunk(&chunk, &peer_addr.to_string(), &session_id);
|
||||
|
||||
// Try to write processed chunk
|
||||
if stream.write_all(processed_chunk.as_bytes()).await.is_err() {
|
||||
log::debug!("Connection closed by client during tarpit: {peer_addr}");
|
||||
break;
|
||||
}
|
||||
|
||||
if stream.flush().await.is_err() {
|
||||
log::debug!("Failed to flush stream during tarpit: {peer_addr}");
|
||||
break;
|
||||
}
|
||||
|
||||
position = end;
|
||||
chunks_sent += 1;
|
||||
|
||||
// Apply random delay between min and max configured values
|
||||
let delay_ms =
|
||||
(rand::random::<f32>() * (max_delay - min_delay) as f32) as u64 + min_delay;
|
||||
total_delay += delay_ms;
|
||||
sleep(Duration::from_millis(delay_ms)).await;
|
||||
}
|
||||
log::debug!(
|
||||
"Tarpit stats for {}: sent {} chunks, {}% of data, total delay {}ms over {}s",
|
||||
peer_addr,
|
||||
chunks_sent,
|
||||
position * 100 / chars.len(),
|
||||
total_delay,
|
||||
start_time.elapsed().as_secs()
|
||||
);
|
||||
let disconnection_ctx = EventContext {
|
||||
event_type: EventType::Disconnection,
|
||||
ip: Some(peer_addr.to_string()),
|
||||
path: None,
|
||||
user_agent: None,
|
||||
request_headers: None,
|
||||
content: None,
|
||||
timestamp: get_timestamp(),
|
||||
session_id: Some(session_id),
|
||||
};
|
||||
script_manager.trigger_event(&disconnection_ctx);
|
||||
if let Ok(mut state) = state.try_write() {
|
||||
state.active_connections.remove(&peer_addr);
|
||||
ACTIVE_CONNECTIONS.set(state.active_connections.len() as f64);
|
||||
}
|
||||
let _ = stream.shutdown().await;
|
||||
}
|
||||
}
|
||||
.await;
|
||||
} else {
|
||||
log::debug!("Proxying request: {path} from {peer_addr}");
|
||||
proxy_fast_path(stream, request_data, &config.backend_addr).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Forward a legitimate request to the real backend server
|
||||
pub async fn proxy_fast_path(
|
||||
mut client_stream: TcpStream,
|
||||
request_data: Vec<u8>,
|
||||
backend_addr: &str,
|
||||
) {
|
||||
// Connect to backend server
|
||||
let server_stream = match TcpStream::connect(backend_addr).await {
|
||||
Ok(stream) => stream,
|
||||
Err(e) => {
|
||||
log::warn!("Failed to connect to backend {backend_addr}: {e}");
|
||||
let _ = client_stream.shutdown().await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Set TCP_NODELAY for both streams before splitting them
|
||||
if let Err(e) = client_stream.set_nodelay(true) {
|
||||
log::debug!("Failed to set TCP_NODELAY on client stream: {e}");
|
||||
}
|
||||
|
||||
let mut server_stream = server_stream;
|
||||
if let Err(e) = server_stream.set_nodelay(true) {
|
||||
log::debug!("Failed to set TCP_NODELAY on server stream: {e}");
|
||||
}
|
||||
|
||||
// Forward the original request bytes directly without parsing
|
||||
if server_stream.write_all(&request_data).await.is_err() {
|
||||
log::debug!("Failed to write request to backend server");
|
||||
let _ = client_stream.shutdown().await;
|
||||
return;
|
||||
}
|
||||
|
||||
// Now split the streams for concurrent reading/writing
|
||||
let (mut client_read, mut client_write) = client_stream.split();
|
||||
let (mut server_read, mut server_write) = server_stream.split();
|
||||
|
||||
// 32KB buffer
|
||||
let buf_size = 32768;
|
||||
|
||||
// Client -> Server
|
||||
let client_to_server = async {
|
||||
let mut buf = vec![0; buf_size];
|
||||
let mut bytes_forwarded = 0;
|
||||
|
||||
loop {
|
||||
match client_read.read(&mut buf).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
bytes_forwarded += n;
|
||||
if server_write.write_all(&buf[..n]).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure everything is sent
|
||||
let _ = server_write.flush().await;
|
||||
log::debug!("Client -> Server: forwarded {bytes_forwarded} bytes");
|
||||
};
|
||||
|
||||
// Server -> Client
|
||||
let server_to_client = async {
|
||||
let mut buf = vec![0; buf_size];
|
||||
let mut bytes_forwarded = 0;
|
||||
|
||||
loop {
|
||||
match server_read.read(&mut buf).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
bytes_forwarded += n;
|
||||
if client_write.write_all(&buf[..n]).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure everything is sent
|
||||
let _ = client_write.flush().await;
|
||||
log::debug!("Server -> Client: forwarded {bytes_forwarded} bytes");
|
||||
};
|
||||
|
||||
// Run both directions concurrently
|
||||
tokio::join!(client_to_server, server_to_client);
|
||||
log::debug!("Fast proxy connection completed");
|
||||
}
|
||||
|
||||
// Generate a deceptive HTTP response that appears legitimate
|
||||
pub async fn generate_deceptive_response(
|
||||
path: &str,
|
||||
user_agent: &str,
|
||||
peer_addr: &IpAddr,
|
||||
headers: &HashMap<String, String>,
|
||||
markov: &MarkovGenerator,
|
||||
script_manager: &ScriptManager,
|
||||
) -> String {
|
||||
// Generate base response using Markov chain text generator
|
||||
let response_type = choose_response_type(path);
|
||||
let markov_text = markov.generate(response_type, 30);
|
||||
|
||||
// Use Lua scripts to enhance with honeytokens and other deceptive content
|
||||
script_manager.generate_response(
|
||||
path,
|
||||
user_agent,
|
||||
&peer_addr.to_string(),
|
||||
headers,
|
||||
&markov_text,
|
||||
)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue