Domain Events Usage Guide¶
Complete guide for using domain events in Dashtam, including when to use them, how to define and publish events, and common anti-patterns to avoid.
Architecture Reference: docs/architecture/domain-events.md (event registry, 3-state pattern)
When to Use Domain Events¶
Pragmatic DDD Approach: Use domain events only for critical workflows that have side effects requiring coordination across multiple systems.
✅ Use Domain Events For¶
Critical workflows with multiple side effects:
- User Registration → Send welcome email + Create audit trail + Initialize user settings
- Password Change → Revoke all sessions + Send notification email + Audit security event
- Provider Connection → Fetch initial data + Create audit trail + Send confirmation
- Token Refresh Failure → Alert user + Audit failure + Mark provider for re-auth
Characteristics of event-worthy workflows:
- Multiple handlers need to react to the same event
- Side effects are independent (email doesn't depend on audit)
- Fail-open acceptable (email failure shouldn't break registration)
- Eventual consistency is acceptable (audit may lag by milliseconds)
❌ Don't Use Domain Events For¶
Simple operations without side effects:
- Read operations (view account, view transactions) → No side effects
- Single-step CRUD (update user profile) → Direct database call sufficient
- Synchronous validation (check email exists) → Use direct method call
- Immediate response required (calculate balance) → Not async-friendly
Rule of thumb: If you can't think of 2+ independent handlers, don't use events.
Event Naming Conventions¶
CRITICAL: All events must use past tense (what happened, not what will happen).
✅ Correct Naming¶
# Good: Past tense, describes what happened
UserRegistrationSucceeded
UserPasswordChangeSucceeded
ProviderConnectionFailed
TokenRefreshAttempted
❌ Incorrect Naming¶
# Bad: Imperative/present tense
RegisterUser # Command, not event
UserRegistering # Present progressive
ChangePassword # Imperative
ConnectProvider # Command
Naming Pattern¶
Format: {Entity}{Action}{Outcome}
- Entity: What the event is about (User, Provider, Token)
- Action: What happened (Registration, PasswordChange, Connection)
- Outcome: Result state (Attempted, Succeeded, Failed)
Examples:
UserRegistrationAttempted- User attempted to registerUserRegistrationSucceeded- User successfully registeredUserRegistrationFailed- User registration failed
Defining New Events¶
Step 1: Create Event Dataclass¶
Events are frozen dataclasses inheriting from DomainEvent.
from dataclasses import dataclass
from uuid import UUID
from src.domain.events.base_event import DomainEvent
@dataclass(frozen=True, kw_only=True)
class AccountCreated(DomainEvent):
"""Account was successfully created for a provider.
Published after account data is persisted to database.
Args:
user_id: UUID of user who owns the account.
provider_id: UUID of provider this account belongs to.
account_id: UUID of created account.
account_type: Type of account (checking, savings, investment).
balance: Current account balance (for display).
Handlers:
- LoggingEventHandler: Logs account creation
- AuditEventHandler: Creates audit record
- NotificationEventHandler: Sends new account notification
Example:
>>> event = AccountCreated(
... user_id=user_id,
... provider_id=provider_id,
... account_id=account_id,
... account_type="checking",
... balance=1000.00
... )
>>> await event_bus.publish(event)
"""
user_id: UUID
provider_id: UUID
account_id: UUID
account_type: str
balance: float
Key points:
frozen=True- Events are immutable (cannot be modified after creation)kw_only=True- Forces keyword arguments (prevents positional arg errors)- Inherit from
DomainEvent- Auto-generatesevent_idandoccurred_at - Comprehensive docstring - Document handlers and usage
Step 2: Add to Events Module¶
Add new event to src/domain/events/auth_events.py (or create new module):
# src/domain/events/account_events.py
"""Account-related domain events."""
from dataclasses import dataclass
from uuid import UUID
from src.domain.events.base_event import DomainEvent
@dataclass(frozen=True, kw_only=True)
class AccountCreated(DomainEvent):
# ... (event definition)
@dataclass(frozen=True, kw_only=True)
class AccountUpdated(DomainEvent):
# ... (event definition)
@dataclass(frozen=True, kw_only=True)
class AccountClosed(DomainEvent):
# ... (event definition)
Step 3: Export from __init__.py¶
# src/domain/events/__init__.py
from src.domain.events.account_events import (
AccountCreated,
AccountUpdated,
AccountClosed,
)
from src.domain.events.auth_events import (
UserRegistrationSucceeded,
# ... other events
)
__all__ = [
"AccountCreated",
"AccountUpdated",
"AccountClosed",
"UserRegistrationSucceeded",
# ... other events
]
Publishing Events¶
Session Requirement (CRITICAL)¶
As of F0.9.3: Events triggering AuditEventHandler require an explicit database session.
# ✅ CORRECT - Pass session to event_bus.publish()
await event_bus.publish(
UserRegistrationSucceeded(user_id=user.id, email=user.email),
session=session, # Required for audit events
)
# ❌ WRONG - Missing session parameter
await event_bus.publish(UserRegistrationSucceeded(...))
# RuntimeError: AuditEventHandler requires a database session
Quick fix: Add session=session parameter to all event_bus.publish() calls for audit events.
Which events? All authentication, provider, and state-change events (check if AuditEventHandler is subscribed).
From Command Handlers (Application Layer)¶
Recommended pattern: Publish events from command handlers after business logic succeeds.
from src.core.container import get_event_bus
from src.domain.events.auth_events import UserRegistrationSucceeded
class RegisterUserHandler:
"""Command handler for user registration."""
def __init__(
self,
user_repo: UserRepository,
event_bus: EventBusProtocol,
database: DatabaseProtocol,
):
self._users = user_repo
self._event_bus = event_bus
self._database = database
async def handle(self, cmd: RegisterUser) -> Result[UUID, Error]:
"""Register new user and publish event."""
async with self._database.get_session() as session:
# 1. Business logic
user = User.create(email=cmd.email, password=cmd.password)
# 2. Persist to database
await self._users.save(user)
# 3. Publish event AFTER successful save WITH session
await self._event_bus.publish(
UserRegistrationSucceeded(
user_id=user.id,
email=user.email
),
session=session, # Required for audit
)
return Success(user.id)
Key points:
- Publish AFTER database commit (don't publish if save fails)
- Use dependency injection for event bus
- Events are fire-and-forget (don't await handler results)
From Domain Services¶
class PasswordResetService:
"""Domain service for password reset workflow."""
def __init__(
self,
user_repo: UserRepository,
event_bus: EventBusProtocol,
database: DatabaseProtocol,
):
self._users = user_repo
self._event_bus = event_bus
self._database = database
async def reset_password(
self,
user_id: UUID,
new_password: str,
) -> Result[None, Error]:
"""Reset user password and publish event."""
async with self._database.get_session() as session:
# 1. Fetch user
user = await self._users.find_by_id(user_id)
if not user:
return Failure(UserNotFound())
# 2. Change password (domain logic)
user.change_password(new_password)
# 3. Persist
await self._users.save(user)
# 4. Publish event WITH session
await self._event_bus.publish(
UserPasswordChangeSucceeded(
user_id=user.id,
initiated_by="admin"
),
session=session, # Required for audit
)
return Success(None)
Creating Event Handlers¶
Event handlers react to events and perform side effects (logging, audit, email, etc.).
Step 1: Create Handler Class¶
# src/infrastructure/events/handlers/notification_event_handler.py
"""Notification event handler for user notifications."""
from src.domain.events.account_events import AccountCreated
from src.domain.protocols.logger_protocol import LoggerProtocol
class NotificationEventHandler:
"""Sends user notifications for account events.
STUB IMPLEMENTATION: Currently logs notifications.
Future: Integrate with notification service (push, SMS, email).
Attributes:
_logger: Logger for structured logging.
"""
def __init__(self, logger: LoggerProtocol) -> None:
"""Initialize handler with logger.
Args:
logger: Logger protocol for structured logging.
"""
self._logger = logger
async def handle_account_created(self, event: AccountCreated) -> None:
"""Send notification after account creation.
Args:
event: AccountCreated event with account details.
"""
self._logger.info(
"notification_would_be_sent",
notification_type="account_created",
user_id=str(event.user_id),
account_id=str(event.account_id),
account_type=event.account_type,
message=f"Your {event.account_type} account has been created!",
# Future: Send actual push notification
)
Step 2: Wire Handler in Container¶
# src/core/container.py
from src.infrastructure.events.handlers.notification_event_handler import NotificationEventHandler
@lru_cache()
def get_event_bus() -> InMemoryEventBus:
"""Get event bus with all handlers subscribed."""
event_bus = InMemoryEventBus(logger=get_logger())
# ... existing handlers ...
# Notification handler (stub)
notification_handler = NotificationEventHandler(logger=get_logger())
event_bus.subscribe(AccountCreated, notification_handler.handle_account_created)
return event_bus
Handler Best Practices¶
✅ DO:
- Keep handlers focused (single responsibility)
- Handlers should be idempotent (safe to call multiple times)
- Use fail-open design (don't crash event bus)
- Log handler errors for debugging
- Make handlers async (non-blocking)
❌ DON'T:
- Don't call other handlers directly (use events)
- Don't depend on handler execution order (concurrent)
- Don't throw unhandled exceptions (caught by event bus)
- Don't perform long-running synchronous operations
Integration with Audit and Logging¶
Automatic Audit Trail¶
All events with AuditEventHandler subscribed are automatically audited:
# Event published
await event_bus.publish(
UserPasswordChangeSucceeded(
user_id=user_id,
initiated_by="user"
)
)
# Audit handler automatically creates record:
# - action: USER_PASSWORD_CHANGED
# - user_id: <user_id>
# - resource_type: "user"
# - context: {"initiated_by": "user", "method": "self_service"}
# - timestamp: <now>
Automatic Structured Logging¶
All events with LoggingEventHandler subscribed are automatically logged:
# Event published
await event_bus.publish(
ProviderConnectionSucceeded(
user_id=user_id,
provider_id=provider_id,
provider_name="schwab"
)
)
# Logging handler automatically logs (JSON):
# {
# "event": "provider_connection_succeeded",
# "level": "info",
# "user_id": "<user_id>",
# "provider_id": "<provider_id>",
# "provider_name": "schwab",
# "timestamp": "2025-11-18T15:00:00Z"
# }
Testing Event-Driven Code¶
Unit Testing Event Publication¶
# tests/unit/test_register_user_handler.py
from unittest.mock import AsyncMock
import pytest
from src.application.commands.register_user import RegisterUserHandler
from src.domain.events.auth_events import UserRegistrationSucceeded
@pytest.mark.unit
@pytest.mark.asyncio
async def test_register_user_publishes_event():
"""Test RegisterUserHandler publishes UserRegistrationSucceeded."""
# Arrange
mock_user_repo = AsyncMock()
mock_event_bus = AsyncMock()
handler = RegisterUserHandler(
user_repo=mock_user_repo,
event_bus=mock_event_bus
)
# Act
result = await handler.handle(RegisterUser(
email="test@example.com",
password="password123"
))
# Assert
assert result.is_success()
mock_event_bus.publish.assert_called_once()
# Verify event type
event = mock_event_bus.publish.call_args[0][0]
assert isinstance(event, UserRegistrationSucceeded)
assert event.email == "test@example.com"
Integration Testing Event Flow¶
# tests/integration/test_user_registration_flow.py
import pytest
from src.core.container import get_event_bus
@pytest.mark.integration
@pytest.mark.asyncio
async def test_user_registration_creates_audit_record(test_database):
"""Test UserRegistrationSucceeded creates audit record."""
# Arrange
event_bus = get_event_bus()
# Act - Pass session for audit handler
async with test_database.get_session() as session:
await event_bus.publish(
UserRegistrationSucceeded(
user_id=uuid7(),
email="test@example.com",
verification_token="test-token",
),
session=session, # Required for audit events
)
# Assert - Check audit record created
async with test_database.get_session() as session:
stmt = select(AuditLogModel).where(
AuditLogModel.action == AuditAction.USER_REGISTERED
)
result = await session.execute(stmt)
logs = result.scalars().all()
assert len(logs) >= 1 # At least one audit record
Troubleshooting¶
Handler Not Executing¶
Problem: Event published but handler doesn't execute.
Solutions:
-
Check subscription: Verify handler subscribed in
get_event_bus() -
Check event type: Verify exact event type matches (case-sensitive)
-
Check handler signature: Must be async, accept event parameter
Handler Failure Breaking Event Bus¶
Problem: One handler crashes and event bus stops processing.
Solution: Event bus uses fail-open design. Handler failures are logged but don't break other handlers.
Check logs for handler failures:
{
"event": "event_handler_failed",
"level": "warning",
"event_type": "UserRegistrationSucceeded",
"handler_name": "handle_user_registration_succeeded",
"error_type": "ValueError",
"error_message": "Invalid email format"
}
Audit Records Not Created (Integration Tests)¶
Known Issue: Audit handler may fail in integration tests with "Event loop is closed".
Cause: Audit handler creates new database sessions inside event handlers, causing timing issues during test cleanup.
Solution: Pass session to event_bus.publish(event, session=session). This is now the standard pattern - see Session Requirement section above.
Verification: Check logs for PendingRollbackError:
{
"event": "event_handler_failed",
"error_type": "PendingRollbackError",
"error_message": "Event loop is closed"
}
Anti-Patterns¶
❌ Anti-Pattern 1: Events for Everything¶
Bad:
# Don't create events for simple CRUD operations
await event_bus.publish(UserProfileViewed(user_id=user_id))
await event_bus.publish(AccountBalanceFetched(account_id=account_id))
await event_bus.publish(TransactionListed(user_id=user_id))
Why: Events add complexity. Use only when you need multiple handlers or decoupling.
Good:
# Simple operations don't need events
balance = await account_repo.get_balance(account_id)
transactions = await transaction_repo.list(user_id)
❌ Anti-Pattern 2: Synchronous Handler Dependencies¶
Bad:
async def handle_user_registered(self, event: UserRegistrationSucceeded):
# ❌ Waiting for email to send before audit
await self.send_welcome_email(event.email)
await self.create_audit_record(event.user_id)
Why: Handlers should be independent. Email failure blocks audit.
Good:
# ✅ Each handler operates independently
async def handle_user_registered(self, event: UserRegistrationSucceeded):
await self.send_welcome_email(event.email) # Separate handler
async def handle_user_registered_audit(self, event: UserRegistrationSucceeded):
await self.create_audit_record(event.user_id) # Separate handler
❌ Anti-Pattern 3: Imperative Event Names¶
Bad:
@dataclass(frozen=True, kw_only=True)
class SendWelcomeEmail(DomainEvent): # ❌ Command, not event
email: str
@dataclass(frozen=True, kw_only=True)
class CreateAuditLog(DomainEvent): # ❌ Command, not event
action: str
Why: Events describe what happened, not what should happen.
Good:
@dataclass(frozen=True, kw_only=True)
class UserRegistrationSucceeded(DomainEvent): # ✅ Past tense
user_id: UUID
email: str
❌ Anti-Pattern 4: Mutable Events¶
Bad:
@dataclass(kw_only=True) # ❌ Missing frozen=True
class UserRegistered(DomainEvent):
user_id: UUID
email: str
# Event can be modified (bad!)
event.email = "changed@example.com"
Why: Events are historical facts and should never change.
Good:
@dataclass(frozen=True, kw_only=True) # ✅ Immutable
class UserRegistrationSucceeded(DomainEvent):
user_id: UUID
email: str
# Attempting to modify raises error
event.email = "changed@example.com" # FrozenInstanceError
❌ Anti-Pattern 5: Publishing Before Commit¶
Bad:
async def register_user(self, email: str) -> Result[UUID, Error]:
user = User.create(email=email)
# ❌ Publishing before database commit
await self._event_bus.publish(UserRegistrationSucceeded(
user_id=user.id,
email=user.email
))
await self._users.save(user) # What if this fails?
return Success(user.id)
Why: If save fails, handlers already executed (email sent, audit created) for non-existent user.
Good:
async def register_user(self, email: str) -> Result[UUID, Error]:
user = User.create(email=email)
# ✅ Save first
await self._users.save(user)
# ✅ Publish after successful save
await self._event_bus.publish(UserRegistrationSucceeded(
user_id=user.id,
email=user.email
))
return Success(user.id)
Summary¶
Key Takeaways:
- ✅ Use events only for critical workflows with multiple side effects
- ✅ Name events in past tense (what happened)
- ✅ Make events immutable (frozen dataclasses)
- ✅ Publish events after database commit
- ✅ Keep handlers independent (no dependencies between handlers)
- ✅ Test events with unit tests (mocks) and integration tests (real DB)
- ✅ Use fail-open design (handler failures don't break event bus)
Next Steps:
- Review Domain Events Architecture for technical details
- Check existing events in
src/domain/events/for examples - See
~/starter/development-checklist.mdSection 23b for verification steps (project file)
Created: 2025-11-18 | Last Updated: 2026-01-10