Skip to content

infrastructure.persistence.repositories.user_repository

src.infrastructure.persistence.repositories.user_repository

UserRepository - SQLAlchemy implementation of UserRepository protocol.

Adapter for hexagonal architecture. Maps between domain User entities and database UserModel.

Classes

UserRepository

SQLAlchemy implementation of UserRepository protocol.

This is an adapter that implements the UserRepository port. It handles the mapping between domain User entities and database UserModel.

This class does NOT inherit from UserRepository protocol (Protocol uses structural typing).

Attributes:

Name Type Description
session

SQLAlchemy async session for database operations.

Example

async with get_session() as session: ... repo = UserRepository(session) ... user = await repo.find_by_email("user@example.com")

Source code in src/infrastructure/persistence/repositories/user_repository.py
class UserRepository:
    """SQLAlchemy implementation of UserRepository protocol.

    This is an adapter that implements the UserRepository port.
    It handles the mapping between domain User entities and database UserModel.

    This class does NOT inherit from UserRepository protocol (Protocol uses structural typing).

    Attributes:
        session: SQLAlchemy async session for database operations.

    Example:
        >>> async with get_session() as session:
        ...     repo = UserRepository(session)
        ...     user = await repo.find_by_email("user@example.com")
    """

    def __init__(self, session: AsyncSession) -> None:
        """Initialize repository with database session.

        Args:
            session: SQLAlchemy async session.
        """
        self.session = session

    async def find_by_id(self, user_id: UUID) -> User | None:
        """Find user by ID.

        Args:
            user_id: User's unique identifier.

        Returns:
            Domain User entity if found, None otherwise.
        """
        stmt = select(UserModel).where(UserModel.id == user_id)
        result = await self.session.execute(stmt)
        user_model = result.scalar_one_or_none()

        if user_model is None:
            return None

        return self._to_domain(user_model)

    async def find_by_email(self, email: str) -> User | None:
        """Find user by email address.

        Email comparison is case-insensitive using PostgreSQL ILIKE.

        Args:
            email: User's email address (case-insensitive).

        Returns:
            Domain User entity if found, None otherwise.
        """
        stmt = select(UserModel).where(UserModel.email.ilike(email))
        result = await self.session.execute(stmt)
        user_model = result.scalar_one_or_none()

        if user_model is None:
            return None

        return self._to_domain(user_model)

    async def save(self, user: User) -> None:
        """Create new user in database.

        Args:
            user: Domain User entity to persist.

        Raises:
            IntegrityError: If email already exists (caught by SQLAlchemy).
        """
        user_model = self._to_model(user)
        self.session.add(user_model)
        await self.session.commit()
        await self.session.refresh(user_model)

    async def update(self, user: User) -> None:
        """Update existing user in database.

        Args:
            user: Domain User entity with updated fields.

        Raises:
            NoResultFound: If user doesn't exist (caught by SQLAlchemy).
        """
        stmt = select(UserModel).where(UserModel.id == user.id)
        result = await self.session.execute(stmt)
        user_model = result.scalar_one()

        # Update fields from domain entity
        user_model.email = user.email
        user_model.password_hash = user.password_hash
        user_model.is_verified = user.is_verified
        user_model.is_active = user.is_active
        user_model.failed_login_attempts = user.failed_login_attempts
        user_model.locked_until = user.locked_until
        user_model.updated_at = user.updated_at
        user_model.min_token_version = user.min_token_version

        await self.session.commit()
        await self.session.refresh(user_model)

    async def delete(self, user_id: UUID) -> None:
        """Delete user (soft delete - sets is_active=False).

        Args:
            user_id: User's unique identifier.

        Raises:
            NoResultFound: If user doesn't exist (caught by SQLAlchemy).
        """
        stmt = select(UserModel).where(UserModel.id == user_id)
        result = await self.session.execute(stmt)
        user_model = result.scalar_one()

        # Soft delete
        user_model.is_active = False

        await self.session.commit()
        await self.session.refresh(user_model)

    async def exists_by_email(self, email: str) -> bool:
        """Check if user with email exists.

        Args:
            email: Email address to check (case-insensitive).

        Returns:
            True if user exists, False otherwise.
        """
        stmt = select(UserModel.id).where(UserModel.email.ilike(email))
        result = await self.session.execute(stmt)
        return result.scalar_one_or_none() is not None

    async def update_password(self, user_id: UUID, password_hash: str) -> None:
        """Update user's password hash.

        Args:
            user_id: User's unique identifier.
            password_hash: New bcrypt password hash.
        """
        stmt = select(UserModel).where(UserModel.id == user_id)
        result = await self.session.execute(stmt)
        user_model = result.scalar_one()

        user_model.password_hash = password_hash
        user_model.updated_at = datetime.now(UTC)

        await self.session.commit()

    async def verify_email(self, user_id: UUID) -> None:
        """Mark user's email as verified.

        Args:
            user_id: User's unique identifier.
        """
        stmt = select(UserModel).where(UserModel.id == user_id)
        result = await self.session.execute(stmt)
        user_model = result.scalar_one()

        user_model.is_verified = True
        user_model.updated_at = datetime.now(UTC)

        await self.session.commit()

    def _to_domain(self, user_model: UserModel) -> User:
        """Convert database model to domain entity.

        Args:
            user_model: SQLAlchemy UserModel instance.

        Returns:
            Domain User entity.
        """
        return User(
            id=user_model.id,
            email=user_model.email,
            password_hash=user_model.password_hash,
            is_verified=user_model.is_verified,
            is_active=user_model.is_active,
            failed_login_attempts=user_model.failed_login_attempts,
            locked_until=user_model.locked_until,
            created_at=user_model.created_at,
            updated_at=user_model.updated_at,
            min_token_version=user_model.min_token_version,
        )

    def _to_model(self, user: User) -> UserModel:
        """Convert domain entity to database model.

        Args:
            user: Domain User entity.

        Returns:
            SQLAlchemy UserModel instance.
        """
        return UserModel(
            id=user.id,
            email=user.email,
            password_hash=user.password_hash,
            is_verified=user.is_verified,
            is_active=user.is_active,
            failed_login_attempts=user.failed_login_attempts,
            locked_until=user.locked_until,
            created_at=user.created_at,
            updated_at=user.updated_at,
            min_token_version=user.min_token_version,
        )
