Skip to content

core.container.events

src.core.container.events

Event bus dependency factory.

Application-scoped singleton for domain event publishing. Configures all event handlers and subscriptions at startup using registry-driven auto-wiring (F7.7: Domain Events Compliance Audit).

Reference

See docs/architecture/domain-events-architecture.md for complete event patterns and handler specifications.

Classes

Functions

get_event_bus cached

get_event_bus() -> EventBusProtocol

Get event bus singleton (app-scoped).

Container owns factory logic - decides which adapter based on EVENT_BUS_TYPE. This follows the Composition Root pattern (industry best practice).

Returns correct adapter based on EVENT_BUS_TYPE environment variable
  • 'in-memory': InMemoryEventBus (MVP, single-server)
  • 'rabbitmq': RabbitMQEventBus (future, distributed)
  • 'kafka': KafkaEventBus (future, high-volume)

Event handlers are AUTOMATICALLY registered at startup using EVENT_REGISTRY (F7.7: Registry-driven auto-wiring). The registry defines ALL event metadata (category, workflow, phase, handler requirements) and this factory: 1. Loops through EVENT_REGISTRY 2. Dynamically computes handler method names from workflow_name + phase 3. Subscribes handlers based on metadata.requires_* flags

Current Registry Status (69 events, 143 subscriptions across 7 categories): - Authentication (31): 8 workflows × 3-state + 1 operational - Authorization (6): 2 workflows × 3-state - Provider (9): 3 workflows × 3-state - Data Sync (12): 4 workflows × 3-state - Session (8): Session lifecycle + operational events - Rate Limit (3): Rate limit operational events - Admin (0): Future administrative events

Subscription Breakdown (143 total): - LoggingEventHandler: 69 (all events) - AuditEventHandler: 66 (all except 3 operational) - EmailEventHandler: 5 (registration, verification, password reset) - SessionEventHandler: 3 (password change, token rotation)

Benefits of registry-driven approach
  • Single source of truth (registry.py)
  • Self-validating (tests fail if handlers/audit actions missing)
  • Automatic wiring (adding event = add to registry, tests enforce rest)
  • No drift (manual code eliminated)

Returns:

Type Description
EventBusProtocol

Event bus implementing EventBusProtocol.

Usage
Application Layer (direct use)

event_bus = get_event_bus() await event_bus.publish(UserRegistrationSucceeded(...))

Presentation Layer (FastAPI Depends)

from fastapi import Depends event_bus: EventBusProtocol = Depends(get_event_bus)

Reference
  • docs/architecture/domain-events-architecture.md
  • docs/architecture/dependency-injection-architecture.md
