Caching Strategy
The Email Assistant implements intelligent caching to minimize API costs and improve response times.
Overviewโ
LRU Cache Implementationโ
Cache Configurationโ
from functools import lru_cache
from hashlib import md5
# Cache size: 1000 emails
CACHE_SIZE = 1000
@lru_cache(maxsize=CACHE_SIZE)
def get_cached_category(email_hash: str) -> dict:
"""Retrieve cached categorization result."""
pass
Email Hashingโ
Emails are hashed based on content that affects categorization:
def hash_email(email: dict) -> str:
"""Generate cache key from email content.
Uses subject + sender + first 500 chars of body.
"""
content = f"{email['subject']}|{email['from']}|{email['body'][:500]}"
return md5(content.encode()).hexdigest()
Cache Layersโ
Layer 1: In-Memory LRU Cacheโ
- Scope: Single script execution
- Size: 1000 entries
- TTL: Session duration
- Use Case: Repeated analysis of same emails
Layer 2: File-Based Cacheโ
- Scope: Persistent across runs
- Location:
data/cache/ - TTL: 24 hours
- Use Case: Daily digest consistency
import json
from pathlib import Path
from datetime import datetime, timedelta
CACHE_DIR = Path("data/cache")
CACHE_TTL = timedelta(hours=24)
def load_file_cache(email_hash: str) -> dict | None:
"""Load cached result from file."""
cache_file = CACHE_DIR / f"{email_hash}.json"
if not cache_file.exists():
return None
data = json.loads(cache_file.read_text())
cached_at = datetime.fromisoformat(data["cached_at"])
if datetime.now() - cached_at > CACHE_TTL:
cache_file.unlink() # Expired
return None
return data["result"]
def save_file_cache(email_hash: str, result: dict) -> None:
"""Save result to file cache."""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
cache_file = CACHE_DIR / f"{email_hash}.json"
cache_file.write_text(json.dumps({
"cached_at": datetime.now().isoformat(),
"result": result
}))
Cache Statisticsโ
Tracking Hit Rateโ
class CacheStats:
"""Track cache performance metrics."""
def __init__(self):
self.hits = 0
self.misses = 0
def record_hit(self):
self.hits += 1
def record_miss(self):
self.misses += 1
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return (self.hits / total * 100) if total > 0 else 0
def report(self) -> dict:
return {
"hits": self.hits,
"misses": self.misses,
"hit_rate": f"{self.hit_rate:.1f}%"
}
Metrics Dashboard Integrationโ
Cache metrics are exposed via the /api/metrics endpoint:
{
"cache": {
"hits": 127,
"misses": 18,
"hit_rate": "87.6%",
"memory_entries": 145,
"file_entries": 892
}
}
Cache Invalidationโ
Automatic Invalidationโ
- TTL Expiry: File cache entries expire after 24 hours
- LRU Eviction: Memory cache evicts least-recently-used when full
- Content Change: Different email content generates different hash
Manual Invalidationโ
def clear_cache():
"""Clear all cache layers."""
# Clear memory cache
get_cached_category.cache_clear()
# Clear file cache
for cache_file in CACHE_DIR.glob("*.json"):
cache_file.unlink()
Best Practicesโ
1. Hash Stabilityโ
Ensure hash inputs are normalized:
def normalize_email(email: dict) -> dict:
"""Normalize email for consistent hashing."""
return {
"subject": email["subject"].strip().lower(),
"from": email["from"].strip().lower(),
"body": " ".join(email["body"].split())[:500]
}
2. Cache Warmingโ
Pre-populate cache for known senders:
PRIORITY_SENDERS = ["boss@company.com", "client@important.com"]
def warm_cache(emails: list[dict]):
"""Pre-cache emails from priority senders."""
priority_emails = [
e for e in emails
if e["from"] in PRIORITY_SENDERS
]
for email in priority_emails:
categorize_email(email) # Populates cache
3. Cache Monitoringโ
import logging
logger = logging.getLogger(__name__)
def log_cache_performance():
"""Log cache statistics periodically."""
stats = cache_stats.report()
logger.info(f"Cache performance: {stats}")
if stats["hit_rate"] < 50:
logger.warning("Low cache hit rate - consider increasing cache size")
Cost Impactโ
| Scenario | API Calls | Estimated Cost |
|---|---|---|
| No caching | 50/run | ~$0.10 |
| With caching (80% hit) | 10/run | ~$0.02 |
| Monthly savings | - | ~$2.40 |