Skip to content

infrastructure.persistence.repositories.transaction_repository

src.infrastructure.persistence.repositories.transaction_repository

TransactionRepository - SQLAlchemy implementation of TransactionRepository protocol.

Adapter for hexagonal architecture. Maps between domain Transaction entities and database TransactionModel.

Reference
  • docs/architecture/repository-pattern.md
  • src/domain/entities/transaction.py

Classes

TransactionRepository

SQLAlchemy implementation of TransactionRepository protocol.

This is an adapter that implements the TransactionRepository port. It handles the mapping between domain Transaction entities and database TransactionModel.

This class does NOT inherit from the protocol (Protocol uses structural typing).

Attributes:

Name Type Description
session

SQLAlchemy async session for database operations.

Example

async with get_session() as session: ... repo = TransactionRepository(session) ... transaction = await repo.find_by_id(transaction_id)

Source code in src/infrastructure/persistence/repositories/transaction_repository.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
class TransactionRepository:
    """SQLAlchemy implementation of TransactionRepository protocol.

    This is an adapter that implements the TransactionRepository port.
    It handles the mapping between domain Transaction entities and
    database TransactionModel.

    This class does NOT inherit from the protocol (Protocol uses structural typing).

    Attributes:
        session: SQLAlchemy async session for database operations.

    Example:
        >>> async with get_session() as session:
        ...     repo = TransactionRepository(session)
        ...     transaction = await repo.find_by_id(transaction_id)
    """

    def __init__(self, session: AsyncSession) -> None:
        """Initialize repository with database session.

        Args:
            session: SQLAlchemy async session.
        """
        self.session = session

    async def find_by_id(self, transaction_id: UUID) -> Transaction | None:
        """Find transaction by ID.

        Args:
            transaction_id: Transaction's unique identifier.

        Returns:
            Domain Transaction entity if found, None otherwise.
        """
        stmt = select(TransactionModel).where(TransactionModel.id == transaction_id)
        result = await self.session.execute(stmt)
        model = result.scalar_one_or_none()

        if model is None:
            return None

        return self._to_domain(model)

    async def find_by_account_id(
        self,
        account_id: UUID,
        limit: int = 50,
        offset: int = 0,
    ) -> list[Transaction]:
        """Find all transactions for an account with pagination.

        Returns transactions ordered by transaction_date DESC (most recent first).

        Args:
            account_id: Account identifier to query.
            limit: Maximum number of transactions to return.
            offset: Number of transactions to skip.

        Returns:
            List of transactions (empty if none found).
        """
        stmt = (
            select(TransactionModel)
            .where(TransactionModel.account_id == account_id)
            .order_by(TransactionModel.transaction_date.desc())
            .limit(limit)
            .offset(offset)
        )
        result = await self.session.execute(stmt)
        models = result.scalars().all()

        return [self._to_domain(model) for model in models]

    async def find_by_account_and_type(
        self,
        account_id: UUID,
        transaction_type: TransactionType,
        limit: int = 50,
    ) -> list[Transaction]:
        """Find transactions by account and type.

        Args:
            account_id: Account identifier to query.
            transaction_type: Type of transactions to retrieve.
            limit: Maximum number of transactions to return.

        Returns:
            List of transactions matching the type (empty if none found).
            Ordered by transaction_date DESC.
        """
        stmt = (
            select(TransactionModel)
            .where(
                TransactionModel.account_id == account_id,
                TransactionModel.transaction_type == transaction_type.value,
            )
            .order_by(TransactionModel.transaction_date.desc())
            .limit(limit)
        )
        result = await self.session.execute(stmt)
        models = result.scalars().all()

        return [self._to_domain(model) for model in models]

    async def find_by_date_range(
        self,
        account_id: UUID,
        start_date: date,
        end_date: date,
    ) -> list[Transaction]:
        """Find transactions within a date range.

        Queries by transaction_date (not created_at).

        Args:
            account_id: Account identifier to query.
            start_date: Start of date range (inclusive).
            end_date: End of date range (inclusive).

        Returns:
            List of transactions within date range (empty if none found).
            Ordered by transaction_date ASC (chronological).
        """
        stmt = (
            select(TransactionModel)
            .where(
                TransactionModel.account_id == account_id,
                TransactionModel.transaction_date >= start_date,
                TransactionModel.transaction_date <= end_date,
            )
            .order_by(TransactionModel.transaction_date.asc())
        )
        result = await self.session.execute(stmt)
        models = result.scalars().all()

        return [self._to_domain(model) for model in models]

    async def find_by_provider_transaction_id(
        self,
        account_id: UUID,
        provider_transaction_id: str,
    ) -> Transaction | None:
        """Find transaction by provider's unique ID.

        Used for deduplication during sync operations.

        Args:
            account_id: Account identifier (scope to account).
            provider_transaction_id: Provider's unique transaction identifier.

        Returns:
            Transaction entity if found, None otherwise.
        """
        stmt = select(TransactionModel).where(
            TransactionModel.account_id == account_id,
            TransactionModel.provider_transaction_id == provider_transaction_id,
        )
        result = await self.session.execute(stmt)
        model = result.scalar_one_or_none()

        if model is None:
            return None

        return self._to_domain(model)

    async def find_security_transactions(
        self,
        account_id: UUID,
        symbol: str,
        limit: int = 50,
    ) -> list[Transaction]:
        """Find all transactions for a specific security.

        Queries TRADE transactions only (filters by symbol field).

        Args:
            account_id: Account identifier to query.
            symbol: Security ticker symbol.
            limit: Maximum number of transactions to return.

        Returns:
            List of trade transactions for the symbol (empty if none found).
            Ordered by transaction_date DESC.
        """
        stmt = (
            select(TransactionModel)
            .where(
                TransactionModel.account_id == account_id,
                TransactionModel.symbol == symbol,
            )
            .order_by(TransactionModel.transaction_date.desc())
            .limit(limit)
        )
        result = await self.session.execute(stmt)
        models = result.scalars().all()

        return [self._to_domain(model) for model in models]

    async def save(self, transaction: Transaction) -> None:
        """Save a single transaction.

        Creates new transaction or updates existing (based on provider_transaction_id).

        Args:
            transaction: Transaction entity to save.
        """
        # Check if exists
        stmt = select(TransactionModel).where(TransactionModel.id == transaction.id)
        result = await self.session.execute(stmt)
        existing = result.scalar_one_or_none()

        if existing is None:
            # Create new
            model = self._to_model(transaction)
            self.session.add(model)
        else:
            # Update existing
            self._update_model(existing, transaction)

        await self.session.commit()

    async def save_many(self, transactions: list[Transaction]) -> None:
        """Save multiple transactions in bulk.

        Efficient for provider sync operations that fetch many transactions at once.
        Uses bulk insert/upsert to minimize database round-trips.

        Args:
            transactions: List of transaction entities to save.
        """
        if not transactions:
            return

        # Get all existing transaction IDs
        transaction_ids = [t.id for t in transactions]
        stmt = select(TransactionModel).where(TransactionModel.id.in_(transaction_ids))
        result = await self.session.execute(stmt)
        existing_models = {model.id: model for model in result.scalars().all()}

        # Separate into new and updates
        for transaction in transactions:
            if transaction.id in existing_models:
                # Update existing
                self._update_model(existing_models[transaction.id], transaction)
            else:
                # Create new
                model = self._to_model(transaction)
                self.session.add(model)

        await self.session.commit()

    async def delete(self, transaction_id: UUID) -> None:
        """Delete a transaction.

        Hard delete - permanently removes the record.

        Args:
            transaction_id: Transaction's unique identifier.

        Raises:
            NoResultFound: If transaction doesn't exist.
        """
        stmt = select(TransactionModel).where(TransactionModel.id == transaction_id)
        result = await self.session.execute(stmt)
        model = result.scalar_one()  # Raises NoResultFound if not found

        await self.session.delete(model)
        await self.session.commit()

    # =========================================================================
    # Entity ↔ Model Mapping (Private Methods)
    # =========================================================================

    def _to_domain(self, model: TransactionModel) -> Transaction:
        """Convert database model to domain entity.

        Reconstructs Money value objects from separate amount/currency columns.
        Converts enum strings back to domain enums.

        Args:
            model: SQLAlchemy TransactionModel instance.

        Returns:
            Domain Transaction entity.
        """
        # Reconstruct Money for amount (required)
        amount = Money(amount=model.amount, currency=model.currency)

        # Reconstruct Money for unit_price if present
        unit_price: Money | None = None
        if model.unit_price_amount is not None:
            unit_price = Money(
                amount=model.unit_price_amount,
                currency=model.currency,
            )

        # Reconstruct Money for commission if present
        commission: Money | None = None
        if model.commission_amount is not None:
            commission = Money(
                amount=model.commission_amount,
                currency=model.currency,
            )

        # Convert asset_type string to enum if present
        asset_type: AssetType | None = None
        if model.asset_type is not None:
            asset_type = AssetType(model.asset_type)

        return Transaction(
            id=model.id,
            account_id=model.account_id,
            provider_transaction_id=model.provider_transaction_id,
            transaction_type=TransactionType(model.transaction_type),
            subtype=TransactionSubtype(model.subtype),
            status=TransactionStatus(model.status),
            amount=amount,
            description=model.description,
            asset_type=asset_type,
            symbol=model.symbol,
            security_name=model.security_name,
            quantity=model.quantity,
            unit_price=unit_price,
            commission=commission,
            transaction_date=model.transaction_date,
            settlement_date=model.settlement_date,
            provider_metadata=model.provider_metadata,
            created_at=model.created_at,
            updated_at=model.updated_at,
        )

    def _to_model(self, entity: Transaction) -> TransactionModel:
        """Convert domain entity to database model.

        Extracts amounts from Money value objects.
        Converts enums to lowercase string values.

        Args:
            entity: Domain Transaction entity.

        Returns:
            SQLAlchemy TransactionModel instance.
        """
        return TransactionModel(
            id=entity.id,
            account_id=entity.account_id,
            provider_transaction_id=entity.provider_transaction_id,
            transaction_type=entity.transaction_type.value,
            subtype=entity.subtype.value,
            status=entity.status.value,
            amount=entity.amount.amount,
            currency=entity.amount.currency,
            description=entity.description,
            asset_type=entity.asset_type.value if entity.asset_type else None,
            symbol=entity.symbol,
            security_name=entity.security_name,
            quantity=entity.quantity,
            unit_price_amount=(
                entity.unit_price.amount if entity.unit_price is not None else None
            ),
            commission_amount=(
                entity.commission.amount if entity.commission is not None else None
            ),
            transaction_date=entity.transaction_date,
            settlement_date=entity.settlement_date,
            provider_metadata=entity.provider_metadata,
            created_at=entity.created_at,
            updated_at=entity.updated_at,
        )

    def _update_model(self, model: TransactionModel, entity: Transaction) -> None:
        """Update existing model with entity data.

        Used for upsert operations (update existing records).

        Args:
            model: Existing SQLAlchemy TransactionModel instance.
            entity: Domain Transaction entity with new data.
        """
        model.account_id = entity.account_id
        model.provider_transaction_id = entity.provider_transaction_id
        model.transaction_type = entity.transaction_type.value
        model.subtype = entity.subtype.value
        model.status = entity.status.value
        model.amount = entity.amount.amount
        model.currency = entity.amount.currency
        model.description = entity.description
        model.asset_type = entity.asset_type.value if entity.asset_type else None
        model.symbol = entity.symbol
        model.security_name = entity.security_name
        model.quantity = entity.quantity
        model.unit_price_amount = (
            entity.unit_price.amount if entity.unit_price is not None else None
        )
        model.commission_amount = (
            entity.commission.amount if entity.commission is not None else None
        )
        model.transaction_date = entity.transaction_date
        model.settlement_date = entity.settlement_date
        model.provider_metadata = entity.provider_metadata
        model.updated_at = entity.updated_at
