Skip to content

domain.protocols.refresh_token_repository

src.domain.protocols.refresh_token_repository

RefreshTokenRepository protocol (port) for domain layer.

This protocol defines the interface for refresh token persistence that the domain layer needs. Infrastructure provides concrete implementations.

Following hexagonal architecture: - Domain defines what it needs (protocol/port) - Infrastructure provides implementation (adapter) - Domain has no knowledge of how tokens are stored

Reference
  • docs/architecture/authentication-architecture.md (Lines 174-233)

Classes

RefreshTokenData dataclass

Data transfer object for refresh token information.

Used by protocol methods to return token data without exposing infrastructure model classes to domain/application layers.

Token Breach Rotation

token_version: Version at time of creation (for validation) global_version_at_issuance: Global min version when issued (for grace period)

Source code in src/domain/protocols/refresh_token_repository.py
@dataclass
class RefreshTokenData:
    """Data transfer object for refresh token information.

    Used by protocol methods to return token data without
    exposing infrastructure model classes to domain/application layers.

    Token Breach Rotation:
        token_version: Version at time of creation (for validation)
        global_version_at_issuance: Global min version when issued (for grace period)
    """

    id: UUID
    user_id: UUID
    token_hash: str
    session_id: UUID
    expires_at: datetime
    revoked_at: datetime | None
    last_used_at: datetime | None
    rotation_count: int
    # Token breach rotation fields
    token_version: int = 1
    global_version_at_issuance: int = 1

RefreshTokenRepository

Bases: Protocol

Protocol for refresh token persistence operations.

Defines the contract that all refresh token repository implementations must satisfy. Used for token rotation and session management.

Token Lifecycle
  1. Created during login (30-day expiration)
  2. Validated during token refresh
  3. Rotated on every refresh (old deleted, new created)
  4. Revoked when session ends or password changes
Implementations
  • RefreshTokenRepository (SQLAlchemy): src/infrastructure/persistence/repositories/
Source code in src/domain/protocols/refresh_token_repository.py
class RefreshTokenRepository(Protocol):
    """Protocol for refresh token persistence operations.

    Defines the contract that all refresh token repository implementations
    must satisfy. Used for token rotation and session management.

    Token Lifecycle:
        1. Created during login (30-day expiration)
        2. Validated during token refresh
        3. Rotated on every refresh (old deleted, new created)
        4. Revoked when session ends or password changes

    Implementations:
        - RefreshTokenRepository (SQLAlchemy): src/infrastructure/persistence/repositories/
    """

    async def save(
        self,
        user_id: UUID,
        token_hash: str,
        session_id: UUID,
        expires_at: datetime,
        *,
        token_version: int = 1,
        global_version_at_issuance: int = 1,
    ) -> RefreshTokenData:
        """Create new refresh token.

        Args:
            user_id: User's unique identifier.
            token_hash: Bcrypt hash of the refresh token (never store plaintext).
            session_id: Associated session ID (F1.3 integration).
            expires_at: Token expiration timestamp (typically 30 days from now).
            token_version: Token version at issuance (for breach rotation).
            global_version_at_issuance: Global min version when issued (for grace period).

        Returns:
            Created RefreshTokenData with token info.
        """
        ...

    async def find_by_token_hash(self, token_hash: str) -> RefreshTokenData | None:
        """Find refresh token by hash.

        Only returns tokens that have NOT been revoked (revoked_at IS NULL).
        Does NOT check expiration - caller must verify expires_at.

        Args:
            token_hash: Bcrypt hash of the token to find.

        Returns:
            RefreshTokenData if found and not revoked, None otherwise.
        """
        ...

    async def find_by_id(self, token_id: UUID) -> RefreshTokenData | None:
        """Find refresh token by ID.

        Args:
            token_id: Token's unique identifier.

        Returns:
            RefreshTokenData if found, None otherwise.
        """
        ...

    async def update_last_used(self, token_id: UUID) -> None:
        """Update last_used_at timestamp.

        Tracks token usage for analytics and security monitoring.

        Args:
            token_id: Token's unique identifier.

        Raises:
            NoResultFound: If token with given ID doesn't exist.
        """
        ...

    async def delete(self, token_id: UUID) -> None:
        """Delete refresh token (for rotation).

        Used during token rotation: delete old token, create new one.

        Args:
            token_id: Token's unique identifier.

        Raises:
            NoResultFound: If token with given ID doesn't exist.
        """
        ...

    async def revoke_by_session(self, session_id: UUID) -> None:
        """Revoke all refresh tokens for a session.

        Called when session is explicitly logged out.

        Args:
            session_id: Session ID to revoke tokens for.
        """
        ...

    async def revoke_all_for_user(
        self,
        user_id: UUID,
        reason: str = "user_requested",
    ) -> None:
        """Revoke all refresh tokens for a user.

        Used when password changes (security) or user logs out of all devices.

        Args:
            user_id: User's unique identifier.
            reason: Reason for revocation (for audit trail).
                Common values: "password_changed", "user_requested", "admin_action"
        """
        ...

    async def find_by_token_verification(
        self,
        token: str,
        verify_fn: Callable[[str, str], bool],
    ) -> RefreshTokenData | None:
        """Find refresh token by verifying against stored hashes.

        Since bcrypt hashes are non-deterministic, we cannot look up by hash.
        This method iterates through active tokens and verifies each one.

        Args:
            token: Plain refresh token from user request.
            verify_fn: Function to verify token against hash (token, hash) -> bool.

        Returns:
            RefreshTokenData if found and verified, None otherwise.

        Note:
            For MVP this iterates all active tokens. For production scale,
            consider adding a deterministic lookup identifier.
        """
        ...