Source code in src/core/container/events.py
@lru_cache()
def get_event_bus() -> "EventBusProtocol":
    """Get event bus singleton (app-scoped).

    Container owns factory logic - decides which adapter based on EVENT_BUS_TYPE.
    This follows the Composition Root pattern (industry best practice).

    Returns correct adapter based on EVENT_BUS_TYPE environment variable:
        - 'in-memory': InMemoryEventBus (MVP, single-server)
        - 'rabbitmq': RabbitMQEventBus (future, distributed)
        - 'kafka': KafkaEventBus (future, high-volume)

    Event handlers are AUTOMATICALLY registered at startup using EVENT_REGISTRY
    (F7.7: Registry-driven auto-wiring). The registry defines ALL event metadata
    (category, workflow, phase, handler requirements) and this factory:
        1. Loops through EVENT_REGISTRY
        2. Dynamically computes handler method names from workflow_name + phase
        3. Subscribes handlers based on metadata.requires_* flags

    Current Registry Status (69 events, 143 subscriptions across 7 categories):
        - Authentication (31): 8 workflows × 3-state + 1 operational
        - Authorization (6): 2 workflows × 3-state
        - Provider (9): 3 workflows × 3-state
        - Data Sync (12): 4 workflows × 3-state
        - Session (8): Session lifecycle + operational events
        - Rate Limit (3): Rate limit operational events
        - Admin (0): Future administrative events

    Subscription Breakdown (143 total):
        - LoggingEventHandler: 69 (all events)
        - AuditEventHandler: 66 (all except 3 operational)
        - EmailEventHandler: 5 (registration, verification, password reset)
        - SessionEventHandler: 3 (password change, token rotation)

    Benefits of registry-driven approach:
        - Single source of truth (registry.py)
        - Self-validating (tests fail if handlers/audit actions missing)
        - Automatic wiring (adding event = add to registry, tests enforce rest)
        - No drift (manual code eliminated)

    Returns:
        Event bus implementing EventBusProtocol.

    Usage:
        # Application Layer (direct use)
        event_bus = get_event_bus()
        await event_bus.publish(UserRegistrationSucceeded(...))

        # Presentation Layer (FastAPI Depends)
        from fastapi import Depends
        event_bus: EventBusProtocol = Depends(get_event_bus)

    Reference:
        - docs/architecture/domain-events-architecture.md
        - docs/architecture/dependency-injection-architecture.md
    """
    import os

    from src.core.config import get_settings
    from src.domain.events.registry import EVENT_REGISTRY

    # Load settings from centralized config (single source of truth)
    settings = get_settings()

    # Strict mode: Fail-fast if handler methods missing (production safety)
    # Graceful mode: Skip missing handlers, continue (development flexibility)
    # Default: false (graceful) until F7.7 Phase 2-8 complete, then switch to true
    EVENTS_STRICT_MODE = settings.events_strict_mode
    from src.infrastructure.events.handlers.audit_event_handler import AuditEventHandler
    from src.infrastructure.events.handlers.email_event_handler import EmailEventHandler
    from src.infrastructure.events.handlers.logging_event_handler import (
        LoggingEventHandler,
    )
    from src.infrastructure.events.handlers.session_event_handler import (
        SessionEventHandler,
    )
    from src.infrastructure.events.in_memory_event_bus import InMemoryEventBus

    # Import from infrastructure module (no circular dependency)
    from src.core.container.infrastructure import get_database, get_logger

    event_bus_type = os.getenv("EVENT_BUS_TYPE", "in-memory")

    if event_bus_type == "in-memory":
        # Create InMemoryEventBus with logger
        event_bus = InMemoryEventBus(logger=get_logger())
    # elif event_bus_type == "rabbitmq":
    #     # Future: RabbitMQ adapter
    #     from src.infrastructure.events.rabbitmq_event_bus import RabbitMQEventBus
    #
    #     event_bus = RabbitMQEventBus(url=os.getenv("RABBITMQ_URL"))
    # elif event_bus_type == "kafka":
    #     # Future: Kafka adapter
    #     from src.infrastructure.events.kafka_event_bus import KafkaEventBus
    #
    #     event_bus = KafkaEventBus(brokers=os.getenv("KAFKA_BROKERS"))
    else:
        raise ValueError(
            f"Unsupported EVENT_BUS_TYPE: {event_bus_type}. "
            f"Supported: 'in-memory' (rabbitmq and kafka: future)"
        )

    # Create event handlers
    logging_handler = LoggingEventHandler(logger=get_logger())

    # Audit handler uses database session from event bus (if provided).
    # Pass both database (fallback) and event_bus (preferred session source).
    # This prevents "Event loop is closed" errors in tests by avoiding
    # session creation inside event handlers.
    audit_handler = AuditEventHandler(database=get_database(), event_bus=event_bus)

    email_handler = EmailEventHandler(logger=get_logger(), settings=get_settings())
    session_handler = SessionEventHandler(logger=get_logger())

    # =========================================================================
    # REGISTRY-DRIVEN AUTO-WIRING (F7.7: Domain Events Compliance Audit)
    # =========================================================================
    # Instead of manually subscribing ~100 handlers to events, we use the
    # EVENT_REGISTRY as the single source of truth. This eliminates ~500 lines
    # of manual subscription code and prevents drift.
    #
    # For each event in EVENT_REGISTRY:
    #   1. Compute handler method name: handle_{workflow_name}_{phase}
    #   2. Subscribe handlers based on metadata.requires_* flags
    #
    # Benefits:
    #   - Adding new event: Just add to registry, tests enforce rest
    #   - No manual subscription management
    #   - Can't drift (tests fail if handler methods missing)
    #
    # NOTE: mypy shows arg-type errors because handler signatures are more specific
    # (e.g., Callable[[UserRegistrationAttempted], Awaitable[None]]) than the
    # EventHandler type alias (Callable[[DomainEvent], Awaitable[None]]). This is
    # correct by contravariance principle - handlers accepting specific events can
    # safely handle the base type. Runtime behavior is sound, so we suppress mypy
    # at file level (first line of this file).
    # =========================================================================

    logger = get_logger()

    for metadata in EVENT_REGISTRY:
        event_class = metadata.event_class

        # Compute handler method name from workflow_name + phase
        # E.g., "user_registration" + "attempted" = "handle_user_registration_attempted"
        method_name = f"handle_{metadata.workflow_name}_{metadata.phase.value}"

        # Subscribe handlers based on metadata requirements
        # Mode-dependent behavior:
        #   - STRICT (production): Crash if required handler missing
        #   - GRACEFUL (development): Skip missing handlers, log warning
        if metadata.requires_logging:
            handler_method = getattr(logging_handler, method_name, None)
            if not handler_method:
                if EVENTS_STRICT_MODE:
                    raise RuntimeError(
                        f"EVENTS_STRICT_MODE: Missing required logging handler\n"
                        f"Event: {event_class.__name__}\n"
                        f"Expected method: LoggingEventHandler.{method_name}\n\n"
                        f"Fix: Implement handler in src/infrastructure/events/handlers/logging_event_handler.py\n"
                        f"Or disable strict mode: Set EVENTS_STRICT_MODE=false in .env"
                    )
                logger.warning(
                    "Missing logging handler (graceful mode)",
                    event_class=event_class.__name__,
                    handler_method=method_name,
                )
            else:
                event_bus.subscribe(event_class, handler_method)

        if metadata.requires_audit:
            handler_method = getattr(audit_handler, method_name, None)
            if not handler_method:
                if EVENTS_STRICT_MODE:
                    raise RuntimeError(
                        f"EVENTS_STRICT_MODE: Missing required audit handler (COMPLIANCE CRITICAL)\n"
                        f"Event: {event_class.__name__}\n"
                        f"Expected method: AuditEventHandler.{method_name}\n\n"
                        f"Audit handlers are REQUIRED for PCI-DSS/SOC 2 compliance.\n"
                        f"Fix: Implement handler in src/infrastructure/events/handlers/audit_event_handler.py\n"
                        f"Or disable strict mode: Set EVENTS_STRICT_MODE=false in .env"
                    )
                logger.warning(
                    "Missing audit handler (graceful mode - COMPLIANCE RISK)",
                    event_class=event_class.__name__,
                    handler_method=method_name,
                )
            else:
                event_bus.subscribe(event_class, handler_method)

        if metadata.requires_email:
            handler_method = getattr(email_handler, method_name, None)
            if not handler_method:
                if EVENTS_STRICT_MODE:
                    raise RuntimeError(
                        f"EVENTS_STRICT_MODE: Missing required email handler\n"
                        f"Event: {event_class.__name__}\n"
                        f"Expected method: EmailEventHandler.{method_name}\n\n"
                        f"Fix: Implement handler in src/infrastructure/events/handlers/email_event_handler.py\n"
                        f"Or disable strict mode: Set EVENTS_STRICT_MODE=false in .env"
                    )
                logger.warning(
                    "Missing email handler (graceful mode)",
                    event_class=event_class.__name__,
                    handler_method=method_name,
                )
            else:
                event_bus.subscribe(event_class, handler_method)

        if metadata.requires_session:
            handler_method = getattr(session_handler, method_name, None)
            if not handler_method:
                if EVENTS_STRICT_MODE:
                    raise RuntimeError(
                        f"EVENTS_STRICT_MODE: Missing required session handler\n"
                        f"Event: {event_class.__name__}\n"
                        f"Expected method: SessionEventHandler.{method_name}\n\n"
                        f"Fix: Implement handler in src/infrastructure/events/handlers/session_event_handler.py\n"
                        f"Or disable strict mode: Set EVENTS_STRICT_MODE=false in .env"
                    )
                logger.warning(
                    "Missing session handler (graceful mode)",
                    event_class=event_class.__name__,
                    handler_method=method_name,
                )
            else:
                event_bus.subscribe(event_class, handler_method)

    # =========================================================================
    # SSE EVENT HANDLER WIRING (Registry-driven)
    # =========================================================================
    # SSE handler subscribes to domain events that have SSE mappings.
    # Same pattern as LoggingEventHandler and AuditEventHandler above.
    # SSE handler bridges domain events → Redis pub/sub → client streams.
    #
    # NOTE: DOMAIN_TO_SSE_MAPPING is initially empty (foundation only).
    # Mappings will be added by use case issues (Issue #1-6).
    # =========================================================================
    from src.core.container.sse import get_sse_publisher
    from src.domain.events.sse_registry import get_domain_event_to_sse_mapping
    from src.infrastructure.sse.sse_event_handler import SSEEventHandler

    sse_publisher = get_sse_publisher()
    sse_handler = SSEEventHandler(publisher=sse_publisher, logger=logger)

    # Subscribe SSE handler to all domain events that have SSE mappings
    domain_to_sse = get_domain_event_to_sse_mapping()
    for domain_event_class in domain_to_sse.keys():
        event_bus.subscribe(domain_event_class, sse_handler.handle)

    logger.debug(
        "SSE event handler wiring complete",
        mapped_events=len(domain_to_sse),
    )

    # =========================================================================
    # PORTFOLIO EVENT HANDLER WIRING (Manual subscription)
    # =========================================================================
    # Portfolio handler listens to AccountBalanceUpdated and AccountHoldingsUpdated,
    # recalculates net worth, and emits PortfolioNetWorthRecalculated.
    #
    # This is a REACTIVE AGGREGATION handler - it coordinates between multiple
    # events to emit a derived event. Manual subscription is used instead of
    # registry-driven auto-wiring because:
    # 1. Custom method names (handle_balance_updated, not handle_{workflow}_{phase})
    # 2. Queries repository + cache (stateful aggregation logic)
    # 3. Emits new derived event (coordination, not just logging/auditing)
    #
    # Pattern: Same as AuditEventHandler - takes database + event_bus, creates
    # sessions on-demand when handling events.
    # =========================================================================
    from src.application.event_handlers.portfolio_event_handler import (
        PortfolioEventHandler,
    )
    from src.core.container.infrastructure import get_cache
    from src.domain.events.portfolio_events import (
        AccountBalanceUpdated,
        AccountHoldingsUpdated,
    )

    portfolio_handler = PortfolioEventHandler(
        database=get_database(),
        cache=get_cache(),
        event_bus=event_bus,
        logger=logger,
    )

    # Subscribe to portfolio trigger events
    event_bus.subscribe(AccountBalanceUpdated, portfolio_handler.handle_balance_updated)
    event_bus.subscribe(
        AccountHoldingsUpdated, portfolio_handler.handle_holdings_updated
    )

    logger.debug("Portfolio event handler wiring complete")

    return event_bus