Skip to content

domain.protocols.provider_connection_repository

src.domain.protocols.provider_connection_repository

ProviderConnectionRepository protocol for provider connection persistence.

Port (interface) for hexagonal architecture. Infrastructure layer implements this protocol.

Reference
  • docs/architecture/provider-domain-model.md

Classes

ProviderConnectionRepository

Bases: Protocol

Provider connection repository protocol (port).

Defines the interface for provider connection persistence operations. Infrastructure layer provides concrete implementation.

This is a Protocol (not ABC) for structural typing. Implementations don't need to inherit from this.

Methods:

Name Description
find_by_id

Retrieve connection by ID

find_by_user_id

Retrieve all connections for user

find_by_user_and_provider

Retrieve connections for user + provider

find_active_by_user

Retrieve active connections for user

save

Create or update connection

delete

Remove connection

Example Implementation

class PostgresProviderConnectionRepository: ... async def find_by_id(self, id: UUID) -> ProviderConnection | None: ... # Database logic here ... pass

Source code in src/domain/protocols/provider_connection_repository.py
class ProviderConnectionRepository(Protocol):
    """Provider connection repository protocol (port).

    Defines the interface for provider connection persistence operations.
    Infrastructure layer provides concrete implementation.

    This is a Protocol (not ABC) for structural typing.
    Implementations don't need to inherit from this.

    Methods:
        find_by_id: Retrieve connection by ID
        find_by_user_id: Retrieve all connections for user
        find_by_user_and_provider: Retrieve connections for user + provider
        find_active_by_user: Retrieve active connections for user
        save: Create or update connection
        delete: Remove connection

    Example Implementation:
        >>> class PostgresProviderConnectionRepository:
        ...     async def find_by_id(self, id: UUID) -> ProviderConnection | None:
        ...         # Database logic here
        ...         pass
    """

    async def find_by_id(self, connection_id: UUID) -> ProviderConnection | None:
        """Find connection by ID.

        Args:
            connection_id: Connection's unique identifier.

        Returns:
            ProviderConnection if found, None otherwise.

        Example:
            >>> conn = await repo.find_by_id(connection_id)
            >>> if conn:
            ...     print(conn.provider_slug)
        """
        ...

    async def find_by_user_id(self, user_id: UUID) -> list[ProviderConnection]:
        """Find all connections for a user.

        Returns connections in all statuses (including disconnected).

        Args:
            user_id: User's unique identifier.

        Returns:
            List of connections (empty if none found).

        Example:
            >>> connections = await repo.find_by_user_id(user_id)
            >>> for conn in connections:
            ...     print(f"{conn.provider_slug}: {conn.status.value}")
        """
        ...

    async def find_by_user_and_provider(
        self,
        user_id: UUID,
        provider_id: UUID,
    ) -> list[ProviderConnection]:
        """Find all connections for user + provider combination.

        User may have multiple connections to same provider (different accounts).

        Args:
            user_id: User's unique identifier.
            provider_id: Provider's unique identifier.

        Returns:
            List of connections (empty if none found).

        Example:
            >>> connections = await repo.find_by_user_and_provider(user_id, schwab_id)
            >>> # User might have multiple Schwab accounts
            >>> for conn in connections:
            ...     print(conn.alias)
        """
        ...

    async def find_active_by_user(self, user_id: UUID) -> list[ProviderConnection]:
        """Find all active connections for a user.

        Only returns connections with status=ACTIVE.

        Args:
            user_id: User's unique identifier.

        Returns:
            List of active connections (empty if none found).

        Example:
            >>> active = await repo.find_active_by_user(user_id)
            >>> for conn in active:
            ...     if conn.can_sync():
            ...         # Perform sync
        """
        ...

    async def save(self, connection: ProviderConnection) -> None:
        """Create or update connection in database.

        Uses upsert semantics - creates if not exists, updates if exists.

        Args:
            connection: ProviderConnection entity to persist.

        Raises:
            DatabaseError: If database operation fails.

        Example:
            >>> conn = ProviderConnection(...)
            >>> await repo.save(conn)
        """
        ...

    async def delete(self, connection_id: UUID) -> None:
        """Remove connection from database.

        Hard delete - permanently removes the record.
        For soft delete, use mark_disconnected() on the entity instead.

        Args:
            connection_id: Connection's unique identifier.

        Raises:
            NotFoundError: If connection doesn't exist.
            DatabaseError: If database operation fails.

        Note:
            Consider using mark_disconnected() for audit trail instead of delete.

        Example:
            >>> await repo.delete(connection_id)
        """
        ...

    async def find_expiring_soon(
        self,
        minutes: int = 30,
    ) -> list[ProviderConnection]:
        """Find connections with credentials expiring soon.

        Used by background job to proactively refresh credentials.

        Args:
            minutes: Time threshold in minutes (default 30).

        Returns:
            List of active connections with credentials expiring within threshold.

        Example:
            >>> expiring = await repo.find_expiring_soon(minutes=15)
            >>> for conn in expiring:
            ...     # Trigger refresh
        """
        ...
