Skip to content

domain.protocols.provider_protocol

src.domain.protocols.provider_protocol

ProviderProtocol for financial provider adapters.

Port (interface) for hexagonal architecture. Infrastructure layer implements this protocol for each provider (Schwab, Chase, etc.).

This protocol defines the contract that all financial providers must implement, regardless of their authentication mechanism (OAuth, API key, etc.) or API structure.

Design Principles
  • Auth-agnostic: Protocol receives credentials as opaque dict, provider extracts what it needs
  • Minimal interface: Only data-fetching methods required (slug, fetch_accounts, etc.)
  • Auth-specific methods (OAuth token exchange, API key validation) are provider implementation details

Methods return Result types following railway-oriented programming pattern. See docs/architecture/error-handling-architecture.md for details.

Credentials Dict Structure (by CredentialType): - OAUTH2: {"access_token": ..., "refresh_token": ..., "token_type": ..., "scope": ...} - API_KEY: {"api_key": ..., "api_secret": ...} - LINK_TOKEN: {"access_token": ..., "item_id": ..., "institution_id": ...} - CERTIFICATE: {"client_certificate": ..., "private_key": ..., "certificate_chain": ...}

Reference
  • docs/architecture/provider-integration-architecture.md

Classes

OAuthTokens dataclass

OAuth tokens returned by provider after authentication.

Returned from exchange_code_for_tokens() and refresh_access_token().

Attributes:

Name Type Description
access_token str

Bearer token for API authentication.

refresh_token str | None

Token for obtaining new access tokens. May be None if provider doesn't rotate tokens on refresh.

expires_in int

Seconds until access_token expires.

token_type str

Token type, typically "Bearer".

scope str | None

OAuth scope granted (provider-specific).

Example

tokens = await provider.exchange_code_for_tokens(code) print(f"Token expires in {tokens.expires_in} seconds")

Source code in src/domain/protocols/provider_protocol.py
@dataclass(frozen=True, kw_only=True)
class OAuthTokens:
    """OAuth tokens returned by provider after authentication.

    Returned from exchange_code_for_tokens() and refresh_access_token().

    Attributes:
        access_token: Bearer token for API authentication.
        refresh_token: Token for obtaining new access tokens. May be None if
            provider doesn't rotate tokens on refresh.
        expires_in: Seconds until access_token expires.
        token_type: Token type, typically "Bearer".
        scope: OAuth scope granted (provider-specific).

    Example:
        >>> tokens = await provider.exchange_code_for_tokens(code)
        >>> print(f"Token expires in {tokens.expires_in} seconds")
    """

    access_token: str
    refresh_token: str | None
    expires_in: int
    token_type: str = "Bearer"
    scope: str | None = None

ProviderAccountData dataclass

Account data as returned by provider (before mapping to domain).

Provider adapters return this; mappers convert to Account entity. This intermediate type decouples provider response format from domain model.

Attributes:

Name Type Description
provider_account_id str

Provider's unique account identifier.

account_number_masked str

Masked account number for display (e.g., "**1234").

name str

Account name from provider.

account_type str

Provider's account type string (mapped by AccountMapper).

balance Decimal

Current account balance.

available_balance Decimal | None

Available balance if different from balance.

currency str

ISO 4217 currency code (e.g., "USD").

is_active bool

Whether account is active on provider.

raw_data dict[str, Any] | None

Full provider response for metadata/debugging.

Example

accounts = await provider.fetch_accounts(access_token) for account in accounts: ... domain_account = mapper.to_entity(account, connection_id)

Source code in src/domain/protocols/provider_protocol.py
@dataclass(frozen=True, kw_only=True)
class ProviderAccountData:
    """Account data as returned by provider (before mapping to domain).

    Provider adapters return this; mappers convert to Account entity.
    This intermediate type decouples provider response format from domain model.

    Attributes:
        provider_account_id: Provider's unique account identifier.
        account_number_masked: Masked account number for display (e.g., "****1234").
        name: Account name from provider.
        account_type: Provider's account type string (mapped by AccountMapper).
        balance: Current account balance.
        available_balance: Available balance if different from balance.
        currency: ISO 4217 currency code (e.g., "USD").
        is_active: Whether account is active on provider.
        raw_data: Full provider response for metadata/debugging.

    Example:
        >>> accounts = await provider.fetch_accounts(access_token)
        >>> for account in accounts:
        ...     domain_account = mapper.to_entity(account, connection_id)
    """

    provider_account_id: str
    account_number_masked: str
    name: str
    account_type: str
    balance: Decimal
    currency: str
    available_balance: Decimal | None = None
    is_active: bool = True
    raw_data: dict[str, Any] | None = None

