Skip to content

application.queries.handlers.get_provider_handler

src.application.queries.handlers.get_provider_handler

GetProviderConnection query handler.

Handles requests to retrieve a single provider connection. Returns DTO (not domain entity) to prevent leaking domain to presentation.

Architecture: - Application layer handler (orchestrates data retrieval) - Returns Result[DTO, str] (explicit error handling) - NO domain events (queries are side-effect free)

Reference
  • docs/architecture/cqrs-pattern.md

Classes

ProviderConnectionResult dataclass

Single provider connection result DTO.

Represents a provider connection for API response. Does NOT include sensitive data (credentials).

Attributes:

Name Type Description
id UUID

Connection unique identifier.

user_id UUID

Owning user's ID.

provider_id UUID

Provider registry ID.

provider_slug str

Provider identifier (e.g., "schwab").

alias str | None

User-defined nickname (if set).

status ConnectionStatus

Current connection status.

is_connected bool

Whether connection is usable.

needs_reauthentication bool

Whether user needs to re-authenticate.

connected_at datetime | None

When connection was first established.

last_sync_at datetime | None

Last successful sync timestamp.

created_at datetime

Record creation timestamp.

updated_at datetime

Last modification timestamp.

Source code in src/application/queries/handlers/get_provider_handler.py
@dataclass
class ProviderConnectionResult:
    """Single provider connection result DTO.

    Represents a provider connection for API response.
    Does NOT include sensitive data (credentials).

    Attributes:
        id: Connection unique identifier.
        user_id: Owning user's ID.
        provider_id: Provider registry ID.
        provider_slug: Provider identifier (e.g., "schwab").
        alias: User-defined nickname (if set).
        status: Current connection status.
        is_connected: Whether connection is usable.
        needs_reauthentication: Whether user needs to re-authenticate.
        connected_at: When connection was first established.
        last_sync_at: Last successful sync timestamp.
        created_at: Record creation timestamp.
        updated_at: Last modification timestamp.
    """

    id: UUID
    user_id: UUID
    provider_id: UUID
    provider_slug: str
    alias: str | None
    status: ConnectionStatus
    is_connected: bool
    needs_reauthentication: bool
    connected_at: datetime | None
    last_sync_at: datetime | None
    created_at: datetime
    updated_at: datetime

GetProviderConnectionError

GetProviderConnection-specific errors.

Source code in src/application/queries/handlers/get_provider_handler.py
class GetProviderConnectionError:
    """GetProviderConnection-specific errors."""

    CONNECTION_NOT_FOUND = "Connection not found"
    NOT_OWNED_BY_USER = "Connection not owned by user"

GetProviderConnectionHandler

Handler for GetProviderConnection query.

Retrieves a single provider connection by ID with ownership check. Uses cache-first strategy for performance.

Dependencies (injected via constructor): - ProviderConnectionRepository: For data retrieval - ProviderConnectionCache: For fast lookups

