Skip to content

infrastructure.persistence.repositories.security_config_repository

src.infrastructure.persistence.repositories.security_config_repository

SecurityConfigRepository - SQLAlchemy implementation.

Adapter for hexagonal architecture. Maps between domain SecurityConfig entity and database SecurityConfig model.

Classes

SecurityConfigRepository

SQLAlchemy implementation of SecurityConfigRepository protocol.

This is an adapter that implements the SecurityConfigRepository port. It handles the mapping between domain SecurityConfig entity and database model.

Note

SecurityConfig is a singleton table (only one row with id=1). Use get_or_create_default() to ensure the config exists.

Attributes:

Name Type Description
session

SQLAlchemy async session for database operations.

Example

async with get_session() as session: ... repo = SecurityConfigRepository(session) ... config = await repo.get_or_create_default() ... print(config.global_min_token_version)

Source code in src/infrastructure/persistence/repositories/security_config_repository.py
class SecurityConfigRepository:
    """SQLAlchemy implementation of SecurityConfigRepository protocol.

    This is an adapter that implements the SecurityConfigRepository port.
    It handles the mapping between domain SecurityConfig entity and database model.

    Note:
        SecurityConfig is a singleton table (only one row with id=1).
        Use get_or_create_default() to ensure the config exists.

    Attributes:
        session: SQLAlchemy async session for database operations.

    Example:
        >>> async with get_session() as session:
        ...     repo = SecurityConfigRepository(session)
        ...     config = await repo.get_or_create_default()
        ...     print(config.global_min_token_version)
    """

    def __init__(
        self,
        session: AsyncSession,
        cache: CacheProtocol | None = None,
        cache_keys: CacheKeys | None = None,
    ) -> None:
        """Initialize repository with database session.

        Args:
            session: SQLAlchemy async session.
            cache: Optional cache for invalidation.
            cache_keys: Optional cache key builder.
        """
        self.session = session
        self._cache = cache
        self._cache_keys = cache_keys

    async def get(self) -> SecurityConfig | None:
        """Get the security configuration.

        Returns the singleton security config row.

        Returns:
            SecurityConfig if exists, None if not initialized.
        """
        stmt = select(SecurityConfigModel).where(SecurityConfigModel.id == 1)
        result = await self.session.execute(stmt)
        config_model = result.scalar_one_or_none()

        if config_model is None:
            return None

        return self._to_domain(config_model)

    async def get_or_create_default(self) -> SecurityConfig:
        """Get security config or create with defaults.

        Ensures the singleton config row exists.
        Creates with default values if missing.

        Returns:
            SecurityConfig: The singleton configuration.

        Default values:
            - global_min_token_version: 1
            - grace_period_seconds: 300 (5 minutes)
        """
        config = await self.get()
        if config is not None:
            return config

        # Create default config
        config_model = SecurityConfigModel(
            id=1,
            global_min_token_version=1,
            grace_period_seconds=300,
            last_rotation_at=None,
            last_rotation_reason=None,
        )
        self.session.add(config_model)
        await self.session.commit()
        await self.session.refresh(config_model)

        return self._to_domain(config_model)

    async def update_global_version(
        self,
        new_version: int,
        reason: str,
        rotation_time: datetime,
    ) -> SecurityConfig:
        """Update global minimum token version.

        Used to trigger global token rotation (invalidate all tokens
        with version below new_version).

        Args:
            new_version: New global minimum token version.
            reason: Reason for rotation (for audit trail).
            rotation_time: When rotation was triggered.

        Returns:
            Updated SecurityConfig.

        Raises:
            ValueError: If new_version <= current version.
        """
        stmt = select(SecurityConfigModel).where(SecurityConfigModel.id == 1)
        result = await self.session.execute(stmt)
        config_model = result.scalar_one_or_none()

        if config_model is None:
            # Create default first, then update
            config_model = SecurityConfigModel(
                id=1,
                global_min_token_version=1,
                grace_period_seconds=300,
            )
            self.session.add(config_model)
            await self.session.flush()

        if new_version <= config_model.global_min_token_version:
            raise ValueError(
                f"New version ({new_version}) must be greater than "
                f"current version ({config_model.global_min_token_version})"
            )

        config_model.global_min_token_version = new_version
        config_model.last_rotation_at = rotation_time
        config_model.last_rotation_reason = reason

        await self.session.commit()
        await self.session.refresh(config_model)

        # Phase 7: Invalidate security config cache
        await self._invalidate_security_cache()

        return self._to_domain(config_model)

    async def update_grace_period(
        self,
        grace_period_seconds: int,
    ) -> SecurityConfig:
        """Update grace period for token rotation.

        Args:
            grace_period_seconds: New grace period in seconds.

        Returns:
            Updated SecurityConfig.

        Raises:
            ValueError: If grace_period_seconds < 0.
        """
        if grace_period_seconds < 0:
            raise ValueError("Grace period cannot be negative")

        stmt = select(SecurityConfigModel).where(SecurityConfigModel.id == 1)
        result = await self.session.execute(stmt)
        config_model = result.scalar_one_or_none()

        if config_model is None:
            # Create default first, then update
            config_model = SecurityConfigModel(
                id=1,
                global_min_token_version=1,
                grace_period_seconds=grace_period_seconds,
            )
            self.session.add(config_model)
        else:
            config_model.grace_period_seconds = grace_period_seconds

        await self.session.commit()
        await self.session.refresh(config_model)

        return self._to_domain(config_model)

    def _to_domain(self, config_model: SecurityConfigModel) -> SecurityConfig:
        """Convert database model to domain entity.

        Args:
            config_model: SQLAlchemy SecurityConfigModel instance.

        Returns:
            Domain SecurityConfig entity.
        """
        return SecurityConfig(
            id=config_model.id,
            global_min_token_version=config_model.global_min_token_version,
            grace_period_seconds=config_model.grace_period_seconds,
            last_rotation_at=config_model.last_rotation_at,
            last_rotation_reason=config_model.last_rotation_reason,
            created_at=config_model.created_at,
            updated_at=config_model.updated_at,
        )

    async def _invalidate_security_cache(self) -> None:
        """Invalidate security config cache.

        Called after global_min_token_version is updated.
        Ensures cached security config is removed for immediate propagation.
        """
        if self._cache and self._cache_keys:
            try:
                cache_key = self._cache_keys.security_global_version()
                await self._cache.delete(cache_key)
            except Exception as e:
                # Fail-open: Cache invalidation failure should not block response
                logger.warning(
                    "security_config_cache_invalidation_error",
                    extra={"error": str(e)},
                )