Functions
__init__
__init__(session: AsyncSession) -> None

Parameters:

Name Type Description Default
session AsyncSession

SQLAlchemy async session.

required
Source code in src/infrastructure/persistence/repositories/user_repository.py
def __init__(self, session: AsyncSession) -> None:
    """Initialize repository with database session.

    Args:
        session: SQLAlchemy async session.
    """
    self.session = session
find_by_id async
find_by_id(user_id: UUID) -> User | None

Find user by ID.

Parameters:

Name Type Description Default
user_id UUID

User's unique identifier.

required

Returns:

Type Description
User | None

Domain User entity if found, None otherwise.

Source code in src/infrastructure/persistence/repositories/user_repository.py
async def find_by_id(self, user_id: UUID) -> User | None:
    """Find user by ID.

    Args:
        user_id: User's unique identifier.

    Returns:
        Domain User entity if found, None otherwise.
    """
    stmt = select(UserModel).where(UserModel.id == user_id)
    result = await self.session.execute(stmt)
    user_model = result.scalar_one_or_none()

    if user_model is None:
        return None

    return self._to_domain(user_model)
find_by_email async
find_by_email(email: str) -> User | None

Find user by email address.

Email comparison is case-insensitive using PostgreSQL ILIKE.

Parameters:

Name Type Description Default
email str

User's email address (case-insensitive).

required

Returns:

Type Description
User | None

Domain User entity if found, None otherwise.

Source code in src/infrastructure/persistence/repositories/user_repository.py
async def find_by_email(self, email: str) -> User | None:
    """Find user by email address.

    Email comparison is case-insensitive using PostgreSQL ILIKE.

    Args:
        email: User's email address (case-insensitive).

    Returns:
        Domain User entity if found, None otherwise.
    """
    stmt = select(UserModel).where(UserModel.email.ilike(email))
    result = await self.session.execute(stmt)
    user_model = result.scalar_one_or_none()

    if user_model is None:
        return None

    return self._to_domain(user_model)
save async
save(user: User) -> None

Create new user in database.

Parameters:

Name Type Description Default
user User

Domain User entity to persist.

required

Raises:

Type Description
IntegrityError

If email already exists (caught by SQLAlchemy).

