Skip to content

infrastructure.persistence.repositories.email_verification_token_repository

src.infrastructure.persistence.repositories.email_verification_token_repository

EmailVerificationTokenRepository - SQLAlchemy implementation for email verification token persistence.

Handles CRUD operations for email verification tokens with expiration checks.

Classes

EmailVerificationTokenRepository

SQLAlchemy implementation for email verification token persistence.

Manages email verification tokens with support for: - Token creation and storage - Token validation (lookup by token string) - One-time use enforcement (mark as used)

Attributes:

Name Type Description
session

SQLAlchemy async session for database operations.

Example

async with get_session() as session: ... repo = EmailVerificationTokenRepository(session) ... token = await repo.find_by_token("abc123...")

Source code in src/infrastructure/persistence/repositories/email_verification_token_repository.py
class EmailVerificationTokenRepository:
    """SQLAlchemy implementation for email verification token persistence.

    Manages email verification tokens with support for:
    - Token creation and storage
    - Token validation (lookup by token string)
    - One-time use enforcement (mark as used)

    Attributes:
        session: SQLAlchemy async session for database operations.

    Example:
        >>> async with get_session() as session:
        ...     repo = EmailVerificationTokenRepository(session)
        ...     token = await repo.find_by_token("abc123...")
    """

    def __init__(self, session: AsyncSession) -> None:
        """Initialize repository with database session.

        Args:
            session: SQLAlchemy async session.
        """
        self.session = session

    async def save(
        self,
        user_id: UUID,
        token: str,
        expires_at: datetime,
    ) -> EmailVerificationTokenData:
        """Create new email verification token in database.

        Args:
            user_id: User's unique identifier.
            token: Random hex token (64 characters).
            expires_at: Token expiration timestamp (24 hours).

        Returns:
            Created EmailVerificationTokenData.
        """
        token_model = EmailVerificationToken(
            user_id=user_id,
            token=token,
            expires_at=expires_at,
        )
        self.session.add(token_model)
        await self.session.commit()
        await self.session.refresh(token_model)
        return _to_data(token_model)

    async def find_by_token(self, token: str) -> EmailVerificationTokenData | None:
        """Find email verification token by token string.

        Args:
            token: The verification token string.

        Returns:
            EmailVerificationTokenData if found and not used, None otherwise.
        """
        stmt = (
            select(EmailVerificationToken)
            .where(EmailVerificationToken.token == token)
            .where(EmailVerificationToken.used_at.is_(None))
        )
        result = await self.session.execute(stmt)
        model = result.scalar_one_or_none()
        return _to_data(model) if model else None

    async def mark_as_used(self, token_id: UUID) -> None:
        """Mark email verification token as used.

        Args:
            token_id: Token's unique identifier.
        """
        stmt = select(EmailVerificationToken).where(
            EmailVerificationToken.id == token_id
        )
        result = await self.session.execute(stmt)
        token = result.scalar_one()

        token.used_at = datetime.now(UTC)
        await self.session.commit()

    async def delete_expired_tokens(self) -> int:
        """Delete expired email verification tokens.

        Cleanup task to remove old tokens (typically run daily).

        Returns:
            Number of tokens deleted.
        """
        stmt = select(EmailVerificationToken).where(
            EmailVerificationToken.expires_at < datetime.now(UTC)
        )
        result = await self.session.execute(stmt)
        tokens = result.scalars().all()

        count = len(tokens)
        for token in tokens:
            await self.session.delete(token)

        await self.session.commit()
        return count

    async def find_by_user_id(self, user_id: UUID) -> list[EmailVerificationTokenData]:
        """Find all email verification tokens for a user.

        Useful for debugging or admin views.

        Args:
            user_id: User's unique identifier.

        Returns:
            List of EmailVerificationTokenData.
        """
        stmt = (
            select(EmailVerificationToken)
            .where(EmailVerificationToken.user_id == user_id)
            .order_by(EmailVerificationToken.created_at.desc())
        )
        result = await self.session.execute(stmt)
        return [_to_data(model) for model in result.scalars().all()]
Functions
__init__
__init__(session: AsyncSession) -> None

Parameters:

Name Type Description Default
session AsyncSession

SQLAlchemy async session.

required
Source code in src/infrastructure/persistence/repositories/email_verification_token_repository.py
def __init__(self, session: AsyncSession) -> None:
    """Initialize repository with database session.

    Args:
        session: SQLAlchemy async session.
    """
    self.session = session