ProviderHoldingData dataclass

Holding (position) data as returned by provider (before mapping to domain).

Provider adapters return this; mappers convert to Holding entity. This intermediate type decouples provider response format from domain model.

Attributes:

Name Type Description
provider_holding_id str

Provider's unique identifier for this position.

symbol str

Security ticker symbol (e.g., "AAPL").

security_name str

Full security name (e.g., "Apple Inc.").

asset_type str

Security asset type (equity, etf, option, etc.).

quantity Decimal

Number of shares/units held.

cost_basis Decimal

Total cost paid for this position.

market_value Decimal

Current market value of the position.

currency str

ISO 4217 currency code (e.g., "USD").

average_price Decimal | None

Average price per share (optional).

current_price Decimal | None

Current market price per share (optional).

raw_data dict[str, Any] | None

Full provider response for metadata/debugging.

Example

holdings = await provider.fetch_holdings(access_token, account_id) for holding in holdings: ... domain_holding = mapper.to_entity(holding, account_id)

Source code in src/domain/protocols/provider_protocol.py
@dataclass(frozen=True, kw_only=True)
class ProviderHoldingData:
    """Holding (position) data as returned by provider (before mapping to domain).

    Provider adapters return this; mappers convert to Holding entity.
    This intermediate type decouples provider response format from domain model.

    Attributes:
        provider_holding_id: Provider's unique identifier for this position.
        symbol: Security ticker symbol (e.g., "AAPL").
        security_name: Full security name (e.g., "Apple Inc.").
        asset_type: Security asset type (equity, etf, option, etc.).
        quantity: Number of shares/units held.
        cost_basis: Total cost paid for this position.
        market_value: Current market value of the position.
        currency: ISO 4217 currency code (e.g., "USD").
        average_price: Average price per share (optional).
        current_price: Current market price per share (optional).
        raw_data: Full provider response for metadata/debugging.

    Example:
        >>> holdings = await provider.fetch_holdings(access_token, account_id)
        >>> for holding in holdings:
        ...     domain_holding = mapper.to_entity(holding, account_id)
    """

    provider_holding_id: str
    symbol: str
    security_name: str
    asset_type: str
    quantity: Decimal
    cost_basis: Decimal
    market_value: Decimal
    currency: str
    average_price: Decimal | None = None
    current_price: Decimal | None = None
    raw_data: dict[str, Any] | None = None

ProviderTransactionData dataclass

Transaction data as returned by provider (before mapping to domain).

Provider adapters return this; mappers convert to Transaction entity. This intermediate type decouples provider response format from domain model.

Attributes:

Name Type Description
provider_transaction_id str

Provider's unique transaction identifier.

transaction_type str

Provider's transaction type string.

subtype str | None

Provider's transaction subtype (if applicable).

amount Decimal

Transaction amount (positive=credit, negative=debit).

currency str

ISO 4217 currency code.

description str

Human-readable transaction description.

transaction_date date

Date transaction occurred.

settlement_date date | None

Date funds/securities settled (if applicable).

status str

Provider's transaction status string.

symbol str | None

Security ticker symbol (for trades).

security_name str | None

Full security name (for trades).

asset_type str | None

Security asset type (for trades).

quantity Decimal | None

Number of shares/units (for trades).

unit_price Decimal | None

Price per share/unit (for trades).

commission Decimal | None

Trading commission (for trades).

raw_data dict[str, Any] | None

Full provider response for metadata/debugging.

Example

transactions = await provider.fetch_transactions( ... access_token, account_id, start_date, end_date ... ) for txn in transactions: ... domain_txn = mapper.to_entity(txn, account_id)