Functions
__init__
__init__(session: AsyncSession) -> None

Parameters:

Name Type Description Default
session AsyncSession

SQLAlchemy async session.

required
Source code in src/infrastructure/persistence/repositories/transaction_repository.py
def __init__(self, session: AsyncSession) -> None:
    """Initialize repository with database session.

    Args:
        session: SQLAlchemy async session.
    """
    self.session = session
find_by_id async
find_by_id(transaction_id: UUID) -> Transaction | None

Find transaction by ID.

Parameters:

Name Type Description Default
transaction_id UUID

Transaction's unique identifier.

required

Returns:

Type Description
Transaction | None

Domain Transaction entity if found, None otherwise.

Source code in src/infrastructure/persistence/repositories/transaction_repository.py
async def find_by_id(self, transaction_id: UUID) -> Transaction | None:
    """Find transaction by ID.

    Args:
        transaction_id: Transaction's unique identifier.

    Returns:
        Domain Transaction entity if found, None otherwise.
    """
    stmt = select(TransactionModel).where(TransactionModel.id == transaction_id)
    result = await self.session.execute(stmt)
    model = result.scalar_one_or_none()

    if model is None:
        return None

    return self._to_domain(model)
find_by_account_id async
find_by_account_id(
    account_id: UUID, limit: int = 50, offset: int = 0
) -> list[Transaction]

