SSE Event Registry Architecture¶
Overview¶
Purpose¶
The SSE Event Registry is a metadata-driven catalog that serves as the single source of truth for all Server-Sent Events (SSE) event types and their mappings from domain events. It follows the Registry Pattern established by the Domain Events Registry, Route Registry, and Validation Registry.
Key Benefits:
- Single source of truth - All SSE event types cataloged in one place
- Self-documenting - Metadata includes descriptions, payload fields, categories
- Self-enforcing - Compliance tests fail if registry incomplete
- Zero drift - Can't add SSE event type without registry entry
- Easy discovery - Helper functions for accessing metadata
- Auto-wiring ready - Mappings enable automatic domain→SSE event bridging
Problem Statement¶
Before Registry Pattern:
- Scattered definitions (Gap ⚠️): SSE event types defined ad-hoc across codebase
- No catalog (Gap ⚠️): No central place to see all SSE events
- No metadata (Gap ⚠️): Event types lack descriptions, expected payloads
- Manual wiring (Gap ⚠️): Must manually wire each domain event to SSE
- Inconsistent naming (Gap ⚠️): Event type names vary without standard
The Gap: Without a registry, developers must manually search for SSE events, lack payload documentation, and have no automated way to ensure complete coverage.
Solution¶
With Registry Pattern:
# src/domain/events/sse_registry.py - Single source of truth
SSE_EVENT_REGISTRY: list[SSEEventMetadata] = [
SSEEventMetadata(
event_type=SSEEventType.SYNC_ACCOUNTS_COMPLETED,
category=SSEEventCategory.DATA_SYNC,
description="Account sync operation completed successfully",
payload_fields=["connection_id", "provider_slug", "account_count"],
),
# ... 28 more event types
]
Benefits:
- ✅ All 29 SSE event types cataloged with complete metadata
- ✅ 6 categories for client-side filtering
- ✅ Self-enforcing compliance tests
- ✅ Helper functions for easy access
- ✅ Domain→SSE mapping support for automated bridging
- ✅ Zero drift - tests fail if metadata incomplete
Architecture Components¶
1. SSEEventType (Enum)¶
Purpose: Enumerate all valid SSE event types sent to clients.
Location: src/domain/events/sse_event.py
Structure:
class SSEEventType(StrEnum):
"""All SSE event types (Single Source of Truth).
Naming convention: {category}.{resource}.{action}
Examples: sync.accounts.completed, provider.token.expiring
"""
# Data Sync (9 types)
SYNC_ACCOUNTS_STARTED = "sync.accounts.started"
SYNC_ACCOUNTS_COMPLETED = "sync.accounts.completed"
SYNC_ACCOUNTS_FAILED = "sync.accounts.failed"
SYNC_TRANSACTIONS_STARTED = "sync.transactions.started"
# ... more
# Provider (4 types)
PROVIDER_TOKEN_EXPIRING = "provider.token.expiring"
PROVIDER_TOKEN_REFRESHED = "provider.token.refreshed"
# ... more
# AI (3 types), Import (4 types), Portfolio (3 types), Security (6 types)
Naming Convention:
- Use dot notation:
{category}.{resource}.{action} - Lowercase with dots separating segments
- Descriptive actions:
started,completed,failed,expiring
2. SSEEventCategory (Enum)¶
Purpose: Categorize SSE events for client-side filtering.
Location: src/domain/events/sse_event.py
Structure:
class SSEEventCategory(StrEnum):
"""Categories for SSE event filtering."""
DATA_SYNC = "data_sync" # Account/transaction/holdings sync
PROVIDER = "provider" # Provider connection health
AI = "ai" # AI assistant streaming
IMPORT = "import" # File import progress
PORTFOLIO = "portfolio" # Balance/holdings updates
SECURITY = "security" # Session/security alerts
Client Usage:
3. SSEEventMetadata (Dataclass)¶
Purpose: Immutable metadata container for a single SSE event type.
Location: src/domain/events/sse_registry.py
Structure:
@dataclass(frozen=True)
class SSEEventMetadata:
"""Metadata for an SSE event type.
Attributes:
event_type: The SSE event type enum value.
category: Event category for filtering.
description: Human-readable description.
payload_fields: Expected fields in the event data payload.
"""
event_type: SSEEventType
category: SSEEventCategory
description: str
payload_fields: list[str] = field(default_factory=list)
Key Properties:
- Immutable (
frozen=True) - Prevents accidental modification - Type-safe - All fields have type hints
- Self-documenting - Description and payload_fields included
- Category-aware - Links to filtering category
4. SSE_EVENT_REGISTRY (Constant)¶
Purpose: The registry itself - single source of truth for all SSE event metadata.
Location: src/domain/events/sse_registry.py
Structure:
SSE_EVENT_REGISTRY: list[SSEEventMetadata] = [
# Data Sync Events (9)
SSEEventMetadata(
event_type=SSEEventType.SYNC_ACCOUNTS_STARTED,
category=SSEEventCategory.DATA_SYNC,
description="Account sync operation started",
payload_fields=["connection_id", "provider_slug"],
),
SSEEventMetadata(
event_type=SSEEventType.SYNC_ACCOUNTS_COMPLETED,
category=SSEEventCategory.DATA_SYNC,
description="Account sync operation completed successfully",
payload_fields=["connection_id", "provider_slug", "account_count"],
),
# ... 27 more entries
]
Event Type Distribution:
| Category | Count | Event Types |
|---|---|---|
| DATA_SYNC | 9 | Accounts, Transactions, Holdings (started/completed/failed) |
| PROVIDER | 4 | Token expiring/refreshed/failed, Disconnected |
| AI | 3 | Response chunk, Tool executing, Response complete |
| IMPORT | 4 | Started, Progress, Completed, Failed |
| PORTFOLIO | 3 | Balance updated, Holdings updated, Net worth updated |
| SECURITY | 6 | Session new/revoked/suspicious, Password changed, Login failed |
5. DomainToSSEMapping (Dataclass)¶
Purpose: Define how domain events are transformed to SSE events.
Location: src/domain/events/sse_registry.py
Structure:
@dataclass(frozen=True)
class DomainToSSEMapping:
"""Mapping from a domain event to an SSE event.
Attributes:
domain_event_class: The domain event class to listen for.
sse_event_type: The SSE event type to emit.
payload_extractor: Function to extract SSE payload from domain event.
user_id_extractor: Function to extract user_id from domain event.
"""
domain_event_class: Type[DomainEvent]
sse_event_type: SSEEventType
payload_extractor: Callable[[DomainEvent], dict[str, Any]]
user_id_extractor: Callable[[DomainEvent], UUID]
Usage (added by use case issues):
DOMAIN_TO_SSE_MAPPING: list[DomainToSSEMapping] = [
DomainToSSEMapping(
domain_event_class=AccountSyncSucceeded,
sse_event_type=SSEEventType.SYNC_ACCOUNTS_COMPLETED,
payload_extractor=lambda e: {
"connection_id": str(e.connection_id),
"provider_slug": e.provider_slug,
"account_count": e.account_count,
},
user_id_extractor=lambda e: e.user_id,
),
# ... more mappings
]
6. Helper Functions¶
Purpose: Convenient access to registry data.
Location: src/domain/events/sse_registry.py
get_sse_event_metadata()¶
def get_sse_event_metadata(event_type: SSEEventType) -> SSEEventMetadata | None:
"""Get metadata for an SSE event type.
Args:
event_type: The SSE event type to look up.
Returns:
SSEEventMetadata if found, None otherwise.
Example:
>>> meta = get_sse_event_metadata(SSEEventType.SYNC_ACCOUNTS_COMPLETED)
>>> print(meta.description)
"Account sync operation completed successfully"
"""
get_events_by_category()¶
def get_events_by_category(category: SSEEventCategory) -> list[SSEEventMetadata]:
"""Get all SSE event metadata for a category.
Args:
category: The category to filter by.
Returns:
List of SSEEventMetadata for events in that category.
Example:
>>> data_sync = get_events_by_category(SSEEventCategory.DATA_SYNC)
>>> print(f"Data sync events: {len(data_sync)}") # 9
"""
get_all_sse_event_types()¶
def get_all_sse_event_types() -> list[SSEEventType]:
"""Get list of all registered SSE event types.
Returns:
List of all SSE event types in the registry.
"""
get_registry_statistics()¶
def get_registry_statistics() -> dict[str, int]:
"""Get statistics about the SSE event registry.
Returns:
Dict with counts by category and totals.
Example:
>>> stats = get_registry_statistics()
>>> print(stats)
{
"total_event_types": 29,
"total_mappings": 24,
"by_category": {
"data_sync": 9,
"provider": 4,
...
}
}
"""
get_domain_event_to_sse_mapping()¶
def get_domain_event_to_sse_mapping() -> dict[Type[DomainEvent], DomainToSSEMapping]:
"""Get mapping from domain event class to SSE mapping.
Used by SSEEventHandler to determine if a domain event should
trigger an SSE notification.
Returns:
Dict mapping domain event classes to their SSE mapping metadata.
"""
Category-to-Event Type Mapping¶
The registry maintains a separate mapping from SSEEventType to SSEEventCategory:
Location: src/domain/events/sse_event.py
_EVENT_TYPE_TO_CATEGORY: dict[SSEEventType, SSEEventCategory] = {
# Data Sync
SSEEventType.SYNC_ACCOUNTS_STARTED: SSEEventCategory.DATA_SYNC,
SSEEventType.SYNC_ACCOUNTS_COMPLETED: SSEEventCategory.DATA_SYNC,
# ... all 29 mappings
}
def get_category_for_event_type(event_type: SSEEventType) -> SSEEventCategory:
"""Get the category for an SSE event type."""
return _EVENT_TYPE_TO_CATEGORY[event_type]
Compliance: Tests verify that:
- Every
SSEEventTypehas a category mapping - Registry metadata category matches
_EVENT_TYPE_TO_CATEGORY
Compliance Tests¶
Location: tests/unit/test_domain_sse_registry_compliance.py
Registry Completeness¶
def test_all_sse_event_types_have_registry_metadata():
"""CRITICAL: Every SSEEventType must have metadata in SSE_EVENT_REGISTRY."""
registered_types = {m.event_type for m in SSE_EVENT_REGISTRY}
for event_type in SSEEventType:
assert event_type in registered_types
Category Consistency¶
def test_registry_category_matches_event_type_mapping():
"""CRITICAL: Registry category must match _EVENT_TYPE_TO_CATEGORY."""
for meta in SSE_EVENT_REGISTRY:
expected = get_category_for_event_type(meta.event_type)
assert meta.category == expected
Statistics Accuracy¶
def test_statistics_expected_counts():
"""Test expected counts for each category."""
stats = get_registry_statistics()
expected = {
"data_sync": 9,
"provider": 4,
"ai": 3,
"import": 4,
"portfolio": 3,
"security": 6,
}
for cat_name, count in expected.items():
assert stats["by_category"][cat_name] == count
No Duplicates¶
def test_no_duplicate_event_types_in_registry():
"""Each event type should appear exactly once."""
event_types = [m.event_type for m in SSE_EVENT_REGISTRY]
assert len(event_types) == len(set(event_types))
Adding New SSE Event Types¶
Process¶
- Add enum value in
SSEEventType(sse_event.py) - Add category mapping in
_EVENT_TYPE_TO_CATEGORY(sse_event.py) - Add registry entry in
SSE_EVENT_REGISTRY(sse_registry.py) - Run tests - compliance tests validate completeness
- Add domain mapping (optional) in
DOMAIN_TO_SSE_MAPPING
Example: Adding New Event Type¶
# Step 1: Add to SSEEventType enum (sse_event.py)
class SSEEventType(StrEnum):
# ... existing types
EXPORT_COMPLETED = "export.completed"
# Step 2: Add category mapping (sse_event.py)
_EVENT_TYPE_TO_CATEGORY = {
# ... existing mappings
SSEEventType.EXPORT_COMPLETED: SSEEventCategory.IMPORT, # or new category
}
# Step 3: Add registry entry (sse_registry.py)
SSE_EVENT_REGISTRY.append(
SSEEventMetadata(
event_type=SSEEventType.EXPORT_COMPLETED,
category=SSEEventCategory.IMPORT,
description="File export operation completed successfully",
payload_fields=["file_name", "file_url", "record_count"],
)
)
# Step 4: Run tests
pytest tests/unit/test_domain_sse_registry_compliance.py -v
SSEEvent Wire Format¶
The registry defines event types that are serialized to SSE wire format:
Location: src/domain/events/sse_event.py
@dataclass(frozen=True, kw_only=True, slots=True)
class SSEEvent:
"""Server-Sent Event data structure."""
event_type: SSEEventType
user_id: UUID
data: dict[str, Any]
event_id: UUID = field(default_factory=uuid7)
occurred_at: datetime = field(default_factory=lambda: datetime.now(UTC))
def to_sse_format(self) -> str:
"""Serialize to SSE wire format."""
return (
f"id: {self.event_id}\n"
f"event: {self.event_type.value}\n"
f"data: {json.dumps(self.data)}\n\n"
)
Example Output:
id: 019447f2-3a5b-7c8d-9e0f-1a2b3c4d5e6f
event: sync.accounts.completed
data: {"connection_id": "abc123", "account_count": 3}
Related Documentation¶
- SSE Architecture - Complete SSE system design
- Domain Events Registry - Domain events registry pattern
- Route Registry - API route registration pattern
- Validation Registry - Validation rules registry
Created: 2026-01-18 | Last Updated: 2026-01-18