Database Usage Guide¶
Practical how-to patterns for working with the database layer in Dashtam.
Architecture Reference: docs/architecture/database.md
Quick Start¶
Get a Database Session¶
from src.core.container import get_db_session
# In FastAPI endpoint (recommended)
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
@router.post("/users")
async def create_user(
data: UserCreate,
session: AsyncSession = Depends(get_db_session)
):
# Session automatically commits on success, rolls back on error
user_repo = UserRepository(session)
user = await user_repo.save(user_entity)
return UserResponse.from_entity(user)
Direct Session Access (Application Layer)¶
from src.core.container import get_database
db = get_database()
async with db.get_session() as session:
# Use session
result = await session.execute(text("SELECT 1"))
Session Management¶
Automatic Transaction Management¶
The get_session() context manager handles transactions automatically:
async with db.get_session() as session:
# Operations here
user_repo = UserRepository(session)
await user_repo.save(user)
# Commits automatically on successful exit
# If exception occurs:
# - Rolls back automatically
# - Re-raises the exception
Explicit Transaction Control¶
For operations spanning multiple repositories:
async with db.transaction() as session:
user_repo = UserRepository(session)
account_repo = AccountRepository(session)
await user_repo.save(user)
await account_repo.save(account)
# Both commit together or both roll back
Creating Models¶
Mutable Model (can be updated)¶
# src/infrastructure/persistence/models/user.py
from sqlalchemy.orm import Mapped, mapped_column
from src.infrastructure.persistence.base import BaseMutableModel
class UserModel(BaseMutableModel):
"""User database model.
Inherits from BaseMutableModel:
- id: UUID (auto-generated)
- created_at: datetime (auto-set)
- updated_at: datetime (auto-updated)
"""
__tablename__ = "users"
email: Mapped[str] = mapped_column(unique=True, index=True)
password_hash: Mapped[str]
is_active: Mapped[bool] = mapped_column(default=True)
Immutable Model (audit logs, events)¶
# src/infrastructure/persistence/models/audit_log.py
from sqlalchemy.orm import Mapped, mapped_column
from src.infrastructure.persistence.base import BaseModel
class AuditLogModel(BaseModel):
"""Audit log - immutable (no updated_at).
Inherits from BaseModel:
- id: UUID (auto-generated)
- created_at: datetime (auto-set)
"""
__tablename__ = "audit_logs"
action: Mapped[str]
user_id: Mapped[UUID]
resource_type: Mapped[str]
# No updated_at - audit logs are immutable
Repository Pattern¶
Creating a Repository¶
# src/infrastructure/persistence/repositories/user_repository.py
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from src.domain.entities.user import User
from src.infrastructure.persistence.models.user import UserModel
class UserRepository:
"""Repository for User persistence.
Maps between domain entities and database models.
NO business logic - just persistence operations.
"""
def __init__(self, session: AsyncSession):
self._session = session
async def save(self, user: User) -> User:
"""Save user entity to database."""
model = UserModel.from_entity(user)
self._session.add(model)
await self._session.flush() # Get generated ID
return model.to_entity()
async def find_by_id(self, user_id: UUID) -> User | None:
"""Find user by ID."""
model = await self._session.get(UserModel, user_id)
return model.to_entity() if model else None
async def find_by_email(self, email: str) -> User | None:
"""Find user by email."""
stmt = select(UserModel).where(UserModel.email == email)
result = await self._session.execute(stmt)
model = result.scalar_one_or_none()
return model.to_entity() if model else None
async def delete(self, user: User) -> None:
"""Delete user from database."""
model = await self._session.get(UserModel, user.id)
if model:
await self._session.delete(model)
Entity-Model Mapping¶
# In UserModel class
def to_entity(self) -> User:
"""Convert database model to domain entity."""
return User(
id=self.id,
email=self.email,
password_hash=self.password_hash,
is_active=self.is_active,
created_at=self.created_at,
updated_at=self.updated_at,
)
@classmethod
def from_entity(cls, user: User) -> "UserModel":
"""Create database model from domain entity."""
return cls(
id=user.id,
email=user.email,
password_hash=user.password_hash,
is_active=user.is_active,
)
Alembic Migrations¶
Create a New Migration¶
# Create migration with auto-generated changes
make migrate-create NAME="add_users_table"
# Or in Docker:
docker compose -f compose/docker-compose.dev.yml exec app \
alembic revision --autogenerate -m "add_users_table"
Apply Migrations¶
make migrate-up
# Or in Docker:
docker compose -f compose/docker-compose.dev.yml exec app \
alembic upgrade head
Rollback Migration¶
make migrate-down
# Or in Docker:
docker compose -f compose/docker-compose.dev.yml exec app \
alembic downgrade -1
View Migration History¶
Testing¶
Integration Tests (Recommended)¶
# tests/integration/test_user_repository.py
import pytest
import pytest_asyncio
from src.core.config import settings
from src.infrastructure.persistence.database import Database
@pytest.mark.integration
class TestUserRepository:
@pytest_asyncio.fixture
async def db(self):
"""Create test database connection."""
database = Database(
database_url=settings.database_url,
echo=settings.db_echo
)
yield database
await database.close()
@pytest.mark.asyncio
async def test_save_and_find_user(self, db):
"""Test saving and retrieving a user."""
async with db.get_session() as session:
repo = UserRepository(session)
user = User(
id=uuid7(),
email="test@example.com",
password_hash="hashed",
)
saved = await repo.save(user)
assert saved.id == user.id
found = await repo.find_by_email("test@example.com")
assert found is not None
assert found.email == "test@example.com"
Mocking for Unit Tests¶
# tests/unit/test_user_service.py
from unittest.mock import AsyncMock, Mock
def test_register_user():
"""Unit test with mocked repository."""
# Mock repository
mock_repo = Mock()
mock_repo.save = AsyncMock(return_value=user)
mock_repo.find_by_email = AsyncMock(return_value=None)
service = UserService(user_repo=mock_repo)
result = await service.register(email="test@example.com")
assert result.is_success()
mock_repo.save.assert_called_once()
Common Patterns¶
Pattern 1: Handler with Repository¶
# src/application/commands/register_user.py
from src.domain.protocols.user_repository import UserRepository
class RegisterUserHandler:
def __init__(self, user_repo: UserRepository):
self._user_repo = user_repo
async def handle(self, cmd: RegisterUser) -> Result[UUID, Error]:
# Check if email exists
existing = await self._user_repo.find_by_email(cmd.email)
if existing:
return Failure(ValidationError("Email already registered"))
# Create and save user
user = User.create(email=cmd.email, password_hash=cmd.hashed_password)
await self._user_repo.save(user)
return Success(user.id)
Pattern 2: Query with Pagination¶
async def find_all_paginated(
self,
page: int = 1,
page_size: int = 20,
) -> tuple[list[User], int]:
"""Find all users with pagination."""
# Count total
count_stmt = select(func.count()).select_from(UserModel)
total = (await self._session.execute(count_stmt)).scalar() or 0
# Get page
offset = (page - 1) * page_size
stmt = select(UserModel).offset(offset).limit(page_size)
result = await self._session.execute(stmt)
users = [model.to_entity() for model in result.scalars()]
return users, total
Pattern 3: Soft Delete¶
# In model
is_deleted: Mapped[bool] = mapped_column(default=False)
deleted_at: Mapped[datetime | None] = mapped_column(nullable=True)
# In repository
async def soft_delete(self, user_id: UUID) -> None:
"""Mark user as deleted (soft delete)."""
stmt = (
update(UserModel)
.where(UserModel.id == user_id)
.values(is_deleted=True, deleted_at=func.now())
)
await self._session.execute(stmt)
Troubleshooting¶
Connection Issues¶
Symptoms: ConnectionRefusedError or timeout errors
Solutions:
- Check database container is running:
docker ps - Verify
DATABASE_URLin.envfile - Check database logs:
docker compose logs postgres
Migration Conflicts¶
Symptoms: Can't locate revision or head mismatch
Solutions:
# Check current state
alembic current
# Stamp to specific revision
alembic stamp head
# Generate new migration from current state
alembic revision --autogenerate -m "sync_schema"
Session Already Closed¶
Symptoms: Session is closed error
Solutions:
- Ensure using
async with db.get_session()pattern - Don't store session references beyond context manager scope
- Use
expire_on_commit=False(already configured)
See Also:
docs/architecture/database.md- Full architecture detailsdocs/architecture/dependency-injection.md- Container patterns
Created: 2025-12-05 | Last Updated: 2026-01-10