Functions
__init__
__init__(
    session: AsyncSession,
    cache: CacheProtocol | None = None,
    cache_keys: CacheKeys | None = None,
) -> None

Parameters:

Name Type Description Default
session AsyncSession

SQLAlchemy async session.

required
cache CacheProtocol | None

Optional cache for invalidation.

None
cache_keys CacheKeys | None

Optional cache key builder.

None
Source code in src/infrastructure/persistence/repositories/security_config_repository.py
def __init__(
    self,
    session: AsyncSession,
    cache: CacheProtocol | None = None,
    cache_keys: CacheKeys | None = None,
) -> None:
    """Initialize repository with database session.

    Args:
        session: SQLAlchemy async session.
        cache: Optional cache for invalidation.
        cache_keys: Optional cache key builder.
    """
    self.session = session
    self._cache = cache
    self._cache_keys = cache_keys
get async
get() -> SecurityConfig | None

Get the security configuration.

Returns the singleton security config row.

Returns:

Type Description
SecurityConfig | None

SecurityConfig if exists, None if not initialized.

Source code in src/infrastructure/persistence/repositories/security_config_repository.py
async def get(self) -> SecurityConfig | None:
    """Get the security configuration.

    Returns the singleton security config row.

    Returns:
        SecurityConfig if exists, None if not initialized.
    """
    stmt = select(SecurityConfigModel).where(SecurityConfigModel.id == 1)
    result = await self.session.execute(stmt)
    config_model = result.scalar_one_or_none()

    if config_model is None:
        return None

    return self._to_domain(config_model)
get_or_create_default async
get_or_create_default() -> SecurityConfig

Get security config or create with defaults.

Ensures the singleton config row exists. Creates with default values if missing.

Returns:

Name Type Description
SecurityConfig SecurityConfig

The singleton configuration.

Default values
  • global_min_token_version: 1
  • grace_period_seconds: 300 (5 minutes)