Functions
find_by_id async
find_by_id(
    connection_id: UUID,
) -> ProviderConnection | None

Find connection by ID.

Parameters:

Name Type Description Default
connection_id UUID

Connection's unique identifier.

required

Returns:

Type Description
ProviderConnection | None

ProviderConnection if found, None otherwise.

Example

conn = await repo.find_by_id(connection_id) if conn: ... print(conn.provider_slug)

Source code in src/domain/protocols/provider_connection_repository.py
async def find_by_id(self, connection_id: UUID) -> ProviderConnection | None:
    """Find connection by ID.

    Args:
        connection_id: Connection's unique identifier.

    Returns:
        ProviderConnection if found, None otherwise.

    Example:
        >>> conn = await repo.find_by_id(connection_id)
        >>> if conn:
        ...     print(conn.provider_slug)
    """
    ...
find_by_user_id async
find_by_user_id(user_id: UUID) -> list[ProviderConnection]

Find all connections for a user.

Returns connections in all statuses (including disconnected).

Parameters:

Name Type Description Default
user_id UUID

User's unique identifier.

required

Returns:

Type Description
list[ProviderConnection]

List of connections (empty if none found).

Example

connections = await repo.find_by_user_id(user_id) for conn in connections: ... print(f"{conn.provider_slug}: {conn.status.value}")

Source code in src/domain/protocols/provider_connection_repository.py
async def find_by_user_id(self, user_id: UUID) -> list[ProviderConnection]:
    """Find all connections for a user.

    Returns connections in all statuses (including disconnected).

    Args:
        user_id: User's unique identifier.

    Returns:
        List of connections (empty if none found).

    Example:
        >>> connections = await repo.find_by_user_id(user_id)
        >>> for conn in connections:
        ...     print(f"{conn.provider_slug}: {conn.status.value}")
    """
    ...
find_by_user_and_provider async
find_by_user_and_provider(
    user_id: UUID, provider_id: UUID
) -> list[ProviderConnection]

Find all connections for user + provider combination.

User may have multiple connections to same provider (different accounts).

Parameters:

Name Type Description Default
user_id UUID

User's unique identifier.

required
provider_id UUID

Provider's unique identifier.

required

Returns:

Type Description
list[ProviderConnection]

List of connections (empty if none found).

Example

connections = await repo.find_by_user_and_provider(user_id, schwab_id)

User might have multiple Schwab accounts

for conn in connections: ... print(conn.alias)

Source code in src/domain/protocols/provider_connection_repository.py
async def find_by_user_and_provider(
    self,
    user_id: UUID,
    provider_id: UUID,
) -> list[ProviderConnection]:
    """Find all connections for user + provider combination.

    User may have multiple connections to same provider (different accounts).

    Args:
        user_id: User's unique identifier.
        provider_id: Provider's unique identifier.

    Returns:
        List of connections (empty if none found).

    Example:
        >>> connections = await repo.find_by_user_and_provider(user_id, schwab_id)
        >>> # User might have multiple Schwab accounts
        >>> for conn in connections:
        ...     print(conn.alias)
    """
    ...