Find all transactions for an account with pagination.

Returns transactions ordered by transaction_date DESC (most recent first).

Parameters:

Name Type Description Default
account_id UUID

Account identifier to query.

required
limit int

Maximum number of transactions to return.

50
offset int

Number of transactions to skip.

0

Returns:

Type Description
list[Transaction]

List of transactions (empty if none found).

Source code in src/infrastructure/persistence/repositories/transaction_repository.py
async def find_by_account_id(
    self,
    account_id: UUID,
    limit: int = 50,
    offset: int = 0,
) -> list[Transaction]:
    """Find all transactions for an account with pagination.

    Returns transactions ordered by transaction_date DESC (most recent first).

    Args:
        account_id: Account identifier to query.
        limit: Maximum number of transactions to return.
        offset: Number of transactions to skip.

    Returns:
        List of transactions (empty if none found).
    """
    stmt = (
        select(TransactionModel)
        .where(TransactionModel.account_id == account_id)
        .order_by(TransactionModel.transaction_date.desc())
        .limit(limit)
        .offset(offset)
    )
    result = await self.session.execute(stmt)
    models = result.scalars().all()

    return [self._to_domain(model) for model in models]
find_by_account_and_type async
find_by_account_and_type(
    account_id: UUID,
    transaction_type: TransactionType,
    limit: int = 50,
) -> list[Transaction]

