Cache Usage Guide¶
Practical patterns for using the cache infrastructure in Dashtam.
Architecture Reference: docs/architecture/cache.md (design decisions, Redis patterns)
Quick Start¶
Getting the Cache¶
# In application layer (direct use)
from src.core.container import get_cache
cache = get_cache()
result = await cache.set("key", "value", ttl=3600)
# In presentation layer (FastAPI dependency)
from fastapi import Depends
from src.domain.protocols.cache_protocol import CacheProtocol
from src.core.container import get_cache
@router.get("/data")
async def get_data(cache: CacheProtocol = Depends(get_cache)):
result = await cache.get("key")
...
Basic Operations¶
from src.core.result import Success, Failure
# Set a value with TTL
result = await cache.set("user:123", "John Doe", ttl=3600)
if isinstance(result, Success):
print("Cached successfully")
# Get a value
result = await cache.get("user:123")
match result:
case Success(value=val) if val is not None:
print(f"Found: {val}")
case Success(value=None):
print("Cache miss")
case Failure(error=err):
print(f"Error: {err.message}")
# Delete a value
result = await cache.delete("user:123")
if isinstance(result, Success) and result.value:
print("Deleted")
Common Patterns¶
Cache-Aside Pattern¶
The most common pattern - check cache first, fall back to database:
async def get_user(user_id: str) -> User | None:
cache_key = f"user:{user_id}"
# Try cache first
result = await cache.get_json(cache_key)
match result:
case Success(value=data) if data is not None:
# Cache hit - return cached data
return User.from_dict(data)
case _:
# Cache miss or error - fetch from database
user = await user_repo.find_by_id(user_id)
if user:
# Store in cache for next time
await cache.set_json(
cache_key,
user.to_dict(),
ttl=3600 # 1 hour
)
return user
Batch Operations¶
Use get_many and set_many for efficient bulk operations:
# Fetch multiple users at once
keys = [f"user:{uid}" for uid in user_ids]
result = await cache.get_many(keys)
if isinstance(result, Success):
cached_users = {}
missing_ids = []
for key, value in result.value.items():
user_id = key.split(":")[1]
if value is not None:
cached_users[user_id] = json.loads(value)
else:
missing_ids.append(user_id)
# Fetch missing from database
if missing_ids:
db_users = await user_repo.find_by_ids(missing_ids)
# Cache the fetched users
to_cache = {
f"user:{u.id}": json.dumps(u.to_dict())
for u in db_users
}
await cache.set_many(to_cache, ttl=3600)
# Store multiple users at once
mapping = {
f"user:{user.id}": json.dumps(user.to_dict())
for user in users
}
await cache.set_many(mapping, ttl=3600)
Rate Limiting Counter¶
Use atomic increment for rate limiting:
async def check_rate_limit(user_id: str, endpoint: str) -> bool:
key = f"rate_limit:{user_id}:{endpoint}"
result = await cache.increment(key)
match result:
case Success(value=count):
if count == 1:
# First request - set expiration window
await cache.expire(key, 60) # 1 minute window
if count > 100: # 100 requests per minute
return False # Rate limited
return True
case Failure(_):
# Fail open - allow request if cache fails
return True
Session Caching¶
Use the specialized RedisSessionCache for session management:
from src.infrastructure.cache.session_cache import RedisSessionCache
from src.core.container import get_cache
session_cache = RedisSessionCache(cache=get_cache())
# Store session
await session_cache.set(session_data, ttl_seconds=1800)
# Get session
session = await session_cache.get(session_id)
# Delete session
await session_cache.delete(session_id)
# Get all sessions for a user
session_ids = await session_cache.get_user_session_ids(user_id)
# Delete all user sessions
deleted_count = await session_cache.delete_all_for_user(user_id)
Pattern-Based Deletion¶
Delete multiple related keys at once:
# Delete all sessions for a user
result = await cache.delete_pattern(f"session:user123:*")
if isinstance(result, Success):
print(f"Deleted {result.value} sessions")
# Invalidate all cached data for an entity
await cache.delete_pattern(f"user:{user_id}:*")
Key Naming Conventions¶
Use hierarchical, namespace-prefixed keys:
# Pattern: {namespace}:{entity_type}:{id}[:sub_resource]
# User data
USER_KEY = "user:{user_id}"
USER_PROFILE_KEY = "user:{user_id}:profile"
USER_SETTINGS_KEY = "user:{user_id}:settings"
# Sessions
SESSION_KEY = "session:{session_id}"
USER_SESSIONS_KEY = "user:{user_id}:sessions"
# Rate limiting
RATE_LIMIT_KEY = "rate_limit:{user_id}:{endpoint}"
# Provider tokens
PROVIDER_TOKEN_KEY = "provider:token:{provider}:{user_id}"
# Authorization
AUTHZ_PERMISSIONS_KEY = "authz:{user_id}:permissions"
TTL Strategy¶
Choose TTLs based on data characteristics:
# Recommended TTL values (in seconds)
TTL_SESSION = 1800 # 30 minutes - security sensitive
TTL_USER = 3600 # 1 hour - changes occasionally
TTL_PROVIDER_TOKEN = 3300 # 55 minutes - refresh before expiry
TTL_RATE_LIMIT = 60 # 1 minute - sliding window
TTL_STATIC_DATA = 86400 # 24 hours - rarely changes
TTL_PERMISSIONS = 300 # 5 minutes - balance freshness/performance
Error Handling (Fail-Open)¶
Cache operations should never break core functionality:
async def get_user_with_fallback(user_id: str) -> User | None:
cache_key = f"user:{user_id}"
# Try cache - but don't fail if cache is down
cache_result = await cache.get_json(cache_key)
if isinstance(cache_result, Success) and cache_result.value is not None:
return User.from_dict(cache_result.value)
# Cache miss OR cache error - fall back to database
# This is the "fail-open" pattern
user = await user_repo.find_by_id(user_id)
if user:
# Try to cache, but don't fail if it doesn't work
await cache.set_json(cache_key, user.to_dict(), ttl=3600)
return user
Testing¶
Integration Tests¶
Cache tests require a real Redis instance (no mocking):
import pytest
from src.core.result import Success
@pytest.mark.integration
class TestCacheOperations:
@pytest.mark.asyncio
async def test_set_and_get(self, cache_adapter):
# Set value
result = await cache_adapter.set("test:key", "value", ttl=60)
assert isinstance(result, Success)
# Get value
result = await cache_adapter.get("test:key")
assert isinstance(result, Success)
assert result.value == "value"
@pytest.mark.asyncio
async def test_batch_operations(self, cache_adapter):
# Set multiple
mapping = {"key1": "val1", "key2": "val2"}
result = await cache_adapter.set_many(mapping, ttl=60)
assert isinstance(result, Success)
# Get multiple
result = await cache_adapter.get_many(["key1", "key2", "missing"])
assert isinstance(result, Success)
assert result.value["key1"] == "val1"
assert result.value["missing"] is None
Using Test Fixtures¶
@pytest.mark.asyncio
async def test_session_cache(session_cache):
"""Test uses session_cache fixture from conftest.py."""
from uuid import uuid4
from datetime import datetime, UTC
session_data = SessionData(
id=uuid4(),
user_id=uuid4(),
created_at=datetime.now(UTC),
# ... other fields
)
await session_cache.set(session_data)
result = await session_cache.get(session_data.id)
assert result is not None
assert result.id == session_data.id
Health Checks¶
Verify cache connectivity:
async def check_cache_health() -> bool:
result = await cache.ping()
return isinstance(result, Success) and result.value is True
# In health check endpoint
@router.get("/health")
async def health_check(cache: CacheProtocol = Depends(get_cache)):
cache_healthy = await cache.ping()
return {
"status": "healthy" if isinstance(cache_healthy, Success) else "degraded",
"cache": "up" if isinstance(cache_healthy, Success) else "down"
}
Common Mistakes to Avoid¶
❌ Don't Cache Sensitive Data Without Encryption¶
# WRONG: Caching password or tokens in plain text
await cache.set(f"user:{user_id}:password", hashed_password)
# RIGHT: Don't cache passwords at all, or encrypt sensitive data
# Passwords should never be cached
❌ Don't Use PII in Cache Keys¶
# WRONG: Email in key
await cache.set(f"user:john@example.com", data)
# RIGHT: Use opaque IDs
await cache.set(f"user:{user_id}", data)
❌ Don't Forget TTL for Session Data¶
# WRONG: No TTL on security-sensitive data
await cache.set(f"session:{session_id}", data)
# RIGHT: Always use TTL
await cache.set(f"session:{session_id}", data, ttl=1800)
❌ Don't Let Cache Failures Break Your App¶
# WRONG: Raising exception on cache failure
result = await cache.get("key")
if isinstance(result, Failure):
raise CacheError("Cache failed!") # App crashes
# RIGHT: Fail open - continue without cache
result = await cache.get("key")
if isinstance(result, Failure):
logger.warning("Cache unavailable, falling back to database")
# Continue with database lookup
Cache Application in Dashtam¶
This section documents where caching is currently applied and where it should be added.
Currently Cached ✅¶
1. Session Management¶
Location: src/infrastructure/cache/session_cache.py
Used by: CreateSessionHandler, GetSessionHandler, RevokeSessionHandler
# Write-through caching with database as source of truth
await session_cache.set(session_data, ttl_seconds=1800)
session = await session_cache.get(session_id)
# User→sessions index for bulk operations
session_ids = await session_cache.get_user_session_ids(user_id)
- TTL: 30 days (matches session expiration)
- Pattern: Write-through (cache + DB)
- Invalidation: On session revocation
2. Authorization (Casbin RBAC)¶
Location: src/infrastructure/authorization/casbin_adapter.py
Used by: require_permission(), require_role() dependencies
# Permission check results cached
cache_key = f"authz:{user_id}:{resource}:{action}"
await cache.set(cache_key, "1" if allowed else "0", ttl=300)
# Pattern-based invalidation on role changes
await cache.delete_pattern(f"authz:{user_id}:*")
- TTL: 5 minutes
- Pattern: Cache-aside with invalidation
- Invalidation: On role assignment/revocation
3. OAuth State (CSRF Protection)¶
Location: src/presentation/routers/api/v1/providers.py
Used by: initiate_connection(), oauth_callback()
# Store state during OAuth initiation
cache_key = f"oauth:state:{state}"
await cache.set(cache_key, f"{user_id}:{provider_slug}:{alias}", ttl=600)
# Retrieve and delete (one-time use) during callback
cached_value = await cache.get(cache_key)
await cache.delete(cache_key)
- TTL: 10 minutes
- Pattern: One-time use token
- Invalidation: Deleted after retrieval
4. Rate Limiting (Token Bucket)¶
Location: src/infrastructure/rate_limit/redis_storage.py
Used by: Rate limit middleware
# Uses Redis directly with Lua scripts for atomic operations
# Keys: "{key_base}:tokens", "{key_base}:time"
await redis.evalsha(token_bucket_sha, ...)
- TTL: Based on rate limit window
- Pattern: Atomic Lua scripts (not CacheProtocol)
- Policy: Fail-open (allow on Redis failure)
Cache Optimization ✅¶
Implemented: 2025-12-25 | Phases 1-9 Complete
1. Provider Connection Cache ✅¶
Location: src/infrastructure/cache/provider_connection_cache.py
Used by: GetProviderConnectionHandler
# Cache-first strategy
cache_key = CacheKeys.provider_connection(connection_id)
connection = await connection_cache.get(connection_id)
if connection is None:
connection = await connection_repo.find_by_id(connection_id)
if connection:
await connection_cache.set(connection)
- TTL: 5 minutes (300 seconds)
- Pattern: Cache-aside with population on miss
- Invalidation: Manual or on provider operations
- Performance: ~10x faster (<5ms vs ~50ms)
2. Schwab API Response Cache ✅¶
Location: src/infrastructure/providers/schwab/schwab_provider.py
Used by: fetch_accounts() method
# External API caching
cache_key = CacheKeys.schwab_accounts(user_id)
if accounts_cache:
cached = await accounts_cache.get(cache_key)
if cached:
return Success(value=cached)
# Fetch from API and cache result
accounts = await self._accounts_api.get_accounts(access_token)
if accounts_cache and isinstance(accounts, Success):
await accounts_cache.set(cache_key, accounts.value, ttl=ttl_schwab_accounts)
- TTL: 5 minutes (300 seconds)
- Pattern: Cache-first with API fallback
- Invalidation: Time-based expiration
- Impact: 70-90% reduction in Schwab API calls
3. Account List Cache ✅¶
Location: src/application/queries/handlers/list_accounts_handler.py
Used by: ListAccountsByUserHandler
# Cache unfiltered user account lists
if accounts_cache and filters is None:
cache_key = CacheKeys.accounts_user(user_id)
cached = await accounts_cache.get(cache_key)
if cached:
return Success(value=cached)
# Fetch from DB and cache
accounts = await self._account_repo.find_by_user_id(user_id)
if accounts_cache:
await accounts_cache.set(cache_key, accounts, ttl=ttl_accounts_list)
- TTL: 5 minutes (300 seconds)
- Pattern: Cache-aside, only caches unfiltered lists
- Invalidation: Time-based expiration
- Impact: 50-70% reduction in DB queries
4. Security Config Cache ✅¶
Location: src/application/commands/handlers/refresh_access_token_handler.py
Used by: RefreshAccessTokenHandler
# Cache security config for token validation
config = await self._get_cached_security_config(user_id, accounts_cache)
# Helper method uses cache-first strategy
async def _get_cached_security_config(user_id, cache):
global_key = CacheKeys.security_global_version()
user_key = CacheKeys.security_user_version(user_id)
# Try cache first, fall back to DB
- TTL: 1 minute (60 seconds) - security-sensitive
- Pattern: Cache-first with short TTL
- Invalidation:
SecurityConfigRepositoryclears on version updates - Impact: Reduces DB load on token refresh operations
5. Cache Metrics & Observability ✅¶
Location: src/infrastructure/cache/cache_metrics.py
metrics = CacheMetrics()
metrics.record_hit("provider")
metrics.record_miss("provider")
metrics.record_error("provider")
stats = metrics.get_stats("provider")
# Returns: hits, misses, errors, total_requests, hit_rate
- Thread-safe operation tracking
- Per-namespace statistics
- Hit rate calculation
- Used for performance monitoring
6. Centralized Cache Keys ✅¶
Location: src/infrastructure/cache/cache_keys.py
# All cache keys constructed through CacheKeys class
CacheKeys.provider_connection(connection_id)
CacheKeys.schwab_accounts(user_id)
CacheKeys.accounts_user(user_id)
CacheKeys.security_global_version()
CacheKeys.security_user_version(user_id)
- Single source of truth for cache key patterns
- Documented in
docs/architecture/cache-keys.md - Prevents key collisions
7. Future Cache Candidates (Lower Priority)¶
Transaction Lists¶
- Large data sets
- Short freshness window
- Consider pagination-aware caching
Provider Credentials (Decrypted)¶
- Security concern: encrypted at rest
- If cached, needs very short TTL
- Consider memory-only caching
Implementation Status¶
| Component | Status | TTL | Invalidation |
|---|---|---|---|
| Sessions | ✅ Implemented | 30 days | On revoke |
| Authorization | ✅ Implemented | 5 min | On role change |
| OAuth State | ✅ Implemented | 10 min | One-time use |
| Rate Limits | ✅ Implemented | Window-based | Automatic |
| Provider Connections | ✅ F6.11 (Phase 4) | 5 min | Manual |
| Schwab API | ✅ F6.11 (Phase 5) | 5 min | Time-based |
| Account Lists | ✅ F6.11 (Phase 6) | 5 min | Time-based |
| Security Config | ✅ F6.11 (Phase 7) | 1 min | On rotation |
| Cache Metrics | ✅ F6.11 (Phase 2) | N/A | N/A |
| Cache Keys | ✅ F6.11 (Phase 2) | N/A | N/A |
Adding New Cache Points¶
When adding caching to a new component:
- Define cache key pattern: Add to
CacheKeysclass incache_keys.py - Add TTL setting: Add to
Settingsclass incore/config.py - Update env templates: Add to all
.env.*.examplefiles - Document pattern: Add to
docs/architecture/cache-keys.md - Implement cache-aside: Check cache → fallback to source → populate cache
- Add metrics tracking: Use
CacheMetricsfor observability - Add fail-open handling: Never let cache failures break functionality
- Write integration tests: Verify cache hit/miss/invalidation
- Update this document: Add to "Cache Optimization (F6.11)" section
Example: Adding Transaction Cache¶
# 1. Add to CacheKeys (cache_keys.py)
class CacheKeys:
@staticmethod
def transactions_account(account_id: UUID) -> str:
"""Cache key for transactions by account."""
return f"{settings.CACHE_KEY_PREFIX}:transactions:account:{account_id}"
# 2. Add TTL to Settings (core/config.py)
class Settings(BaseSettings):
CACHE_TTL_TRANSACTIONS: int = Field(default=300, ge=60, le=3600)
# 3. Update handler with cache-first strategy
class ListTransactionsHandler:
async def handle(self, query):
if transactions_cache:
cache_key = CacheKeys.transactions_account(query.account_id)
cached = await transactions_cache.get(cache_key)
if cached:
metrics.record_hit("transactions")
return Success(value=cached)
metrics.record_miss("transactions")
# Fetch from DB
transactions = await self._transaction_repo.find_by_account_id(...)
# Populate cache
if transactions_cache:
await transactions_cache.set(
cache_key,
transactions,
ttl=settings.CACHE_TTL_TRANSACTIONS,
)
return Success(value=transactions)
Not Yet Cached ❌¶
Lower Priority Candidates¶
User Data Lookups¶
Status: Deferred to v1.1.0+ (Phase 3 skipped in F6.11) Reason: Low ROI - user data accessed infrequently
# Proposed (if needed)
cache_key = CacheKeys.user_data(user_id)
# TTL: 60-300 seconds
# Invalidation: On user update, password change
Transaction Lists (Paginated)¶
- Large data sets require pagination-aware caching
- Short freshness window
- Consider implementing when usage patterns are established
Reference¶
- Architecture:
docs/architecture/cache.md - Key Patterns:
docs/architecture/cache-keys.md - Protocol:
src/domain/protocols/cache_protocol.py - Implementation:
src/infrastructure/cache/redis_adapter.py - Session Cache:
src/infrastructure/cache/session_cache.py - Provider Connection Cache:
src/infrastructure/cache/provider_connection_cache.py - Cache Keys:
src/infrastructure/cache/cache_keys.py - Cache Metrics:
src/infrastructure/cache/cache_metrics.py - Container:
src/core/container/infrastructure.py - Tests:
tests/integration/test_cache_*.py
Created: 2025-12-05 | Last Updated: 2026-01-10