Source code in src/domain/protocols/provider_protocol.py
@dataclass(frozen=True, kw_only=True)
class ProviderTransactionData:
    """Transaction data as returned by provider (before mapping to domain).

    Provider adapters return this; mappers convert to Transaction entity.
    This intermediate type decouples provider response format from domain model.

    Attributes:
        provider_transaction_id: Provider's unique transaction identifier.
        transaction_type: Provider's transaction type string.
        subtype: Provider's transaction subtype (if applicable).
        amount: Transaction amount (positive=credit, negative=debit).
        currency: ISO 4217 currency code.
        description: Human-readable transaction description.
        transaction_date: Date transaction occurred.
        settlement_date: Date funds/securities settled (if applicable).
        status: Provider's transaction status string.
        symbol: Security ticker symbol (for trades).
        security_name: Full security name (for trades).
        asset_type: Security asset type (for trades).
        quantity: Number of shares/units (for trades).
        unit_price: Price per share/unit (for trades).
        commission: Trading commission (for trades).
        raw_data: Full provider response for metadata/debugging.

    Example:
        >>> transactions = await provider.fetch_transactions(
        ...     access_token, account_id, start_date, end_date
        ... )
        >>> for txn in transactions:
        ...     domain_txn = mapper.to_entity(txn, account_id)
    """

    provider_transaction_id: str
    transaction_type: str
    amount: Decimal
    currency: str
    description: str
    transaction_date: date
    status: str
    subtype: str | None = None
    settlement_date: date | None = None
    # Security details (for trades)
    symbol: str | None = None
    security_name: str | None = None
    asset_type: str | None = None
    quantity: Decimal | None = None
    unit_price: Decimal | None = None
    commission: Decimal | None = None
    # Metadata
    raw_data: dict[str, Any] | None = None

ProviderProtocol

Bases: Protocol

Protocol (port) for financial provider adapters.

Each financial provider (Schwab, Alpaca, Chase, etc.) implements this protocol to integrate with Dashtam. The protocol is auth-agnostic: credentials are passed as an opaque dict, and each provider extracts what it needs based on its authentication mechanism.

This is a Protocol (not ABC) for structural typing. Implementations don't need to inherit from this.

All methods return Result types (railway-oriented programming): - Success(data) on successful API calls - Failure(ProviderError) on failures

Auth-Specific Methods

OAuth providers (Schwab) implement additional methods like exchange_code_for_tokens() and refresh_access_token(), but these are NOT part of this protocol - they are provider implementation details. Sync handlers only need the fetch_* methods.

Provider implementations live in

src/infrastructure/providers/{provider_slug}/

Example Implementation (OAuth - Schwab): >>> class SchwabProvider: ... @property ... def slug(self) -> str: ... return "schwab" ... ... async def fetch_accounts( ... self, credentials: dict[str, Any] ... ) -> Result[list[ProviderAccountData], ProviderError]: ... access_token = credentials.get("access_token") ... # Use access_token to call Schwab API ... ...

Example Implementation (API Key - Alpaca): >>> class AlpacaProvider: ... @property ... def slug(self) -> str: ... return "alpaca" ... ... async def fetch_accounts( ... self, credentials: dict[str, Any] ... ) -> Result[list[ProviderAccountData], ProviderError]: ... api_key = credentials.get("api_key") ... api_secret = credentials.get("api_secret") ... # Use api_key/api_secret to call Alpaca API ... ...

Reference
  • docs/architecture/provider-integration-architecture.md
  • docs/architecture/error-handling-architecture.md