Find transactions by account and type.

Parameters:

Name Type Description Default
account_id UUID

Account identifier to query.

required
transaction_type TransactionType

Type of transactions to retrieve.

required
limit int

Maximum number of transactions to return.

50

Returns:

Type Description
list[Transaction]

List of transactions matching the type (empty if none found).

list[Transaction]

Ordered by transaction_date DESC.

Source code in src/infrastructure/persistence/repositories/transaction_repository.py
async def find_by_account_and_type(
    self,
    account_id: UUID,
    transaction_type: TransactionType,
    limit: int = 50,
) -> list[Transaction]:
    """Find transactions by account and type.

    Args:
        account_id: Account identifier to query.
        transaction_type: Type of transactions to retrieve.
        limit: Maximum number of transactions to return.

    Returns:
        List of transactions matching the type (empty if none found).
        Ordered by transaction_date DESC.
    """
    stmt = (
        select(TransactionModel)
        .where(
            TransactionModel.account_id == account_id,
            TransactionModel.transaction_type == transaction_type.value,
        )
        .order_by(TransactionModel.transaction_date.desc())
        .limit(limit)
    )
    result = await self.session.execute(stmt)
    models = result.scalars().all()

    return [self._to_domain(model) for model in models]
find_by_date_range async
find_by_date_range(
    account_id: UUID, start_date: date, end_date: date
) -> list[Transaction]

Find transactions within a date range.

Queries by transaction_date (not created_at).

Parameters:

Name Type Description Default
account_id UUID

Account identifier to query.

required
start_date date

Start of date range (inclusive).

required
end_date date

End of date range (inclusive).

required

Returns:

Type Description
list[Transaction]

List of transactions within date range (empty if none found).

list[Transaction]

Ordered by transaction_date ASC (chronological).

Source code in src/infrastructure/persistence/repositories/transaction_repository.py
async def find_by_date_range(
    self,
    account_id: UUID,
    start_date: date,
    end_date: date,
) -> list[Transaction]:
    """Find transactions within a date range.

    Queries by transaction_date (not created_at).

    Args:
        account_id: Account identifier to query.
        start_date: Start of date range (inclusive).
        end_date: End of date range (inclusive).

    Returns:
        List of transactions within date range (empty if none found).
        Ordered by transaction_date ASC (chronological).
    """
    stmt = (
        select(TransactionModel)
        .where(
            TransactionModel.account_id == account_id,
            TransactionModel.transaction_date >= start_date,
            TransactionModel.transaction_date <= end_date,
        )
        .order_by(TransactionModel.transaction_date.asc())
    )
    result = await self.session.execute(stmt)
    models = result.scalars().all()

    return [self._to_domain(model) for model in models]
find_by_provider_transaction_id async
find_by_provider_transaction_id(
    account_id: UUID, provider_transaction_id: str
) -> Transaction | None

Find transaction by provider's unique ID.

Used for deduplication during sync operations.

Parameters:

Name Type Description Default
account_id UUID

Account identifier (scope to account).

required
provider_transaction_id str

Provider's unique transaction identifier.

required

Returns:

Type Description
Transaction | None

Transaction entity if found, None otherwise.

Source code in src/infrastructure/persistence/repositories/transaction_repository.py
async def find_by_provider_transaction_id(
    self,
    account_id: UUID,
    provider_transaction_id: str,
) -> Transaction | None:
    """Find transaction by provider's unique ID.

    Used for deduplication during sync operations.

    Args:
        account_id: Account identifier (scope to account).
        provider_transaction_id: Provider's unique transaction identifier.

    Returns:
        Transaction entity if found, None otherwise.
    """
    stmt = select(TransactionModel).where(
        TransactionModel.account_id == account_id,
        TransactionModel.provider_transaction_id == provider_transaction_id,
    )
    result = await self.session.execute(stmt)
    model = result.scalar_one_or_none()

    if model is None:
        return None

    return self._to_domain(model)
