infrastructure.events.in_memory_event_bus¶
src.infrastructure.events.in_memory_event_bus
¶
In-memory event bus implementation.
This module implements the EventBusProtocol using an in-memory dictionary-based registry. Suitable for MVP and single-server deployments. For distributed systems or high-volume production, swap to RabbitMQ/Kafka adapter.
Architecture
- Implements EventBusProtocol (hexagonal adapter pattern)
- Dictionary-based handler registry (event_type → list of handlers)
- Fail-open behavior (one handler failure doesn't break others)
- Concurrent handler execution (asyncio.gather)
- Comprehensive error logging for handler failures
Usage
Container creates singleton instance¶
@lru_cache() def get_event_bus() -> EventBusProtocol: ... return InMemoryEventBus(logger=get_logger())
Application layer uses protocol¶
event_bus = get_event_bus() event_bus.subscribe(UserRegistered, log_user_registered) await event_bus.publish(UserRegistered(user_id=uuid7(), email="test"))
Reference
- docs/architecture/domain-events-architecture.md (Lines 920-1067)
- docs/architecture/dependency-injection-architecture.md (Container pattern)
Attributes¶
Classes¶
InMemoryEventBus
¶
In-memory event bus with fail-open behavior.
Implements EventBusProtocol using dictionary-based handler registry. Executes handlers concurrently with asyncio.gather and fail-open error handling (one handler failure doesn't prevent other handlers from executing).
Thread Safety
- NOT thread-safe (single-server, single-threaded async design)
- For multi-threaded, use locks or separate adapter
- For distributed systems, use RabbitMQ/Kafka adapter
Performance
- O(1) handler lookup by event type
- Concurrent handler execution (asyncio.gather)
- Average overhead: <10ms for 4 handlers
Attributes:
| Name | Type | Description |
|---|---|---|
_handlers |
dict[type[DomainEvent], list[EventHandler]]
|
Dictionary mapping event types to list of async handlers. Key: Event class (e.g., UserRegistered) Value: List of async handler functions |
_logger |
Logger for handler failures and event publishing |
Example
Subscribe handlers¶
bus = InMemoryEventBus(logger=logger) bus.subscribe(UserRegistered, log_user_registered) bus.subscribe(UserRegistered, audit_user_registered) bus.subscribe(UserRegistered, send_welcome_email)
Publish event (all 3 handlers execute concurrently)¶
event = UserRegistered(user_id=uuid7(), email="test@example.com") await bus.publish(event)
If audit handler fails, logging and email handlers still execute¶
Design Decisions
- Fail-open: Handler failures logged but not propagated
- Concurrent: asyncio.gather for parallel handler execution
- No ordering: Handlers execute concurrently (order undefined)
- In-memory: Simple dictionary (no persistence, no distribution)
Source code in src/infrastructure/events/in_memory_event_bus.py
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 | |
Functions¶
__init__
¶
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
logger
|
LoggerProtocol
|
Logger for handler failures and event publishing. Used to log handler exceptions (warning level) and event publishing (debug level for non-ATTEMPT events). |
required |
Example
from src.core.container import get_logger logger = get_logger() bus = InMemoryEventBus(logger=logger)
Source code in src/infrastructure/events/in_memory_event_bus.py
subscribe
¶
Register event handler for specific event type.
Handlers registered for the same event type will ALL be called when that event is published. Handlers execute concurrently (no ordering).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_type
|
type[DomainEvent]
|
Class of event to handle (e.g., UserRegistered). Only exact type matches (no inheritance matching). |
required |
handler
|
EventHandler
|
Async function to call when event is published. Must accept single event parameter and return None. |
required |
Example
async def log_event(event: UserRegistered) -> None: ... logger.info("User registered", user_id=str(event.user_id))
bus.subscribe(UserRegistered, log_event)
Multiple handlers for same event¶
bus.subscribe(UserRegistered, audit_event) bus.subscribe(UserRegistered, send_email)
Notes
- Handlers execute concurrently (asyncio.gather)
- No duplicate detection (same handler can be registered twice)
- Handlers should be idempotent (may be called multiple times)
Source code in src/infrastructure/events/in_memory_event_bus.py
publish
async
¶
publish(
event: DomainEvent,
session: AsyncSession | None = None,
metadata: dict[str, str] | None = None,
) -> None
Publish event to all registered handlers.
Executes all handlers concurrently with fail-open behavior. Handler exceptions are logged but NOT propagated to publisher. If no handlers are registered, this is a no-op (not an error).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
DomainEvent
|
Domain event to publish (subclass of DomainEvent). All handlers registered for type(event) will be called. |
required |
session
|
AsyncSession | None
|
Optional database session for handlers that need database access (e.g., AuditEventHandler). Stored as instance variable for duration of publish() so handlers can access via get_session(). Defaults to None for backward compatibility. |
None
|
metadata
|
dict[str, str] | None
|
Optional dict with request context (ip_address, user_agent, trace_id) for audit trail enrichment. Stored as instance variable for duration of publish() so handlers can access via get_metadata(). Defaults to None for backward compatibility. |
None
|
Example
After successful business logic¶
user = User(email="test@example.com", ...) await user_repo.save(user) await session.commit() # ← COMMIT FIRST
Publish event (all handlers execute)¶
event = UserRegistered(user_id=user.id, email=user.email) await bus.publish(event)
If audit handler fails:¶
- Audit failure logged (warning level)¶
- Logging/email handlers still execute¶
- No exception raised to publisher¶
Flow
- Look up handlers for type(event)
- If no handlers, return immediately (no-op)
- Execute all handlers with asyncio.gather(return_exceptions=True)
- Log any handler exceptions (warning level)
- Return (never raise exceptions)
Performance
- Handler lookup: O(1)
- Handler execution: Concurrent (not sequential)
- Average latency: <10ms for 4 handlers (in-memory)
Notes
- No handlers = no-op (not an error)
- Handler failures logged with event_id for debugging
- NEVER raises exceptions (fail-open guarantee)
Source code in src/infrastructure/events/in_memory_event_bus.py
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 | |
get_session
¶
Get current database session for event handlers.
Returns the session passed to publish() for handlers that need database access (e.g., AuditEventHandler). This avoids handlers creating their own sessions, which can cause "Event loop is closed" errors in tests.
Returns:
| Type | Description |
|---|---|
AsyncSession | None
|
AsyncSession | None: Current session or None if not provided. |
Example
In AuditEventHandler¶
class AuditEventHandler: ... def init(self, event_bus: InMemoryEventBus, audit_adapter: PostgresAuditAdapter): ... self._event_bus = event_bus ... self._audit = audit_adapter ... ... async def handle_event(self, event: DomainEvent): ... session = self._event_bus.get_session() ... if session: ... # Use provided session ... audit = PostgresAuditAdapter(session=session) ... await audit.record(...)
Notes
- Only available during publish() execution
- Returns None if no session provided to publish()
- Cleared after publish() completes (finally block)
Source code in src/infrastructure/events/in_memory_event_bus.py
get_metadata
¶
Get current request metadata for event handlers.
Returns the metadata dict passed to publish() for handlers that need request context (e.g., AuditEventHandler for IP/user agent tracking). Enables PCI-DSS 10.2.7 compliance (track client IP and user agent).
Returns:
| Type | Description |
|---|---|
dict[str, str]
|
dict[str, str]: Current metadata dict (empty if not provided). Expected keys: - ip_address: Client IP address (str) - user_agent: Client user agent string (str) - trace_id: Request trace ID (str, optional) |
Example
In AuditEventHandler¶
class AuditEventHandler: ... def init(self, event_bus: InMemoryEventBus): ... self._event_bus = event_bus ... ... async def _create_audit_record(self, kwargs): ... metadata = self._event_bus.get_metadata() ... # Merge metadata into audit record ... if metadata: ... kwargs.setdefault('ip_address', metadata.get('ip_address')) ... kwargs.setdefault('user_agent', metadata.get('user_agent')) ... await audit.record(kwargs)
Notes
- Only available during publish() execution
- Returns empty dict if no metadata provided to publish()
- Cleared after publish() completes (finally block)
- Handlers should use setdefault() to avoid overwriting explicit values