Skip to content

domain.events.sse_registry

src.domain.events.sse_registry

SSE Event Registry - Single Source of Truth.

This registry catalogs all SSE event types and their mappings from domain events. Used for: - Container wiring (automated SSEEventHandler subscriptions) - Validation tests (verify no drift) - Documentation generation (always accurate) - Gap detection (missing mappings)

Architecture
  • Domain layer (no dependencies on infrastructure)
  • Imported by container for automated wiring
  • Verified by tests to catch drift

Adding new SSE mappings (separate issues, not foundation): 1. Add mapping to DOMAIN_TO_SSE_MAPPING 2. Run tests - they validate the mapping 3. Container auto-wires SSEEventHandler subscriptions

Reference
  • docs/architecture/sse-architecture.md

Classes

SSEEventMetadata dataclass

Metadata for an SSE event type.

Attributes:

Name Type Description
event_type SSEEventType

The SSE event type enum value.

category SSEEventCategory

Event category for filtering.

description str

Human-readable description.

payload_fields list[str]

Expected fields in the event data payload.

Source code in src/domain/events/sse_registry.py
@dataclass(frozen=True)
class SSEEventMetadata:
    """Metadata for an SSE event type.

    Attributes:
        event_type: The SSE event type enum value.
        category: Event category for filtering.
        description: Human-readable description.
        payload_fields: Expected fields in the event data payload.
    """

    event_type: SSEEventType
    category: SSEEventCategory
    description: str
    payload_fields: list[str] = field(default_factory=list)

DomainToSSEMapping dataclass

Mapping from a domain event to an SSE event.

Defines how a domain event is transformed into an SSE event for real-time client notification.

Attributes:

Name Type Description
domain_event_class Type[DomainEvent]

The domain event class to listen for.

sse_event_type SSEEventType

The SSE event type to emit.

payload_extractor Callable[[DomainEvent], dict[str, Any]]

Function to extract SSE payload from domain event. Takes (domain_event, user_id) and returns dict payload.

user_id_extractor Callable[[DomainEvent], Any]

Function to extract user_id from domain event. Takes domain_event and returns UUID.

Source code in src/domain/events/sse_registry.py
@dataclass(frozen=True)
class DomainToSSEMapping:
    """Mapping from a domain event to an SSE event.

    Defines how a domain event is transformed into an SSE event
    for real-time client notification.

    Attributes:
        domain_event_class: The domain event class to listen for.
        sse_event_type: The SSE event type to emit.
        payload_extractor: Function to extract SSE payload from domain event.
            Takes (domain_event, user_id) and returns dict payload.
        user_id_extractor: Function to extract user_id from domain event.
            Takes domain_event and returns UUID.
    """

    domain_event_class: Type[DomainEvent]
    sse_event_type: SSEEventType
    payload_extractor: Callable[[DomainEvent], dict[str, Any]]
    user_id_extractor: Callable[[DomainEvent], Any]  # Returns UUID

Functions

get_domain_event_to_sse_mapping

get_domain_event_to_sse_mapping() -> (
    dict[Type[DomainEvent], DomainToSSEMapping]
)

Get mapping from domain event class to SSE mapping.

Used by SSEEventHandler to determine if a domain event should trigger an SSE notification and how to transform it.

Returns:

Type Description
dict[Type[DomainEvent], DomainToSSEMapping]

Dict mapping domain event classes to their SSE mapping metadata.

Example

mapping = get_domain_event_to_sse_mapping() if AccountSyncSucceeded in mapping: ... sse_mapping = mapping[AccountSyncSucceeded] ... # Transform and publish SSE event