Source code in src/infrastructure/persistence/repositories/user_repository.py
async def save(self, user: User) -> None:
    """Create new user in database.

    Args:
        user: Domain User entity to persist.

    Raises:
        IntegrityError: If email already exists (caught by SQLAlchemy).
    """
    user_model = self._to_model(user)
    self.session.add(user_model)
    await self.session.commit()
    await self.session.refresh(user_model)
update async
update(user: User) -> None

Update existing user in database.

Parameters:

Name Type Description Default
user User

Domain User entity with updated fields.

required

Raises:

Type Description
NoResultFound

If user doesn't exist (caught by SQLAlchemy).

Source code in src/infrastructure/persistence/repositories/user_repository.py
async def update(self, user: User) -> None:
    """Update existing user in database.

    Args:
        user: Domain User entity with updated fields.

    Raises:
        NoResultFound: If user doesn't exist (caught by SQLAlchemy).
    """
    stmt = select(UserModel).where(UserModel.id == user.id)
    result = await self.session.execute(stmt)
    user_model = result.scalar_one()

    # Update fields from domain entity
    user_model.email = user.email
    user_model.password_hash = user.password_hash
    user_model.is_verified = user.is_verified
    user_model.is_active = user.is_active
    user_model.failed_login_attempts = user.failed_login_attempts
    user_model.locked_until = user.locked_until
    user_model.updated_at = user.updated_at
    user_model.min_token_version = user.min_token_version

    await self.session.commit()
    await self.session.refresh(user_model)
delete async
delete(user_id: UUID) -> None

Delete user (soft delete - sets is_active=False).

Parameters:

Name Type Description Default
user_id UUID

User's unique identifier.

required

Raises:

Type Description
NoResultFound

If user doesn't exist (caught by SQLAlchemy).

Source code in src/infrastructure/persistence/repositories/user_repository.py
async def delete(self, user_id: UUID) -> None:
    """Delete user (soft delete - sets is_active=False).

    Args:
        user_id: User's unique identifier.

    Raises:
        NoResultFound: If user doesn't exist (caught by SQLAlchemy).
    """
    stmt = select(UserModel).where(UserModel.id == user_id)
    result = await self.session.execute(stmt)
    user_model = result.scalar_one()

    # Soft delete
    user_model.is_active = False

    await self.session.commit()
    await self.session.refresh(user_model)
exists_by_email async
exists_by_email(email: str) -> bool

Check if user with email exists.

Parameters:

Name Type Description Default
email str

Email address to check (case-insensitive).

required

Returns:

Type Description
bool

True if user exists, False otherwise.

Source code in src/infrastructure/persistence/repositories/user_repository.py
async def exists_by_email(self, email: str) -> bool:
    """Check if user with email exists.

    Args:
        email: Email address to check (case-insensitive).

    Returns:
        True if user exists, False otherwise.
    """
    stmt = select(UserModel.id).where(UserModel.email.ilike(email))
    result = await self.session.execute(stmt)
    return result.scalar_one_or_none() is not None
update_password async
update_password(user_id: UUID, password_hash: str) -> None

Update user's password hash.

Parameters:

Name Type Description Default
user_id UUID

User's unique identifier.

required
password_hash str

New bcrypt password hash.

required
Source code in src/infrastructure/persistence/repositories/user_repository.py
async def update_password(self, user_id: UUID, password_hash: str) -> None:
    """Update user's password hash.

    Args:
        user_id: User's unique identifier.
        password_hash: New bcrypt password hash.
    """
    stmt = select(UserModel).where(UserModel.id == user_id)
    result = await self.session.execute(stmt)
    user_model = result.scalar_one()

    user_model.password_hash = password_hash
    user_model.updated_at = datetime.now(UTC)

    await self.session.commit()
verify_email async
verify_email(user_id: UUID) -> None

Mark user's email as verified.

Parameters:

Name Type Description Default
user_id UUID

User's unique identifier.

required
Source code in src/infrastructure/persistence/repositories/user_repository.py
async def verify_email(self, user_id: UUID) -> None:
    """Mark user's email as verified.

    Args:
        user_id: User's unique identifier.
    """
    stmt = select(UserModel).where(UserModel.id == user_id)
    result = await self.session.execute(stmt)
    user_model = result.scalar_one()

    user_model.is_verified = True
    user_model.updated_at = datetime.now(UTC)

    await self.session.commit()