Source code in src/domain/protocols/provider_protocol.py
class ProviderProtocol(Protocol):
    """Protocol (port) for financial provider adapters.

    Each financial provider (Schwab, Alpaca, Chase, etc.) implements this
    protocol to integrate with Dashtam. The protocol is **auth-agnostic**:
    credentials are passed as an opaque dict, and each provider extracts
    what it needs based on its authentication mechanism.

    This is a Protocol (not ABC) for structural typing.
    Implementations don't need to inherit from this.

    All methods return Result types (railway-oriented programming):
    - Success(data) on successful API calls
    - Failure(ProviderError) on failures

    Auth-Specific Methods:
        OAuth providers (Schwab) implement additional methods like
        `exchange_code_for_tokens()` and `refresh_access_token()`, but these
        are NOT part of this protocol - they are provider implementation details.
        Sync handlers only need the fetch_* methods.

    Provider implementations live in:
        src/infrastructure/providers/{provider_slug}/

    Example Implementation (OAuth - Schwab):
        >>> class SchwabProvider:
        ...     @property
        ...     def slug(self) -> str:
        ...         return "schwab"
        ...
        ...     async def fetch_accounts(
        ...         self, credentials: dict[str, Any]
        ...     ) -> Result[list[ProviderAccountData], ProviderError]:
        ...         access_token = credentials.get("access_token")
        ...         # Use access_token to call Schwab API
        ...         ...

    Example Implementation (API Key - Alpaca):
        >>> class AlpacaProvider:
        ...     @property
        ...     def slug(self) -> str:
        ...         return "alpaca"
        ...
        ...     async def fetch_accounts(
        ...         self, credentials: dict[str, Any]
        ...     ) -> Result[list[ProviderAccountData], ProviderError]:
        ...         api_key = credentials.get("api_key")
        ...         api_secret = credentials.get("api_secret")
        ...         # Use api_key/api_secret to call Alpaca API
        ...         ...

    Reference:
        - docs/architecture/provider-integration-architecture.md
        - docs/architecture/error-handling-architecture.md
    """

    @property
    def slug(self) -> str:
        """Unique provider identifier.

        Used for routing, configuration, and database storage.
        Must be lowercase, alphanumeric with underscores.

        Returns:
            Provider slug (e.g., "schwab", "alpaca", "chase").

        Example:
            >>> provider.slug
            'schwab'
        """
        ...

    async def fetch_accounts(
        self,
        credentials: dict[str, Any],
    ) -> "Result[list[ProviderAccountData], ProviderError]":
        """Fetch all accounts for the authenticated user.

        Returns account data in provider's format. Use AccountMapper
        to convert to domain Account entities.

        Args:
            credentials: Decrypted credentials dict. Structure depends on
                credential_type (OAuth2, API Key, etc.). Provider extracts
                what it needs (e.g., access_token, api_key, api_secret).

        Returns:
            Success(list[ProviderAccountData]): Account data (empty list if no accounts).
            Failure(ProviderAuthenticationError): If credentials are invalid/expired.
            Failure(ProviderUnavailableError): If provider API is unreachable.
            Failure(ProviderRateLimitError): If rate limit exceeded.

        Example:
            >>> credentials = {"access_token": "..."}  # OAuth
            >>> # OR: credentials = {"api_key": "...", "api_secret": "..."}  # API Key
            >>> result = await provider.fetch_accounts(credentials)
            >>> match result:
            ...     case Success(accounts):
            ...         for account_data in accounts:
            ...             account = mapper.to_entity(account_data, connection_id)
            ...             await account_repo.save(account)
            ...     case Failure(error):
            ...         logger.error(f"Failed to fetch accounts: {error.message}")
        """
        ...

    async def fetch_holdings(
        self,
        credentials: dict[str, Any],
        provider_account_id: str,
    ) -> "Result[list[ProviderHoldingData], ProviderError]":
        """Fetch holdings (positions) for a specific account.

        Returns holding data in provider's format. Use HoldingMapper
        to convert to domain Holding entities.

        Args:
            credentials: Decrypted credentials dict. Provider extracts what it needs.
            provider_account_id: Provider's account identifier (from ProviderAccountData).

        Returns:
            Success(list[ProviderHoldingData]): Holding data (empty list if none).
            Failure(ProviderAuthenticationError): If credentials are invalid/expired.
            Failure(ProviderUnavailableError): If provider API is unreachable.
            Failure(ProviderRateLimitError): If rate limit exceeded.

        Example:
            >>> result = await provider.fetch_holdings(credentials, "12345")
            >>> match result:
            ...     case Success(holdings):
            ...         for holding_data in holdings:
            ...             holding = mapper.to_entity(holding_data, account_id)
            ...             await holding_repo.save(holding)
            ...     case Failure(error):
            ...         logger.error(f"Failed to fetch holdings: {error.message}")
        """
        ...

    async def fetch_transactions(
        self,
        credentials: dict[str, Any],
        provider_account_id: str,
        start_date: date | None = None,
        end_date: date | None = None,
    ) -> "Result[list[ProviderTransactionData], ProviderError]":
        """Fetch transactions for a specific account.

        Returns transaction data in provider's format. Use TransactionMapper
        to convert to domain Transaction entities.

        Args:
            credentials: Decrypted credentials dict. Provider extracts what it needs.
            provider_account_id: Provider's account identifier (from ProviderAccountData).
            start_date: Beginning of date range (default: provider-specific, often 30 days).
            end_date: End of date range (default: today).

        Returns:
            Success(list[ProviderTransactionData]): Transaction data (empty list if none).
            Failure(ProviderAuthenticationError): If credentials are invalid/expired.
            Failure(ProviderUnavailableError): If provider API is unreachable.
            Failure(ProviderRateLimitError): If rate limit exceeded.

        Example:
            >>> result = await provider.fetch_transactions(
            ...     credentials,
            ...     "12345",
            ...     start_date=date(2024, 1, 1),
            ...     end_date=date(2024, 12, 31),
            ... )
            >>> match result:
            ...     case Success(transactions):
            ...         for txn_data in transactions:
            ...             txn = mapper.to_entity(txn_data, account_id)
            ...             await txn_repo.save(txn)
            ...     case Failure(error):
            ...         logger.error(f"Failed to fetch transactions: {error.message}")
        """
        ...
