Skip to content

infrastructure.events.in_memory_event_bus

src.infrastructure.events.in_memory_event_bus

In-memory event bus implementation.

This module implements the EventBusProtocol using an in-memory dictionary-based registry. Suitable for MVP and single-server deployments. For distributed systems or high-volume production, swap to RabbitMQ/Kafka adapter.

Architecture
  • Implements EventBusProtocol (hexagonal adapter pattern)
  • Dictionary-based handler registry (event_type → list of handlers)
  • Fail-open behavior (one handler failure doesn't break others)
  • Concurrent handler execution (asyncio.gather)
  • Comprehensive error logging for handler failures
Usage

Container creates singleton instance

@lru_cache() def get_event_bus() -> EventBusProtocol: ... return InMemoryEventBus(logger=get_logger())

Application layer uses protocol

event_bus = get_event_bus() event_bus.subscribe(UserRegistered, log_user_registered) await event_bus.publish(UserRegistered(user_id=uuid7(), email="test"))

Reference
  • docs/architecture/domain-events-architecture.md (Lines 920-1067)
  • docs/architecture/dependency-injection-architecture.md (Container pattern)

Attributes

Classes

InMemoryEventBus

In-memory event bus with fail-open behavior.

Implements EventBusProtocol using dictionary-based handler registry. Executes handlers concurrently with asyncio.gather and fail-open error handling (one handler failure doesn't prevent other handlers from executing).

Thread Safety
  • NOT thread-safe (single-server, single-threaded async design)
  • For multi-threaded, use locks or separate adapter
  • For distributed systems, use RabbitMQ/Kafka adapter
Performance
  • O(1) handler lookup by event type
  • Concurrent handler execution (asyncio.gather)
  • Average overhead: <10ms for 4 handlers

Attributes:

Name Type Description
_handlers dict[type[DomainEvent], list[EventHandler]]

Dictionary mapping event types to list of async handlers. Key: Event class (e.g., UserRegistered) Value: List of async handler functions

_logger

Logger for handler failures and event publishing

Example
Subscribe handlers

bus = InMemoryEventBus(logger=logger) bus.subscribe(UserRegistered, log_user_registered) bus.subscribe(UserRegistered, audit_user_registered) bus.subscribe(UserRegistered, send_welcome_email)

Publish event (all 3 handlers execute concurrently)

event = UserRegistered(user_id=uuid7(), email="test@example.com") await bus.publish(event)

If audit handler fails, logging and email handlers still execute
Design Decisions
  • Fail-open: Handler failures logged but not propagated
  • Concurrent: asyncio.gather for parallel handler execution
  • No ordering: Handlers execute concurrently (order undefined)
  • In-memory: Simple dictionary (no persistence, no distribution)
Source code in src/infrastructure/events/in_memory_event_bus.py
class InMemoryEventBus:
    """In-memory event bus with fail-open behavior.

    Implements EventBusProtocol using dictionary-based handler registry.
    Executes handlers concurrently with asyncio.gather and fail-open error
    handling (one handler failure doesn't prevent other handlers from executing).

    Thread Safety:
        - NOT thread-safe (single-server, single-threaded async design)
        - For multi-threaded, use locks or separate adapter
        - For distributed systems, use RabbitMQ/Kafka adapter

    Performance:
        - O(1) handler lookup by event type
        - Concurrent handler execution (asyncio.gather)
        - Average overhead: <10ms for 4 handlers

    Attributes:
        _handlers: Dictionary mapping event types to list of async handlers.
            Key: Event class (e.g., UserRegistered)
            Value: List of async handler functions
        _logger: Logger for handler failures and event publishing

    Example:
        >>> # Subscribe handlers
        >>> bus = InMemoryEventBus(logger=logger)
        >>> bus.subscribe(UserRegistered, log_user_registered)
        >>> bus.subscribe(UserRegistered, audit_user_registered)
        >>> bus.subscribe(UserRegistered, send_welcome_email)
        >>>
        >>> # Publish event (all 3 handlers execute concurrently)
        >>> event = UserRegistered(user_id=uuid7(), email="test@example.com")
        >>> await bus.publish(event)
        >>>
        >>> # If audit handler fails, logging and email handlers still execute

    Design Decisions:
        - **Fail-open**: Handler failures logged but not propagated
        - **Concurrent**: asyncio.gather for parallel handler execution
        - **No ordering**: Handlers execute concurrently (order undefined)
        - **In-memory**: Simple dictionary (no persistence, no distribution)
    """

    def __init__(self, logger: LoggerProtocol) -> None:
        """Initialize event bus with logger.

        Args:
            logger: Logger for handler failures and event publishing. Used to
                log handler exceptions (warning level) and event publishing
                (debug level for non-ATTEMPT events).

        Example:
            >>> from src.core.container import get_logger
            >>> logger = get_logger()
            >>> bus = InMemoryEventBus(logger=logger)
        """
        self._handlers: dict[type[DomainEvent], list[EventHandler]] = defaultdict(list)
        self._logger = logger
        self._session: "AsyncSession | None" = (
            None  # Session for handlers that need DB access
        )
        self._metadata: dict[
            str, str
        ] = {}  # Metadata for handlers (IP, user agent, etc.)

    def subscribe(
        self,
        event_type: type[DomainEvent],
        handler: EventHandler,
    ) -> None:
        """Register event handler for specific event type.

        Handlers registered for the same event type will ALL be called when
        that event is published. Handlers execute concurrently (no ordering).

        Args:
            event_type: Class of event to handle (e.g., UserRegistered).
                Only exact type matches (no inheritance matching).
            handler: Async function to call when event is published. Must
                accept single event parameter and return None.

        Example:
            >>> async def log_event(event: UserRegistered) -> None:
            ...     logger.info("User registered", user_id=str(event.user_id))
            >>>
            >>> bus.subscribe(UserRegistered, log_event)
            >>>
            >>> # Multiple handlers for same event
            >>> bus.subscribe(UserRegistered, audit_event)
            >>> bus.subscribe(UserRegistered, send_email)

        Notes:
            - Handlers execute concurrently (asyncio.gather)
            - No duplicate detection (same handler can be registered twice)
            - Handlers should be idempotent (may be called multiple times)
        """
        self._handlers[event_type].append(handler)

    async def publish(
        self,
        event: DomainEvent,
        session: "AsyncSession | None" = None,
        metadata: dict[str, str] | None = None,
    ) -> None:
        """Publish event to all registered handlers.

        Executes all handlers concurrently with fail-open behavior. Handler
        exceptions are logged but NOT propagated to publisher. If no handlers
        are registered, this is a no-op (not an error).

        Args:
            event: Domain event to publish (subclass of DomainEvent). All
                handlers registered for type(event) will be called.
            session: Optional database session for handlers that need database
                access (e.g., AuditEventHandler). Stored as instance variable
                for duration of publish() so handlers can access via get_session().
                Defaults to None for backward compatibility.
            metadata: Optional dict with request context (ip_address, user_agent,
                trace_id) for audit trail enrichment. Stored as instance variable
                for duration of publish() so handlers can access via get_metadata().
                Defaults to None for backward compatibility.

        Example:
            >>> # After successful business logic
            >>> user = User(email="test@example.com", ...)
            >>> await user_repo.save(user)
            >>> await session.commit()  # ← COMMIT FIRST
            >>>
            >>> # Publish event (all handlers execute)
            >>> event = UserRegistered(user_id=user.id, email=user.email)
            >>> await bus.publish(event)
            >>>
            >>> # If audit handler fails:
            >>> # - Audit failure logged (warning level)
            >>> # - Logging/email handlers still execute
            >>> # - No exception raised to publisher

        Flow:
            1. Look up handlers for type(event)
            2. If no handlers, return immediately (no-op)
            3. Execute all handlers with asyncio.gather(return_exceptions=True)
            4. Log any handler exceptions (warning level)
            5. Return (never raise exceptions)

        Performance:
            - Handler lookup: O(1)
            - Handler execution: Concurrent (not sequential)
            - Average latency: <10ms for 4 handlers (in-memory)

        Notes:
            - No handlers = no-op (not an error)
            - Handler failures logged with event_id for debugging
            - NEVER raises exceptions (fail-open guarantee)
        """
        # Store session and metadata for handlers that need them
        self._session = session
        self._metadata = metadata or {}

        try:
            event_type = type(event)
            handlers = self._handlers.get(event_type, [])

            if not handlers:
                # No handlers registered (not an error for optional workflows)
                return

            # Log event publishing (debug level) - helpful for debugging
            self._logger.debug(
                "event_publishing",
                event_type=event_type.__name__,
                event_id=str(event.event_id),
                handler_count=len(handlers),
            )

            # Execute all handlers concurrently with fail-open behavior
            # return_exceptions=True prevents one handler failure from breaking others
            results = await asyncio.gather(
                *(handler(event) for handler in handlers),
                return_exceptions=True,  # ← Fail-open: catch exceptions
            )

            # Log any handler failures (warning level)
            for idx, result in enumerate(results):
                if isinstance(result, Exception):
                    handler_name = handlers[idx].__name__
                    self._logger.warning(
                        "event_handler_failed",
                        event_type=event_type.__name__,
                        event_id=str(event.event_id),
                        handler_name=handler_name,
                        error_type=type(result).__name__,
                        error_message=str(result),
                        # Include stack trace for debugging
                        exc_info=result,
                    )
        finally:
            # Clear session and metadata after publish completes
            self._session = None
            self._metadata = {}

    def get_session(self) -> "AsyncSession | None":
        """Get current database session for event handlers.

        Returns the session passed to publish() for handlers that need database
        access (e.g., AuditEventHandler). This avoids handlers creating their
        own sessions, which can cause "Event loop is closed" errors in tests.

        Returns:
            AsyncSession | None: Current session or None if not provided.

        Example:
            >>> # In AuditEventHandler
            >>> class AuditEventHandler:
            ...     def __init__(self, event_bus: InMemoryEventBus, audit_adapter: PostgresAuditAdapter):
            ...         self._event_bus = event_bus
            ...         self._audit = audit_adapter
            ...
            ...     async def handle_event(self, event: DomainEvent):
            ...         session = self._event_bus.get_session()
            ...         if session:
            ...             # Use provided session
            ...             audit = PostgresAuditAdapter(session=session)
            ...             await audit.record(...)

        Notes:
            - Only available during publish() execution
            - Returns None if no session provided to publish()
            - Cleared after publish() completes (finally block)
        """
        return self._session

    def get_metadata(self) -> dict[str, str]:
        """Get current request metadata for event handlers.

        Returns the metadata dict passed to publish() for handlers that need
        request context (e.g., AuditEventHandler for IP/user agent tracking).
        Enables PCI-DSS 10.2.7 compliance (track client IP and user agent).

        Returns:
            dict[str, str]: Current metadata dict (empty if not provided).
                Expected keys:
                - ip_address: Client IP address (str)
                - user_agent: Client user agent string (str)
                - trace_id: Request trace ID (str, optional)

        Example:
            >>> # In AuditEventHandler
            >>> class AuditEventHandler:
            ...     def __init__(self, event_bus: InMemoryEventBus):
            ...         self._event_bus = event_bus
            ...
            ...     async def _create_audit_record(self, **kwargs):
            ...         metadata = self._event_bus.get_metadata()
            ...         # Merge metadata into audit record
            ...         if metadata:
            ...             kwargs.setdefault('ip_address', metadata.get('ip_address'))
            ...             kwargs.setdefault('user_agent', metadata.get('user_agent'))
            ...         await audit.record(**kwargs)

        Notes:
            - Only available during publish() execution
            - Returns empty dict if no metadata provided to publish()
            - Cleared after publish() completes (finally block)
            - Handlers should use setdefault() to avoid overwriting explicit values
        """
        return self._metadata
Functions
__init__
__init__(logger: LoggerProtocol) -> None

Parameters:

Name Type Description Default
logger LoggerProtocol

Logger for handler failures and event publishing. Used to log handler exceptions (warning level) and event publishing (debug level for non-ATTEMPT events).

required
Example

from src.core.container import get_logger logger = get_logger() bus = InMemoryEventBus(logger=logger)

Source code in src/infrastructure/events/in_memory_event_bus.py
def __init__(self, logger: LoggerProtocol) -> None:
    """Initialize event bus with logger.

    Args:
        logger: Logger for handler failures and event publishing. Used to
            log handler exceptions (warning level) and event publishing
            (debug level for non-ATTEMPT events).

    Example:
        >>> from src.core.container import get_logger
        >>> logger = get_logger()
        >>> bus = InMemoryEventBus(logger=logger)
    """
    self._handlers: dict[type[DomainEvent], list[EventHandler]] = defaultdict(list)
    self._logger = logger
    self._session: "AsyncSession | None" = (
        None  # Session for handlers that need DB access
    )
    self._metadata: dict[
        str, str
    ] = {}  # Metadata for handlers (IP, user agent, etc.)
subscribe
subscribe(
    event_type: type[DomainEvent], handler: EventHandler
) -> None

Register event handler for specific event type.

Handlers registered for the same event type will ALL be called when that event is published. Handlers execute concurrently (no ordering).

Parameters:

Name Type Description Default
event_type type[DomainEvent]

Class of event to handle (e.g., UserRegistered). Only exact type matches (no inheritance matching).

required
handler EventHandler

Async function to call when event is published. Must accept single event parameter and return None.

required
Example

async def log_event(event: UserRegistered) -> None: ... logger.info("User registered", user_id=str(event.user_id))

bus.subscribe(UserRegistered, log_event)

Multiple handlers for same event

bus.subscribe(UserRegistered, audit_event) bus.subscribe(UserRegistered, send_email)

Notes
  • Handlers execute concurrently (asyncio.gather)
  • No duplicate detection (same handler can be registered twice)
  • Handlers should be idempotent (may be called multiple times)
Source code in src/infrastructure/events/in_memory_event_bus.py
def subscribe(
    self,
    event_type: type[DomainEvent],
    handler: EventHandler,
) -> None:
    """Register event handler for specific event type.

    Handlers registered for the same event type will ALL be called when
    that event is published. Handlers execute concurrently (no ordering).

    Args:
        event_type: Class of event to handle (e.g., UserRegistered).
            Only exact type matches (no inheritance matching).
        handler: Async function to call when event is published. Must
            accept single event parameter and return None.

    Example:
        >>> async def log_event(event: UserRegistered) -> None:
        ...     logger.info("User registered", user_id=str(event.user_id))
        >>>
        >>> bus.subscribe(UserRegistered, log_event)
        >>>
        >>> # Multiple handlers for same event
        >>> bus.subscribe(UserRegistered, audit_event)
        >>> bus.subscribe(UserRegistered, send_email)

    Notes:
        - Handlers execute concurrently (asyncio.gather)
        - No duplicate detection (same handler can be registered twice)
        - Handlers should be idempotent (may be called multiple times)
    """
    self._handlers[event_type].append(handler)
publish async
publish(
    event: DomainEvent,
    session: AsyncSession | None = None,
    metadata: dict[str, str] | None = None,
) -> None

Publish event to all registered handlers.

Executes all handlers concurrently with fail-open behavior. Handler exceptions are logged but NOT propagated to publisher. If no handlers are registered, this is a no-op (not an error).

Parameters:

Name Type Description Default
event DomainEvent

Domain event to publish (subclass of DomainEvent). All handlers registered for type(event) will be called.

required
session AsyncSession | None

Optional database session for handlers that need database access (e.g., AuditEventHandler). Stored as instance variable for duration of publish() so handlers can access via get_session(). Defaults to None for backward compatibility.

None
metadata dict[str, str] | None

Optional dict with request context (ip_address, user_agent, trace_id) for audit trail enrichment. Stored as instance variable for duration of publish() so handlers can access via get_metadata(). Defaults to None for backward compatibility.

None
Example
After successful business logic

user = User(email="test@example.com", ...) await user_repo.save(user) await session.commit() # ← COMMIT FIRST

Publish event (all handlers execute)

event = UserRegistered(user_id=user.id, email=user.email) await bus.publish(event)

If audit handler fails:
- Audit failure logged (warning level)
- Logging/email handlers still execute
- No exception raised to publisher
Flow
  1. Look up handlers for type(event)
  2. If no handlers, return immediately (no-op)
  3. Execute all handlers with asyncio.gather(return_exceptions=True)
  4. Log any handler exceptions (warning level)
  5. Return (never raise exceptions)
Performance
  • Handler lookup: O(1)
  • Handler execution: Concurrent (not sequential)
  • Average latency: <10ms for 4 handlers (in-memory)
Notes
  • No handlers = no-op (not an error)
  • Handler failures logged with event_id for debugging
  • NEVER raises exceptions (fail-open guarantee)
Source code in src/infrastructure/events/in_memory_event_bus.py
async def publish(
    self,
    event: DomainEvent,
    session: "AsyncSession | None" = None,
    metadata: dict[str, str] | None = None,
) -> None:
    """Publish event to all registered handlers.

    Executes all handlers concurrently with fail-open behavior. Handler
    exceptions are logged but NOT propagated to publisher. If no handlers
    are registered, this is a no-op (not an error).

    Args:
        event: Domain event to publish (subclass of DomainEvent). All
            handlers registered for type(event) will be called.
        session: Optional database session for handlers that need database
            access (e.g., AuditEventHandler). Stored as instance variable
            for duration of publish() so handlers can access via get_session().
            Defaults to None for backward compatibility.
        metadata: Optional dict with request context (ip_address, user_agent,
            trace_id) for audit trail enrichment. Stored as instance variable
            for duration of publish() so handlers can access via get_metadata().
            Defaults to None for backward compatibility.

    Example:
        >>> # After successful business logic
        >>> user = User(email="test@example.com", ...)
        >>> await user_repo.save(user)
        >>> await session.commit()  # ← COMMIT FIRST
        >>>
        >>> # Publish event (all handlers execute)
        >>> event = UserRegistered(user_id=user.id, email=user.email)
        >>> await bus.publish(event)
        >>>
        >>> # If audit handler fails:
        >>> # - Audit failure logged (warning level)
        >>> # - Logging/email handlers still execute
        >>> # - No exception raised to publisher

    Flow:
        1. Look up handlers for type(event)
        2. If no handlers, return immediately (no-op)
        3. Execute all handlers with asyncio.gather(return_exceptions=True)
        4. Log any handler exceptions (warning level)
        5. Return (never raise exceptions)

    Performance:
        - Handler lookup: O(1)
        - Handler execution: Concurrent (not sequential)
        - Average latency: <10ms for 4 handlers (in-memory)

    Notes:
        - No handlers = no-op (not an error)
        - Handler failures logged with event_id for debugging
        - NEVER raises exceptions (fail-open guarantee)
    """
    # Store session and metadata for handlers that need them
    self._session = session
    self._metadata = metadata or {}

    try:
        event_type = type(event)
        handlers = self._handlers.get(event_type, [])

        if not handlers:
            # No handlers registered (not an error for optional workflows)
            return

        # Log event publishing (debug level) - helpful for debugging
        self._logger.debug(
            "event_publishing",
            event_type=event_type.__name__,
            event_id=str(event.event_id),
            handler_count=len(handlers),
        )

        # Execute all handlers concurrently with fail-open behavior
        # return_exceptions=True prevents one handler failure from breaking others
        results = await asyncio.gather(
            *(handler(event) for handler in handlers),
            return_exceptions=True,  # ← Fail-open: catch exceptions
        )

        # Log any handler failures (warning level)
        for idx, result in enumerate(results):
            if isinstance(result, Exception):
                handler_name = handlers[idx].__name__
                self._logger.warning(
                    "event_handler_failed",
                    event_type=event_type.__name__,
                    event_id=str(event.event_id),
                    handler_name=handler_name,
                    error_type=type(result).__name__,
                    error_message=str(result),
                    # Include stack trace for debugging
                    exc_info=result,
                )
    finally:
        # Clear session and metadata after publish completes
        self._session = None
        self._metadata = {}
get_session
get_session() -> AsyncSession | None

Get current database session for event handlers.

Returns the session passed to publish() for handlers that need database access (e.g., AuditEventHandler). This avoids handlers creating their own sessions, which can cause "Event loop is closed" errors in tests.

Returns:

Type Description
AsyncSession | None

AsyncSession | None: Current session or None if not provided.

Example
In AuditEventHandler

class AuditEventHandler: ... def init(self, event_bus: InMemoryEventBus, audit_adapter: PostgresAuditAdapter): ... self._event_bus = event_bus ... self._audit = audit_adapter ... ... async def handle_event(self, event: DomainEvent): ... session = self._event_bus.get_session() ... if session: ... # Use provided session ... audit = PostgresAuditAdapter(session=session) ... await audit.record(...)

Notes
  • Only available during publish() execution
  • Returns None if no session provided to publish()
  • Cleared after publish() completes (finally block)
Source code in src/infrastructure/events/in_memory_event_bus.py
def get_session(self) -> "AsyncSession | None":
    """Get current database session for event handlers.

    Returns the session passed to publish() for handlers that need database
    access (e.g., AuditEventHandler). This avoids handlers creating their
    own sessions, which can cause "Event loop is closed" errors in tests.

    Returns:
        AsyncSession | None: Current session or None if not provided.

    Example:
        >>> # In AuditEventHandler
        >>> class AuditEventHandler:
        ...     def __init__(self, event_bus: InMemoryEventBus, audit_adapter: PostgresAuditAdapter):
        ...         self._event_bus = event_bus
        ...         self._audit = audit_adapter
        ...
        ...     async def handle_event(self, event: DomainEvent):
        ...         session = self._event_bus.get_session()
        ...         if session:
        ...             # Use provided session
        ...             audit = PostgresAuditAdapter(session=session)
        ...             await audit.record(...)

    Notes:
        - Only available during publish() execution
        - Returns None if no session provided to publish()
        - Cleared after publish() completes (finally block)
    """
    return self._session
get_metadata
get_metadata() -> dict[str, str]

Get current request metadata for event handlers.

Returns the metadata dict passed to publish() for handlers that need request context (e.g., AuditEventHandler for IP/user agent tracking). Enables PCI-DSS 10.2.7 compliance (track client IP and user agent).

Returns:

Type Description
dict[str, str]

dict[str, str]: Current metadata dict (empty if not provided). Expected keys: - ip_address: Client IP address (str) - user_agent: Client user agent string (str) - trace_id: Request trace ID (str, optional)

Example
In AuditEventHandler

class AuditEventHandler: ... def init(self, event_bus: InMemoryEventBus): ... self._event_bus = event_bus ... ... async def _create_audit_record(self, kwargs): ... metadata = self._event_bus.get_metadata() ... # Merge metadata into audit record ... if metadata: ... kwargs.setdefault('ip_address', metadata.get('ip_address')) ... kwargs.setdefault('user_agent', metadata.get('user_agent')) ... await audit.record(kwargs)

Notes
  • Only available during publish() execution
  • Returns empty dict if no metadata provided to publish()
  • Cleared after publish() completes (finally block)
  • Handlers should use setdefault() to avoid overwriting explicit values
Source code in src/infrastructure/events/in_memory_event_bus.py
def get_metadata(self) -> dict[str, str]:
    """Get current request metadata for event handlers.

    Returns the metadata dict passed to publish() for handlers that need
    request context (e.g., AuditEventHandler for IP/user agent tracking).
    Enables PCI-DSS 10.2.7 compliance (track client IP and user agent).

    Returns:
        dict[str, str]: Current metadata dict (empty if not provided).
            Expected keys:
            - ip_address: Client IP address (str)
            - user_agent: Client user agent string (str)
            - trace_id: Request trace ID (str, optional)

    Example:
        >>> # In AuditEventHandler
        >>> class AuditEventHandler:
        ...     def __init__(self, event_bus: InMemoryEventBus):
        ...         self._event_bus = event_bus
        ...
        ...     async def _create_audit_record(self, **kwargs):
        ...         metadata = self._event_bus.get_metadata()
        ...         # Merge metadata into audit record
        ...         if metadata:
        ...             kwargs.setdefault('ip_address', metadata.get('ip_address'))
        ...             kwargs.setdefault('user_agent', metadata.get('user_agent'))
        ...         await audit.record(**kwargs)

    Notes:
        - Only available during publish() execution
        - Returns empty dict if no metadata provided to publish()
        - Cleared after publish() completes (finally block)
        - Handlers should use setdefault() to avoid overwriting explicit values
    """
    return self._metadata