domain.protocols.provider_protocol¶
src.domain.protocols.provider_protocol
¶
ProviderProtocol for financial provider adapters.
Port (interface) for hexagonal architecture. Infrastructure layer implements this protocol for each provider (Schwab, Chase, etc.).
This protocol defines the contract that all financial providers must implement, regardless of their authentication mechanism (OAuth, API key, etc.) or API structure.
Design Principles
- Auth-agnostic: Protocol receives credentials as opaque dict, provider extracts what it needs
- Minimal interface: Only data-fetching methods required (slug, fetch_accounts, etc.)
- Auth-specific methods (OAuth token exchange, API key validation) are provider implementation details
Methods return Result types following railway-oriented programming pattern. See docs/architecture/error-handling-architecture.md for details.
Credentials Dict Structure (by CredentialType): - OAUTH2: {"access_token": ..., "refresh_token": ..., "token_type": ..., "scope": ...} - API_KEY: {"api_key": ..., "api_secret": ...} - LINK_TOKEN: {"access_token": ..., "item_id": ..., "institution_id": ...} - CERTIFICATE: {"client_certificate": ..., "private_key": ..., "certificate_chain": ...}
Reference
- docs/architecture/provider-integration-architecture.md
Classes¶
OAuthTokens
dataclass
¶
OAuth tokens returned by provider after authentication.
Returned from exchange_code_for_tokens() and refresh_access_token().
Attributes:
| Name | Type | Description |
|---|---|---|
access_token |
str
|
Bearer token for API authentication. |
refresh_token |
str | None
|
Token for obtaining new access tokens. May be None if provider doesn't rotate tokens on refresh. |
expires_in |
int
|
Seconds until access_token expires. |
token_type |
str
|
Token type, typically "Bearer". |
scope |
str | None
|
OAuth scope granted (provider-specific). |
Example
tokens = await provider.exchange_code_for_tokens(code) print(f"Token expires in {tokens.expires_in} seconds")
Source code in src/domain/protocols/provider_protocol.py
ProviderAccountData
dataclass
¶
Account data as returned by provider (before mapping to domain).
Provider adapters return this; mappers convert to Account entity. This intermediate type decouples provider response format from domain model.
Attributes:
| Name | Type | Description |
|---|---|---|
provider_account_id |
str
|
Provider's unique account identifier. |
account_number_masked |
str
|
Masked account number for display (e.g., "**1234"). |
name |
str
|
Account name from provider. |
account_type |
str
|
Provider's account type string (mapped by AccountMapper). |
balance |
Decimal
|
Current account balance. |
available_balance |
Decimal | None
|
Available balance if different from balance. |
currency |
str
|
ISO 4217 currency code (e.g., "USD"). |
is_active |
bool
|
Whether account is active on provider. |
raw_data |
dict[str, Any] | None
|
Full provider response for metadata/debugging. |
Example
accounts = await provider.fetch_accounts(access_token) for account in accounts: ... domain_account = mapper.to_entity(account, connection_id)
Source code in src/domain/protocols/provider_protocol.py
ProviderHoldingData
dataclass
¶
Holding (position) data as returned by provider (before mapping to domain).
Provider adapters return this; mappers convert to Holding entity. This intermediate type decouples provider response format from domain model.
Attributes:
| Name | Type | Description |
|---|---|---|
provider_holding_id |
str
|
Provider's unique identifier for this position. |
symbol |
str
|
Security ticker symbol (e.g., "AAPL"). |
security_name |
str
|
Full security name (e.g., "Apple Inc."). |
asset_type |
str
|
Security asset type (equity, etf, option, etc.). |
quantity |
Decimal
|
Number of shares/units held. |
cost_basis |
Decimal
|
Total cost paid for this position. |
market_value |
Decimal
|
Current market value of the position. |
currency |
str
|
ISO 4217 currency code (e.g., "USD"). |
average_price |
Decimal | None
|
Average price per share (optional). |
current_price |
Decimal | None
|
Current market price per share (optional). |
raw_data |
dict[str, Any] | None
|
Full provider response for metadata/debugging. |
Example
holdings = await provider.fetch_holdings(access_token, account_id) for holding in holdings: ... domain_holding = mapper.to_entity(holding, account_id)
Source code in src/domain/protocols/provider_protocol.py
ProviderTransactionData
dataclass
¶
Transaction data as returned by provider (before mapping to domain).
Provider adapters return this; mappers convert to Transaction entity. This intermediate type decouples provider response format from domain model.
Attributes:
| Name | Type | Description |
|---|---|---|
provider_transaction_id |
str
|
Provider's unique transaction identifier. |
transaction_type |
str
|
Provider's transaction type string. |
subtype |
str | None
|
Provider's transaction subtype (if applicable). |
amount |
Decimal
|
Transaction amount (positive=credit, negative=debit). |
currency |
str
|
ISO 4217 currency code. |
description |
str
|
Human-readable transaction description. |
transaction_date |
date
|
Date transaction occurred. |
settlement_date |
date | None
|
Date funds/securities settled (if applicable). |
status |
str
|
Provider's transaction status string. |
symbol |
str | None
|
Security ticker symbol (for trades). |
security_name |
str | None
|
Full security name (for trades). |
asset_type |
str | None
|
Security asset type (for trades). |
quantity |
Decimal | None
|
Number of shares/units (for trades). |
unit_price |
Decimal | None
|
Price per share/unit (for trades). |
commission |
Decimal | None
|
Trading commission (for trades). |
raw_data |
dict[str, Any] | None
|
Full provider response for metadata/debugging. |
Example
transactions = await provider.fetch_transactions( ... access_token, account_id, start_date, end_date ... ) for txn in transactions: ... domain_txn = mapper.to_entity(txn, account_id)
Source code in src/domain/protocols/provider_protocol.py
ProviderProtocol
¶
Bases: Protocol
Protocol (port) for financial provider adapters.
Each financial provider (Schwab, Alpaca, Chase, etc.) implements this protocol to integrate with Dashtam. The protocol is auth-agnostic: credentials are passed as an opaque dict, and each provider extracts what it needs based on its authentication mechanism.
This is a Protocol (not ABC) for structural typing. Implementations don't need to inherit from this.
All methods return Result types (railway-oriented programming): - Success(data) on successful API calls - Failure(ProviderError) on failures
Auth-Specific Methods
OAuth providers (Schwab) implement additional methods like
exchange_code_for_tokens() and refresh_access_token(), but these
are NOT part of this protocol - they are provider implementation details.
Sync handlers only need the fetch_* methods.
Provider implementations live in
src/infrastructure/providers/{provider_slug}/
Example Implementation (OAuth - Schwab): >>> class SchwabProvider: ... @property ... def slug(self) -> str: ... return "schwab" ... ... async def fetch_accounts( ... self, credentials: dict[str, Any] ... ) -> Result[list[ProviderAccountData], ProviderError]: ... access_token = credentials.get("access_token") ... # Use access_token to call Schwab API ... ...
Example Implementation (API Key - Alpaca): >>> class AlpacaProvider: ... @property ... def slug(self) -> str: ... return "alpaca" ... ... async def fetch_accounts( ... self, credentials: dict[str, Any] ... ) -> Result[list[ProviderAccountData], ProviderError]: ... api_key = credentials.get("api_key") ... api_secret = credentials.get("api_secret") ... # Use api_key/api_secret to call Alpaca API ... ...
Reference
- docs/architecture/provider-integration-architecture.md
- docs/architecture/error-handling-architecture.md
Source code in src/domain/protocols/provider_protocol.py
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 | |
Attributes¶
slug
property
¶
Unique provider identifier.
Used for routing, configuration, and database storage. Must be lowercase, alphanumeric with underscores.
Returns:
| Type | Description |
|---|---|
str
|
Provider slug (e.g., "schwab", "alpaca", "chase"). |
Example
provider.slug 'schwab'
Functions¶
fetch_accounts
async
¶
Fetch all accounts for the authenticated user.
Returns account data in provider's format. Use AccountMapper to convert to domain Account entities.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
credentials
|
dict[str, Any]
|
Decrypted credentials dict. Structure depends on credential_type (OAuth2, API Key, etc.). Provider extracts what it needs (e.g., access_token, api_key, api_secret). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Success |
list[ProviderAccountData]
|
Account data (empty list if no accounts). |
Failure |
ProviderAuthenticationError
|
If credentials are invalid/expired. |
Failure |
ProviderUnavailableError
|
If provider API is unreachable. |
Failure |
ProviderRateLimitError
|
If rate limit exceeded. |
Example
credentials = {"access_token": "..."} # OAuth
OR: credentials = {"api_key": "...", "api_secret": "..."} # API Key¶
result = await provider.fetch_accounts(credentials) match result: ... case Success(accounts): ... for account_data in accounts: ... account = mapper.to_entity(account_data, connection_id) ... await account_repo.save(account) ... case Failure(error): ... logger.error(f"Failed to fetch accounts: {error.message}")
Source code in src/domain/protocols/provider_protocol.py
fetch_holdings
async
¶
fetch_holdings(
credentials: dict[str, Any], provider_account_id: str
) -> Result[list[ProviderHoldingData], ProviderError]
Fetch holdings (positions) for a specific account.
Returns holding data in provider's format. Use HoldingMapper to convert to domain Holding entities.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
credentials
|
dict[str, Any]
|
Decrypted credentials dict. Provider extracts what it needs. |
required |
provider_account_id
|
str
|
Provider's account identifier (from ProviderAccountData). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Success |
list[ProviderHoldingData]
|
Holding data (empty list if none). |
Failure |
ProviderAuthenticationError
|
If credentials are invalid/expired. |
Failure |
ProviderUnavailableError
|
If provider API is unreachable. |
Failure |
ProviderRateLimitError
|
If rate limit exceeded. |
Example
result = await provider.fetch_holdings(credentials, "12345") match result: ... case Success(holdings): ... for holding_data in holdings: ... holding = mapper.to_entity(holding_data, account_id) ... await holding_repo.save(holding) ... case Failure(error): ... logger.error(f"Failed to fetch holdings: {error.message}")
Source code in src/domain/protocols/provider_protocol.py
fetch_transactions
async
¶
fetch_transactions(
credentials: dict[str, Any],
provider_account_id: str,
start_date: date | None = None,
end_date: date | None = None,
) -> Result[list[ProviderTransactionData], ProviderError]
Fetch transactions for a specific account.
Returns transaction data in provider's format. Use TransactionMapper to convert to domain Transaction entities.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
credentials
|
dict[str, Any]
|
Decrypted credentials dict. Provider extracts what it needs. |
required |
provider_account_id
|
str
|
Provider's account identifier (from ProviderAccountData). |
required |
start_date
|
date | None
|
Beginning of date range (default: provider-specific, often 30 days). |
None
|
end_date
|
date | None
|
End of date range (default: today). |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Success |
list[ProviderTransactionData]
|
Transaction data (empty list if none). |
Failure |
ProviderAuthenticationError
|
If credentials are invalid/expired. |
Failure |
ProviderUnavailableError
|
If provider API is unreachable. |
Failure |
ProviderRateLimitError
|
If rate limit exceeded. |
Example
result = await provider.fetch_transactions( ... credentials, ... "12345", ... start_date=date(2024, 1, 1), ... end_date=date(2024, 12, 31), ... ) match result: ... case Success(transactions): ... for txn_data in transactions: ... txn = mapper.to_entity(txn_data, account_id) ... await txn_repo.save(txn) ... case Failure(error): ... logger.error(f"Failed to fetch transactions: {error.message}")
Source code in src/domain/protocols/provider_protocol.py
OAuthProviderProtocol
¶
Bases: ProviderProtocol, Protocol
Extended protocol for OAuth 2.0 providers.
Extends ProviderProtocol with OAuth-specific methods for token exchange and refresh. Only OAuth providers (Schwab, Chase, etc.) implement this.
API Key providers (Alpaca) do NOT implement this protocol - they only implement the base ProviderProtocol.
Usage
- Use ProviderProtocol for sync handlers (auth-agnostic)
- Use OAuthProviderProtocol for OAuth callbacks/token refresh
Example
In OAuth callback handler:¶
provider: OAuthProviderProtocol = get_oauth_provider("schwab") result = await provider.exchange_code_for_tokens(code)
Source code in src/domain/protocols/provider_protocol.py
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 | |
Functions¶
exchange_code_for_tokens
async
¶
Exchange OAuth authorization code for access and refresh tokens.
Called after user completes OAuth consent flow and is redirected back with an authorization code.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
authorization_code
|
str
|
Code from OAuth callback query parameter. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Success |
OAuthTokens
|
With access_token, refresh_token, and expiration. |
Failure |
ProviderAuthenticationError
|
If code is invalid or expired. |
Failure |
ProviderUnavailableError
|
If provider API is unreachable. |
Example
result = await provider.exchange_code_for_tokens(code) match result: ... case Success(tokens): ... encrypted = encryption_service.encrypt({ ... "access_token": tokens.access_token, ... "refresh_token": tokens.refresh_token, ... }) ... case Failure(error): ... logger.error(f"Token exchange failed: {error.message}")
Source code in src/domain/protocols/provider_protocol.py
refresh_access_token
async
¶
Refresh access token using refresh token.
Called when access token is expired or about to expire. May return a new refresh token if provider rotates tokens.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
refresh_token
|
str
|
Current refresh token. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Success |
OAuthTokens
|
With new access_token. refresh_token is: - None if provider doesn't rotate tokens - New token if provider rotated - Same token if provider returns same token |
Failure |
ProviderAuthenticationError
|
If refresh token is invalid/expired. User must re-authenticate via OAuth flow. |
Failure |
ProviderUnavailableError
|
If provider API is unreachable. |
Example
result = await provider.refresh_access_token(refresh_token) match result: ... case Success(new_tokens): ... if new_tokens.refresh_token: ... # Provider rotated token, must update storage ... new_refresh = new_tokens.refresh_token ... else: ... # Keep existing refresh token ... new_refresh = refresh_token ... case Failure(error): ... # User must re-authenticate ... logger.warning(f"Token refresh failed: {error.message}")