Attributes
slug property
slug: str

Unique provider identifier.

Used for routing, configuration, and database storage. Must be lowercase, alphanumeric with underscores.

Returns:

Type Description
str

Provider slug (e.g., "schwab", "alpaca", "chase").

Example

provider.slug 'schwab'

Functions
fetch_accounts async
fetch_accounts(
    credentials: dict[str, Any],
) -> Result[list[ProviderAccountData], ProviderError]

Fetch all accounts for the authenticated user.

Returns account data in provider's format. Use AccountMapper to convert to domain Account entities.

Parameters:

Name Type Description Default
credentials dict[str, Any]

Decrypted credentials dict. Structure depends on credential_type (OAuth2, API Key, etc.). Provider extracts what it needs (e.g., access_token, api_key, api_secret).

required

Returns:

Name Type Description
Success list[ProviderAccountData]

Account data (empty list if no accounts).

Failure ProviderAuthenticationError

If credentials are invalid/expired.

Failure ProviderUnavailableError

If provider API is unreachable.

Failure ProviderRateLimitError

If rate limit exceeded.

Example

credentials = {"access_token": "..."} # OAuth

OR: credentials = {"api_key": "...", "api_secret": "..."} # API Key

result = await provider.fetch_accounts(credentials) match result: ... case Success(accounts): ... for account_data in accounts: ... account = mapper.to_entity(account_data, connection_id) ... await account_repo.save(account) ... case Failure(error): ... logger.error(f"Failed to fetch accounts: {error.message}")

Source code in src/domain/protocols/provider_protocol.py
async def fetch_accounts(
    self,
    credentials: dict[str, Any],
) -> "Result[list[ProviderAccountData], ProviderError]":
    """Fetch all accounts for the authenticated user.

    Returns account data in provider's format. Use AccountMapper
    to convert to domain Account entities.

    Args:
        credentials: Decrypted credentials dict. Structure depends on
            credential_type (OAuth2, API Key, etc.). Provider extracts
            what it needs (e.g., access_token, api_key, api_secret).

    Returns:
        Success(list[ProviderAccountData]): Account data (empty list if no accounts).
        Failure(ProviderAuthenticationError): If credentials are invalid/expired.
        Failure(ProviderUnavailableError): If provider API is unreachable.
        Failure(ProviderRateLimitError): If rate limit exceeded.

    Example:
        >>> credentials = {"access_token": "..."}  # OAuth
        >>> # OR: credentials = {"api_key": "...", "api_secret": "..."}  # API Key
        >>> result = await provider.fetch_accounts(credentials)
        >>> match result:
        ...     case Success(accounts):
        ...         for account_data in accounts:
        ...             account = mapper.to_entity(account_data, connection_id)
        ...             await account_repo.save(account)
        ...     case Failure(error):
        ...         logger.error(f"Failed to fetch accounts: {error.message}")
    """
    ...
fetch_holdings async
fetch_holdings(
    credentials: dict[str, Any], provider_account_id: str
) -> Result[list[ProviderHoldingData], ProviderError]

Fetch holdings (positions) for a specific account.

Returns holding data in provider's format. Use HoldingMapper to convert to domain Holding entities.

Parameters:

Name Type Description Default
credentials dict[str, Any]