find_security_transactions async
find_security_transactions(
    account_id: UUID, symbol: str, limit: int = 50
) -> list[Transaction]

Find all transactions for a specific security.

Queries TRADE transactions only (filters by symbol field).

Parameters:

Name Type Description Default
account_id UUID

Account identifier to query.

required
symbol str

Security ticker symbol.

required
limit int

Maximum number of transactions to return.

50

Returns:

Type Description
list[Transaction]

List of trade transactions for the symbol (empty if none found).

list[Transaction]

Ordered by transaction_date DESC.

Source code in src/infrastructure/persistence/repositories/transaction_repository.py
async def find_security_transactions(
    self,
    account_id: UUID,
    symbol: str,
    limit: int = 50,
) -> list[Transaction]:
    """Find all transactions for a specific security.

    Queries TRADE transactions only (filters by symbol field).

    Args:
        account_id: Account identifier to query.
        symbol: Security ticker symbol.
        limit: Maximum number of transactions to return.

    Returns:
        List of trade transactions for the symbol (empty if none found).
        Ordered by transaction_date DESC.
    """
    stmt = (
        select(TransactionModel)
        .where(
            TransactionModel.account_id == account_id,
            TransactionModel.symbol == symbol,
        )
        .order_by(TransactionModel.transaction_date.desc())
        .limit(limit)
    )
    result = await self.session.execute(stmt)
    models = result.scalars().all()

    return [self._to_domain(model) for model in models]
save async
save(transaction: Transaction) -> None

Save a single transaction.

Creates new transaction or updates existing (based on provider_transaction_id).

Parameters:

Name Type Description Default
transaction Transaction

Transaction entity to save.

required
Source code in src/infrastructure/persistence/repositories/transaction_repository.py
async def save(self, transaction: Transaction) -> None:
    """Save a single transaction.

    Creates new transaction or updates existing (based on provider_transaction_id).

    Args:
        transaction: Transaction entity to save.
    """
    # Check if exists
    stmt = select(TransactionModel).where(TransactionModel.id == transaction.id)
    result = await self.session.execute(stmt)
    existing = result.scalar_one_or_none()

    if existing is None:
        # Create new
        model = self._to_model(transaction)
        self.session.add(model)
    else:
        # Update existing
        self._update_model(existing, transaction)

    await self.session.commit()
save_many async
save_many(transactions: list[Transaction]) -> None

Save multiple transactions in bulk.

Efficient for provider sync operations that fetch many transactions at once. Uses bulk insert/upsert to minimize database round-trips.

Parameters:

Name Type Description Default
transactions list[Transaction]

List of transaction entities to save.

required
Source code in src/infrastructure/persistence/repositories/transaction_repository.py
async def save_many(self, transactions: list[Transaction]) -> None:
    """Save multiple transactions in bulk.

    Efficient for provider sync operations that fetch many transactions at once.
    Uses bulk insert/upsert to minimize database round-trips.

    Args:
        transactions: List of transaction entities to save.
    """
    if not transactions:
        return

    # Get all existing transaction IDs
    transaction_ids = [t.id for t in transactions]
    stmt = select(TransactionModel).where(TransactionModel.id.in_(transaction_ids))
    result = await self.session.execute(stmt)
    existing_models = {model.id: model for model in result.scalars().all()}

    # Separate into new and updates
    for transaction in transactions:
        if transaction.id in existing_models:
            # Update existing
            self._update_model(existing_models[transaction.id], transaction)
        else:
            # Create new
            model = self._to_model(transaction)
            self.session.add(model)

    await self.session.commit()
delete async
delete(transaction_id: UUID) -> None

Delete a transaction.

Hard delete - permanently removes the record.

Parameters:

Name Type Description Default
transaction_id UUID

Transaction's unique identifier.

required

Raises:

Type Description
NoResultFound

If transaction doesn't exist.

Source code in src/infrastructure/persistence/repositories/transaction_repository.py
async def delete(self, transaction_id: UUID) -> None:
    """Delete a transaction.

    Hard delete - permanently removes the record.

    Args:
        transaction_id: Transaction's unique identifier.

    Raises:
        NoResultFound: If transaction doesn't exist.
    """
    stmt = select(TransactionModel).where(TransactionModel.id == transaction_id)
    result = await self.session.execute(stmt)
    model = result.scalar_one()  # Raises NoResultFound if not found

    await self.session.delete(model)
    await self.session.commit()