Dependency Injection Architecture¶
Overview¶
This document defines the centralized dependency injection strategy for Dashtam, combining application-scoped singletons (custom container) with request-scoped dependencies (FastAPI Depends). This ensures consistent, maintainable, and testable dependency management across all architectural layers.
Related: See Import Guidelines for cross-layer import rules.
0. Protocol-First Pattern¶
Core Principle: All cross-layer dependencies use protocols (interfaces), not concrete implementations.
Why Protocol-First?¶
- Clean Architecture: Application layer depends only on domain protocols, not infrastructure
- Testability: Easy to mock protocols in unit tests without real database/cache
- Flexibility: Swap implementations (PostgreSQL → MongoDB) without changing handlers
- No Circular Imports: Protocols in domain, implementations in infrastructure
Protocol Locations¶
All protocols live in src/domain/protocols/ (29 protocols):
Repository Protocols (12):
user_repository.py- User persistenceemail_verification_token_repository.py- Email token persistencerefresh_token_repository.py- Refresh token persistencepassword_reset_token_repository.py- Password reset token persistencesession_repository.py- Session persistencesecurity_config_repository.py- Security configurationprovider_connection_repository.py- Provider connectionsprovider_repository.py- Provider metadataaccount_repository.py- Account persistencetransaction_repository.py- Transaction persistencebalance_snapshot_repository.py- Balance snapshot persistenceholding_repository.py- Holding persistence
Service Protocols (17):
cache_protocol.py- Cache operations (Redis)session_cache_protocol.py- Session cachingprovider_connection_cache_protocol.py- Provider connection cachingpassword_hashing_protocol.py- Password hashing (bcrypt)token_generation_protocol.py- JWT generationrefresh_token_service_protocol.py- Refresh token operationspassword_reset_token_service_protocol.py- Password reset tokensevent_bus_protocol.py- Domain event publishingaudit_protocol.py- Audit trail recordinglogger_protocol.py- Structured loggingsecrets_protocol.py- Secrets managementemail_protocol.py- Email sendingemail_service_protocol.py- Email service operationsrate_limit_protocol.py- Rate limitingauthorization_protocol.py- RBAC authorizationprovider_protocol.py- Financial provider operationssession_enricher_protocol.py- Session metadata enrichment
Container Pattern¶
Container returns protocol types, creates concrete implementations:
# ✅ CORRECT: Return protocol type, create implementation
def get_user_repository(session: AsyncSession) -> UserRepository:
from src.infrastructure.persistence.repositories.user_repository import (
UserRepository as UserRepositoryImpl
)
return UserRepositoryImpl(session=session)
Handler Pattern¶
Handlers depend on protocols via constructor injection:
class RegisterUserHandler:
def __init__(
self,
user_repo: UserRepository, # Protocol from domain
password_hasher: PasswordHashingProtocol, # Protocol from domain
event_bus: EventBusProtocol, # Protocol from domain
):
self._user_repo = user_repo
self._password_hasher = password_hasher
self._event_bus = event_bus
1. Two-Tier Dependency Injection¶
1.1 Application-Scoped (Container)¶
Purpose: Singletons that live for the entire application lifetime.
Location: src/core/container/ (modular structure)
Pattern: Functions decorated with @lru_cache() return same instance
Use Cases:
- Cache clients (Redis connection pool)
- Secrets managers (env/AWS adapters)
- Configuration (Settings singleton)
- Event bus (in-memory or external)
- Logger instances
Benefits:
- Efficient resource usage (connection pooling)
- Consistent state across requests
- Easy to mock in tests
- Clear dependency graph
1.2 Request-Scoped (FastAPI Depends)¶
Purpose: Dependencies created fresh per HTTP request.
Location: Dependency functions in src/core/container/ modules
Pattern: Generator functions with yield (cleanup after request)
Use Cases:
- Database sessions (new transaction per request)
- Current user (authentication per request)
- Request context (headers, query params)
- Command/Query handlers (stateless per request)
Note on Auth Dependencies: Authentication dependencies (get_current_user, require_role)
use FastAPI-idiomatic HTTPException for auth failures instead of Result types. This is acceptable
because auth is an HTTP concern, not business logic. See docs/architecture/error-handling.md
"FastAPI-Idiomatic Exceptions" section.
Benefits:
- Automatic cleanup (FastAPI calls finally block)
- Request isolation (no state leakage)
- Transaction boundaries clear
- Testable with TestClient
2. Container Implementation¶
2.1 Modular Container Structure¶
The container is organized into modules by domain responsibility:
src/core/container/
├── __init__.py # Re-exports all (single entry point preserved)
├── infrastructure.py # Core services: cache, db, secrets, logging, etc.
├── events.py # Event bus with all 27 subscriptions
├── repositories.py # All repository factories
├── handler_factory.py # Auto-wired CQRS handler injection (38 handlers)
├── providers.py # Financial provider adapter factory
└── authorization.py # Casbin RBAC enforcer
Key Benefits:
- Maintainability: Smaller, focused files (~200-500 lines each vs 2300+ monolith)
- Team Development: Multiple developers can work on different modules
- Faster Navigation: Find dependencies by domain area
- Backward Compatible: All imports still work via
__init__.pyre-exports
2.2 Module Responsibilities¶
infrastructure.py (~460 lines) - Application-scoped singletons:
get_cache()- Redis connection poolget_secrets()- Env/AWS secrets adapterget_encryption_service()- AES-256-GCM encryptionget_database()- PostgreSQL connection poolget_db_session()- Request-scoped sessionget_audit_session()- Separate audit sessionget_audit()- Audit trail adapterget_password_service()- Bcrypt hashingget_token_service()- JWT generationget_email_service()- Email adapterget_rate_limit()- Token bucket rate limiterget_logger()- Console/CloudWatch loggerget_jobs_monitor()- Background jobs queue monitor (dashtam-jobs)
events.py (~230 lines) - Domain event infrastructure:
get_event_bus()- In-memory event bus with 27 subscriptions- Registers LoggingEventHandler, AuditEventHandler, EmailEventHandler, SessionEventHandler
repositories.py (~230 lines) - Repository factories:
get_user_repository()get_provider_connection_repository()get_provider_repository()get_account_repository()get_transaction_repository()
auth_handlers.py (~530 lines) - Authentication handlers:
- Registration, login, logout handlers
- Token refresh, email verification
- Password reset (request and confirm)
- Session management (list, get, revoke)
- Token rotation (global and per-user)
provider_handlers.py (~190 lines) - Provider handlers:
- Connect/disconnect provider
- Refresh provider tokens
- Get/list provider connections
data_handlers.py (~390 lines) - Account/transaction handlers:
- Account queries (get, list by connection, list by user)
- Transaction queries (get, list, date range, security)
- Sync handlers (accounts, transactions)
handler_factory.py (~200 lines) - Auto-wired handler injection:
handler_factory()- Generic handler factory using type hintsSESSION_SERVICE_TYPES- Session-scoped services (e.g.,OwnershipVerifier)- Supports all 38+ CQRS handlers without manual factory functions
providers.py (~80 lines) - Provider factory:
get_provider(slug)- Returns Schwab (future: Chase, Fidelity)
authorization.py (~160 lines) - Casbin RBAC:
init_enforcer()- Initialize at startupget_enforcer()- Get singletonget_authorization()- Request-scoped adapter
2.3 Usage (Unchanged)¶
All existing imports continue to work:
# Single import point preserved
from src.core.container import get_cache, get_db_session, get_register_user_handler
# Or import from specific module for clarity
from src.core.container.infrastructure import get_cache
from src.core.container.auth_handlers import get_register_user_handler
3. Usage Patterns by Layer¶
3.1 Domain Layer¶
Rule: Domain should NOT depend on container (pure domain logic).
# src/domain/entities/user.py
# Domain entities are pure - NO container imports
@dataclass(slots=True, kw_only=True)
class User:
id: UUID
email: str
# Pure business logic only
3.2 Application Layer¶
Rule: Handlers receive dependencies via constructor injection. The container factories assemble handlers with their dependencies - handlers never import container directly.
# src/application/commands/handlers/register_user_handler.py
# NO container imports - only domain protocols!
from src.domain.protocols import UserRepository, PasswordHashingProtocol
from src.domain.protocols.event_bus_protocol import EventBusProtocol
class RegisterUserHandler:
"""Handler receives all dependencies via constructor."""
def __init__(
self,
user_repo: UserRepository, # ← Injected protocol
password_service: PasswordHashingProtocol, # ← Injected protocol
event_bus: EventBusProtocol, # ← Injected protocol
) -> None:
self._user_repo = user_repo
self._password_service = password_service
self._event_bus = event_bus
async def handle(self, cmd: RegisterUser) -> Result[UUID, str]:
# Use injected dependencies - no container knowledge
password_hash = self._password_service.hash_password(cmd.password)
user = User(email=cmd.email, password_hash=password_hash, ...)
await self._user_repo.save(user)
await self._event_bus.publish(UserRegistrationSucceeded(...))
return Success(value=user.id)
Container factory assembles the handler (src/core/container/auth_handlers.py):
async def get_register_user_handler(
session: AsyncSession = Depends(get_db_session),
) -> RegisterUserHandler:
"""Container factory wires dependencies."""
from src.infrastructure.persistence.repositories import UserRepository
return RegisterUserHandler(
user_repo=UserRepository(session=session),
password_service=get_password_service(), # App-scoped singleton
event_bus=get_event_bus(), # App-scoped singleton
)
3.3 Infrastructure Layer¶
Rule: Adapters can use container for dependencies, but avoid circular imports.
# src/infrastructure/providers/schwab_provider.py
from src.core.container import get_secrets
class SchwabProvider:
def __init__(self):
# Infrastructure can use container
secrets = get_secrets()
self.api_key = secrets.get_secret("schwab/api_key")
3.4 Presentation Layer¶
Rule: Use FastAPI Depends() for ALL dependencies.
# src/presentation/api/v1/users.py
from fastapi import Depends
@router.post("/users", status_code=201)
async def create_user(
data: UserCreate,
session: AsyncSession = Depends(get_db_session), # ← Request-scoped
cache: CacheProtocol = Depends(get_cache), # ← App-scoped
handler: RegisterUserHandler = Depends(get_register_user_handler), # ← Request-scoped
) -> UserResponse:
"""Create new user.
FastAPI automatically injects dependencies:
- get_db_session(): New session per request
- get_cache(): Shared cache singleton
- get_register_user_handler(): New handler per request
"""
result = await handler.handle(
RegisterUser(email=data.email, password=data.password),
session
)
match result:
case Success(user_id):
return UserResponse(id=user_id, email=data.email)
case Failure(error):
raise HTTPException(400, detail=error.message)
4. Testing Strategy¶
4.1 Unit Tests - Direct Dependency Injection¶
Since handlers use constructor injection, unit testing is straightforward - create mock dependencies and pass them directly:
# tests/unit/test_application_register_user_handler.py
from unittest.mock import AsyncMock, Mock
import pytest
@pytest.mark.asyncio
async def test_register_user_handler_success():
"""Test handler with mock dependencies - NO patching needed."""
# Create mock dependencies directly
mock_user_repo = Mock()
mock_user_repo.find_by_email = AsyncMock(return_value=None) # No existing user
mock_user_repo.save = AsyncMock()
mock_password_service = Mock()
mock_password_service.hash_password.return_value = "hashed_password"
mock_event_bus = Mock()
mock_event_bus.publish = AsyncMock()
# Instantiate handler with mocks
handler = RegisterUserHandler(
user_repo=mock_user_repo,
password_service=mock_password_service,
event_bus=mock_event_bus,
)
# Test the handler
result = await handler.handle(RegisterUser(email="test@example.com", password="secure"))
assert isinstance(result, Success)
mock_user_repo.save.assert_called_once()
mock_event_bus.publish.assert_called() # Events emitted
Benefits of constructor injection testing:
- No
patch()context managers needed - Test isolation - each test has fresh mocks
- Clear dependency graph visible in test setup
- IDE autocomplete works on mock setup
4.2 Integration Tests - Real Dependencies¶
# tests/integration/test_cache_redis.py
from src.core.container import get_cache
async def test_cache_integration():
"""Test with real Redis (integration test)."""
cache = get_cache() # Real Redis adapter
result = await cache.set("test:key", "value")
assert result is Success
result = await cache.get("test:key")
assert result.value == "value"
4.3 API Tests - FastAPI TestClient¶
# tests/api/test_users_endpoints.py
from fastapi.testclient import TestClient
def test_create_user_endpoint(client: TestClient):
"""Test endpoint with real dependencies."""
# TestClient uses real container (test environment)
response = client.post("/api/v1/users", json={
"email": "test@example.com",
"password": "SecurePass123!"
})
assert response.status_code == 201
5. Dependency Lifecycle¶
5.1 Application Startup¶
# src/main.py
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan - initialize singletons."""
# Singletons created on first access (lazy initialization)
get_cache() # ← Creates Redis connection pool
get_secrets() # ← Creates secrets adapter
get_database() # ← Creates database connection pool
yield
# Cleanup
await close_cache()
await close_database()
app = FastAPI(lifespan=lifespan)
5.2 Request Lifecycle¶
sequenceDiagram
participant Client
participant FastAPI
participant Container
participant Handler
participant Database
Client->>FastAPI: POST /users
FastAPI->>Container: get_db_session()
Container->>Database: Create new session
Database-->>Container: session
Container-->>FastAPI: session
FastAPI->>Container: get_cache()
Container-->>FastAPI: cache singleton
FastAPI->>Container: get_register_user_handler()
Container-->>FastAPI: new handler
FastAPI->>Handler: handle(command, session)
Handler->>Database: Use session
Database-->>Handler: Result
Handler-->>FastAPI: Result
FastAPI->>Database: Commit & close session
FastAPI-->>Client: 201 Created
6. Benefits¶
6.1 Maintainability¶
- ✅ Single source of truth for all dependencies
- ✅ Easy to find where dependencies are created
- ✅ Clear lifecycle management (app vs request scope)
- ✅ No scattered factory functions across codebase
6.2 Testability¶
- ✅ Easy to mock entire container
- ✅ Easy to provide test implementations
- ✅ Clear dependency boundaries
- ✅ Isolated tests (no global state leakage)
6.3 Performance¶
- ✅ Connection pooling for expensive resources (Redis, Database)
- ✅ Singleton pattern reduces object creation
- ✅
@lru_cache()provides zero-cost singleton - ✅ Request-scoped sessions prevent connection leaks
6.4 Type Safety¶
- ✅ All dependencies return Protocol types
- ✅ IDE autocomplete works correctly
- ✅ Type checkers (mypy) can verify dependencies
- ✅ Refactoring is safe
7. Migration Guide¶
7.1 From Scattered Factories¶
Before (inconsistent):
# src/infrastructure/cache/__init__.py
def get_cache(): # ← Singleton in __init__.py
...
# src/infrastructure/secrets/factory.py
def create_secrets_manager(): # ← Factory function
...
# Direct instantiation
db = Database(settings.database_url) # ← No abstraction
After (centralized):
# src/core/container.py
@lru_cache()
def get_cache(): ...
@lru_cache()
def get_secrets(): ...
@lru_cache()
def get_database(): ...
7.2 Update Import Statements¶
Before (scattered across infrastructure):
# Old: Import from infrastructure modules
from src.infrastructure.cache import get_cache # Singleton in __init__.py
from src.infrastructure.secrets.env_adapter import EnvAdapter # Direct import
from src.infrastructure.persistence.database import Database # Direct instantiation
After (centralized in container):
# New: All dependencies from container
from src.core.container import get_cache, get_secrets, get_database
8. Anti-Patterns to Avoid¶
8.1 ❌ Direct Instantiation in Application Layer¶
# WRONG - Don't create dependencies directly
class RegisterUserHandler:
def __init__(self):
self.cache = RedisAdapter(...) # ❌ Tight coupling!
8.2 ❌ Importing Infrastructure in Domain¶
# WRONG - Domain should be pure
# src/domain/entities/user.py
from src.core.container import get_cache # ❌ Domain depends on infrastructure!
8.3 ❌ Using App-Scoped for Request-Specific Data¶
# WRONG - Don't use singleton for request data
@lru_cache()
def get_current_user(): # ❌ Current user should be request-scoped!
...
8.4 ❌ Using Request-Scoped for Expensive Resources¶
# WRONG - Don't create new connections per request
async def get_cache(): # ❌ No @lru_cache - creates new Redis connection per request!
return RedisAdapter(...)
9. Integration with Architecture¶
9.1 Hexagonal Architecture Compliance¶
- ✅ Domain Layer: No container imports (pure, only protocols)
- ✅ Application Layer: Handlers receive dependencies via constructor injection
- ✅ Infrastructure Layer: Implements protocols, can use container for assembly
- ✅ Presentation Layer: Uses FastAPI Depends (delegates to container factories)
9.2 CQRS Pattern¶
Handlers are protocol-dependent and container-agnostic:
# Command handler - depends only on protocols
class RegisterUserHandler:
def __init__(self, user_repo: UserRepository, event_bus: EventBusProtocol):
self._user_repo = user_repo
self._event_bus = event_bus
# Query handler with ownership verification - uses OwnershipVerifier service
class GetAccountHandler:
def __init__(self, ownership_verifier: OwnershipVerifier):
self._verifier = ownership_verifier
async def handle(self, query: GetAccount) -> Result[AccountResult, str]:
result = await self._verifier.verify_account_ownership(
query.account_id, query.user_id
)
if isinstance(result, Failure):
return Failure(error=error_map[result.error.code])
account = result.value
# ... map to DTO
# Presentation layer uses handler_factory for auto-wiring
from src.core.container.handler_factory import handler_factory
@router.get("/accounts/{account_id}")
async def get_account(
handler: GetAccountHandler = Depends(handler_factory(GetAccountHandler))
):
result = await handler.handle(query)
...
9.3 Session-Scoped Services¶
Some services need database session but aren't repositories. These are registered in handler_factory.py:
# src/core/container/handler_factory.py
SESSION_SERVICE_TYPES: set[type] = {
OwnershipVerifier, # Needs repos for ownership chain verification
}
def _get_session_service_instance(service_type: type, session: AsyncSession) -> Any:
"""Create session-scoped service instance."""
if service_type is OwnershipVerifier:
return OwnershipVerifier(
transaction_repo=TransactionRepository(session=session),
holding_repo=HoldingRepository(session=session),
account_repo=AccountRepository(session=session),
connection_repo=ProviderConnectionRepository(session=session),
)
raise ValueError(f"Unknown session service type: {service_type}")
Use OwnershipVerifier when: Query handlers need to verify ownership chains
(e.g., Account → Connection → User). This avoids DRY violations across handlers.
9.3 Repository Pattern¶
# Repositories receive session from presentation layer
class UserRepository:
def __init__(self, session: AsyncSession):
self.session = session # ← Injected, not created
# Presentation layer provides session
@router.get("/users/{id}")
async def get_user(
id: UUID,
session: AsyncSession = Depends(get_db_session)
):
repo = UserRepository(session)
user = await repo.find_by_id(id)
...
10. Background Jobs Integration¶
10.1 Overview¶
The API integrates with the dashtam-jobs background worker service via shared Redis.
This is a monitoring-only integration - no code dependency between services.
10.2 Configuration¶
| Setting | Default | Description |
|---|---|---|
JOBS_REDIS_URL |
Falls back to REDIS_URL |
Redis URL for jobs queue |
JOBS_QUEUE_NAME |
dashtam:jobs |
Queue name (must match dashtam-jobs) |
10.3 Architecture¶
┌───────────────┐ ┌───────────────┐
│ API │ │ dashtam-jobs │
│ JobsMonitor │───── same Redis ────│ TaskIQ │
│ │ queue name │ │
└───────────────┘ └───────────────┘
│ │
│ LLEN dashtam:jobs │ LPUSH/BRPOP dashtam:jobs
│ PING │ Store results
└─────────────┬───────────────────────┘
▼
┌───────────────┐
│ Redis │
│ (shared) │
└───────────────┘
10.4 Deployment Options¶
Option 1: Shared Redis (Development)¶
Both API cache and jobs queue use the same Redis instance:
# .env.dev
REDIS_URL=redis://redis:6379/0
# JOBS_REDIS_URL not set - falls back to REDIS_URL
JOBS_QUEUE_NAME=dashtam:jobs
Option 2: Separate Redis (Production)¶
Dedicated Redis instance for job queue isolation:
# docker-compose.yml - add second Redis service
redis-jobs:
image: redis:8.4-alpine
container_name: dashtam-redis-jobs
volumes:
- redis_jobs_data:/data
command: redis-server --appendonly yes
networks:
- dashtam-network
# .env.prod
REDIS_URL=redis://redis:6379/0 # Cache
JOBS_REDIS_URL=redis://redis-jobs:6379/0 # Jobs queue
JOBS_QUEUE_NAME=dashtam:jobs
10.5 JobsMonitor Usage¶
# Application layer - direct use
from src.core.container import get_jobs_monitor
monitor = get_jobs_monitor()
health = await monitor.check_health()
if health.value.healthy:
print(f"Queue length: {health.value.queue_length}")
# Presentation layer - FastAPI Depends
from fastapi import Depends
from src.core.container import get_jobs_monitor
@router.get("/admin/jobs")
async def get_jobs_status(
monitor: JobsMonitor = Depends(get_jobs_monitor)
):
result = await monitor.check_health()
...
10.6 Endpoints¶
| Endpoint | Auth | Purpose |
|----------|------|---------||
| GET /health/jobs | None | Load balancer health check (returns healthy/unhealthy) |
| GET /api/v1/admin/jobs | Admin | Detailed status (queue length, Redis connectivity, errors) |
Created: 2025-11-13 | Last Updated: 2026-01-19