Decrypted credentials dict. Provider extracts what it needs.

required
provider_account_id str

Provider's account identifier (from ProviderAccountData).

required

Returns:

Name Type Description
Success list[ProviderHoldingData]

Holding data (empty list if none).

Failure ProviderAuthenticationError

If credentials are invalid/expired.

Failure ProviderUnavailableError

If provider API is unreachable.

Failure ProviderRateLimitError

If rate limit exceeded.

Example

result = await provider.fetch_holdings(credentials, "12345") match result: ... case Success(holdings): ... for holding_data in holdings: ... holding = mapper.to_entity(holding_data, account_id) ... await holding_repo.save(holding) ... case Failure(error): ... logger.error(f"Failed to fetch holdings: {error.message}")

Source code in src/domain/protocols/provider_protocol.py
async def fetch_holdings(
    self,
    credentials: dict[str, Any],
    provider_account_id: str,
) -> "Result[list[ProviderHoldingData], ProviderError]":
    """Fetch holdings (positions) for a specific account.

    Returns holding data in provider's format. Use HoldingMapper
    to convert to domain Holding entities.

    Args:
        credentials: Decrypted credentials dict. Provider extracts what it needs.
        provider_account_id: Provider's account identifier (from ProviderAccountData).

    Returns:
        Success(list[ProviderHoldingData]): Holding data (empty list if none).
        Failure(ProviderAuthenticationError): If credentials are invalid/expired.
        Failure(ProviderUnavailableError): If provider API is unreachable.
        Failure(ProviderRateLimitError): If rate limit exceeded.

    Example:
        >>> result = await provider.fetch_holdings(credentials, "12345")
        >>> match result:
        ...     case Success(holdings):
        ...         for holding_data in holdings:
        ...             holding = mapper.to_entity(holding_data, account_id)
        ...             await holding_repo.save(holding)
        ...     case Failure(error):
        ...         logger.error(f"Failed to fetch holdings: {error.message}")
    """
    ...
fetch_transactions async
fetch_transactions(
    credentials: dict[str, Any],
    provider_account_id: str,
    start_date: date | None = None,
    end_date: date | None = None,
) -> Result[list[ProviderTransactionData], ProviderError]

Fetch transactions for a specific account.

Returns transaction data in provider's format. Use TransactionMapper to convert to domain Transaction entities.

Parameters:

Name Type Description Default
credentials dict[str, Any]

Decrypted credentials dict. Provider extracts what it needs.

required
provider_account_id str

Provider's account identifier (from ProviderAccountData).

required
start_date date | None

Beginning of date range (default: provider-specific, often 30 days).

None
end_date date | None

End of date range (default: today).

None

Returns:

Name Type Description
Success list[ProviderTransactionData]

Transaction data (empty list if none).

Failure ProviderAuthenticationError

If credentials are invalid/expired.

Failure ProviderUnavailableError

If provider API is unreachable.

Failure ProviderRateLimitError

If rate limit exceeded.

Example

result = await provider.fetch_transactions( ... credentials, ... "12345", ... start_date=date(2024, 1, 1), ... end_date=date(2024, 12, 31), ... ) match result: ... case Success(transactions): ... for txn_data in transactions: ... txn = mapper.to_entity(txn_data, account_id) ... await txn_repo.save(txn) ... case Failure(error): ... logger.error(f"Failed to fetch transactions: {error.message}")

Source code in src/domain/protocols/provider_protocol.py
async def fetch_transactions(
    self,
    credentials: dict[str, Any],
    provider_account_id: str,
    start_date: date | None = None,
    end_date: date | None = None,
) -> "Result[list[ProviderTransactionData], ProviderError]":
    """Fetch transactions for a specific account.

    Returns transaction data in provider's format. Use TransactionMapper
    to convert to domain Transaction entities.

    Args:
        credentials: Decrypted credentials dict. Provider extracts what it needs.
        provider_account_id: Provider's account identifier (from ProviderAccountData).
        start_date: Beginning of date range (default: provider-specific, often 30 days).
        end_date: End of date range (default: today).

    Returns:
        Success(list[ProviderTransactionData]): Transaction data (empty list if none).
        Failure(ProviderAuthenticationError): If credentials are invalid/expired.
        Failure(ProviderUnavailableError): If provider API is unreachable.
        Failure(ProviderRateLimitError): If rate limit exceeded.

    Example:
        >>> result = await provider.fetch_transactions(
        ...     credentials,
        ...     "12345",
        ...     start_date=date(2024, 1, 1),
        ...     end_date=date(2024, 12, 31),
        ... )
        >>> match result:
        ...     case Success(transactions):
        ...         for txn_data in transactions:
        ...             txn = mapper.to_entity(txn_data, account_id)
        ...             await txn_repo.save(txn)
        ...     case Failure(error):
        ...         logger.error(f"Failed to fetch transactions: {error.message}")
    """
    ...