Functions
save async
save(
    user_id: UUID,
    token_hash: str,
    session_id: UUID,
    expires_at: datetime,
    *,
    token_version: int = 1,
    global_version_at_issuance: int = 1
) -> RefreshTokenData

Create new refresh token.

Parameters:

Name Type Description Default
user_id UUID

User's unique identifier.

required
token_hash str

Bcrypt hash of the refresh token (never store plaintext).

required
session_id UUID

Associated session ID (F1.3 integration).

required
expires_at datetime

Token expiration timestamp (typically 30 days from now).

required
token_version int

Token version at issuance (for breach rotation).

1
global_version_at_issuance int

Global min version when issued (for grace period).

1

Returns:

Type Description
RefreshTokenData

Created RefreshTokenData with token info.

Source code in src/domain/protocols/refresh_token_repository.py
async def save(
    self,
    user_id: UUID,
    token_hash: str,
    session_id: UUID,
    expires_at: datetime,
    *,
    token_version: int = 1,
    global_version_at_issuance: int = 1,
) -> RefreshTokenData:
    """Create new refresh token.

    Args:
        user_id: User's unique identifier.
        token_hash: Bcrypt hash of the refresh token (never store plaintext).
        session_id: Associated session ID (F1.3 integration).
        expires_at: Token expiration timestamp (typically 30 days from now).
        token_version: Token version at issuance (for breach rotation).
        global_version_at_issuance: Global min version when issued (for grace period).

    Returns:
        Created RefreshTokenData with token info.
    """
    ...
find_by_token_hash async
find_by_token_hash(
    token_hash: str,
) -> RefreshTokenData | None

Find refresh token by hash.

Only returns tokens that have NOT been revoked (revoked_at IS NULL). Does NOT check expiration - caller must verify expires_at.

Parameters:

Name Type Description Default
token_hash str

Bcrypt hash of the token to find.

required

Returns:

Type Description
RefreshTokenData | None

RefreshTokenData if found and not revoked, None otherwise.

Source code in src/domain/protocols/refresh_token_repository.py
async def find_by_token_hash(self, token_hash: str) -> RefreshTokenData | None:
    """Find refresh token by hash.

    Only returns tokens that have NOT been revoked (revoked_at IS NULL).
    Does NOT check expiration - caller must verify expires_at.

    Args:
        token_hash: Bcrypt hash of the token to find.

    Returns:
        RefreshTokenData if found and not revoked, None otherwise.
    """
    ...
find_by_id async
find_by_id(token_id: UUID) -> RefreshTokenData | None

Find refresh token by ID.

Parameters:

Name Type Description Default
token_id UUID

Token's unique identifier.

required

Returns:

Type Description
RefreshTokenData | None

RefreshTokenData if found, None otherwise.

