Cross-Service SSE Publishing¶
This guide explains how dashtam-jobs can publish Server-Sent Events (SSE) to clients connected to dashtam-api.
Overview¶
The API's SSE infrastructure uses Redis pub/sub for real-time event delivery. Background jobs can publish events directly to the same Redis channels, and connected clients receive them instantly.
┌─────────────────┐ ┌─────────────────┐
│ dashtam-jobs │ │ dashtam-api │
│ │ │ │
│ Task publishes │ │ Client connected│
│ SSE event │ │ via GET /events │
│ ↓ │ │ ↑ │
│ Redis PUBLISH │────── Redis ──────▶│ RedisSSE- │
│ to user channel │ │ Subscriber │
│ │ │ delivers event │
└─────────────────┘ └─────────────────┘
Key insight: Jobs don't need to call an API endpoint. They publish directly to Redis, and the API's existing subscriber infrastructure delivers to clients.
Channel Naming Convention¶
The API uses these Redis channel patterns:
| Channel | Pattern | Purpose |
|---|---|---|
| User channel | sse:user:{user_id} |
Events for a specific user |
| Broadcast | sse:broadcast |
Events for all connected users |
Examples:
sse:user:01234567-89ab-cdef-0123-456789abcdefsse:broadcast
Event JSON Format¶
Events must be JSON with this exact structure:
{
"event_id": "01958f12-3456-7890-abcd-ef0123456789",
"event_type": "provider.token.expiring",
"user_id": "01234567-89ab-cdef-0123-456789abcdef",
"data": {
"connection_id": "abcd1234-...",
"provider_slug": "plaid",
"expires_at": "2026-01-21T00:00:00+00:00"
},
"occurred_at": "2026-01-20T02:30:00+00:00"
}
| Field | Type | Description |
|---|---|---|
event_id |
UUID string | Unique identifier (UUID v7 recommended for ordering) |
event_type |
string | SSE event type (see below) |
user_id |
UUID string | Target user |
data |
object | Event-specific payload |
occurred_at |
ISO 8601 | When the event occurred (UTC) |
Available Event Types¶
Jobs should only publish events that make sense as background notifications:
| Event Type | Category | Description |
|---|---|---|
provider.token.expiring |
provider | Token expires within threshold |
security.session.expiring |
security | Session expires within threshold |
Note: Other provider events (provider.token.refreshed, provider.token.failed, provider.disconnected) and security events (security.session.*, security.password.*, security.login.failed) are triggered by user actions in the API, not background jobs.
Implementation¶
1. SSE Publisher Utility¶
Create a simple publisher in dashtam-jobs:
# src/dashtam_jobs/infrastructure/sse_publisher.py
import json
from datetime import UTC, datetime
from uuid import UUID
from redis.asyncio import Redis
from uuid_extensions import uuid7
class SSEPublisher:
"""Publish SSE events to dashtam-api's Redis channels.
This allows background jobs to send real-time notifications
to connected clients without calling an API endpoint.
"""
CHANNEL_PREFIX = "sse"
def __init__(self, redis_client: Redis) -> None:
self._redis = redis_client
async def publish_to_user(
self,
user_id: UUID,
event_type: str,
data: dict,
) -> None:
"""Publish SSE event to a specific user.
Args:
user_id: Target user's UUID.
event_type: SSE event type (e.g., "provider.token.expiring").
data: Event payload (will be JSON serialized).
"""
channel = f"{self.CHANNEL_PREFIX}:user:{user_id}"
event = {
"event_id": str(uuid7()),
"event_type": event_type,
"user_id": str(user_id),
"data": data,
"occurred_at": datetime.now(UTC).isoformat(),
}
await self._redis.publish(channel, json.dumps(event))
async def broadcast(
self,
event_type: str,
data: dict,
system_user_id: UUID | None = None,
) -> None:
"""Broadcast SSE event to all connected users.
Args:
event_type: SSE event type.
data: Event payload.
system_user_id: Optional user ID for event (use system UUID if none).
"""
channel = f"{self.CHANNEL_PREFIX}:broadcast"
event = {
"event_id": str(uuid7()),
"event_type": event_type,
"user_id": str(system_user_id) if system_user_id else "00000000-0000-0000-0000-000000000000",
"data": data,
"occurred_at": datetime.now(UTC).isoformat(),
}
await self._redis.publish(channel, json.dumps(event))
2. Worker Startup Integration¶
Add SSE publisher to worker state:
# src/dashtam_jobs/broker.py
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def startup(state: TaskiqState) -> None:
# ... existing database setup ...
# SSE publisher for sending notifications to API clients
from redis.asyncio import Redis
from dashtam_jobs.infrastructure.sse_publisher import SSEPublisher
redis_client = Redis.from_url(_settings.redis_url)
state.sse_publisher = SSEPublisher(redis_client)
@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)
async def shutdown(state: TaskiqState) -> None:
# ... existing cleanup ...
if hasattr(state, "sse_publisher"):
await state.sse_publisher._redis.aclose()
3. Using in Tasks¶
# src/dashtam_jobs/tasks/tokens.py
@broker.task(...)
async def check_expiring_tokens(
context: Annotated[Context, TaskiqDepends()],
) -> dict[str, Any]:
"""Check for expiring tokens and notify users via SSE."""
async with context.state.database.get_session() as session:
repo = ProviderConnectionRepository(session)
expiring = await repo.find_expiring_soon(hours=24)
sse_publisher = context.state.sse_publisher
for conn in expiring:
# Publish SSE event directly to user's channel
await sse_publisher.publish_to_user(
user_id=conn.user_id,
event_type="provider.token.expiring",
data={
"connection_id": str(conn.id),
"provider_slug": conn.provider_slug,
"expires_at": conn.credentials_expires_at.isoformat(),
},
)
return {"checked": total, "expiring": len(expiring), "notified": len(expiring)}
Configuration¶
Jobs need access to the same Redis that the API uses for SSE. Configure via environment:
# env/.env.dev
# Use the API's Redis (or a shared Redis instance)
# This must be the same Redis the API uses for SSE pub/sub
REDIS_URL=redis://dashtam-dev-redis:6379/0
Important: If jobs use a separate Redis instance, SSE events won't reach API clients.
Testing¶
Unit Test (Mock Redis)¶
@pytest.mark.anyio
async def test_sse_publisher_publishes_to_correct_channel():
mock_redis = AsyncMock()
publisher = SSEPublisher(mock_redis)
user_id = UUID("01234567-89ab-cdef-0123-456789abcdef")
await publisher.publish_to_user(
user_id=user_id,
event_type="provider.token.expiring",
data={"connection_id": "abc"},
)
mock_redis.publish.assert_called_once()
call_args = mock_redis.publish.call_args
# Verify channel
assert call_args[0][0] == f"sse:user:{user_id}"
# Verify event structure
event = json.loads(call_args[0][1])
assert event["event_type"] == "provider.token.expiring"
assert event["user_id"] == str(user_id)
assert event["data"]["connection_id"] == "abc"
Integration Test (Real Redis)¶
@pytest.mark.integration
@pytest.mark.anyio
async def test_sse_event_reaches_subscriber(redis_client):
"""Verify event published by jobs reaches API's subscriber pattern."""
publisher = SSEPublisher(redis_client)
user_id = uuid7()
channel = f"sse:user:{user_id}"
# Subscribe (simulating API's RedisSSESubscriber)
pubsub = redis_client.pubsub()
await pubsub.subscribe(channel)
# Publish event
await publisher.publish_to_user(
user_id=user_id,
event_type="provider.token.expiring",
data={"test": True},
)
# Receive event
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=5)
assert message is not None
event = json.loads(message["data"])
assert event["event_type"] == "provider.token.expiring"
await pubsub.unsubscribe(channel)
Error Handling¶
Follow fail-open pattern (consistent with API's SSEPublisher):
async def publish_to_user(self, ...) -> None:
try:
await self._redis.publish(channel, json.dumps(event))
logger.info("sse_event_published", user_id=str(user_id), event_type=event_type)
except RedisError as e:
# Fail-open: log but don't raise
logger.warning(
"sse_publish_failed",
user_id=str(user_id),
event_type=event_type,
error=str(e),
)
Why fail-open? SSE notifications are best-effort. A failed notification shouldn't crash the job or prevent other work.
Reference¶
- API SSE Architecture:
dashtam-api/docs/architecture/sse-architecture.md - API Channel Keys:
dashtam-api/src/infrastructure/sse/channel_keys.py - API Publisher:
dashtam-api/src/infrastructure/sse/redis_publisher.py - SSE Event Types:
dashtam-api/src/domain/events/sse_event.py
Created: 2026-01-20 | Last Updated: 2026-01-20