pinakes/crates/pinakes-core/src/cache.rs
NotAShelf 3d9f8933d2
pinakes-core: update remaining modules and tests
Signed-off-by: NotAShelf <raf@notashelf.dev>
Change-Id: I9e0ff5ea33a5cf697473423e88f167ce6a6a6964
2026-03-08 00:43:30 +03:00

542 lines
13 KiB
Rust

//! High-performance caching layer using moka.
//!
//! This module provides a comprehensive caching solution with:
//! - LRU eviction with configurable size limits
//! - TTL-based expiration
//! - Smart cache invalidation
//! - Metrics tracking (hit rate, size, evictions)
//! - Specialized caches for different data types
use std::{
hash::Hash,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use moka::future::Cache as MokaCache;
use crate::model::MediaId;
/// Cache statistics for monitoring and debugging.
#[derive(Debug, Clone, Default)]
pub struct CacheStats {
pub hits: u64,
pub misses: u64,
pub evictions: u64,
pub size: u64,
}
impl CacheStats {
#[must_use]
pub fn hit_rate(&self) -> f64 {
let total = self.hits + self.misses;
// Compute ratio using integer arithmetic: hits * 10000 / total gives basis
// points (0..=10000), then scale back to [0.0, 1.0]. Returns 0.0 if total
// is zero.
let basis_points = self
.hits
.saturating_mul(10_000)
.checked_div(total)
.unwrap_or(0);
f64::from(u32::try_from(basis_points).unwrap_or(u32::MAX)) / 10_000.0
}
}
/// Atomic counters for cache metrics.
struct CacheMetrics {
hits: AtomicU64,
misses: AtomicU64,
}
impl Default for CacheMetrics {
fn default() -> Self {
Self {
hits: AtomicU64::new(0),
misses: AtomicU64::new(0),
}
}
}
impl CacheMetrics {
fn record_hit(&self) {
self.hits.fetch_add(1, Ordering::Relaxed);
}
fn record_miss(&self) {
self.misses.fetch_add(1, Ordering::Relaxed);
}
fn stats(&self) -> (u64, u64) {
(
self.hits.load(Ordering::Relaxed),
self.misses.load(Ordering::Relaxed),
)
}
}
/// A high-performance cache with LRU eviction and TTL support.
pub struct Cache<K, V>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
inner: MokaCache<K, V>,
metrics: Arc<CacheMetrics>,
}
impl<K, V> Cache<K, V>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
/// Create a new cache with the specified TTL and maximum capacity.
#[must_use]
pub fn new(ttl: Duration, max_capacity: u64) -> Self {
let inner = MokaCache::builder()
.time_to_live(ttl)
.max_capacity(max_capacity)
.build();
Self {
inner,
metrics: Arc::new(CacheMetrics::default()),
}
}
/// Create a new cache with TTL, max capacity, and time-to-idle.
#[must_use]
pub fn new_with_idle(
ttl: Duration,
tti: Duration,
max_capacity: u64,
) -> Self {
let inner = MokaCache::builder()
.time_to_live(ttl)
.time_to_idle(tti)
.max_capacity(max_capacity)
.build();
Self {
inner,
metrics: Arc::new(CacheMetrics::default()),
}
}
/// Get a value from the cache.
pub async fn get(&self, key: &K) -> Option<V> {
self.inner.get(key).await.map_or_else(
|| {
self.metrics.record_miss();
None
},
|value| {
self.metrics.record_hit();
Some(value)
},
)
}
/// Insert a value into the cache.
pub async fn insert(&self, key: K, value: V) {
self.inner.insert(key, value).await;
}
/// Remove a specific key from the cache.
pub async fn invalidate(&self, key: &K) {
self.inner.invalidate(key).await;
}
/// Clear all entries from the cache.
pub async fn invalidate_all(&self) {
self.inner.invalidate_all();
// Run pending tasks to ensure immediate invalidation
self.inner.run_pending_tasks().await;
}
/// Get the current number of entries in the cache.
#[must_use]
pub fn entry_count(&self) -> u64 {
self.inner.entry_count()
}
/// Get cache statistics.
#[must_use]
pub fn stats(&self) -> CacheStats {
let (hits, misses) = self.metrics.stats();
CacheStats {
hits,
misses,
evictions: 0, // Moka doesn't expose this directly
size: self.entry_count(),
}
}
}
/// Specialized cache for search query results.
pub struct QueryCache {
/// Cache keyed by (`query_hash`, offset, limit)
inner: Cache<String, String>,
}
impl QueryCache {
#[must_use]
pub fn new(ttl: Duration, max_capacity: u64) -> Self {
Self {
inner: Cache::new(ttl, max_capacity),
}
}
/// Generate a cache key from query parameters.
fn make_key(
query: &str,
offset: u64,
limit: u64,
sort: Option<&str>,
) -> String {
use std::hash::{DefaultHasher, Hasher};
let mut hasher = DefaultHasher::new();
hasher.write(query.as_bytes());
hasher.write(&offset.to_le_bytes());
hasher.write(&limit.to_le_bytes());
if let Some(s) = sort {
hasher.write(s.as_bytes());
}
format!("q:{:016x}", hasher.finish())
}
pub async fn get(
&self,
query: &str,
offset: u64,
limit: u64,
sort: Option<&str>,
) -> Option<String> {
let key = Self::make_key(query, offset, limit, sort);
self.inner.get(&key).await
}
pub async fn insert(
&self,
query: &str,
offset: u64,
limit: u64,
sort: Option<&str>,
result: String,
) {
let key = Self::make_key(query, offset, limit, sort);
self.inner.insert(key, result).await;
}
pub async fn invalidate_all(&self) {
self.inner.invalidate_all().await;
}
#[must_use]
pub fn stats(&self) -> CacheStats {
self.inner.stats()
}
}
/// Specialized cache for metadata extraction results.
pub struct MetadataCache {
/// Cache keyed by content hash
inner: Cache<String, String>,
}
impl MetadataCache {
#[must_use]
pub fn new(ttl: Duration, max_capacity: u64) -> Self {
Self {
inner: Cache::new(ttl, max_capacity),
}
}
pub async fn get(&self, content_hash: &str) -> Option<String> {
self.inner.get(&content_hash.to_string()).await
}
pub async fn insert(&self, content_hash: &str, metadata_json: String) {
self
.inner
.insert(content_hash.to_string(), metadata_json)
.await;
}
pub async fn invalidate(&self, content_hash: &str) {
self.inner.invalidate(&content_hash.to_string()).await;
}
#[must_use]
pub fn stats(&self) -> CacheStats {
self.inner.stats()
}
}
/// Specialized cache for media item data.
pub struct MediaCache {
inner: Cache<String, String>,
}
impl MediaCache {
#[must_use]
pub fn new(ttl: Duration, max_capacity: u64) -> Self {
Self {
inner: Cache::new(ttl, max_capacity),
}
}
pub async fn get(&self, media_id: MediaId) -> Option<String> {
self.inner.get(&media_id.to_string()).await
}
pub async fn insert(&self, media_id: MediaId, item_json: String) {
self.inner.insert(media_id.to_string(), item_json).await;
}
pub async fn invalidate(&self, media_id: MediaId) {
self.inner.invalidate(&media_id.to_string()).await;
}
pub async fn invalidate_all(&self) {
self.inner.invalidate_all().await;
}
#[must_use]
pub fn stats(&self) -> CacheStats {
self.inner.stats()
}
}
/// Configuration for the cache layer.
#[derive(Debug, Clone)]
pub struct CacheConfig {
/// TTL for response cache in seconds
pub response_ttl_secs: u64,
/// Maximum number of cached responses
pub response_max_entries: u64,
/// TTL for query cache in seconds
pub query_ttl_secs: u64,
/// Maximum number of cached query results
pub query_max_entries: u64,
/// TTL for metadata cache in seconds
pub metadata_ttl_secs: u64,
/// Maximum number of cached metadata entries
pub metadata_max_entries: u64,
/// TTL for media cache in seconds
pub media_ttl_secs: u64,
/// Maximum number of cached media items
pub media_max_entries: u64,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
response_ttl_secs: 60,
response_max_entries: 1000,
query_ttl_secs: 300,
query_max_entries: 500,
metadata_ttl_secs: 3600,
metadata_max_entries: 10000,
media_ttl_secs: 300,
media_max_entries: 5000,
}
}
}
/// Application-level cache layer wrapping multiple specialized caches.
pub struct CacheLayer {
/// Cache for serialized API responses
pub responses: Cache<String, String>,
/// Cache for search query results
pub queries: QueryCache,
/// Cache for metadata extraction results
pub metadata: MetadataCache,
/// Cache for individual media items
pub media: MediaCache,
/// Configuration
config: CacheConfig,
}
impl CacheLayer {
/// Create a new cache layer with the specified TTL (using defaults for other
/// settings).
#[must_use]
pub fn new(ttl_secs: u64) -> Self {
let config = CacheConfig {
response_ttl_secs: ttl_secs,
..Default::default()
};
Self::with_config(config)
}
/// Create a new cache layer with full configuration.
#[must_use]
pub fn with_config(config: CacheConfig) -> Self {
Self {
responses: Cache::new(
Duration::from_secs(config.response_ttl_secs),
config.response_max_entries,
),
queries: QueryCache::new(
Duration::from_secs(config.query_ttl_secs),
config.query_max_entries,
),
metadata: MetadataCache::new(
Duration::from_secs(config.metadata_ttl_secs),
config.metadata_max_entries,
),
media: MediaCache::new(
Duration::from_secs(config.media_ttl_secs),
config.media_max_entries,
),
config,
}
}
/// Invalidate all caches related to a media item update.
pub async fn invalidate_for_media_update(&self, media_id: MediaId) {
self.media.invalidate(media_id).await;
// Query cache should be invalidated as search results may change
self.queries.invalidate_all().await;
}
/// Invalidate all caches related to a media item deletion.
pub async fn invalidate_for_media_delete(&self, media_id: MediaId) {
self.media.invalidate(media_id).await;
self.queries.invalidate_all().await;
}
/// Invalidate all caches (useful after bulk imports or major changes).
pub async fn invalidate_all(&self) {
self.responses.invalidate_all().await;
self.queries.invalidate_all().await;
self.media.invalidate_all().await;
// Keep metadata cache as it's keyed by content hash which doesn't change
}
/// Get aggregated statistics for all caches.
#[must_use]
pub fn stats(&self) -> CacheLayerStats {
CacheLayerStats {
responses: self.responses.stats(),
queries: self.queries.stats(),
metadata: self.metadata.stats(),
media: self.media.stats(),
}
}
/// Get the current configuration.
#[must_use]
pub const fn config(&self) -> &CacheConfig {
&self.config
}
}
/// Aggregated statistics for the entire cache layer.
#[derive(Debug, Clone)]
pub struct CacheLayerStats {
pub responses: CacheStats,
pub queries: CacheStats,
pub metadata: CacheStats,
pub media: CacheStats,
}
impl CacheLayerStats {
/// Get the overall hit rate across all caches.
#[must_use]
pub fn overall_hit_rate(&self) -> f64 {
let total_hits = self.responses.hits
+ self.queries.hits
+ self.metadata.hits
+ self.media.hits;
let total_requests = total_hits
+ self.responses.misses
+ self.queries.misses
+ self.metadata.misses
+ self.media.misses;
let basis_points = total_hits
.saturating_mul(10_000)
.checked_div(total_requests)
.unwrap_or(0);
f64::from(u32::try_from(basis_points).unwrap_or(u32::MAX)) / 10_000.0
}
/// Get the total number of entries across all caches.
#[must_use]
pub const fn total_entries(&self) -> u64 {
self.responses.size
+ self.queries.size
+ self.metadata.size
+ self.media.size
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_cache_basic_operations() {
let cache: Cache<String, String> = Cache::new(Duration::from_mins(1), 100);
// Insert and get
cache.insert("key1".to_string(), "value1".to_string()).await;
assert_eq!(
cache.get(&"key1".to_string()).await,
Some("value1".to_string())
);
// Miss
assert_eq!(cache.get(&"key2".to_string()).await, None);
// Invalidate
cache.invalidate(&"key1".to_string()).await;
assert_eq!(cache.get(&"key1".to_string()).await, None);
}
#[tokio::test]
async fn test_cache_stats() {
let cache: Cache<String, String> = Cache::new(Duration::from_mins(1), 100);
cache.insert("key1".to_string(), "value1".to_string()).await;
let _ = cache.get(&"key1".to_string()).await; // hit
let _ = cache.get(&"key2".to_string()).await; // miss
let stats = cache.stats();
assert_eq!(stats.hits, 1);
assert_eq!(stats.misses, 1);
assert!((stats.hit_rate() - 0.5).abs() < 0.01);
}
#[tokio::test]
async fn test_query_cache() {
let cache = QueryCache::new(Duration::from_mins(1), 100);
cache
.insert("test query", 0, 10, Some("name"), "results".to_string())
.await;
assert_eq!(
cache.get("test query", 0, 10, Some("name")).await,
Some("results".to_string())
);
// Different parameters should miss
assert_eq!(cache.get("test query", 10, 10, Some("name")).await, None);
}
#[tokio::test]
async fn test_cache_layer() {
let layer = CacheLayer::new(60);
let media_id = MediaId::new();
layer.media.insert(media_id, "{}".to_string()).await;
assert!(layer.media.get(media_id).await.is_some());
layer.invalidate_for_media_delete(media_id).await;
assert!(layer.media.get(media_id).await.is_none());
}
}