save async
save(
    user_id: UUID, token: str, expires_at: datetime
) -> EmailVerificationTokenData

Create new email verification token in database.

Parameters:

Name Type Description Default
user_id UUID

User's unique identifier.

required
token str

Random hex token (64 characters).

required
expires_at datetime

Token expiration timestamp (24 hours).

required

Returns:

Type Description
EmailVerificationTokenData

Created EmailVerificationTokenData.

Source code in src/infrastructure/persistence/repositories/email_verification_token_repository.py
async def save(
    self,
    user_id: UUID,
    token: str,
    expires_at: datetime,
) -> EmailVerificationTokenData:
    """Create new email verification token in database.

    Args:
        user_id: User's unique identifier.
        token: Random hex token (64 characters).
        expires_at: Token expiration timestamp (24 hours).

    Returns:
        Created EmailVerificationTokenData.
    """
    token_model = EmailVerificationToken(
        user_id=user_id,
        token=token,
        expires_at=expires_at,
    )
    self.session.add(token_model)
    await self.session.commit()
    await self.session.refresh(token_model)
    return _to_data(token_model)
find_by_token async
find_by_token(
    token: str,
) -> EmailVerificationTokenData | None

Find email verification token by token string.

Parameters:

Name Type Description Default
token str

The verification token string.

required

Returns:

Type Description
EmailVerificationTokenData | None

EmailVerificationTokenData if found and not used, None otherwise.

Source code in src/infrastructure/persistence/repositories/email_verification_token_repository.py
async def find_by_token(self, token: str) -> EmailVerificationTokenData | None:
    """Find email verification token by token string.

    Args:
        token: The verification token string.

    Returns:
        EmailVerificationTokenData if found and not used, None otherwise.
    """
    stmt = (
        select(EmailVerificationToken)
        .where(EmailVerificationToken.token == token)
        .where(EmailVerificationToken.used_at.is_(None))
    )
    result = await self.session.execute(stmt)
    model = result.scalar_one_or_none()
    return _to_data(model) if model else None
mark_as_used async
mark_as_used(token_id: UUID) -> None

Mark email verification token as used.

Parameters:

Name Type Description Default
token_id UUID

Token's unique identifier.

required
Source code in src/infrastructure/persistence/repositories/email_verification_token_repository.py
async def mark_as_used(self, token_id: UUID) -> None:
    """Mark email verification token as used.

    Args:
        token_id: Token's unique identifier.
    """
    stmt = select(EmailVerificationToken).where(
        EmailVerificationToken.id == token_id
    )
    result = await self.session.execute(stmt)
    token = result.scalar_one()

    token.used_at = datetime.now(UTC)
    await self.session.commit()
delete_expired_tokens async
delete_expired_tokens() -> int

Delete expired email verification tokens.

Cleanup task to remove old tokens (typically run daily).

Returns:

Type Description
int

Number of tokens deleted.

Source code in src/infrastructure/persistence/repositories/email_verification_token_repository.py
async def delete_expired_tokens(self) -> int:
    """Delete expired email verification tokens.

    Cleanup task to remove old tokens (typically run daily).

    Returns:
        Number of tokens deleted.
    """
    stmt = select(EmailVerificationToken).where(
        EmailVerificationToken.expires_at < datetime.now(UTC)
    )
    result = await self.session.execute(stmt)
    tokens = result.scalars().all()

    count = len(tokens)
    for token in tokens:
        await self.session.delete(token)

    await self.session.commit()
    return count
find_by_user_id async
find_by_user_id(
    user_id: UUID,
) -> list[EmailVerificationTokenData]

Find all email verification tokens for a user.

Useful for debugging or admin views.

Parameters:

Name Type Description Default
user_id UUID

User's unique identifier.

required

Returns:

Type Description
list[EmailVerificationTokenData]

List of EmailVerificationTokenData.

Source code in src/infrastructure/persistence/repositories/email_verification_token_repository.py
async def find_by_user_id(self, user_id: UUID) -> list[EmailVerificationTokenData]:
    """Find all email verification tokens for a user.

    Useful for debugging or admin views.

    Args:
        user_id: User's unique identifier.

    Returns:
        List of EmailVerificationTokenData.
    """
    stmt = (
        select(EmailVerificationToken)
        .where(EmailVerificationToken.user_id == user_id)
        .order_by(EmailVerificationToken.created_at.desc())
    )
    result = await self.session.execute(stmt)
    return [_to_data(model) for model in result.scalars().all()]