OAuthProviderProtocol

Bases: ProviderProtocol, Protocol

Extended protocol for OAuth 2.0 providers.

Extends ProviderProtocol with OAuth-specific methods for token exchange and refresh. Only OAuth providers (Schwab, Chase, etc.) implement this.

API Key providers (Alpaca) do NOT implement this protocol - they only implement the base ProviderProtocol.

Usage
  • Use ProviderProtocol for sync handlers (auth-agnostic)
  • Use OAuthProviderProtocol for OAuth callbacks/token refresh
Example
In OAuth callback handler:

provider: OAuthProviderProtocol = get_oauth_provider("schwab") result = await provider.exchange_code_for_tokens(code)

Source code in src/domain/protocols/provider_protocol.py
class OAuthProviderProtocol(ProviderProtocol, Protocol):
    """Extended protocol for OAuth 2.0 providers.

    Extends ProviderProtocol with OAuth-specific methods for token exchange
    and refresh. Only OAuth providers (Schwab, Chase, etc.) implement this.

    API Key providers (Alpaca) do NOT implement this protocol - they only
    implement the base ProviderProtocol.

    Usage:
        - Use ProviderProtocol for sync handlers (auth-agnostic)
        - Use OAuthProviderProtocol for OAuth callbacks/token refresh

    Example:
        >>> # In OAuth callback handler:
        >>> provider: OAuthProviderProtocol = get_oauth_provider("schwab")
        >>> result = await provider.exchange_code_for_tokens(code)
    """

    async def exchange_code_for_tokens(
        self,
        authorization_code: str,
    ) -> "Result[OAuthTokens, ProviderError]":
        """Exchange OAuth authorization code for access and refresh tokens.

        Called after user completes OAuth consent flow and is redirected
        back with an authorization code.

        Args:
            authorization_code: Code from OAuth callback query parameter.

        Returns:
            Success(OAuthTokens): With access_token, refresh_token, and expiration.
            Failure(ProviderAuthenticationError): If code is invalid or expired.
            Failure(ProviderUnavailableError): If provider API is unreachable.

        Example:
            >>> result = await provider.exchange_code_for_tokens(code)
            >>> match result:
            ...     case Success(tokens):
            ...         encrypted = encryption_service.encrypt({
            ...             "access_token": tokens.access_token,
            ...             "refresh_token": tokens.refresh_token,
            ...         })
            ...     case Failure(error):
            ...         logger.error(f"Token exchange failed: {error.message}")
        """
        ...

    async def refresh_access_token(
        self,
        refresh_token: str,
    ) -> "Result[OAuthTokens, ProviderError]":
        """Refresh access token using refresh token.

        Called when access token is expired or about to expire.
        May return a new refresh token if provider rotates tokens.

        Args:
            refresh_token: Current refresh token.

        Returns:
            Success(OAuthTokens): With new access_token. refresh_token is:
                - None if provider doesn't rotate tokens
                - New token if provider rotated
                - Same token if provider returns same token
            Failure(ProviderAuthenticationError): If refresh token is invalid/expired.
                User must re-authenticate via OAuth flow.
            Failure(ProviderUnavailableError): If provider API is unreachable.

        Example:
            >>> result = await provider.refresh_access_token(refresh_token)
            >>> match result:
            ...     case Success(new_tokens):
            ...         if new_tokens.refresh_token:
            ...             # Provider rotated token, must update storage
            ...             new_refresh = new_tokens.refresh_token
            ...         else:
            ...             # Keep existing refresh token
            ...             new_refresh = refresh_token
            ...     case Failure(error):
            ...         # User must re-authenticate
            ...         logger.warning(f"Token refresh failed: {error.message}")
        """
        ...
