Audit Trail Architecture¶
Overview¶
This document defines Dashtam's audit trail system for compliance with PCI-DSS, SOC 2, and GDPR requirements. The audit trail provides an immutable, tamper-proof record of all security-relevant actions for forensics and compliance auditing.
Key Distinction: This is compliance auditing for legal/regulatory
requirements. For technical debugging, see logging.md.
For workflow coordination, see domain-events.md.
1. Key Principles¶
1.1 Core Principles¶
- ✅ Hexagonal Architecture: Domain defines protocol (port), infrastructure provides database-agnostic adapters
- ✅ Database Freedom: NOT coupled to PostgreSQL - works with any SQL database
- ✅ Immutability: Records CANNOT be modified or deleted (enforced by database)
- ✅ Compliance First: PCI-DSS, SOC 2, GDPR requirements baked in
- ✅ 7+ Year Retention: Legal requirement for financial data
- ✅ Tamper-Proof: Append-only, immutable records with timestamps
- ✅ Extensibility: JSONB metadata for new fields without schema changes
- ✅ Performance: Async operations, indexed queries, partitioned tables
1.2 Audit vs Logging vs Events¶
Three separate concerns with different purposes:
| Concern | Purpose | Storage | Retention | Query Pattern |
|---|---|---|---|---|
| Audit | Compliance forensics | SQL Database | 7+ years | SQL |
| Logging | Technical debugging | CloudWatch | 30-90 days | grep |
| Events | Workflow coordination | In-memory | N/A | pub/sub |
Why separate?
- Different retention (7+ years vs 30 days vs none)
- Different query patterns (SQL vs grep vs subscription)
- Different compliance (required vs optional vs internal)
- Different access (auditors vs engineers vs system)
1.3 Compliance Requirements¶
PCI-DSS Requirements:
- Audit all access to cardholder data
- Track all authentication attempts
- Record all administrative actions
- 7+ year retention for audit logs
- Tamper-proof, immutable records
SOC 2 Requirements:
- Audit all security-relevant events
- Track who accessed what, when, where
- Immutable audit trail
- Quarterly audit log reviews
GDPR Requirements:
- Audit all personal data access
- Track data subject requests (export, deletion)
- Record consent changes
- Data breach notification tracking
2. Hexagonal Architecture¶
2.1 Layer Responsibilities¶
┌─────────────────────────────────────────────────────────┐
│ Domain Layer (Port/Protocol) │
│ - AuditProtocol defines interface │
│ - AuditAction enum (extensible) │
│ - Pure Python (no database imports) │
└─────────────────────────────────────────────────────────┘
▲
│ implements
│
┌─────────────────────────────────────────────────────────┐
│ Infrastructure Layer (Database-Agnostic Adapters) │
│ - PostgresAuditAdapter (uses rules for immutability) │
│ - MySQLAuditAdapter (uses triggers for immutability) │
│ - SQLiteAuditAdapter (testing - uses constraints) │
│ - Each adapter implements immutability its own way │
└─────────────────────────────────────────────────────────┘
▲
│ uses
│
┌─────────────────────────────────────────────────────────┐
│ Core Layer (Container) │
│ - get_audit() creates correct adapter │
│ - Reads DATABASE_TYPE env var (postgres/mysql/sqlite) │
│ - Follows Composition Root pattern │
└─────────────────────────────────────────────────────────┘
▲
│ uses
│
┌─────────────────────────────────────────────────────────┐
│ Application Layer (Event Handlers, Services) │
│ - Inject AuditProtocol via Depends() │
│ - Record security-relevant actions │
│ - Add context (IP, user agent, metadata) │
└─────────────────────────────────────────────────────────┘
2.2 Dependency Flow¶
- Domain → Defines
AuditProtocol+AuditActionenum (no dependencies) - Infrastructure → Implements protocol with database-specific adapters
- Core/Container → Creates correct adapter based on
DATABASE_TYPE - Application → Consumes audit via protocol (database-agnostic)
Benefits:
- Domain layer remains pure (no SQLAlchemy/SQLModel imports)
- Easy to swap databases (Postgres → MySQL → SQLite)
- Testable (mock protocol, in-memory adapter for tests)
- Configuration-driven (change database via env var)
2.3 Database¶
Each adapter implements immutability differently:
| Database | Immutability Strategy |
|---|---|
| PostgreSQL | RULES (block UPDATE/DELETE) |
| MySQL | TRIGGERS (block UPDATE/DELETE) |
| SQLite | Constraints + app-level enforcement (testing) |
| In-Memory | Simple list append (unit tests) |
Protocol doesn't care HOW - it just says "record this audit entry".
2.4 Why Adapter Pattern (NOT Repository Pattern)¶
CRITICAL: Audit uses Adapter Pattern, NOT Repository Pattern.
Repository Pattern vs Adapter Pattern¶
Repository Pattern (for domain entities with business logic):
Domain Entity → Repository Protocol → Repository Implementation
↓ ↓ ↓
User → UserRepository → UserRepository
Provider → ProviderRepository → ProviderRepository
Account → AccountRepository → AccountRepository
Purpose: Manage domain entities (User, Provider, Account, Transaction)
When to use:
- Domain entity EXISTS in
src/domain/entities/ - Has business logic and domain rules
- Lives in memory as domain objects
- Repository maps between domain entities ↔ database models
Adapter Pattern (for external system integration):
Domain Protocol → Infrastructure Adapter
↓ ↓
AuditProtocol → PostgresAuditAdapter
CacheProtocol → RedisAdapter
SecretsProtocol → AWSAdapter
Purpose: Adapt external systems (database, cache, AWS) to domain protocols
When to use:
- NO domain entity (pure infrastructure concern)
- No business logic in domain layer
- Direct database access (SQL in adapter)
- Protocol IS the abstraction (no repository needed)
Why Audit Uses Adapter Pattern¶
AuditLog is NOT a domain entity:
- ❌ No business logic (just INSERT and SELECT)
- ❌ No domain rules (compliance is infrastructure concern)
- ❌ Lives ONLY in database (never loaded into memory as domain object)
- ❌ Model exists ONLY in infrastructure:
src/infrastructure/persistence/models/audit.py
AuditProtocol IS the abstraction:
- ✅ Domain defines "what" (protocol interface)
- ✅ Infrastructure defines "how" (SQL queries)
- ✅ SQL belongs in adapter (correct separation of concerns)
- ✅ Protocol-first design (hexagonal architecture)
Repository Pattern Would Be Over-Engineering¶
❌ Wrong approach (adds unnecessary layers):
# Domain entity (not needed!)
src/domain/entities/audit_log.py
# Repository protocol (redundant with AuditProtocol!)
src/domain/protocols/audit_log_repository.py
# Repository implementation (redundant layer!)
src/infrastructure/persistence/repositories/audit_log_repository.py
# Adapter (finally the real implementation!)
src/infrastructure/audit/postgres_adapter.py
This is 4 layers when 2 layers are sufficient!
✅ Correct approach (adapter pattern):
# Protocol defines interface
src/domain/protocols/audit_protocol.py
# Adapter implements protocol with SQL
src/infrastructure/audit/postgres_adapter.py
Just 2 layers - protocol and adapter!
SQL in Adapter is CORRECT¶
PostgresAuditAdapter contains SQL queries directly:
class PostgresAuditAdapter:
async def query(...):
# ✅ CORRECT: SQL belongs here!
query = select(AuditLogModel)
query = query.where(AuditLogModel.user_id == user_id)
query = query.order_by(AuditLogModel.created_at.desc())
result = await self.session.execute(query)
...
Why this is correct:
- Adapter's job is to adapt database to protocol
- SQL is database-specific (belongs in infrastructure)
- No domain entity to map to/from
- Protocol abstracts away database details
- Easy to test (mock protocol)
- Easy to swap databases (create MySQLAuditAdapter)
No migration needed - this is the correct architecture! ✅
Comparison: Repository vs Adapter¶
| Aspect | Repository Pattern | Adapter Pattern |
|---|---|---|
| Domain Entity | ✅ Yes (User, Provider) |
❌ No |
| Business Logic | ✅ Yes (validation, rules) | ❌ No |
| Lives in Memory | ✅ Yes (domain objects) | ❌ No (database only) |
| SQL Location | Repository implementation | Adapter directly |
| Abstraction | Repository protocol | Protocol (port) |
| Example Features | F3.1-F3.3 (repositories) | F0.9 (audit adapter) |
When to Use Which Pattern¶
Use Repository Pattern when:
- Domain entity exists (User, Provider, Account, Transaction)
- Has business logic and domain rules
- Entity lives in memory as domain object
- Need to map between domain entity ↔ database model
- See F3.1 (Provider Repository) in roadmap
Use Adapter Pattern when:
- NO domain entity (pure infrastructure)
- External system integration (database, cache, AWS, APIs)
- Protocol IS the abstraction
- No business logic in domain
- See F0.9 (Audit), F0.5 (Cache), F0.7 (Secrets) in roadmap
Examples in Dashtam:
- Repository: UserRepository, ProviderRepository, AccountRepository, TransactionRepository (F3.1-F3.3)
- Adapter: PostgresAuditAdapter (F0.9), RedisAdapter (F0.5), AWSAdapter (F0.7)
3. Audit Event Semantics: The Truth Contract¶
3.1 Core Principle: Audit What Actually Happened¶
CRITICAL: Audit logs must record observable facts, not intentions or predictions.
The Problem: Recording outcomes before they occur
# ❌ WRONG: Lying in audit logs
await audit.record(
action=AuditAction.USER_REGISTERED, # ❌ LIE - user NOT registered yet!
user_id=user_id,
)
# Audit committed (permanent)
# Business logic (might fail)
session.add(user)
await session.commit() # ❌ If this fails, audit lies!
# Result: Audit says "USER_REGISTERED" but user doesn't exist
# ❌ Compliance violation - audit trail is inconsistent with database
The Solution: Audit ATTEMPTS and OUTCOMES separately
# ✅ CORRECT: Record what user ATTEMPTED
await audit.record(
action=AuditAction.USER_REGISTRATION_ATTEMPTED, # ✅ Truth: they TRIED
user_id=None, # Don't have ID yet
email=data.email,
)
# Business logic
try:
user = User(email=data.email, ...)
session.add(user)
await session.commit() # ✅ User NOW exists
# ✅ CORRECT: Record SUCCESS after business commit
await audit.record(
action=AuditAction.USER_REGISTERED, # ✅ Truth: NOW they're registered
user_id=user.id,
)
except HTTPException as e:
# ✅ CORRECT: Record FAILURE with reason
await audit.record(
action=AuditAction.USER_REGISTRATION_FAILED, # ✅ Truth: it FAILED
user_id=None,
context={"reason": "duplicate_email"},
)
3.2 The ATTEMPT → OUTCOME Pattern¶
Every state-changing operation follows this pattern:
Timeline:
Time: T0 - User initiates action
↓
[Audit: ATTEMPTED] ← Record immediately (independent session)
↓
Time: T1 - Business logic executes
↓
[Business: Success or Failure?]
↓
Time: T2 - Record outcome
↓
[Audit: SUCCESS] ← After business commit
OR
[Audit: FAILED] ← After business rollback
3.3 Event Taxonomy¶
Category 1: State-Changing Operations (ATTEMPT → OUTCOME)¶
Pattern: Always record ATTEMPTED first, then FAILED or SUCCESS
Registration:
USER_REGISTRATION_ATTEMPTED → User hit endpoint
↓
USER_REGISTRATION_FAILED → Validation failed, duplicate email, etc.
OR
USER_REGISTERED → User exists in database
Login:
USER_LOGIN_ATTEMPTED → User submitted credentials
↓
USER_LOGIN_FAILED → Invalid credentials, account locked, etc.
OR
USER_LOGIN_SUCCESS → Session created
Provider Connection:
PROVIDER_CONNECTION_ATTEMPTED → OAuth flow started
↓
PROVIDER_CONNECTION_FAILED → OAuth failed, API error, etc.
OR
PROVIDER_CONNECTED → Tokens saved, provider active
Data Modification:
DATA_MODIFICATION_ATTEMPTED → Update operation started
↓
DATA_MODIFICATION_FAILED → Validation failed, constraint violation
OR
DATA_MODIFIED → Changes committed to database
Category 2: Access Control Events (ATTEMPT → OUTCOME)¶
Pattern: Record access attempt, then DENIED or GRANTED
Data Access:
DATA_ACCESS_ATTEMPTED → User requested data
↓
ACCESS_DENIED → Permission check failed
OR
DATA_VIEWED → Permission check passed, data returned
Admin Action:
ADMIN_ACTION_ATTEMPTED → Admin initiated action
↓
ADMIN_ACTION_DENIED → Insufficient permissions
OR
ADMIN_ACTION_COMPLETED → Action executed successfully
Category 3: Completed Events (No ATTEMPT needed)¶
Pattern: Record after action completes (these are reactions, not initiations)
Session Events:
USER_LOGOUT → User logged out (always succeeds)
SESSION_EXPIRED → Session timed out (always succeeds)
TOKEN_ROTATED → Token refresh completed (always succeeds)
System Events:
BACKUP_COMPLETED → Backup job finished
DATA_SYNC_COMPLETED → Sync job finished
CACHE_CLEARED → Cache operation completed
Why no ATTEMPT?: These events are triggered by completed actions or system processes.
3.4 Compliance Rationale¶
Why ATTEMPT/OUTCOME pattern is mandatory:
PCI-DSS Requirement 10.2.4¶
"Invalid logical access attempts must be logged."
Wrong approach (no ATTEMPT record):
Audit log: [empty]
Database: no user record
Result: ❌ No evidence of failed attempt (compliance violation)
Correct approach:
Audit log:
- USER_REGISTRATION_ATTEMPTED (IP: 123.45.67.89)
- USER_REGISTRATION_FAILED (reason: duplicate_email)
Database: no user record
Result: ✅ Clear evidence of attempt and failure (compliance met)
SOC 2 CC6.1¶
"The entity implements logical access security measures to protect against threats from sources outside its system boundaries."
Audit trail must show:
- Who attempted access (USER_LOGIN_ATTEMPTED)
- Whether access was granted (USER_LOGIN_SUCCESS) or denied (USER_LOGIN_FAILED)
- Why access was denied (context: {"reason": "invalid_password"})
GDPR Article 30¶
"Records of processing activities must be maintained."
Processing activity = ATTEMPT (user initiated action)
Processing outcome = FAILED or SUCCESS
Both must be logged for complete audit trail.
3.5 Real-World Scenarios¶
Scenario 1: Registration with Duplicate Email¶
@router.post("/users", status_code=201)
async def register_user(
data: UserCreate,
session: AsyncSession = Depends(get_db_session),
audit: AuditProtocol = Depends(get_audit),
request: Request = None,
):
# Step 1: ALWAYS record attempt first
await audit.record(
action=AuditAction.USER_REGISTRATION_ATTEMPTED,
user_id=None, # Don't have ID yet
resource_type="user",
ip_address=request.client.host,
context={"email": data.email},
)
# ✅ Committed: Attempt is permanent record
# Step 2: Validation
if await email_exists(data.email, session):
# Step 3: Record FAILURE
await audit.record(
action=AuditAction.USER_REGISTRATION_FAILED,
user_id=None,
resource_type="user",
ip_address=request.client.host,
context={
"email": data.email,
"reason": "duplicate_email",
},
)
# ✅ Committed: Failure is permanent record
raise HTTPException(400, "Email already registered")
# Step 4: Business logic
user = User(email=data.email, ...)
session.add(user)
await session.commit() # ✅ User NOW exists in database
# Step 5: Record SUCCESS (after business commit)
await audit.record(
action=AuditAction.USER_REGISTERED,
user_id=user.id, # NOW we have ID
resource_type="user",
ip_address=request.client.host,
context={"email": user.email},
)
# ✅ Committed: Success is permanent record
return UserResponse(id=user.id, email=user.email)
Audit Timeline:
10:00:00.001 - USER_REGISTRATION_ATTEMPTED (email: john@example.com)
10:00:00.050 - USER_REGISTRATION_FAILED (reason: duplicate_email)
Result: ✅ Audit shows ATTEMPT + FAILURE
✅ Database has no user record (consistent)
✅ Compliance requirement met
Scenario 2: Failed Login Attempt (Security)¶
@router.post("/auth/login")
async def login(
data: LoginRequest,
audit: AuditProtocol = Depends(get_audit),
request: Request = None,
):
# Step 1: Record attempt
await audit.record(
action=AuditAction.USER_LOGIN_ATTEMPTED,
user_id=None,
resource_type="session",
ip_address=request.client.host,
context={"email": data.email},
)
# Step 2: Authenticate
user = await find_user(data.email)
if not user or not verify_password(data.password, user.password_hash):
# Step 3: Record FAILURE (security event)
await audit.record(
action=AuditAction.USER_LOGIN_FAILED,
user_id=user.id if user else None,
resource_type="session",
ip_address=request.client.host,
context={
"email": data.email,
"reason": "invalid_credentials",
},
)
raise HTTPException(401, "Invalid credentials")
# Step 4: Create session (business logic)
session_id = create_session(user.id)
# Step 5: Record SUCCESS
await audit.record(
action=AuditAction.USER_LOGIN_SUCCESS,
user_id=user.id,
resource_type="session",
resource_id=session_id,
ip_address=request.client.host,
)
return {"access_token": create_token(user)}
Security Investigation Use Case:
-- Find all failed login attempts from suspicious IP
SELECT * FROM audit_logs
WHERE action = 'user_login_failed'
AND ip_address = '123.45.67.89'
AND created_at > NOW() - INTERVAL '24 hours'
ORDER BY created_at;
-- Result: Clear evidence of brute force attack
-- 100 failed attempts, 0 successes
-- ✅ Block IP with confidence
Scenario 3: Provider Connection with API Failure¶
@router.post("/providers/{provider_name}/connect")
async def connect_provider(
provider_name: str,
data: OAuth2Request,
session: AsyncSession = Depends(get_db_session),
audit: AuditProtocol = Depends(get_audit),
request: Request = None,
):
# Step 1: Record attempt
await audit.record(
action=AuditAction.PROVIDER_CONNECTION_ATTEMPTED,
user_id=current_user.id,
resource_type="provider",
ip_address=request.client.host,
context={"provider": provider_name},
)
# Step 2: OAuth flow
try:
tokens = await oauth_client.exchange_code(data.code)
except OAuthError as e:
# Step 3: Record FAILURE (API error)
await audit.record(
action=AuditAction.PROVIDER_CONNECTION_FAILED,
user_id=current_user.id,
resource_type="provider",
ip_address=request.client.host,
context={
"provider": provider_name,
"reason": "oauth_failed",
"error": str(e),
},
)
raise HTTPException(400, "Provider connection failed")
# Step 4: Save provider (business logic)
provider = Provider(
user_id=current_user.id,
name=provider_name,
access_token=tokens.access_token,
refresh_token=tokens.refresh_token,
)
session.add(provider)
await session.commit() # ✅ Provider NOW exists
# Step 5: Record SUCCESS
await audit.record(
action=AuditAction.PROVIDER_CONNECTED,
user_id=current_user.id,
resource_type="provider",
resource_id=provider.id,
ip_address=request.client.host,
context={"provider": provider_name},
)
return ProviderResponse(id=provider.id, name=provider_name)
3.6 Anti-Patterns (Common Mistakes)¶
Anti-Pattern 1: Recording SUCCESS Before Business Commit¶
# ❌ WRONG: Audit says user registered, but commit might fail
await audit.record(action=AuditAction.USER_REGISTERED, ...)
session.add(user)
await session.commit() # ❌ If this fails, audit is lying
Fix: Record SUCCESS AFTER business commit succeeds.
Anti-Pattern 2: No ATTEMPT Record¶
# ❌ WRONG: No record of failed attempt
if email_exists:
raise HTTPException(400) # ❌ No audit of failed attempt
Fix: Always record ATTEMPTED first, then FAILED.
Anti-Pattern 3: Single Event for Both Attempt and Outcome¶
# ❌ WRONG: Can't distinguish attempt from success
await audit.record(action=AuditAction.USER_LOGIN, ...)
# ❌ Was this successful login or just an attempt?
Fix: Use ATTEMPTED → (FAILED or SUCCESS) pattern.
Anti-Pattern 4: Recording Predictions¶
# ❌ WRONG: Audit predicts the future
await audit.record(action=AuditAction.DATA_WILL_BE_EXPORTED, ...)
# ❌ "will be" is prediction, not fact
Fix: Only audit completed actions (past tense: DATA_EXPORTED).
3.7 Decision Tree: When to Audit¶
Is this a user-initiated action?
├─ YES → Record ATTEMPTED immediately
│ ↓
│ Does business logic execute?
│ ├─ YES → Record FAILED or SUCCESS after business commit
│ └─ NO → Just record ATTEMPTED (e.g., validation fails immediately)
│
└─ NO → Is this a completed system event?
├─ YES → Record event after completion (e.g., BACKUP_COMPLETED)
└─ NO → Is this internal workflow? → Don't audit (use events instead)
3.8 Testing Semantic Accuracy¶
# tests/integration/test_audit_semantics.py
async def test_registration_audit_timeline(
test_database,
audit: AuditProtocol,
):
"""Verify registration follows ATTEMPTED → FAILED/SUCCESS pattern."""
# Attempt registration with duplicate email
# Should see: ATTEMPTED → FAILED
# Query audit logs
logs = await audit.query(user_id=None, limit=100)
# Verify timeline
assert logs[0]["action"] == "user_registration_attempted"
assert logs[1]["action"] == "user_registration_failed"
assert logs[1]["context"]["reason"] == "duplicate_email"
# Verify database consistency
users = await session.execute(select(User).where(User.email == email))
assert len(users.scalars().all()) == 0 # ✅ No user created
4. Domain Layer - Protocol Definition¶
3.1 AuditAction Enum¶
File: src/domain/enums/audit_action.py
Architectural Decision: Following centralized enum pattern (see
docs/architecture/directory-structure.md - Enum Organization section).
All domain enums live in src/domain/enums/ for discoverability and
maintainability.
# src/domain/enums/audit_action.py
"""Audit action types for compliance tracking.
This enum defines all auditable actions in the system (PCI-DSS, SOC 2, GDPR).
Extensible via enum values - no database schema changes needed.
Categories:
- Authentication (USER_*)
- Authorization (ACCESS_*)
- Data Operations (DATA_*)
- Administrative (ADMIN_*)
- Provider (PROVIDER_*)
"""
from enum import Enum
class AuditAction(str, Enum):
"""Audit action types (extensible via enum).
Organized by category for clarity. Add new actions as needed
without database schema changes (metadata stores action-specific data).
Categories:
- Authentication (USER_*)
- Authorization (ACCESS_*)
- Data Operations (DATA_*)
- Administrative (ADMIN_*)
- Provider (PROVIDER_*)
"""
# Authentication events (PCI-DSS required)
USER_LOGIN = "user_login"
USER_LOGOUT = "user_logout"
USER_LOGIN_FAILED = "user_login_failed"
USER_REGISTERED = "user_registered"
USER_PASSWORD_CHANGED = "user_password_changed"
USER_PASSWORD_RESET_REQUESTED = "user_password_reset_requested"
USER_PASSWORD_RESET_COMPLETED = "user_password_reset_completed"
USER_EMAIL_VERIFIED = "user_email_verified"
USER_MFA_ENABLED = "user_mfa_enabled"
USER_MFA_DISABLED = "user_mfa_disabled"
# Authorization events (SOC 2 required)
ACCESS_GRANTED = "access_granted"
ACCESS_DENIED = "access_denied"
PERMISSION_CHANGED = "permission_changed"
ROLE_ASSIGNED = "role_assigned"
ROLE_REVOKED = "role_revoked"
# Data access events (GDPR required)
DATA_VIEWED = "data_viewed"
DATA_EXPORTED = "data_exported"
DATA_DELETED = "data_deleted"
DATA_MODIFIED = "data_modified"
# Administrative events (SOC 2 required)
ADMIN_USER_CREATED = "admin_user_created"
ADMIN_USER_DELETED = "admin_user_deleted"
ADMIN_USER_SUSPENDED = "admin_user_suspended"
ADMIN_CONFIG_CHANGED = "admin_config_changed"
ADMIN_BACKUP_CREATED = "admin_backup_created"
# Provider events (PCI-DSS required - cardholder data access)
PROVIDER_CONNECTED = "provider_connected"
PROVIDER_DISCONNECTED = "provider_disconnected"
PROVIDER_TOKEN_REFRESHED = "provider_token_refreshed"
PROVIDER_TOKEN_REFRESH_FAILED = "provider_token_refresh_failed"
PROVIDER_DATA_SYNCED = "provider_data_synced"
PROVIDER_ACCOUNT_VIEWED = "provider_account_viewed" # PCI-DSS
PROVIDER_TRANSACTION_VIEWED = "provider_transaction_viewed" # PCI-DSS
3.2 AuditProtocol Interface¶
File: src/domain/protocols/audit_protocol.py
# src/domain/protocols/audit_protocol.py
from typing import Protocol, Any
from uuid import UUID
from datetime import datetime
from src.domain.enums import AuditAction
from src.domain.errors import AuditError
from src.core.result import Result
class AuditProtocol(Protocol):
"""Protocol for audit trail systems.
Records immutable audit entries for compliance (PCI-DSS, SOC 2, GDPR).
All implementations MUST ensure immutability (no updates/deletes).
Implementations:
- PostgresAuditAdapter: PostgreSQL with RULES
- MySQLAuditAdapter: MySQL with TRIGGERS
- SQLiteAuditAdapter: SQLite for testing
- InMemoryAuditAdapter: Testing only
Compliance:
- PCI-DSS: 7+ year retention, immutable, tamper-proof
- SOC 2: Security event tracking, who/what/when/where
- GDPR: Personal data access tracking
"""
async def record(
self,
*,
action: AuditAction,
user_id: UUID | None = None,
resource_type: str,
resource_id: UUID | None = None,
ip_address: str | None = None,
user_agent: str | None = None,
metadata: dict[str, Any] | None = None,
) -> Result[None, AuditError]:
"""Record an audit entry (immutable, cannot be changed).
Args:
action: What happened (enum for consistency)
user_id: Who performed the action (None for system actions)
resource_type: What was affected (user, account, provider)
resource_id: Specific resource identifier
ip_address: Where from (required for auth events)
user_agent: Client information
metadata: Additional context (JSONB - extensible)
Returns:
Result[None, AuditError]: Success or audit system error
Example:
await audit.record(
action=AuditAction.USER_LOGIN,
user_id=user_id,
resource_type="session",
resource_id=session_id,
ip_address=request.client.host,
user_agent=request.headers.get("User-Agent"),
metadata={"method": "password", "mfa": True},
)
Note:
- Records are IMMUTABLE (cannot update or delete)
- Timestamps are UTC and set by database
- Metadata is JSONB for extensibility
"""
...
async def query(
self,
*,
user_id: UUID | None = None,
action: AuditAction | None = None,
resource_type: str | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
limit: int = 100,
offset: int = 0,
) -> Result[list[dict[str, Any]], AuditError]:
"""Query audit trail (read-only, for compliance reports).
Args:
user_id: Filter by user
action: Filter by action type
resource_type: Filter by resource
start_date: From date (inclusive)
end_date: To date (inclusive)
limit: Max results (default 100, max 1000)
offset: Pagination offset
Returns:
Result[list[AuditEntry], AuditError]: Matching audit entries
Note:
- Used for compliance reports and forensics
- NOT for application logic (use domain events instead)
- Results ordered by timestamp DESC (newest first)
"""
...
3.2 AuditAction Enum¶
# src/domain/enums/audit_action.py
from enum import Enum
class AuditAction(str, Enum):
"""Audit action types (extensible via enum).
Organized by category for clarity. Add new actions as needed
without database schema changes (metadata stores action-specific data).
Categories:
- Authentication (USER_*)
- Authorization (ACCESS_*)
- Data Operations (DATA_*)
- Administrative (ADMIN_*)
- Provider (PROVIDER_*)
"""
# Authentication events (PCI-DSS required)
USER_LOGIN = "user_login"
USER_LOGOUT = "user_logout"
USER_LOGIN_FAILED = "user_login_failed"
USER_REGISTERED = "user_registered"
USER_PASSWORD_CHANGED = "user_password_changed"
USER_PASSWORD_RESET_REQUESTED = "user_password_reset_requested"
USER_PASSWORD_RESET_COMPLETED = "user_password_reset_completed"
USER_EMAIL_VERIFIED = "user_email_verified"
USER_MFA_ENABLED = "user_mfa_enabled"
USER_MFA_DISABLED = "user_mfa_disabled"
# Authorization events (SOC 2 required)
ACCESS_GRANTED = "access_granted"
ACCESS_DENIED = "access_denied"
PERMISSION_CHANGED = "permission_changed"
ROLE_ASSIGNED = "role_assigned"
ROLE_REVOKED = "role_revoked"
# Data access events (GDPR required)
DATA_VIEWED = "data_viewed"
DATA_EXPORTED = "data_exported"
DATA_DELETED = "data_deleted"
DATA_MODIFIED = "data_modified"
# Administrative events (SOC 2 required)
ADMIN_USER_CREATED = "admin_user_created"
ADMIN_USER_DELETED = "admin_user_deleted"
ADMIN_USER_SUSPENDED = "admin_user_suspended"
ADMIN_CONFIG_CHANGED = "admin_config_changed"
ADMIN_BACKUP_CREATED = "admin_backup_created"
# Provider events (PCI-DSS required - cardholder data access)
PROVIDER_CONNECTED = "provider_connected"
PROVIDER_DISCONNECTED = "provider_disconnected"
PROVIDER_TOKEN_REFRESHED = "provider_token_refreshed"
PROVIDER_TOKEN_REFRESH_FAILED = "provider_token_refresh_failed"
PROVIDER_DATA_SYNCED = "provider_data_synced"
PROVIDER_ACCOUNT_VIEWED = "provider_account_viewed" # PCI-DSS
PROVIDER_TRANSACTION_VIEWED = "provider_transaction_viewed" # PCI-DSS
3.3 AuditError Class¶
File: src/domain/errors/audit_error.py
from dataclasses import dataclass
from src.core.errors import DomainError
@dataclass(frozen=True, slots=True, kw_only=True)
class AuditError(DomainError):
"""Audit system failure.
Used when audit trail recording fails (database error, connection loss).
Attributes:
code: ErrorCode enum (AUDIT_RECORD_FAILED, AUDIT_QUERY_FAILED).
message: Human-readable message.
details: Additional context.
"""
pass
Error Codes: Added to src/core/enums/error_code.py:
AUDIT_RECORD_FAILED- Failed to record audit entryAUDIT_QUERY_FAILED- Failed to query audit trail
3.4 Required Context Fields¶
MANDATORY fields in every audit entry:
id(UUID): Unique identifier (primary key)action(AuditAction): What happened (enum)timestamp(datetime): When it happened (UTC, immutable)resource_type(str): What was affected (user, account, provider)
REQUIRED for specific actions:
user_id(UUID): Who did it (required except system actions)ip_address(str): Where from (required for auth events)user_agent(str): Client info (required for auth events)
OPTIONAL but recommended:
resource_id(UUID): Specific resource identifiermetadata(JSONB): Action-specific context (extensible)
4. Infrastructure Layer - Database Adapters¶
4.1 PostgreSQL Adapter (Production)¶
File: src/infrastructure/audit/postgres_audit_adapter.py
Immutability Strategy: PostgreSQL RULES block UPDATE/DELETE
# Database migration creates immutable table
CREATE RULE audit_logs_no_update AS
ON UPDATE TO audit_logs
DO INSTEAD NOTHING;
CREATE RULE audit_logs_no_delete AS
ON DELETE TO audit_logs
DO INSTEAD NOTHING;
Adapter Implementation:
from datetime import datetime, UTC
from uuid import UUID
from uuid_extensions import uuid7
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, and_
from sqlalchemy.dialects.postgresql import insert
from src.domain.protocols.audit_protocol import AuditProtocol, AuditError
from src.domain.enums import AuditAction
from src.infrastructure.persistence.models.audit import AuditLogModel
from src.core.result import Result, Success, Failure
class PostgresAuditAdapter:
"""PostgreSQL audit trail adapter with immutable records.
Immutability enforced by database RULES (cannot UPDATE or DELETE).
Uses JSONB for extensible metadata storage.
Args:
session: SQLAlchemy async session (injected)
"""
def __init__(self, session: AsyncSession):
self.session = session
async def record(
self,
*,
action: AuditAction,
user_id: UUID | None = None,
resource_type: str,
resource_id: UUID | None = None,
ip_address: str | None = None,
user_agent: str | None = None,
metadata: dict[str, Any] | None = None,
) -> Result[None, AuditError]:
"""Record immutable audit entry in PostgreSQL."""
try:
audit_log = AuditLogModel(
id=uuid7(),
action=action.value,
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id,
ip_address=ip_address,
user_agent=user_agent,
metadata=metadata or {},
timestamp=datetime.now(UTC),
)
self.session.add(audit_log)
await self.session.flush()
return Success(None)
except Exception as e:
return Failure(AuditError(
code="AUDIT_RECORD_FAILED",
message=f"Failed to record audit entry: {str(e)}",
))
async def query(
self,
*,
user_id: UUID | None = None,
action: AuditAction | None = None,
resource_type: str | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
limit: int = 100,
offset: int = 0,
) -> Result[list[dict[str, Any]], AuditError]:
"""Query audit trail with filters."""
try:
# Build query with filters
query = select(AuditLogModel)
conditions = []
if user_id:
conditions.append(AuditLogModel.user_id == user_id)
if action:
conditions.append(AuditLogModel.action == action.value)
if resource_type:
conditions.append(AuditLogModel.resource_type == resource_type)
if start_date:
conditions.append(AuditLogModel.timestamp >= start_date)
if end_date:
conditions.append(AuditLogModel.timestamp <= end_date)
if conditions:
query = query.where(and_(*conditions))
# Order by timestamp DESC, apply pagination
query = query.order_by(AuditLogModel.timestamp.desc())
query = query.limit(min(limit, 1000)).offset(offset)
result = await self.session.execute(query)
records = result.scalars().all()
# Convert to dict for protocol compliance
return Success([
{
"id": str(r.id),
"action": r.action,
"user_id": str(r.user_id) if r.user_id else None,
"resource_type": r.resource_type,
"resource_id": str(r.resource_id) if r.resource_id else None,
"ip_address": r.ip_address,
"user_agent": r.user_agent,
"metadata": r.metadata,
"timestamp": r.timestamp.isoformat(),
}
for r in records
])
except Exception as e:
return Failure(AuditError(
code="AUDIT_QUERY_FAILED",
message=f"Failed to query audit trail: {str(e)}",
))
4.2 Database Model (SQLModel)¶
# src/infrastructure/persistence/models/audit.py
from datetime import datetime
from uuid import UUID
from sqlmodel import SQLModel, Field, Column, JSON
from sqlalchemy import Index
class AuditLogModel(SQLModel, table=True):
"""Audit trail table - IMMUTABLE (enforced by database rules).
CRITICAL: This table is append-only. Records CANNOT be modified or
deleted due to PostgreSQL RULES in migration.
Compliance:
- PCI-DSS: 7+ year retention
- SOC 2: Security event tracking
- GDPR: Personal data access tracking
"""
__tablename__ = "audit_logs"
# Primary key
id: UUID = Field(primary_key=True)
# What happened
action: str = Field(index=True, max_length=100)
# Who did it
user_id: UUID | None = Field(default=None, index=True, foreign_key="users.id")
# What was affected
resource_type: str = Field(index=True, max_length=100)
resource_id: UUID | None = Field(default=None, index=True)
# Where and how
ip_address: str | None = Field(default=None, max_length=45) # IPv6 max
user_agent: str | None = Field(default=None, max_length=500)
# Extensible metadata (JSONB in PostgreSQL)
metadata: dict = Field(default_factory=dict, sa_column=Column(JSON))
# When (immutable, set by database)
timestamp: datetime = Field(index=True)
__table_args__ = (
# Composite indexes for common queries
Index("idx_audit_user_action", "user_id", "action"),
Index("idx_audit_resource", "resource_type", "resource_id"),
Index("idx_audit_timestamp", "timestamp"),
)
4.3 Alembic Migration (Immutability Enforcement)¶
# alembic/versions/XXXX_create_audit_logs_table.py
"""Create immutable audit_logs table
Revision ID: XXXX
Revises: YYYY
Create Date: 2025-11-14
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision = 'XXXX'
down_revision = 'YYYY'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create audit_logs table
op.create_table(
'audit_logs',
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column('action', sa.String(100), nullable=False, index=True),
sa.Column('user_id', postgresql.UUID(as_uuid=True), nullable=True, index=True),
sa.Column('resource_type', sa.String(100), nullable=False, index=True),
sa.Column('resource_id', postgresql.UUID(as_uuid=True), nullable=True, index=True),
sa.Column('ip_address', sa.String(45), nullable=True),
sa.Column('user_agent', sa.String(500), nullable=True),
sa.Column('metadata', postgresql.JSONB, nullable=False, server_default='{}'),
sa.Column('timestamp', sa.DateTime(timezone=True), nullable=False, index=True),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ondelete='SET NULL'),
)
# Create composite indexes
op.create_index('idx_audit_user_action', 'audit_logs', ['user_id', 'action'])
op.create_index('idx_audit_resource', 'audit_logs', ['resource_type', 'resource_id'])
# CRITICAL: Enforce immutability with PostgreSQL RULES
op.execute("""
CREATE RULE audit_logs_no_update AS
ON UPDATE TO audit_logs
DO INSTEAD NOTHING;
""")
op.execute("""
CREATE RULE audit_logs_no_delete AS
ON DELETE TO audit_logs
DO INSTEAD NOTHING;
""")
def downgrade() -> None:
# Drop rules first
op.execute("DROP RULE IF EXISTS audit_logs_no_delete ON audit_logs;")
op.execute("DROP RULE IF EXISTS audit_logs_no_update ON audit_logs;")
# Drop table
op.drop_table('audit_logs')
4.4 Alternative: MySQL Adapter (Optional)¶
Immutability Strategy: MySQL TRIGGERS block UPDATE/DELETE
-- MySQL migration
DELIMITER $$
CREATE TRIGGER audit_logs_no_update
BEFORE UPDATE ON audit_logs
FOR EACH ROW
BEGIN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Audit logs are immutable - updates not allowed';
END$$
CREATE TRIGGER audit_logs_no_delete
BEFORE DELETE ON audit_logs
FOR EACH ROW
BEGIN
SIGNAL SQLSTATE '45000'
SET MESSAGE_TEXT = 'Audit logs are immutable - deletes not allowed';
END$$
DELIMITER ;
Adapter (MySQLAuditAdapter) would be similar to Postgres adapter but
use MySQL-specific dialect.
5. Core Layer - Container Integration¶
5.1 Container Pattern¶
File: src/core/container.py (add to existing file)
# ============================================================================
# Audit Dependencies (Application-Scoped)
# ============================================================================
@lru_cache()
def get_audit() -> "AuditProtocol":
"""Get audit trail singleton (app-scoped).
Container owns adapter selection based on DATABASE_TYPE.
This follows the Composition Root pattern (industry best practice).
Returns correct adapter based on database in use:
- 'postgresql': PostgresAuditAdapter (immutability via RULES)
- 'mysql': MySQLAuditAdapter (immutability via TRIGGERS)
- 'sqlite': SQLiteAuditAdapter (testing only)
Returns:
Audit trail implementing AuditProtocol.
Usage:
# Application Layer
audit = get_audit()
await audit.record(action=AuditAction.USER_LOGIN, ...)
# Presentation Layer (FastAPI)
audit: AuditProtocol = Depends(get_audit)
"""
from src.domain.protocols.audit_protocol import AuditProtocol
# Determine database type from settings
db_type = settings.database_url.split("://")[0]
if db_type.startswith("postgresql"):
from src.infrastructure.audit.postgres_audit_adapter import PostgresAuditAdapter
# Note: In real usage, get session from request scope
# This is simplified for illustration
return PostgresAuditAdapter(session=get_database().session)
elif db_type.startswith("mysql"):
from src.infrastructure.audit.mysql_audit_adapter import MySQLAuditAdapter
return MySQLAuditAdapter(session=get_database().session)
else:
raise ValueError(f"Unsupported database type for audit: {db_type}")
def clear_container_cache() -> None:
"""Clear all container caches (testing utility)."""
get_cache.cache_clear()
get_secrets.cache_clear()
get_database.cache_clear()
get_logger.cache_clear()
get_audit.cache_clear() # Add audit to cache clearing
5.2 Session Architecture: Audit Durability¶
5.2.1 Principle: Independent Audit Sessions¶
CRITICAL REQUIREMENT: Audit logs MUST persist regardless of request outcome to meet PCI-DSS and SOC 2 requirements.
Why this matters:
- Failed authentication attempts MUST be logged (security monitoring)
- Access denied events MUST be logged (compliance requirement)
- Invalid operations MUST be logged (forensics)
- Transaction rollbacks MUST NOT lose audit logs
5.2.2 Architecture: Separate Sessions¶
Problem with shared sessions:
┌─────────────────────────────────────────┐
│ FastAPI Request │
├─────────────────────────────────────────┤
│ Session (begin) │
│ ├─> Business logic │
│ ├─> Audit log (flush) ← Audit here │
│ ├─> More business logic │
│ └─> Commit or Rollback ← Lost here │
└─────────────────────────────────────────┘
Issue: If request fails, audit logs are lost with transaction rollback.
Solution: Independent audit session:
┌─────────────────────────────────────────┐
│ FastAPI Request │
├─────────────────────────────────────────┤
│ Business Session (begin) │
│ ├─> Business logic │
│ ├─> More business logic │
│ └─> Commit or Rollback │
│ │
│ Audit Session (separate) │
│ ├─> Audit log │
│ └─> Commit (immediate) ← Durable │
└─────────────────────────────────────────┘
Benefits:
- ✅ Audit logs persist even when business transaction fails
- ✅ Failed operations are audited (compliance requirement)
- ✅ Clear separation of concerns
- ✅ No risk of losing security-critical audit data
5.2.3 Implementation Pattern¶
Container with two session factories:
# src/core/container.py
@lru_cache()
def get_session() -> AsyncGenerator[AsyncSession, None]:
"""Get database session (request-scoped, no auto-commit).
Used for business logic. Managed by FastAPI request lifecycle.
Commits on success, rolls back on exception.
"""
async with Database().get_session() as session:
yield session
@lru_cache()
def get_audit_session() -> AsyncGenerator[AsyncSession, None]:
"""Get audit session (auto-commits, independent lifecycle).
Used ONLY for audit logging. Commits immediately after each
audit record to ensure durability regardless of request outcome.
Separate from business session to prevent audit logs from being
lost when business transactions roll back.
Yields:
AsyncSession: Session that commits after each audit operation.
"""
async with Database().get_session() as session:
# Session commits are managed by adapter
yield session
def get_audit(
audit_session: AsyncSession = Depends(get_audit_session)
) -> AuditProtocol:
"""Get audit adapter (uses separate audit session).
Args:
audit_session: Independent session from get_audit_session().
Returns:
Audit adapter that commits immediately (durable).
"""
return PostgresAuditAdapter(session=audit_session)
Adapter commits immediately:
# src/infrastructure/audit/postgres_adapter.py
class PostgresAuditAdapter:
async def record(...) -> Result[None, AuditError]:
try:
audit_log = AuditLogModel(...)
self.session.add(audit_log)
await self.session.commit() # ✅ Durable (separate session)
return Success(value=None)
except SQLAlchemyError as e:
await self.session.rollback() # Rollback audit session only
return Failure(AuditError(...))
5.2.4 Example Scenarios¶
Scenario 1: Failed Authentication:
@router.post("/auth/login")
async def login(
data: LoginRequest,
audit: AuditProtocol = Depends(get_audit), # Separate session
business_session: AsyncSession = Depends(get_session),
):
# Record audit FIRST (separate session, commits immediately)
await audit.record(
action=AuditAction.USER_LOGIN_FAILED,
resource_type="session",
ip_address=request.client.host,
)
# Business logic fails
user = await business_session.execute(...)
raise HTTPException(401, "Invalid credentials")
# Result: Audit log persists ✅, business transaction rolled back ✅
Scenario 2: Database Constraint Violation:
async def create_provider(
data: ProviderData,
audit: AuditProtocol = Depends(get_audit),
session: AsyncSession = Depends(get_session),
):
# Record audit
await audit.record(
action=AuditAction.PROVIDER_CONNECTED,
user_id=user_id,
resource_type="provider",
)
# Business logic fails (duplicate provider)
session.add(Provider(...))
await session.commit() # Raises IntegrityError
# Result: Audit log persists ✅, provider not created ✅
5.2.5 Testing Durability¶
# tests/integration/test_audit_durability.py
async def test_audit_persists_when_request_fails(
business_session,
audit_session,
):
"""Verify audit logs persist even when business transaction fails."""
# Record audit in separate session
audit = PostgresAuditAdapter(session=audit_session)
result = await audit.record(
action=AuditAction.USER_LOGIN,
resource_type="session",
user_id=user_id,
)
assert isinstance(result, Success)
# Simulate business transaction failure
business_session.add(User(email="duplicate")) # Unique constraint
with pytest.raises(IntegrityError):
await business_session.commit()
# Verify audit log still exists
logs = await audit_session.execute(
select(AuditLogModel).where(AuditLogModel.user_id == user_id)
)
assert len(logs.scalars().all()) == 1 # ✅ Audit persisted
5.2.6 Compliance Impact¶
PCI-DSS Requirement 10: Track and monitor all access to cardholder data.
- 10.2.4: Invalid logical access attempts MUST be logged
- 10.2.5: Authentication mechanisms MUST be logged
SOC 2 CC6.1: Access controls restrict access to authorized users.
- Access denied events MUST be logged for security monitoring
GDPR Article 30: Records of processing activities.
- Personal data access MUST be logged even if operation fails
With separate sessions: ✅ All compliance requirements met.
6. Application Layer Integration¶
6.1 Using Audit in Services¶
# src/application/services/auth_service.py
from src.domain.protocols.audit_protocol import AuditProtocol
from src.domain.enums import AuditAction
class AuthService:
def __init__(self, audit: AuditProtocol):
self.audit = audit
async def login_user(
self,
email: str,
password: str,
ip_address: str,
user_agent: str,
) -> Result[User, AuthError]:
"""Authenticate user and record audit trail."""
# Attempt authentication
result = await self.user_repo.find_by_email(email)
match result:
case Success(user):
if verify_password(password, user.password_hash):
# Success - record audit
await self.audit.record(
action=AuditAction.USER_LOGIN,
user_id=user.id,
resource_type="session",
ip_address=ip_address,
user_agent=user_agent,
metadata={"method": "password"},
)
return Success(user)
else:
# Failed login - record audit (security event)
await self.audit.record(
action=AuditAction.USER_LOGIN_FAILED,
user_id=user.id,
resource_type="session",
ip_address=ip_address,
user_agent=user_agent,
metadata={"reason": "invalid_password"},
)
return Failure(AuthError("Invalid credentials"))
case Failure(_):
# User not found - record audit (security event)
await self.audit.record(
action=AuditAction.USER_LOGIN_FAILED,
user_id=None, # Unknown user
resource_type="session",
ip_address=ip_address,
user_agent=user_agent,
metadata={"reason": "user_not_found", "email": email},
)
return Failure(AuthError("Invalid credentials"))
6.2 Integration with Domain Events¶
# src/application/events/handlers/audit_event_handler.py
from src.domain.events.user_password_changed import UserPasswordChanged
from src.domain.protocols.audit_protocol import AuditProtocol
from src.domain.enums import AuditAction
from src.domain.enums.audit_action import AuditAction
class AuditEventHandler:
"""Domain event handler that creates audit records.
Listens to domain events and records them in audit trail.
Decouples audit from business logic.
"""
def __init__(self, audit: AuditProtocol):
self.audit = audit
async def on_password_changed(self, event: UserPasswordChanged) -> None:
"""Record password change in audit trail."""
await self.audit.record(
action=AuditAction.USER_PASSWORD_CHANGED,
user_id=event.user_id,
resource_type="user",
resource_id=event.user_id,
metadata={
"initiated_by": event.initiated_by, # user or admin
"event_id": str(event.event_id),
},
)
6.3 FastAPI Endpoint Example¶
# src/presentation/api/v1/auth.py
from fastapi import APIRouter, Depends, Request
from src.domain.protocols.audit_protocol import AuditProtocol
from src.core.container import get_audit
router = APIRouter()
@router.post("/login")
async def login(
data: LoginRequest,
request: Request,
audit: AuditProtocol = Depends(get_audit),
):
"""User login with automatic audit trail."""
# Extract context
ip_address = request.client.host
user_agent = request.headers.get("User-Agent", "unknown")
# Authenticate (service records audit internally)
result = await auth_service.login_user(
email=data.email,
password=data.password,
ip_address=ip_address,
user_agent=user_agent,
)
match result:
case Success(user):
return {"access_token": create_token(user)}
case Failure(error):
raise HTTPException(401, detail=error.message)
7. Testing Strategy¶
7.1 Unit Tests (Domain/Application)¶
# tests/unit/test_domain_audit_protocol.py
from unittest.mock import AsyncMock
from src.domain.protocols.audit_protocol import AuditProtocol
from src.domain.enums import AuditAction
from src.domain.enums.audit_action import AuditAction
async def test_audit_protocol_mocking():
"""Test that AuditProtocol can be easily mocked."""
# Create mock audit
mock_audit = AsyncMock(spec=AuditProtocol)
# Use in service
service = AuthService(audit=mock_audit)
# Verify audit.record was called
mock_audit.record.assert_called_once_with(
action=AuditAction.USER_LOGIN,
user_id=user_id,
resource_type="session",
ip_address="192.168.1.1",
user_agent="Mozilla/5.0",
metadata={"method": "password"},
)
7.2 Integration Tests (Infrastructure)¶
# tests/integration/test_audit_postgres_adapter.py
import pytest
from datetime import datetime, UTC
from src.infrastructure.audit.postgres_audit_adapter import PostgresAuditAdapter
from src.domain.enums import AuditAction
@pytest.mark.integration
async def test_audit_record_creates_immutable_entry(db_session):
"""Test audit adapter creates immutable records."""
adapter = PostgresAuditAdapter(session=db_session)
# Record audit entry
result = await adapter.record(
action=AuditAction.USER_LOGIN,
user_id=user_id,
resource_type="session",
ip_address="192.168.1.1",
user_agent="Test Agent",
metadata={"test": True},
)
assert result.is_success()
# Verify record exists
query_result = await adapter.query(user_id=user_id, limit=1)
assert len(query_result.value) == 1
entry = query_result.value[0]
assert entry["action"] == "user_login"
assert entry["user_id"] == str(user_id)
@pytest.mark.integration
async def test_audit_immutability_enforced(db_session):
"""Test that audit records CANNOT be updated or deleted."""
adapter = PostgresAuditAdapter(session=db_session)
# Create entry
result = await adapter.record(
action=AuditAction.USER_LOGIN,
user_id=user_id,
resource_type="session",
)
# Attempt UPDATE (should be blocked by RULE)
# This test verifies database-level immutability
with pytest.raises(Exception): # PostgreSQL blocks with RULE
await db_session.execute(
"UPDATE audit_logs SET action = 'hacked' WHERE user_id = :user_id",
{"user_id": user_id},
)
# Attempt DELETE (should be blocked by RULE)
with pytest.raises(Exception): # PostgreSQL blocks with RULE
await db_session.execute(
"DELETE FROM audit_logs WHERE user_id = :user_id",
{"user_id": user_id},
)
7.3 Test Coverage Target¶
- Unit tests: 90%+ coverage (protocol mocking, enum validation)
- Integration tests: 85%+ coverage (database operations, immutability)
- Total coverage: 85%+ for audit infrastructure
8. Compliance & Security¶
8.1 PCI-DSS Compliance Checklist¶
- ✅ All cardholder data access logged (PROVIDER_ACCOUNT_VIEWED)
- ✅ All authentication attempts logged (USER_LOGIN, USER_LOGIN_FAILED)
- ✅ All administrative actions logged (ADMIN_*)
- ✅ 7+ year retention configured
- ✅ Immutable records (cannot be tampered with)
- ✅ Quarterly audit log review process (manual)
8.2 SOC 2 Compliance Checklist¶
- ✅ Security-relevant events logged
- ✅ Who/what/when/where tracking (user_id, action, timestamp, ip_address)
- ✅ Tamper-proof audit trail (immutable)
- ✅ Access controls (only auditors query audit trail)
- ✅ Regular audit log reviews
8.3 GDPR Compliance Checklist¶
- ✅ Personal data access logged (DATA_VIEWED, DATA_EXPORTED)
- ✅ Data deletion logged (DATA_DELETED)
- ✅ Consent changes logged (metadata)
- ✅ Data breach notification tracking (ADMIN actions)
- ✅ User can request audit trail of their data
8.4 Retention Policy¶
# Retention configuration
AUDIT_RETENTION_YEARS = 7 # Minimum for PCI-DSS
# Database partitioning (PostgreSQL)
# Partition by year for efficient retention management
CREATE TABLE audit_logs_2025 PARTITION OF audit_logs
FOR VALUES FROM ('2025-01-01') TO ('2026-01-01');
# Retention job (runs annually)
# Archive old partitions to cold storage, keep for 7+ years
9. Performance Optimization¶
9.1 Indexing Strategy¶
Indexes on audit_logs table:
timestamp(DESC) - Most queries filter by date rangeuser_id,action- Composite index for user activity reportsresource_type,resource_id- Composite index for resource auditsaction- Compliance reports by event type
9.2 Table Partitioning¶
Partition by year (PostgreSQL):
-- Create partitioned table
CREATE TABLE audit_logs (
...
) PARTITION BY RANGE (timestamp);
-- Create partition per year
CREATE TABLE audit_logs_2025 PARTITION OF audit_logs
FOR VALUES FROM ('2025-01-01') TO ('2026-01-01');
CREATE TABLE audit_logs_2026 PARTITION OF audit_logs
FOR VALUES FROM ('2026-01-01') TO ('2027-01-01');
Benefits:
- Query performance (scan only relevant partition)
- Easy archival (detach old partition, archive to S3)
- Retention management (keep 7+ years, archive older)
9.3 Async Operations¶
All audit operations are async (non-blocking):
# Don't wait for audit to complete
await audit.record(...) # Returns immediately (async)
# Business logic continues
return {"status": "success"}
9.4 Query Limits¶
- Default limit: 100 records
- Maximum limit: 1000 records (prevent DoS)
- Pagination: Use
limit/offsetfor large result sets
10. Operational Procedures¶
10.1 Audit Log Review (Quarterly)¶
Process:
- Export audit logs for quarter:
# Generate compliance report
result = await audit.query(
start_date=quarter_start,
end_date=quarter_end,
limit=10000,
)
- Review for anomalies:
- Unusual login patterns
- Failed auth attempts
- Admin actions
-
Data exports
-
Document findings in compliance report
10.2 Incident Response¶
When security incident occurs:
- Query audit trail for affected time range
- Identify all affected users/resources
- Preserve audit records (immutable, no risk of tampering)
- Generate incident report from audit data
# Incident investigation
result = await audit.query(
start_date=incident_start,
end_date=incident_end,
resource_type="account",
limit=1000,
)
10.3 Data Retention¶
Archival process (after 7+ years):
- Detach old partition
- Export to cold storage (S3 Glacier)
- Keep for legal requirement (varies by jurisdiction)
- Eventually delete after legal retention expires
11. Migration Path¶
11.1 Initial Migration¶
11.2 Adding New AuditActions¶
NO database migration needed - just add to enum:
# src/domain/enums/audit_action.py
class AuditAction(str, Enum):
# ... existing actions ...
# NEW: Just add to enum
USER_PROFILE_UPDATED = "user_profile_updated"
Action-specific metadata goes in JSONB:
await audit.record(
action=AuditAction.USER_PROFILE_UPDATED,
user_id=user_id,
resource_type="user",
resource_id=user_id,
metadata={
"fields_changed": ["email", "phone"],
"old_email": "old@example.com",
"new_email": "new@example.com",
},
)
12. Future Enhancements¶
12.1 Cryptographic Integrity¶
Future: Add cryptographic chain for tamper detection
# Each audit entry includes hash of previous entry
# Detect if database is tampered with at OS/filesystem level
previous_hash = sha256(previous_entry)
current_entry.previous_hash = previous_hash
12.2 Compliance Dashboards¶
Future: Real-time compliance dashboards
- Failed login attempts (last 24 hours)
- Admin actions (last week)
- Data exports (by user)
- PCI-DSS compliance score
12.3 Automated Anomaly Detection¶
Future: ML-based anomaly detection
- Unusual login patterns
- Geographic anomalies
- Bulk data exports
- Time-based anomalies (logins at 3 AM)
Created: 2025-11-14 | Last Updated: 2026-01-10