Balance Tracking Architecture¶
Balance snapshot domain model for historical portfolio tracking.
Overview¶
The Balance Tracking system captures point-in-time snapshots of account balances during sync operations. These snapshots enable portfolio value tracking over time, performance calculations, and historical balance charts.
Core Principle¶
Snapshots are IMMUTABLE historical records. Once created, they are never modified. If a correction is needed, create a new snapshot.
Design Goals¶
- Immutability: Frozen snapshots that cannot be altered after creation
- Time-Series Friendly: Optimized for date-range queries and charting
- Source Tracking: Know exactly when and why each snapshot was captured
- Value Breakdown: Track total balance, holdings value, and cash separately
Domain Model¶
Entity: BalanceSnapshot¶
BalanceSnapshot (frozen=True)
├── id: UUID # Unique identifier
├── account_id: UUID # FK to Account
├── balance: Money # Total account balance
├── available_balance: Money | None # Available for trading/withdrawal
├── holdings_value: Money | None # Total securities value
├── cash_value: Money | None # Cash/money market balance
├── currency: str # ISO 4217 code
├── source: SnapshotSource # How/why captured
├── provider_metadata: dict | None # Raw provider data at capture
├── captured_at: datetime # When balance was captured
└── created_at: datetime # Record creation timestamp
Note: BalanceSnapshot is a frozen dataclass (frozen=True) - all attributes are immutable after creation.
Value Breakdown¶
┌─────────────────────────────────────────┐
│ Total Balance │
│ (balance.amount) │
├───────────────────┬─────────────────────┤
│ Holdings Value │ Cash Value │
│ (holdings_value) │ (cash_value) │
│ │ │
│ Stocks, ETFs, │ Money Market, │
│ Bonds, etc. │ Sweep Account │
└───────────────────┴─────────────────────┘
Relationship: balance ≈ holdings_value + cash_value
SnapshotSource Enum¶
class SnapshotSource(StrEnum):
"""Source/trigger of a balance snapshot."""
ACCOUNT_SYNC = "account_sync"
"""Captured during account data sync."""
HOLDINGS_SYNC = "holdings_sync"
"""Captured during holdings sync operation."""
MANUAL_SYNC = "manual_sync"
"""User-initiated sync request."""
SCHEDULED_SYNC = "scheduled_sync"
"""Automated background sync job."""
INITIAL_CONNECTION = "initial_connection"
"""First sync after provider connection."""
# Query Methods
def is_automated(self) -> bool:
"""Check if snapshot was captured automatically."""
return self in {ACCOUNT_SYNC, HOLDINGS_SYNC, SCHEDULED_SYNC}
def is_user_initiated(self) -> bool:
"""Check if snapshot was triggered by user action."""
return self in {MANUAL_SYNC, INITIAL_CONNECTION}
When Snapshots Are Created¶
| Operation | Source | Trigger |
|---|---|---|
| Account sync | ACCOUNT_SYNC |
During SyncAccountsHandler |
| Holdings sync | HOLDINGS_SYNC |
During SyncHoldingsHandler |
| User clicks "Refresh" | MANUAL_SYNC |
Manual refresh button |
| Background job | SCHEDULED_SYNC |
Cron/scheduler |
| New connection | INITIAL_CONNECTION |
After OAuth callback |
Query Methods¶
class BalanceSnapshot:
def has_value_breakdown(self) -> bool:
"""Check if both holdings_value and cash_value are present."""
return self.holdings_value is not None and self.cash_value is not None
def get_holdings_percentage(self) -> float | None:
"""Calculate percentage of portfolio in holdings.
Returns:
Percentage (0-100) or None if breakdown unavailable.
"""
def get_cash_percentage(self) -> float | None:
"""Calculate percentage of portfolio in cash."""
def is_automated_capture(self) -> bool:
"""Check if snapshot was captured automatically."""
return self.source.is_automated()
def is_user_initiated_capture(self) -> bool:
"""Check if snapshot was triggered by user."""
return self.source.is_user_initiated()
def calculate_change_from(
self, previous: "BalanceSnapshot"
) -> tuple[Money, float] | None:
"""Calculate change from previous snapshot.
Returns:
Tuple of (change_amount, change_percent), or None if currencies differ.
"""
Repository Protocol¶
class BalanceSnapshotRepository(Protocol):
"""Port for balance snapshot persistence."""
async def find_by_id(self, snapshot_id: UUID) -> BalanceSnapshot | None:
"""Find snapshot by ID."""
async def find_by_account_id(
self,
account_id: UUID,
limit: int | None = None,
) -> list[BalanceSnapshot]:
"""Find snapshots for account (newest first)."""
async def find_by_account_and_date_range(
self,
account_id: UUID,
start_date: datetime,
end_date: datetime,
) -> list[BalanceSnapshot]:
"""Find snapshots within date range (chronological)."""
async def find_latest_by_account(
self, account_id: UUID
) -> BalanceSnapshot | None:
"""Find most recent snapshot for account."""
async def find_latest_by_user(
self, user_id: UUID
) -> list[BalanceSnapshot]:
"""Find latest snapshot for each of user's accounts."""
async def save(self, snapshot: BalanceSnapshot) -> None:
"""Persist snapshot (insert only - no updates)."""
Use Cases¶
1. Portfolio Value Chart¶
Query balance history over a date range to build a line chart:
query = GetBalanceHistory(
account_id=account_id,
user_id=user_id,
start_date=datetime(2024, 1, 1),
end_date=datetime(2024, 12, 31),
)
result = await handler.handle(query)
# Result contains:
# - snapshots: List ordered chronologically
# - start_balance: First snapshot balance
# - end_balance: Last snapshot balance
# - total_change_amount: end - start
# - total_change_percent: percentage change
2. Portfolio Summary Dashboard¶
Get latest balance for all user's accounts:
query = GetLatestBalanceSnapshots(user_id=user_id)
result = await handler.handle(query)
# Result contains:
# - snapshots: One per account (most recent)
# - total_count: Number of accounts
# - total_balance_by_currency: Aggregated balances
3. Performance Calculation¶
Calculate daily/weekly/monthly returns:
# Get first and last snapshot in period
first = snapshots[0]
last = snapshots[-1]
change, percent = last.calculate_change_from(first)
print(f"Period return: {change.amount} ({percent:.2f}%)")
4. Holdings vs Cash Allocation¶
Track asset allocation over time:
for snapshot in snapshots:
if snapshot.has_value_breakdown():
holdings_pct = snapshot.get_holdings_percentage()
cash_pct = snapshot.get_cash_percentage()
print(f"{snapshot.captured_at}: {holdings_pct}% holdings, {cash_pct}% cash")
Capture Flow¶
sequenceDiagram
participant Handler as Sync Handler
participant Account as Account Entity
participant Snapshot as BalanceSnapshot
participant Repo as SnapshotRepository
Handler->>Account: Fetch updated account data
Handler->>Handler: Calculate holdings_value (if syncing holdings)
Handler->>Snapshot: Create snapshot
Note over Snapshot: frozen=True (immutable)
Handler->>Repo: save(snapshot)
Repo-->>Handler: Success
Snapshot Creation in Handlers¶
# In SyncAccountsHandler or SyncHoldingsHandler
from uuid_extensions import uuid7
from src.domain.entities import BalanceSnapshot
from src.domain.enums import SnapshotSource
# After syncing data, capture snapshot
snapshot = BalanceSnapshot(
id=uuid7(),
account_id=account.id,
balance=account.balance,
available_balance=account.available_balance,
holdings_value=total_holdings_market_value, # From sum of holdings
cash_value=account.cash_balance,
currency=account.currency,
source=SnapshotSource.HOLDINGS_SYNC,
provider_metadata={"sync_id": sync_id, "provider": connection.provider_slug},
)
await snapshot_repo.save(snapshot)
Validation Rules¶
BalanceSnapshot Entity¶
id: Required, valid UUIDaccount_id: Required, valid UUID (references Account)balance: Required, Money value objectcurrency: Required, 3-letter ISO codesource: Required, valid SnapshotSource
Currency Consistency¶
All monetary fields must have matching currency:
balance.currency == currencyavailable_balance.currency == currency(if present)holdings_value.currency == currency(if present)cash_value.currency == currency(if present)
File Structure¶
src/domain/
├── entities/
│ └── balance_snapshot.py # BalanceSnapshot entity
├── enums/
│ └── snapshot_source.py # SnapshotSource enum
├── errors/
│ └── balance_snapshot_error.py # Error constants
└── protocols/
└── balance_snapshot_repository.py # Repository protocol
src/infrastructure/
└── persistence/
├── models/
│ └── balance_snapshot.py # SQLAlchemy model
└── repositories/
└── balance_snapshot_repository.py # PostgreSQL adapter
src/application/
├── queries/
│ └── balance_snapshot_queries.py # Query definitions
└── handlers/
└── balance_snapshot_handlers.py # Query handlers
tests/
├── unit/
│ └── test_domain_balance_snapshot.py
└── integration/
└── test_balance_snapshot_repository.py
Testing Strategy¶
Unit Tests (~40+ tests)¶
BalanceSnapshot Entity:
- Entity creation with valid/invalid fields
- Frozen immutability (cannot modify after creation)
- Currency consistency validation
- Query methods:
has_value_breakdown(),get_holdings_percentage(),get_cash_percentage() calculate_change_from()with various scenarios
SnapshotSource Enum:
- All enum values
is_automated()classificationis_user_initiated()classification
Integration Tests (~20+ tests)¶
Repository:
- Save and retrieve
- Query by account_id
- Query by date range
- Find latest by account
- Find latest by user (aggregation)
Handler Tests:
- GetBalanceHistory with date filtering
- GetLatestBalanceSnapshots aggregation
- ListBalanceSnapshotsByAccount pagination
Coverage Target¶
- Domain layer: 95%+
- Repository: 70%+
- Handlers: 85%+
Performance Considerations¶
Indexing¶
-- Essential indexes for balance_snapshots table
CREATE INDEX idx_balance_snapshots_account_id
ON balance_snapshots(account_id);
CREATE INDEX idx_balance_snapshots_captured_at
ON balance_snapshots(captured_at DESC);
CREATE INDEX idx_balance_snapshots_account_captured
ON balance_snapshots(account_id, captured_at DESC);
Query Optimization¶
- Date range queries: Use
captured_atindex for efficient time-series access - Latest per account: Single query with window function or subquery
- Aggregations: Sum at application layer to avoid complex SQL
Data Volume¶
Snapshot frequency determines storage growth:
| Frequency | Snapshots/Account/Year | Storage Estimate |
|---|---|---|
| Daily | ~365 | ~50KB |
| On sync only | ~50-100 | ~10KB |
| Real-time | ~2000+ | ~250KB |
Recommendation: Capture on sync only (not real-time) to balance accuracy with storage.
Security Considerations¶
- User Isolation: All queries filter by user_id (enforced at handler level)
- Balance Privacy: Financial data never logged with actual amounts
- Audit Trail: Snapshot creation logged with structured logging
- Provider Metadata: Contains debugging info, not exposed to clients
Created: 2025-12-26 | Last Updated: 2026-01-10