find_active_by_user async
find_active_by_user(
    user_id: UUID,
) -> list[ProviderConnection]

Find all active connections for a user.

Only returns connections with status=ACTIVE.

Parameters:

Name Type Description Default
user_id UUID

User's unique identifier.

required

Returns:

Type Description
list[ProviderConnection]

List of active connections (empty if none found).

Example

active = await repo.find_active_by_user(user_id) for conn in active: ... if conn.can_sync(): ... # Perform sync

Source code in src/domain/protocols/provider_connection_repository.py
async def find_active_by_user(self, user_id: UUID) -> list[ProviderConnection]:
    """Find all active connections for a user.

    Only returns connections with status=ACTIVE.

    Args:
        user_id: User's unique identifier.

    Returns:
        List of active connections (empty if none found).

    Example:
        >>> active = await repo.find_active_by_user(user_id)
        >>> for conn in active:
        ...     if conn.can_sync():
        ...         # Perform sync
    """
    ...
save async
save(connection: ProviderConnection) -> None

Create or update connection in database.

Uses upsert semantics - creates if not exists, updates if exists.

Parameters:

Name Type Description Default
connection ProviderConnection

ProviderConnection entity to persist.

required

Raises:

Type Description
DatabaseError

If database operation fails.

Example

conn = ProviderConnection(...) await repo.save(conn)

Source code in src/domain/protocols/provider_connection_repository.py
async def save(self, connection: ProviderConnection) -> None:
    """Create or update connection in database.

    Uses upsert semantics - creates if not exists, updates if exists.

    Args:
        connection: ProviderConnection entity to persist.

    Raises:
        DatabaseError: If database operation fails.

    Example:
        >>> conn = ProviderConnection(...)
        >>> await repo.save(conn)
    """
    ...
delete async
delete(connection_id: UUID) -> None

Remove connection from database.

Hard delete - permanently removes the record. For soft delete, use mark_disconnected() on the entity instead.

Parameters:

Name Type Description Default
connection_id UUID

Connection's unique identifier.

required

Raises:

Type Description
NotFoundError

If connection doesn't exist.

DatabaseError

If database operation fails.

Note

Consider using mark_disconnected() for audit trail instead of delete.

Example

await repo.delete(connection_id)

Source code in src/domain/protocols/provider_connection_repository.py
async def delete(self, connection_id: UUID) -> None:
    """Remove connection from database.

    Hard delete - permanently removes the record.
    For soft delete, use mark_disconnected() on the entity instead.

    Args:
        connection_id: Connection's unique identifier.

    Raises:
        NotFoundError: If connection doesn't exist.
        DatabaseError: If database operation fails.

    Note:
        Consider using mark_disconnected() for audit trail instead of delete.

    Example:
        >>> await repo.delete(connection_id)
    """
    ...
find_expiring_soon async
find_expiring_soon(
    minutes: int = 30,
) -> list[ProviderConnection]

Find connections with credentials expiring soon.

Used by background job to proactively refresh credentials.

Parameters:

Name Type Description Default
minutes int

Time threshold in minutes (default 30).

30

Returns:

Type Description
list[ProviderConnection]

List of active connections with credentials expiring within threshold.

Example

expiring = await repo.find_expiring_soon(minutes=15) for conn in expiring: ... # Trigger refresh

Source code in src/domain/protocols/provider_connection_repository.py
async def find_expiring_soon(
    self,
    minutes: int = 30,
) -> list[ProviderConnection]:
    """Find connections with credentials expiring soon.

    Used by background job to proactively refresh credentials.

    Args:
        minutes: Time threshold in minutes (default 30).

    Returns:
        List of active connections with credentials expiring within threshold.

    Example:
        >>> expiring = await repo.find_expiring_soon(minutes=15)
        >>> for conn in expiring:
        ...     # Trigger refresh
    """
    ...