Source code in src/infrastructure/persistence/repositories/security_config_repository.py
async def get_or_create_default(self) -> SecurityConfig:
    """Get security config or create with defaults.

    Ensures the singleton config row exists.
    Creates with default values if missing.

    Returns:
        SecurityConfig: The singleton configuration.

    Default values:
        - global_min_token_version: 1
        - grace_period_seconds: 300 (5 minutes)
    """
    config = await self.get()
    if config is not None:
        return config

    # Create default config
    config_model = SecurityConfigModel(
        id=1,
        global_min_token_version=1,
        grace_period_seconds=300,
        last_rotation_at=None,
        last_rotation_reason=None,
    )
    self.session.add(config_model)
    await self.session.commit()
    await self.session.refresh(config_model)

    return self._to_domain(config_model)
update_global_version async
update_global_version(
    new_version: int, reason: str, rotation_time: datetime
) -> SecurityConfig

Update global minimum token version.

Used to trigger global token rotation (invalidate all tokens with version below new_version).

Parameters:

Name Type Description Default
new_version int

New global minimum token version.

required
reason str

Reason for rotation (for audit trail).

required
rotation_time datetime

When rotation was triggered.

required

Returns:

Type Description
SecurityConfig

Updated SecurityConfig.

Raises:

Type Description
ValueError

If new_version <= current version.

Source code in src/infrastructure/persistence/repositories/security_config_repository.py
async def update_global_version(
    self,
    new_version: int,
    reason: str,
    rotation_time: datetime,
) -> SecurityConfig:
    """Update global minimum token version.

    Used to trigger global token rotation (invalidate all tokens
    with version below new_version).

    Args:
        new_version: New global minimum token version.
        reason: Reason for rotation (for audit trail).
        rotation_time: When rotation was triggered.

    Returns:
        Updated SecurityConfig.

    Raises:
        ValueError: If new_version <= current version.
    """
    stmt = select(SecurityConfigModel).where(SecurityConfigModel.id == 1)
    result = await self.session.execute(stmt)
    config_model = result.scalar_one_or_none()

    if config_model is None:
        # Create default first, then update
        config_model = SecurityConfigModel(
            id=1,
            global_min_token_version=1,
            grace_period_seconds=300,
        )
        self.session.add(config_model)
        await self.session.flush()

    if new_version <= config_model.global_min_token_version:
        raise ValueError(
            f"New version ({new_version}) must be greater than "
            f"current version ({config_model.global_min_token_version})"
        )

    config_model.global_min_token_version = new_version
    config_model.last_rotation_at = rotation_time
    config_model.last_rotation_reason = reason

    await self.session.commit()
    await self.session.refresh(config_model)

    # Phase 7: Invalidate security config cache
    await self._invalidate_security_cache()

    return self._to_domain(config_model)
update_grace_period async
update_grace_period(
    grace_period_seconds: int,
) -> SecurityConfig

Update grace period for token rotation.

Parameters:

Name Type Description Default
grace_period_seconds int

New grace period in seconds.

required

Returns:

Type Description
SecurityConfig

Updated SecurityConfig.

Raises:

Type Description
ValueError

If grace_period_seconds < 0.

Source code in src/infrastructure/persistence/repositories/security_config_repository.py
async def update_grace_period(
    self,
    grace_period_seconds: int,
) -> SecurityConfig:
    """Update grace period for token rotation.

    Args:
        grace_period_seconds: New grace period in seconds.

    Returns:
        Updated SecurityConfig.

    Raises:
        ValueError: If grace_period_seconds < 0.
    """
    if grace_period_seconds < 0:
        raise ValueError("Grace period cannot be negative")

    stmt = select(SecurityConfigModel).where(SecurityConfigModel.id == 1)
    result = await self.session.execute(stmt)
    config_model = result.scalar_one_or_none()

    if config_model is None:
        # Create default first, then update
        config_model = SecurityConfigModel(
            id=1,
            global_min_token_version=1,
            grace_period_seconds=grace_period_seconds,
        )
        self.session.add(config_model)
    else:
        config_model.grace_period_seconds = grace_period_seconds

    await self.session.commit()
    await self.session.refresh(config_model)

    return self._to_domain(config_model)