Source code in src/application/queries/handlers/get_provider_handler.py
class GetProviderConnectionHandler:
    """Handler for GetProviderConnection query.

    Retrieves a single provider connection by ID with ownership check.
    Uses cache-first strategy for performance.

    Dependencies (injected via constructor):
        - ProviderConnectionRepository: For data retrieval
        - ProviderConnectionCache: For fast lookups
    """

    def __init__(
        self,
        connection_repo: ProviderConnectionRepository,
        connection_cache: ProviderConnectionCache,
    ) -> None:
        """Initialize handler with dependencies.

        Args:
            connection_repo: Provider connection repository.
            connection_cache: Provider connection cache.
        """
        self._connection_repo = connection_repo
        self._connection_cache = connection_cache

    async def handle(
        self, query: GetProviderConnection
    ) -> Result[ProviderConnectionResult, str]:
        """Handle GetProviderConnection query.

        Uses cache-first strategy:
        1. Try cache
        2. Fall back to database
        3. Populate cache on miss

        Args:
            query: GetProviderConnection query with connection and user IDs.

        Returns:
            Success(ProviderConnectionResult): Connection found and owned by user.
            Failure(error): Connection not found or not owned by user.
        """
        # Step 1: Try cache first
        connection = await self._connection_cache.get(query.connection_id)

        # Step 2: Fall back to database
        if connection is None:
            connection = await self._connection_repo.find_by_id(query.connection_id)

            # Populate cache on miss (if found)
            if connection is not None:
                await self._connection_cache.set(connection)

        # Verify exists
        if connection is None:
            return Failure(error=GetProviderConnectionError.CONNECTION_NOT_FOUND)

        # Verify ownership
        if connection.user_id != query.user_id:
            return Failure(error=GetProviderConnectionError.NOT_OWNED_BY_USER)

        # Map to DTO (no domain entity in response)
        dto = ProviderConnectionResult(
            id=connection.id,
            user_id=connection.user_id,
            provider_id=connection.provider_id,
            provider_slug=connection.provider_slug,
            alias=connection.alias,
            status=connection.status,
            is_connected=connection.is_connected(),
            needs_reauthentication=connection.needs_reauthentication(),
            connected_at=connection.connected_at,
            last_sync_at=connection.last_sync_at,
            created_at=connection.created_at,
            updated_at=connection.updated_at,
        )

        return Success(value=dto)
Functions
__init__
__init__(
    connection_repo: ProviderConnectionRepository,
    connection_cache: ProviderConnectionCache,
) -> None

Parameters:

Name Type Description Default
connection_repo ProviderConnectionRepository

Provider connection repository.

required
connection_cache ProviderConnectionCache

Provider connection cache.

required
Source code in src/application/queries/handlers/get_provider_handler.py
def __init__(
    self,
    connection_repo: ProviderConnectionRepository,
    connection_cache: ProviderConnectionCache,
) -> None:
    """Initialize handler with dependencies.

    Args:
        connection_repo: Provider connection repository.
        connection_cache: Provider connection cache.
    """
    self._connection_repo = connection_repo
    self._connection_cache = connection_cache
handle async
handle(
    query: GetProviderConnection,
) -> Result[ProviderConnectionResult, str]

Handle GetProviderConnection query.

Uses cache-first strategy: 1. Try cache 2. Fall back to database 3. Populate cache on miss

Parameters:

Name Type Description Default
query GetProviderConnection

GetProviderConnection query with connection and user IDs.

required

Returns:

Name Type Description
Success ProviderConnectionResult

Connection found and owned by user.

Failure error

Connection not found or not owned by user.

Source code in src/application/queries/handlers/get_provider_handler.py
async def handle(
    self, query: GetProviderConnection
) -> Result[ProviderConnectionResult, str]:
    """Handle GetProviderConnection query.

    Uses cache-first strategy:
    1. Try cache
    2. Fall back to database
    3. Populate cache on miss

    Args:
        query: GetProviderConnection query with connection and user IDs.

    Returns:
        Success(ProviderConnectionResult): Connection found and owned by user.
        Failure(error): Connection not found or not owned by user.
    """
    # Step 1: Try cache first
    connection = await self._connection_cache.get(query.connection_id)

    # Step 2: Fall back to database
    if connection is None:
        connection = await self._connection_repo.find_by_id(query.connection_id)

        # Populate cache on miss (if found)
        if connection is not None:
            await self._connection_cache.set(connection)

    # Verify exists
    if connection is None:
        return Failure(error=GetProviderConnectionError.CONNECTION_NOT_FOUND)

    # Verify ownership
    if connection.user_id != query.user_id:
        return Failure(error=GetProviderConnectionError.NOT_OWNED_BY_USER)

    # Map to DTO (no domain entity in response)
    dto = ProviderConnectionResult(
        id=connection.id,
        user_id=connection.user_id,
        provider_id=connection.provider_id,
        provider_slug=connection.provider_slug,
        alias=connection.alias,
        status=connection.status,
        is_connected=connection.is_connected(),
        needs_reauthentication=connection.needs_reauthentication(),
        connected_at=connection.connected_at,
        last_sync_at=connection.last_sync_at,
        created_at=connection.created_at,
        updated_at=connection.updated_at,
    )

    return Success(value=dto)