Source code in src/domain/events/sse_registry.py
def get_domain_event_to_sse_mapping() -> dict[Type[DomainEvent], DomainToSSEMapping]:
    """Get mapping from domain event class to SSE mapping.

    Used by SSEEventHandler to determine if a domain event should
    trigger an SSE notification and how to transform it.

    Returns:
        Dict mapping domain event classes to their SSE mapping metadata.

    Example:
        >>> mapping = get_domain_event_to_sse_mapping()
        >>> if AccountSyncSucceeded in mapping:
        ...     sse_mapping = mapping[AccountSyncSucceeded]
        ...     # Transform and publish SSE event
    """
    return {m.domain_event_class: m for m in DOMAIN_TO_SSE_MAPPING}

get_sse_event_metadata

get_sse_event_metadata(
    event_type: SSEEventType,
) -> SSEEventMetadata | None

Get metadata for an SSE event type.

Parameters:

Name Type Description Default
event_type SSEEventType

The SSE event type to look up.

required

Returns:

Type Description
SSEEventMetadata | None

SSEEventMetadata if found, None otherwise.

Source code in src/domain/events/sse_registry.py
def get_sse_event_metadata(event_type: SSEEventType) -> SSEEventMetadata | None:
    """Get metadata for an SSE event type.

    Args:
        event_type: The SSE event type to look up.

    Returns:
        SSEEventMetadata if found, None otherwise.
    """
    for metadata in SSE_EVENT_REGISTRY:
        if metadata.event_type == event_type:
            return metadata
    return None

get_events_by_category

get_events_by_category(
    category: SSEEventCategory,
) -> list[SSEEventMetadata]

Get all SSE event metadata for a category.

Parameters:

Name Type Description Default
category SSEEventCategory

The category to filter by.

required

Returns:

Type Description
list[SSEEventMetadata]

List of SSEEventMetadata for events in that category.

Source code in src/domain/events/sse_registry.py
def get_events_by_category(category: SSEEventCategory) -> list[SSEEventMetadata]:
    """Get all SSE event metadata for a category.

    Args:
        category: The category to filter by.

    Returns:
        List of SSEEventMetadata for events in that category.
    """
    return [m for m in SSE_EVENT_REGISTRY if m.category == category]

get_all_sse_event_types

get_all_sse_event_types() -> list[SSEEventType]

Get list of all registered SSE event types.

Returns:

Type Description
list[SSEEventType]

List of all SSE event types in the registry.

Source code in src/domain/events/sse_registry.py
def get_all_sse_event_types() -> list[SSEEventType]:
    """Get list of all registered SSE event types.

    Returns:
        List of all SSE event types in the registry.
    """
    return [m.event_type for m in SSE_EVENT_REGISTRY]

get_registry_statistics

get_registry_statistics() -> dict[str, object]

Get statistics about the SSE event registry.

Returns:

Name Type Description
dict[str, object]

Dict with counts by category and totals.

Keys dict[str, object]

total_event_types (int), total_mappings (int), by_category (dict[str, int]).

Example

stats = get_registry_statistics() print(stats) { "total_event_types": 21, "total_mappings": 0, # Until use case issues implemented "by_category": { "data_sync": 9, "provider": 4, "ai": 3, ... } }

Source code in src/domain/events/sse_registry.py
def get_registry_statistics() -> dict[str, object]:
    """Get statistics about the SSE event registry.

    Returns:
        Dict with counts by category and totals.
        Keys: total_event_types (int), total_mappings (int), by_category (dict[str, int]).

    Example:
        >>> stats = get_registry_statistics()
        >>> print(stats)
        {
            "total_event_types": 21,
            "total_mappings": 0,  # Until use case issues implemented
            "by_category": {
                "data_sync": 9,
                "provider": 4,
                "ai": 3,
                ...
            }
        }
    """
    by_category: dict[str, int] = {}
    for metadata in SSE_EVENT_REGISTRY:
        cat_name = metadata.category.value
        by_category[cat_name] = by_category.get(cat_name, 0) + 1

    return {
        "total_event_types": len(SSE_EVENT_REGISTRY),
        "total_mappings": len(DOMAIN_TO_SSE_MAPPING),
        "by_category": by_category,
    }