domain.protocols.sse_subscriber_protocol¶
src.domain.protocols.sse_subscriber_protocol
¶
SSE Subscriber Protocol for domain layer.
This module defines the interface for subscribing to SSE event streams. Infrastructure adapters (Redis) implement this protocol to provide real-time event consumption for connected clients.
Architecture
- Protocol-based (structural typing, no inheritance)
- Async generator for streaming events
- Category filtering for client subscriptions
- Last-Event-ID support for reconnection replay
Reference
- docs/architecture/sse-architecture.md
Classes¶
SSESubscriberProtocol
¶
Bases: Protocol
Protocol for subscribing to SSE event streams.
Infrastructure adapters (Redis pub/sub) implement this protocol without inheritance. Used by the SSE endpoint to stream events to connected clients.
Design
- Async generator for event streaming
- Category-based filtering
- Last-Event-ID replay for reconnection
- Request-scoped (new instance per SSE connection)
Example
subscriber: SSESubscriberProtocol = get_sse_subscriber() async for event in subscriber.subscribe( ... user_id=user_id, ... categories=["data_sync", "provider"], ... ): ... yield event.to_sse_format()
Source code in src/domain/protocols/sse_subscriber_protocol.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | |
Functions¶
subscribe
async
¶
Subscribe to user's SSE event stream.
Returns an async generator that yields SSE events as they arrive. Subscribes to both user-specific and broadcast channels.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_id
|
UUID
|
User ID to subscribe to. |
required |
categories
|
list[str] | None
|
Optional list of categories to filter. If None, all events are yielded. Valid: "data_sync", "provider", "ai", "import", "portfolio", "security" |
None
|
Yields:
| Name | Type | Description |
|---|---|---|
SSEEvent |
AsyncIterator[SSEEvent]
|
Events matching the subscription criteria. |
Note
- Runs until client disconnects or server shuts down
- Filters by category if specified
- Listens to both user channel and broadcast channel
Example
async for event in subscriber.subscribe( ... user_id=user_id, ... categories=["data_sync"], ... ): ... # Only sync events for this user ... yield event.to_sse_format()
Source code in src/domain/protocols/sse_subscriber_protocol.py
get_missed_events
async
¶
get_missed_events(
user_id: UUID,
last_event_id: UUID,
categories: list[str] | None = None,
) -> list[SSEEvent]
Get events missed since last_event_id (reconnection replay).
When a client reconnects with Last-Event-ID header, this method retrieves events that were published while the client was disconnected.
Requires retention to be enabled (sse_enable_retention=True).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_id
|
UUID
|
User ID to get events for. |
required |
last_event_id
|
UUID
|
Last event ID received by client. |
required |
categories
|
list[str] | None
|
Optional category filter. |
None
|
Returns:
| Type | Description |
|---|---|
list[SSEEvent]
|
List of SSEEvent objects published after last_event_id. |
list[SSEEvent]
|
Empty list if retention disabled or no events found. |
Note
- Returns empty list if retention is disabled
- Events are returned in chronological order
- Only returns events within retention window (TTL)
Example
missed = await subscriber.get_missed_events( ... user_id=user_id, ... last_event_id=UUID("01234567-..."), ... categories=["data_sync"], ... ) for event in missed: ... yield event.to_sse_format()
Source code in src/domain/protocols/sse_subscriber_protocol.py
filter_by_categories
¶
Check if event matches category filter.
Helper method for filtering events by category.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
SSEEvent
|
Event to check. |
required |
categories
|
list[str] | None
|
List of category strings to match. If None, returns True (no filter). |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if event matches filter, False otherwise. |
Example
if subscriber.filter_by_categories(event, ["data_sync"]): ... yield event.to_sse_format()
Source code in src/domain/protocols/sse_subscriber_protocol.py
validate_categories
staticmethod
¶
Validate and convert category strings to enum values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
categories
|
list[str] | None
|
List of category strings from query params. |
required |
Returns:
| Type | Description |
|---|---|
list[SSEEventCategory]
|
List of valid SSEEventCategory enum values. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If any category string is invalid. |
Example
valid = SSESubscriberProtocol.validate_categories(["data_sync", "ai"])
Returns [SSEEventCategory.DATA_SYNC, SSEEventCategory.AI]¶