Source code in src/domain/protocols/refresh_token_repository.py
async def find_by_id(self, token_id: UUID) -> RefreshTokenData | None:
    """Find refresh token by ID.

    Args:
        token_id: Token's unique identifier.

    Returns:
        RefreshTokenData if found, None otherwise.
    """
    ...
update_last_used async
update_last_used(token_id: UUID) -> None

Update last_used_at timestamp.

Tracks token usage for analytics and security monitoring.

Parameters:

Name Type Description Default
token_id UUID

Token's unique identifier.

required

Raises:

Type Description
NoResultFound

If token with given ID doesn't exist.

Source code in src/domain/protocols/refresh_token_repository.py
async def update_last_used(self, token_id: UUID) -> None:
    """Update last_used_at timestamp.

    Tracks token usage for analytics and security monitoring.

    Args:
        token_id: Token's unique identifier.

    Raises:
        NoResultFound: If token with given ID doesn't exist.
    """
    ...
delete async
delete(token_id: UUID) -> None

Delete refresh token (for rotation).

Used during token rotation: delete old token, create new one.

Parameters:

Name Type Description Default
token_id UUID

Token's unique identifier.

required

Raises:

Type Description
NoResultFound

If token with given ID doesn't exist.

Source code in src/domain/protocols/refresh_token_repository.py
async def delete(self, token_id: UUID) -> None:
    """Delete refresh token (for rotation).

    Used during token rotation: delete old token, create new one.

    Args:
        token_id: Token's unique identifier.

    Raises:
        NoResultFound: If token with given ID doesn't exist.
    """
    ...
revoke_by_session async
revoke_by_session(session_id: UUID) -> None

Revoke all refresh tokens for a session.

Called when session is explicitly logged out.

Parameters:

Name Type Description Default
session_id UUID

Session ID to revoke tokens for.

required
Source code in src/domain/protocols/refresh_token_repository.py
async def revoke_by_session(self, session_id: UUID) -> None:
    """Revoke all refresh tokens for a session.

    Called when session is explicitly logged out.

    Args:
        session_id: Session ID to revoke tokens for.
    """
    ...
revoke_all_for_user async
revoke_all_for_user(
    user_id: UUID, reason: str = "user_requested"
) -> None

Revoke all refresh tokens for a user.

Used when password changes (security) or user logs out of all devices.

Parameters:

Name Type Description Default
user_id UUID

User's unique identifier.

required
reason str

Reason for revocation (for audit trail). Common values: "password_changed", "user_requested", "admin_action"

'user_requested'
Source code in src/domain/protocols/refresh_token_repository.py
async def revoke_all_for_user(
    self,
    user_id: UUID,
    reason: str = "user_requested",
) -> None:
    """Revoke all refresh tokens for a user.

    Used when password changes (security) or user logs out of all devices.

    Args:
        user_id: User's unique identifier.
        reason: Reason for revocation (for audit trail).
            Common values: "password_changed", "user_requested", "admin_action"
    """
    ...
find_by_token_verification async
find_by_token_verification(
    token: str, verify_fn: Callable[[str, str], bool]
) -> RefreshTokenData | None

Find refresh token by verifying against stored hashes.

Since bcrypt hashes are non-deterministic, we cannot look up by hash. This method iterates through active tokens and verifies each one.

Parameters:

Name Type Description Default
token str

Plain refresh token from user request.

required
verify_fn Callable[[str, str], bool]

Function to verify token against hash (token, hash) -> bool.

required

Returns:

Type Description
RefreshTokenData | None

RefreshTokenData if found and verified, None otherwise.

Note

For MVP this iterates all active tokens. For production scale, consider adding a deterministic lookup identifier.

Source code in src/domain/protocols/refresh_token_repository.py
async def find_by_token_verification(
    self,
    token: str,
    verify_fn: Callable[[str, str], bool],
) -> RefreshTokenData | None:
    """Find refresh token by verifying against stored hashes.

    Since bcrypt hashes are non-deterministic, we cannot look up by hash.
    This method iterates through active tokens and verifies each one.

    Args:
        token: Plain refresh token from user request.
        verify_fn: Function to verify token against hash (token, hash) -> bool.

    Returns:
        RefreshTokenData if found and verified, None otherwise.

    Note:
        For MVP this iterates all active tokens. For production scale,
        consider adding a deterministic lookup identifier.
    """
    ...