Functions
exchange_code_for_tokens async
exchange_code_for_tokens(
    authorization_code: str,
) -> Result[OAuthTokens, ProviderError]

Exchange OAuth authorization code for access and refresh tokens.

Called after user completes OAuth consent flow and is redirected back with an authorization code.

Parameters:

Name Type Description Default
authorization_code str

Code from OAuth callback query parameter.

required

Returns:

Name Type Description
Success OAuthTokens

With access_token, refresh_token, and expiration.

Failure ProviderAuthenticationError

If code is invalid or expired.

Failure ProviderUnavailableError

If provider API is unreachable.

Example

result = await provider.exchange_code_for_tokens(code) match result: ... case Success(tokens): ... encrypted = encryption_service.encrypt({ ... "access_token": tokens.access_token, ... "refresh_token": tokens.refresh_token, ... }) ... case Failure(error): ... logger.error(f"Token exchange failed: {error.message}")

Source code in src/domain/protocols/provider_protocol.py
async def exchange_code_for_tokens(
    self,
    authorization_code: str,
) -> "Result[OAuthTokens, ProviderError]":
    """Exchange OAuth authorization code for access and refresh tokens.

    Called after user completes OAuth consent flow and is redirected
    back with an authorization code.

    Args:
        authorization_code: Code from OAuth callback query parameter.

    Returns:
        Success(OAuthTokens): With access_token, refresh_token, and expiration.
        Failure(ProviderAuthenticationError): If code is invalid or expired.
        Failure(ProviderUnavailableError): If provider API is unreachable.

    Example:
        >>> result = await provider.exchange_code_for_tokens(code)
        >>> match result:
        ...     case Success(tokens):
        ...         encrypted = encryption_service.encrypt({
        ...             "access_token": tokens.access_token,
        ...             "refresh_token": tokens.refresh_token,
        ...         })
        ...     case Failure(error):
        ...         logger.error(f"Token exchange failed: {error.message}")
    """
    ...
refresh_access_token async
refresh_access_token(
    refresh_token: str,
) -> Result[OAuthTokens, ProviderError]

Refresh access token using refresh token.

Called when access token is expired or about to expire. May return a new refresh token if provider rotates tokens.

Parameters:

Name Type Description Default
refresh_token str

Current refresh token.

required

Returns:

Name Type Description
Success OAuthTokens

With new access_token. refresh_token is: - None if provider doesn't rotate tokens - New token if provider rotated - Same token if provider returns same token

Failure ProviderAuthenticationError

If refresh token is invalid/expired. User must re-authenticate via OAuth flow.

Failure ProviderUnavailableError

If provider API is unreachable.

Example

result = await provider.refresh_access_token(refresh_token) match result: ... case Success(new_tokens): ... if new_tokens.refresh_token: ... # Provider rotated token, must update storage ... new_refresh = new_tokens.refresh_token ... else: ... # Keep existing refresh token ... new_refresh = refresh_token ... case Failure(error): ... # User must re-authenticate ... logger.warning(f"Token refresh failed: {error.message}")

Source code in src/domain/protocols/provider_protocol.py
async def refresh_access_token(
    self,
    refresh_token: str,
) -> "Result[OAuthTokens, ProviderError]":
    """Refresh access token using refresh token.

    Called when access token is expired or about to expire.
    May return a new refresh token if provider rotates tokens.

    Args:
        refresh_token: Current refresh token.

    Returns:
        Success(OAuthTokens): With new access_token. refresh_token is:
            - None if provider doesn't rotate tokens
            - New token if provider rotated
            - Same token if provider returns same token
        Failure(ProviderAuthenticationError): If refresh token is invalid/expired.
            User must re-authenticate via OAuth flow.
        Failure(ProviderUnavailableError): If provider API is unreachable.

    Example:
        >>> result = await provider.refresh_access_token(refresh_token)
        >>> match result:
        ...     case Success(new_tokens):
        ...         if new_tokens.refresh_token:
        ...             # Provider rotated token, must update storage
        ...             new_refresh = new_tokens.refresh_token
        ...         else:
        ...             # Keep existing refresh token
        ...             new_refresh = refresh_token
        ...     case Failure(error):
        ...         # User must re-authenticate
        ...         logger.warning(f"Token refresh failed: {error.message}")
    """
    ...