Eris/src/network.rs

443 lines
16 KiB
Rust

use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::process::Command;
use tokio::sync::RwLock;
use tokio::time::sleep;
use crate::config::Config;
use crate::lua::{EventContext, EventType, ScriptManager};
use crate::markov::MarkovGenerator;
use crate::metrics::{ACTIVE_CONNECTIONS, BLOCKED_IPS, HITS_COUNTER, PATH_HITS, UA_HITS};
use crate::state::BotState;
use crate::utils::{
choose_response_type, extract_all_headers, extract_header_value, extract_path_from_request,
find_header_end, generate_session_id, get_timestamp,
};
// Main connection handler.
// Decides whether to tarpit or proxy
pub async fn handle_connection(
mut stream: TcpStream,
config: Arc<Config>,
state: Arc<RwLock<BotState>>,
markov_generator: Arc<MarkovGenerator>,
script_manager: Arc<ScriptManager>,
) {
// Get peer information
let peer_addr = match stream.peer_addr() {
Ok(addr) => addr.ip(),
Err(e) => {
log::debug!("Failed to get peer address: {e}");
return;
}
};
// Check for blocked IPs to avoid any processing
if state.read().await.blocked.contains(&peer_addr) {
log::debug!("Rejected connection from blocked IP: {peer_addr}");
let _ = stream.shutdown().await;
return;
}
// Check if Lua scripts allow this connection
if !script_manager.on_connection(&peer_addr.to_string()) {
log::debug!("Connection rejected by Lua script: {peer_addr}");
let _ = stream.shutdown().await;
return;
}
// Pre-check for whitelisted IPs to bypass heavy processing
let mut whitelisted = false;
for network_str in &config.whitelist_networks {
if let Ok(network) = network_str.parse::<ipnetwork::IpNetwork>() {
if network.contains(peer_addr) {
whitelisted = true;
break;
}
}
}
// Read buffer
let mut buffer = vec![0; 8192];
let mut request_data = Vec::with_capacity(8192);
let mut header_end_pos = 0;
// Read with timeout to prevent hanging resource load ops.
let read_fut = async {
loop {
match stream.read(&mut buffer).await {
Ok(0) => break,
Ok(n) => {
let new_data = &buffer[..n];
request_data.extend_from_slice(new_data);
// Look for end of headers
if header_end_pos == 0 {
if let Some(pos) = find_header_end(&request_data) {
header_end_pos = pos;
break;
}
}
// Avoid excessive buffering
if request_data.len() > 32768 {
break;
}
}
Err(e) => {
log::debug!("Error reading from stream: {e}");
break;
}
}
}
};
let timeout_fut = sleep(Duration::from_secs(3));
tokio::select! {
() = read_fut => {},
() = timeout_fut => {
log::debug!("Connection timeout from: {peer_addr}");
let _ = stream.shutdown().await;
return;
}
}
// Fast path for whitelisted IPs. Skip full parsing and speed up "approved"
// connections automatically.
if whitelisted {
log::debug!("Whitelisted IP {peer_addr} - using fast proxy path");
proxy_fast_path(stream, request_data, &config.backend_addr).await;
return;
}
// Parse minimally to extract the path
let path = if let Some(p) = extract_path_from_request(&request_data) {
p
} else {
log::debug!("Invalid request from {peer_addr}");
let _ = stream.shutdown().await;
return;
};
// Extract request headers for Lua scripts
let headers = extract_all_headers(&request_data);
// Extract user agent for logging and decision making
let user_agent =
extract_header_value(&request_data, "user-agent").unwrap_or_else(|| "unknown".to_string());
// Trigger request event for Lua scripts
let request_ctx = EventContext {
event_type: EventType::Request,
ip: Some(peer_addr.to_string()),
path: Some(path.to_string()),
user_agent: Some(user_agent.clone()),
request_headers: Some(headers.clone()),
content: None,
timestamp: get_timestamp(),
session_id: Some(generate_session_id(&peer_addr.to_string(), &user_agent)),
};
script_manager.trigger_event(&request_ctx);
// Check if this request matches our tarpit patterns
let should_tarpit = crate::config::should_tarpit(path, &peer_addr, &config);
if should_tarpit {
log::info!("Tarpit triggered: {path} from {peer_addr} (UA: {user_agent})");
// Update metrics
HITS_COUNTER.inc();
PATH_HITS.with_label_values(&[path]).inc();
UA_HITS.with_label_values(&[&user_agent]).inc();
// Update state and check for blocking threshold
{
let mut state = state.write().await;
state.active_connections.insert(peer_addr);
ACTIVE_CONNECTIONS.set(state.active_connections.len() as f64);
*state.hits.entry(peer_addr).or_insert(0) += 1;
let hit_count = state.hits[&peer_addr];
// Use Lua to decide whether to block this IP
let should_block = script_manager.should_block_ip(&peer_addr.to_string(), hit_count);
// Block IPs that hit tarpits too many times
if should_block && !state.blocked.contains(&peer_addr) {
log::info!("Blocking IP {peer_addr} after {hit_count} hits");
state.blocked.insert(peer_addr);
BLOCKED_IPS.set(state.blocked.len() as f64);
state.save_to_disk();
// Do firewall blocking in background
let peer_addr_str = peer_addr.to_string();
tokio::spawn(async move {
log::debug!("Adding IP {peer_addr_str} to firewall blacklist");
match Command::new("nft")
.args([
"add",
"element",
"inet",
"filter",
"eris_blacklist",
"{",
&peer_addr_str,
"}",
])
.output()
.await
{
Ok(output) => {
if !output.status.success() {
log::warn!(
"Failed to add IP {} to firewall: {}",
peer_addr_str,
String::from_utf8_lossy(&output.stderr)
);
}
}
Err(e) => {
log::warn!("Failed to execute nft command: {e}");
}
}
});
}
}
// Generate a deceptive response using Markov chains and Lua
let response = generate_deceptive_response(
path,
&user_agent,
&peer_addr,
&headers,
&markov_generator,
&script_manager,
)
.await;
// Generate a session ID for tracking this tarpit session
let session_id = generate_session_id(&peer_addr.to_string(), &user_agent);
// Send the response with the tarpit delay strategy
{
let mut stream = stream;
let peer_addr = peer_addr;
let state = state.clone();
let min_delay = config.min_delay;
let max_delay = config.max_delay;
let max_tarpit_time = config.max_tarpit_time;
let script_manager = script_manager.clone();
async move {
let start_time = Instant::now();
let mut chars = response.chars().collect::<Vec<_>>();
for i in (0..chars.len()).rev() {
if i > 0 && rand::random::<f32>() < 0.1 {
chars.swap(i, i - 1);
}
}
log::debug!(
"Starting tarpit for {} with {} chars, min_delay={}ms, max_delay={}ms",
peer_addr,
chars.len(),
min_delay,
max_delay
);
let mut position = 0;
let mut chunks_sent = 0;
let mut total_delay = 0;
while position < chars.len() {
// Check if we've exceeded maximum tarpit time
let elapsed_secs = start_time.elapsed().as_secs();
if elapsed_secs > max_tarpit_time {
log::info!(
"Tarpit maximum time ({max_tarpit_time} sec) reached for {peer_addr}"
);
break;
}
// Decide how many chars to send in this chunk (usually 1, sometimes more)
let chunk_size = if rand::random::<f32>() < 0.9 {
1
} else {
(rand::random::<f32>() * 3.0).floor() as usize + 1
};
let end = (position + chunk_size).min(chars.len());
let chunk: String = chars[position..end].iter().collect();
// Process chunk through Lua before sending
let processed_chunk =
script_manager.process_chunk(&chunk, &peer_addr.to_string(), &session_id);
// Try to write processed chunk
if stream.write_all(processed_chunk.as_bytes()).await.is_err() {
log::debug!("Connection closed by client during tarpit: {peer_addr}");
break;
}
if stream.flush().await.is_err() {
log::debug!("Failed to flush stream during tarpit: {peer_addr}");
break;
}
position = end;
chunks_sent += 1;
// Apply random delay between min and max configured values
let delay_ms =
(rand::random::<f32>() * (max_delay - min_delay) as f32) as u64 + min_delay;
total_delay += delay_ms;
sleep(Duration::from_millis(delay_ms)).await;
}
log::debug!(
"Tarpit stats for {}: sent {} chunks, {}% of data, total delay {}ms over {}s",
peer_addr,
chunks_sent,
position * 100 / chars.len(),
total_delay,
start_time.elapsed().as_secs()
);
let disconnection_ctx = EventContext {
event_type: EventType::Disconnection,
ip: Some(peer_addr.to_string()),
path: None,
user_agent: None,
request_headers: None,
content: None,
timestamp: get_timestamp(),
session_id: Some(session_id),
};
script_manager.trigger_event(&disconnection_ctx);
if let Ok(mut state) = state.try_write() {
state.active_connections.remove(&peer_addr);
ACTIVE_CONNECTIONS.set(state.active_connections.len() as f64);
}
let _ = stream.shutdown().await;
}
}
.await;
} else {
log::debug!("Proxying request: {path} from {peer_addr}");
proxy_fast_path(stream, request_data, &config.backend_addr).await;
}
}
// Forward a legitimate request to the real backend server
pub async fn proxy_fast_path(
mut client_stream: TcpStream,
request_data: Vec<u8>,
backend_addr: &str,
) {
// Connect to backend server
let server_stream = match TcpStream::connect(backend_addr).await {
Ok(stream) => stream,
Err(e) => {
log::warn!("Failed to connect to backend {backend_addr}: {e}");
let _ = client_stream.shutdown().await;
return;
}
};
// Set TCP_NODELAY for both streams before splitting them
if let Err(e) = client_stream.set_nodelay(true) {
log::debug!("Failed to set TCP_NODELAY on client stream: {e}");
}
let mut server_stream = server_stream;
if let Err(e) = server_stream.set_nodelay(true) {
log::debug!("Failed to set TCP_NODELAY on server stream: {e}");
}
// Forward the original request bytes directly without parsing
if server_stream.write_all(&request_data).await.is_err() {
log::debug!("Failed to write request to backend server");
let _ = client_stream.shutdown().await;
return;
}
// Now split the streams for concurrent reading/writing
let (mut client_read, mut client_write) = client_stream.split();
let (mut server_read, mut server_write) = server_stream.split();
// 32KB buffer
let buf_size = 32768;
// Client -> Server
let client_to_server = async {
let mut buf = vec![0; buf_size];
let mut bytes_forwarded = 0;
loop {
match client_read.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
bytes_forwarded += n;
if server_write.write_all(&buf[..n]).await.is_err() {
break;
}
}
Err(_) => break,
}
}
// Ensure everything is sent
let _ = server_write.flush().await;
log::debug!("Client -> Server: forwarded {bytes_forwarded} bytes");
};
// Server -> Client
let server_to_client = async {
let mut buf = vec![0; buf_size];
let mut bytes_forwarded = 0;
loop {
match server_read.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
bytes_forwarded += n;
if client_write.write_all(&buf[..n]).await.is_err() {
break;
}
}
Err(_) => break,
}
}
// Ensure everything is sent
let _ = client_write.flush().await;
log::debug!("Server -> Client: forwarded {bytes_forwarded} bytes");
};
// Run both directions concurrently
tokio::join!(client_to_server, server_to_client);
log::debug!("Fast proxy connection completed");
}
// Generate a deceptive HTTP response that appears legitimate
pub async fn generate_deceptive_response(
path: &str,
user_agent: &str,
peer_addr: &IpAddr,
headers: &HashMap<String, String>,
markov: &MarkovGenerator,
script_manager: &ScriptManager,
) -> String {
// Generate base response using Markov chain text generator
let response_type = choose_response_type(path);
let markov_text = markov.generate(response_type, 30);
// Use Lua scripts to enhance with honeytokens and other deceptive content
script_manager.generate_response(
path,
user_agent,
&peer_addr.to_string(),
headers,
&markov_text,
)
}