Skip to content

Models

SQLModel database models with relationships and validation.

All models are located in src/models/ and represent database tables. They use SQLModel (combining SQLAlchemy and Pydantic) for both ORM functionality and data validation.

User Model

src.models.user.User

Bases: DashtamBase

Application user model with JWT authentication support.

Represents a user who can authenticate and manage multiple financial provider instances. Supports JWT authentication with email/password, email verification, and account security features.

Attributes:

Name Type Description
email str

User's email address (unique, used for login).

name str

User's full name.

password_hash Optional[str]

Bcrypt hash of user's password.

email_verified bool

Whether email address has been verified.

email_verified_at Optional[datetime]

Timestamp of email verification.

failed_login_attempts int

Count of consecutive failed login attempts.

account_locked_until Optional[datetime]

Timestamp until which account is locked (after too many failures).

last_login_at Optional[datetime]

Timestamp of last successful login.

last_login_ip Optional[str]

IP address of last successful login.

is_active bool

Whether account is active (for soft deletion/suspension).

providers List[Provider]

List of provider instances owned by user.

refresh_tokens List[RefreshToken]

List of active refresh tokens for user sessions.

email_verification_tokens List[EmailVerificationToken]

List of email verification tokens.

password_reset_tokens List[PasswordResetToken]

List of password reset tokens.

Source code in src/models/user.py
class User(DashtamBase, table=True):
    """Application user model with JWT authentication support.

    Represents a user who can authenticate and manage multiple financial
    provider instances. Supports JWT authentication with email/password,
    email verification, and account security features.

    Attributes:
        email: User's email address (unique, used for login).
        name: User's full name.
        password_hash: Bcrypt hash of user's password.
        email_verified: Whether email address has been verified.
        email_verified_at: Timestamp of email verification.
        failed_login_attempts: Count of consecutive failed login attempts.
        account_locked_until: Timestamp until which account is locked (after too many failures).
        last_login_at: Timestamp of last successful login.
        last_login_ip: IP address of last successful login.
        is_active: Whether account is active (for soft deletion/suspension).
        providers: List of provider instances owned by user.
        refresh_tokens: List of active refresh tokens for user sessions.
        email_verification_tokens: List of email verification tokens.
        password_reset_tokens: List of password reset tokens.
    """

    __tablename__ = "users"

    # Basic info
    email: str = Field(
        sa_column=Column(String(255), unique=True, index=True, nullable=False),
        description="User's email address (unique, used for login)",
    )
    name: str = Field(description="User's full name")

    # Authentication
    password_hash: Optional[str] = Field(
        default=None,
        sa_column=Column(String(255), nullable=True),
        description="Bcrypt hash of user's password",
    )

    # Email verification
    email_verified: bool = Field(
        default=False,
        sa_column=Column(Boolean, nullable=False, server_default="false"),
        description="Whether email address has been verified",
    )
    email_verified_at: Optional[datetime] = Field(
        default=None,
        sa_type=DateTime(timezone=True),
        description="Timestamp of email verification",
    )

    # Account security
    failed_login_attempts: int = Field(
        default=0,
        sa_column=Column(Integer, nullable=False, server_default="0"),
        description="Count of consecutive failed login attempts",
    )
    account_locked_until: Optional[datetime] = Field(
        default=None,
        sa_type=DateTime(timezone=True),
        description="Timestamp until which account is locked",
    )

    # Login tracking
    last_login_at: Optional[datetime] = Field(
        default=None,
        sa_type=DateTime(timezone=True),
        description="Timestamp of last successful login",
    )
    last_login_ip: Optional[str] = Field(
        default=None,
        sa_column=Column(INET, nullable=True),
        description="IP address of last successful login",
    )

    # Account status
    is_active: bool = Field(
        default=True,
        description="Whether account is active (for soft deletion/suspension)",
    )

    # Token rotation (per-user version)
    min_token_version: int = Field(
        default=1,
        sa_column=Column(Integer, nullable=False, server_default="1"),
        description="Minimum token version for this user (rotation)",
    )

    # Relationships
    providers: List["Provider"] = Relationship(
        back_populates="user", cascade_delete=True
    )
    refresh_tokens: List["RefreshToken"] = Relationship(
        back_populates="user", cascade_delete=True
    )
    email_verification_tokens: List["EmailVerificationToken"] = Relationship(
        back_populates="user", cascade_delete=True
    )
    password_reset_tokens: List["PasswordResetToken"] = Relationship(
        back_populates="user", cascade_delete=True
    )

    # Validators to ensure timezone awareness
    @field_validator(
        "email_verified_at", "account_locked_until", "last_login_at", mode="before"
    )
    @classmethod
    def ensure_timezone_aware(cls, v: Optional[datetime]) -> Optional[datetime]:
        """Ensure datetime fields are timezone-aware (UTC)."""
        if v is None:
            return None
        if v.tzinfo is None:
            return v.replace(tzinfo=timezone.utc)
        return v.astimezone(timezone.utc)

    @property
    def is_locked(self) -> bool:
        """Check if account is currently locked."""
        if not self.account_locked_until:
            return False
        return datetime.now(timezone.utc) < self.account_locked_until

    @property
    def can_login(self) -> bool:
        """Check if user can log in (active, not locked)."""
        return self.is_active and not self.is_locked

    @property
    def active_providers_count(self) -> int:
        """Count of active provider connections."""
        if not self.providers:
            return 0
        return sum(1 for p in self.providers if p.is_connected)

    @property
    def display_name(self) -> str:
        """Get display name (name or email)."""
        return self.name or self.email.split("@")[0]

    def reset_failed_login_attempts(self) -> None:
        """Reset failed login attempts counter."""
        self.failed_login_attempts = 0
        self.account_locked_until = None

    def increment_failed_login_attempts(self) -> None:
        """Increment failed login attempts and lock account if threshold exceeded."""
        self.failed_login_attempts += 1

        # Lock account after 10 failed attempts (1 hour)
        if self.failed_login_attempts >= 10:
            self.account_locked_until = datetime.now(timezone.utc) + timedelta(hours=1)

Attributes

is_locked property

is_locked: bool

Check if account is currently locked.

can_login property

can_login: bool

Check if user can log in (active, not locked).

active_providers_count property

active_providers_count: int

Count of active provider connections.

display_name property

display_name: str

Get display name (name or email).

Functions

ensure_timezone_aware classmethod

ensure_timezone_aware(
    v: Optional[datetime],
) -> Optional[datetime]

Ensure datetime fields are timezone-aware (UTC).

Source code in src/models/user.py
@field_validator(
    "email_verified_at", "account_locked_until", "last_login_at", mode="before"
)
@classmethod
def ensure_timezone_aware(cls, v: Optional[datetime]) -> Optional[datetime]:
    """Ensure datetime fields are timezone-aware (UTC)."""
    if v is None:
        return None
    if v.tzinfo is None:
        return v.replace(tzinfo=timezone.utc)
    return v.astimezone(timezone.utc)

reset_failed_login_attempts

reset_failed_login_attempts() -> None

Reset failed login attempts counter.

Source code in src/models/user.py
def reset_failed_login_attempts(self) -> None:
    """Reset failed login attempts counter."""
    self.failed_login_attempts = 0
    self.account_locked_until = None

increment_failed_login_attempts

increment_failed_login_attempts() -> None

Increment failed login attempts and lock account if threshold exceeded.

Source code in src/models/user.py
def increment_failed_login_attempts(self) -> None:
    """Increment failed login attempts and lock account if threshold exceeded."""
    self.failed_login_attempts += 1

    # Lock account after 10 failed attempts (1 hour)
    if self.failed_login_attempts >= 10:
        self.account_locked_until = datetime.now(timezone.utc) + timedelta(hours=1)

Provider Model

src.models.provider.Provider

Bases: DashtamBase

User's provider instances.

Each row represents a user's connection to a financial provider. Users can have multiple instances of the same provider with different aliases (e.g., "Schwab Personal" and "Schwab 401k").

Attributes:

Name Type Description
user_id UUID

The user who owns this provider instance.

provider_key str

The provider identifier (must match registry).

alias str

User's custom name for this instance.

is_active bool

Whether this instance is currently active.

metadata bool

Additional user-specific metadata.

Source code in src/models/provider.py
class Provider(DashtamBase, table=True):
    """User's provider instances.

    Each row represents a user's connection to a financial provider.
    Users can have multiple instances of the same provider with different
    aliases (e.g., "Schwab Personal" and "Schwab 401k").

    Attributes:
        user_id: The user who owns this provider instance.
        provider_key: The provider identifier (must match registry).
        alias: User's custom name for this instance.
        is_active: Whether this instance is currently active.
        metadata: Additional user-specific metadata.
    """

    __tablename__ = "providers"

    # Core fields
    user_id: UUID = Field(
        foreign_key="users.id",
        index=True,
        description="User who owns this provider instance",
    )

    provider_key: str = Field(
        index=True, description="Provider identifier (e.g., 'schwab', 'chase')"
    )

    alias: str = Field(
        description="User's custom name for this connection (e.g., 'Schwab Personal')"
    )

    # Additional fields
    provider_metadata: Dict[str, Any] = Field(
        default_factory=dict,
        sa_column=Column(JSON),
        description="User-specific metadata for this instance",
    )

    # Ensure unique aliases per user
    __table_args__ = (
        UniqueConstraint("user_id", "alias", name="unique_user_provider_alias"),
    )

    # Relationships
    user: Optional["User"] = Relationship(back_populates="providers")
    connection: Optional["ProviderConnection"] = Relationship(
        back_populates="provider",
        sa_relationship_kwargs={"uselist": False, "cascade": "all, delete-orphan"},
    )

    @field_validator("provider_key")
    @classmethod
    def validate_provider_key(cls, v: str) -> str:
        """Validate that provider_key exists in the registry.

        This validation happens at the application level.
        The actual check against the registry should be done in the service layer.
        """
        # Note: We can't import ProviderRegistry here due to circular imports
        # Validation should be done in the service/API layer
        return v.lower().strip()

    @property
    def is_connected(self) -> bool:
        """Check if this provider has an active connection."""
        return (
            self.connection is not None
            and self.connection.status == ProviderStatus.ACTIVE
        )

    @property
    def needs_reconnection(self) -> bool:
        """Check if this provider needs to be reconnected."""
        if not self.connection:
            return True
        return self.connection.status in [
            ProviderStatus.EXPIRED,
            ProviderStatus.REVOKED,
            ProviderStatus.ERROR,
        ]

    @property
    def display_status(self) -> str:
        """Get a user-friendly status string."""
        if not self.connection:
            return "Not Connected"
        return self.connection.status.value.title()

Attributes

is_connected property

is_connected: bool

Check if this provider has an active connection.

needs_reconnection property

needs_reconnection: bool

Check if this provider needs to be reconnected.

display_status property

display_status: str

Get a user-friendly status string.

Functions

validate_provider_key classmethod

validate_provider_key(v: str) -> str

Validate that provider_key exists in the registry.

This validation happens at the application level. The actual check against the registry should be done in the service layer.

Source code in src/models/provider.py
@field_validator("provider_key")
@classmethod
def validate_provider_key(cls, v: str) -> str:
    """Validate that provider_key exists in the registry.

    This validation happens at the application level.
    The actual check against the registry should be done in the service layer.
    """
    # Note: We can't import ProviderRegistry here due to circular imports
    # Validation should be done in the service/API layer
    return v.lower().strip()

ProviderConnection Model

src.models.provider.ProviderConnection

Bases: DashtamBase

Connection details for a provider instance.

Each provider instance has exactly one connection that manages the authentication state and sync schedule.

Attributes:

Name Type Description
provider_id UUID

The provider instance this connection belongs to.

status ProviderStatus

Current status of the connection.

connected_at Optional[datetime]

When the connection was established.

last_sync_at Optional[datetime]

When data was last synced.

next_sync_at Optional[datetime]

When the next sync should occur.

sync_frequency_minutes int

How often to sync data.

error_message Optional[str]

Last error message if any.

error_count int

Number of consecutive errors.

accounts_count int

Number of accounts accessible.

accounts_list List[str]

List of account IDs accessible.

Source code in src/models/provider.py
class ProviderConnection(DashtamBase, table=True):
    """Connection details for a provider instance.

    Each provider instance has exactly one connection that manages the
    authentication state and sync schedule.

    Attributes:
        provider_id: The provider instance this connection belongs to.
        status: Current status of the connection.
        connected_at: When the connection was established.
        last_sync_at: When data was last synced.
        next_sync_at: When the next sync should occur.
        sync_frequency_minutes: How often to sync data.
        error_message: Last error message if any.
        error_count: Number of consecutive errors.
        accounts_count: Number of accounts accessible.
        accounts_list: List of account IDs accessible.
    """

    __tablename__ = "provider_connections"

    # One-to-one with Provider
    provider_id: UUID = Field(
        foreign_key="providers.id",
        unique=True,
        index=True,
        description="The provider instance this connection belongs to",
    )

    # Connection status
    status: ProviderStatus = Field(
        default=ProviderStatus.PENDING,
        index=True,
        description="Current connection status",
    )

    connected_at: Optional[datetime] = Field(
        default=None,
        sa_type=DateTime(timezone=True),
        description="When connection was first established",
    )

    # Sync tracking
    last_sync_at: Optional[datetime] = Field(
        default=None,
        sa_type=DateTime(timezone=True),
        description="Last successful data sync",
    )

    next_sync_at: Optional[datetime] = Field(
        default=None,
        sa_type=DateTime(timezone=True),
        index=True,
        description="Next scheduled sync",
    )

    sync_frequency_minutes: int = Field(default=60, description="Minutes between syncs")

    # Error tracking
    error_message: Optional[str] = Field(
        default=None, sa_column=Column(Text), description="Last error message if any"
    )

    error_count: int = Field(default=0, description="Consecutive error count")

    # Account information
    accounts_count: int = Field(default=0, description="Number of accounts accessible")

    accounts_list: List[str] = Field(
        default_factory=list,
        sa_column=Column(JSON),
        description="List of account IDs accessible",
    )

    # Relationships
    provider: Provider = Relationship(back_populates="connection")
    token: Optional["ProviderToken"] = Relationship(
        back_populates="connection",
        sa_relationship_kwargs={"uselist": False, "cascade": "all, delete-orphan"},
    )
    audit_logs: List["ProviderAuditLog"] = Relationship(
        back_populates="connection", cascade_delete=True
    )

    # Validators to ensure timezone awareness
    @field_validator("connected_at", "last_sync_at", "next_sync_at", mode="before")
    @classmethod
    def ensure_timezone_aware(cls, v: Optional[datetime]) -> Optional[datetime]:
        """Ensure datetime fields are timezone-aware (UTC)."""
        if v is None:
            return None
        if v.tzinfo is None:
            return v.replace(tzinfo=timezone.utc)
        return v.astimezone(timezone.utc)

    def mark_connected(self) -> None:
        """Mark the connection as successfully connected."""
        self.status = ProviderStatus.ACTIVE
        self.connected_at = datetime.now(timezone.utc)
        self.error_count = 0
        self.error_message = None
        self.schedule_next_sync()

    def schedule_next_sync(self) -> None:
        """Schedule the next sync based on frequency."""
        self.next_sync_at = datetime.now(timezone.utc) + timedelta(
            minutes=self.sync_frequency_minutes
        )

    def mark_sync_successful(self, accounts: Optional[List[str]] = None) -> None:
        """Mark a sync as successful and schedule next one.

        Args:
            accounts: Optional list of account IDs that were synced.
        """
        self.last_sync_at = datetime.now(timezone.utc)
        self.error_count = 0
        self.error_message = None
        self.status = ProviderStatus.ACTIVE

        if accounts is not None:
            self.accounts_list = accounts
            self.accounts_count = len(accounts)

        self.schedule_next_sync()

    def mark_sync_failed(self, error_message: str) -> None:
        """Mark a sync as failed and increment error count.

        Args:
            error_message: Description of the error that occurred.
        """
        self.error_count += 1
        self.error_message = error_message

        # Set status to error after 3 consecutive failures
        if self.error_count >= 3:
            self.status = ProviderStatus.ERROR

        # Schedule next sync with exponential backoff
        backoff_minutes = min(
            self.sync_frequency_minutes * (2**self.error_count),
            1440,  # Max 24 hours
        )
        self.next_sync_at = datetime.now(timezone.utc) + timedelta(
            minutes=backoff_minutes
        )

Functions

ensure_timezone_aware classmethod

ensure_timezone_aware(
    v: Optional[datetime],
) -> Optional[datetime]

Ensure datetime fields are timezone-aware (UTC).

Source code in src/models/provider.py
@field_validator("connected_at", "last_sync_at", "next_sync_at", mode="before")
@classmethod
def ensure_timezone_aware(cls, v: Optional[datetime]) -> Optional[datetime]:
    """Ensure datetime fields are timezone-aware (UTC)."""
    if v is None:
        return None
    if v.tzinfo is None:
        return v.replace(tzinfo=timezone.utc)
    return v.astimezone(timezone.utc)

mark_connected

mark_connected() -> None

Mark the connection as successfully connected.

Source code in src/models/provider.py
def mark_connected(self) -> None:
    """Mark the connection as successfully connected."""
    self.status = ProviderStatus.ACTIVE
    self.connected_at = datetime.now(timezone.utc)
    self.error_count = 0
    self.error_message = None
    self.schedule_next_sync()

schedule_next_sync

schedule_next_sync() -> None

Schedule the next sync based on frequency.

Source code in src/models/provider.py
def schedule_next_sync(self) -> None:
    """Schedule the next sync based on frequency."""
    self.next_sync_at = datetime.now(timezone.utc) + timedelta(
        minutes=self.sync_frequency_minutes
    )

mark_sync_successful

mark_sync_successful(
    accounts: Optional[List[str]] = None,
) -> None

Mark a sync as successful and schedule next one.

Parameters:

Name Type Description Default
accounts Optional[List[str]]

Optional list of account IDs that were synced.

None
Source code in src/models/provider.py
def mark_sync_successful(self, accounts: Optional[List[str]] = None) -> None:
    """Mark a sync as successful and schedule next one.

    Args:
        accounts: Optional list of account IDs that were synced.
    """
    self.last_sync_at = datetime.now(timezone.utc)
    self.error_count = 0
    self.error_message = None
    self.status = ProviderStatus.ACTIVE

    if accounts is not None:
        self.accounts_list = accounts
        self.accounts_count = len(accounts)

    self.schedule_next_sync()

mark_sync_failed

mark_sync_failed(error_message: str) -> None

Mark a sync as failed and increment error count.

Parameters:

Name Type Description Default
error_message str

Description of the error that occurred.

required
Source code in src/models/provider.py
def mark_sync_failed(self, error_message: str) -> None:
    """Mark a sync as failed and increment error count.

    Args:
        error_message: Description of the error that occurred.
    """
    self.error_count += 1
    self.error_message = error_message

    # Set status to error after 3 consecutive failures
    if self.error_count >= 3:
        self.status = ProviderStatus.ERROR

    # Schedule next sync with exponential backoff
    backoff_minutes = min(
        self.sync_frequency_minutes * (2**self.error_count),
        1440,  # Max 24 hours
    )
    self.next_sync_at = datetime.now(timezone.utc) + timedelta(
        minutes=backoff_minutes
    )

ProviderToken Model

src.models.provider.ProviderToken

Bases: DashtamBase

OAuth tokens for a provider connection.

Each connection has exactly one token record that stores encrypted OAuth credentials. Tokens are automatically refreshed when expired.

The tokens are encrypted before storage for security. The encryption is handled by the TokenService layer.

Attributes:

Name Type Description
connection_id UUID

The connection this token belongs to.

access_token_encrypted str

Encrypted OAuth access token.

refresh_token_encrypted Optional[str]

Encrypted OAuth refresh token.

id_token Optional[str]

JWT ID token if provided (for identifying unique connections).

token_type str

Type of token (usually 'Bearer').

expires_at Optional[datetime]

When the access token expires.

scope Optional[str]

OAuth scopes granted.

last_refreshed_at Optional[datetime]

When token was last refreshed.

refresh_count int

Number of times refreshed.

Source code in src/models/provider.py
class ProviderToken(DashtamBase, table=True):
    """OAuth tokens for a provider connection.

    Each connection has exactly one token record that stores encrypted
    OAuth credentials. Tokens are automatically refreshed when expired.

    The tokens are encrypted before storage for security. The encryption
    is handled by the TokenService layer.

    Attributes:
        connection_id: The connection this token belongs to.
        access_token_encrypted: Encrypted OAuth access token.
        refresh_token_encrypted: Encrypted OAuth refresh token.
        id_token: JWT ID token if provided (for identifying unique connections).
        token_type: Type of token (usually 'Bearer').
        expires_at: When the access token expires.
        scope: OAuth scopes granted.
        last_refreshed_at: When token was last refreshed.
        refresh_count: Number of times refreshed.
    """

    __tablename__ = "provider_tokens"

    # One-to-one with ProviderConnection
    connection_id: UUID = Field(
        foreign_key="provider_connections.id",
        unique=True,
        index=True,
        description="The connection this token belongs to",
    )

    # Encrypted token data
    access_token_encrypted: str = Field(
        sa_column=Column(Text), description="Encrypted OAuth access token"
    )

    refresh_token_encrypted: Optional[str] = Field(
        default=None,
        sa_column=Column(Text),
        description="Encrypted OAuth refresh token",
    )

    # ID token for provider identification (not encrypted as it's already a JWT)
    id_token: Optional[str] = Field(
        default=None,
        sa_column=Column(Text),
        description="JWT ID token from provider (used for deduplication)",
    )

    # Token metadata
    token_type: str = Field(default="Bearer", description="Type of token")

    expires_at: Optional[datetime] = Field(
        default=None,
        sa_type=DateTime(timezone=True),
        index=True,
        description="Access token expiration",
    )

    scope: Optional[str] = Field(default=None, description="OAuth scopes granted")

    # Refresh tracking
    last_refreshed_at: Optional[datetime] = Field(
        default=None,
        sa_type=DateTime(timezone=True),
        description="Last refresh timestamp",
    )

    refresh_count: int = Field(default=0, description="Number of refreshes")

    # Relationships
    connection: ProviderConnection = Relationship(back_populates="token")

    # Validators to ensure timezone awareness
    @field_validator("expires_at", "last_refreshed_at", mode="before")
    @classmethod
    def ensure_timezone_aware(cls, v: Optional[datetime]) -> Optional[datetime]:
        """Ensure datetime fields are timezone-aware (UTC)."""
        if v is None:
            return None
        if v.tzinfo is None:
            return v.replace(tzinfo=timezone.utc)
        return v.astimezone(timezone.utc)

    @property
    def is_expired(self) -> bool:
        """Check if the access token is expired.

        Returns:
            True if the token has expired, False otherwise.
        """
        if not self.expires_at:
            return False
        return datetime.now(timezone.utc) >= self.expires_at

    @property
    def is_expiring_soon(self) -> bool:
        """Check if token expires within 5 minutes.

        Returns:
            True if the token expires within 5 minutes, False otherwise.
        """
        if not self.expires_at:
            return False
        return datetime.now(timezone.utc) >= (self.expires_at - timedelta(minutes=5))

    @property
    def needs_refresh(self) -> bool:
        """Check if token should be refreshed."""
        return self.is_expired or self.is_expiring_soon

    def update_tokens(
        self,
        access_token_encrypted: str,
        refresh_token_encrypted: Optional[str] = None,
        expires_in: Optional[int] = None,
        id_token: Optional[str] = None,
    ) -> None:
        """Update tokens from a refresh response.

        Handles token rotation if the provider sends a new refresh token.

        Args:
            access_token_encrypted: New encrypted access token.
            refresh_token_encrypted: New encrypted refresh token (if rotated).
            expires_in: Token lifetime in seconds.
            id_token: New ID token if provided.
        """
        self.access_token_encrypted = access_token_encrypted

        # Handle token rotation - update refresh token if provider sends new one
        if refresh_token_encrypted:
            self.refresh_token_encrypted = refresh_token_encrypted

        if expires_in:
            self.expires_at = datetime.now(timezone.utc) + timedelta(seconds=expires_in)

        if id_token:
            self.id_token = id_token

        self.last_refreshed_at = datetime.now(timezone.utc)
        self.refresh_count += 1

Attributes

is_expired property

is_expired: bool

Check if the access token is expired.

Returns:

Type Description
bool

True if the token has expired, False otherwise.

is_expiring_soon property

is_expiring_soon: bool

Check if token expires within 5 minutes.

Returns:

Type Description
bool

True if the token expires within 5 minutes, False otherwise.

needs_refresh property

needs_refresh: bool

Check if token should be refreshed.

Functions

ensure_timezone_aware classmethod

ensure_timezone_aware(
    v: Optional[datetime],
) -> Optional[datetime]

Ensure datetime fields are timezone-aware (UTC).

Source code in src/models/provider.py
@field_validator("expires_at", "last_refreshed_at", mode="before")
@classmethod
def ensure_timezone_aware(cls, v: Optional[datetime]) -> Optional[datetime]:
    """Ensure datetime fields are timezone-aware (UTC)."""
    if v is None:
        return None
    if v.tzinfo is None:
        return v.replace(tzinfo=timezone.utc)
    return v.astimezone(timezone.utc)

update_tokens

update_tokens(
    access_token_encrypted: str,
    refresh_token_encrypted: Optional[str] = None,
    expires_in: Optional[int] = None,
    id_token: Optional[str] = None,
) -> None

Update tokens from a refresh response.

Handles token rotation if the provider sends a new refresh token.

Parameters:

Name Type Description Default
access_token_encrypted str

New encrypted access token.

required
refresh_token_encrypted Optional[str]

New encrypted refresh token (if rotated).

None
expires_in Optional[int]

Token lifetime in seconds.

None
id_token Optional[str]

New ID token if provided.

None
Source code in src/models/provider.py
def update_tokens(
    self,
    access_token_encrypted: str,
    refresh_token_encrypted: Optional[str] = None,
    expires_in: Optional[int] = None,
    id_token: Optional[str] = None,
) -> None:
    """Update tokens from a refresh response.

    Handles token rotation if the provider sends a new refresh token.

    Args:
        access_token_encrypted: New encrypted access token.
        refresh_token_encrypted: New encrypted refresh token (if rotated).
        expires_in: Token lifetime in seconds.
        id_token: New ID token if provided.
    """
    self.access_token_encrypted = access_token_encrypted

    # Handle token rotation - update refresh token if provider sends new one
    if refresh_token_encrypted:
        self.refresh_token_encrypted = refresh_token_encrypted

    if expires_in:
        self.expires_at = datetime.now(timezone.utc) + timedelta(seconds=expires_in)

    if id_token:
        self.id_token = id_token

    self.last_refreshed_at = datetime.now(timezone.utc)
    self.refresh_count += 1

RefreshToken Model

src.models.user.RefreshToken

Bases: DashtamBase

Refresh token for JWT authentication with session management.

Stores refresh tokens with rotation support. Each token is hashed before storage and can be revoked for logout or security events. Each refresh token represents a user session on a specific device.

Session Management: - Users can view all active sessions (devices) - Users can revoke sessions individually or in bulk - Email alerts sent for new sessions from new devices/locations - Device fingerprinting for session hijacking detection

Attributes:

Name Type Description
user_id UUID

ID of user who owns this token.

token_hash str

Bcrypt hash of the refresh token.

expires_at datetime

Token expiration timestamp (30 days).

revoked_at Optional[datetime]

Timestamp when token was revoked (logout).

is_revoked bool

Whether token has been revoked.

device_info Optional[str]

Information about device/browser.

ip_address Optional[str]

IP address where token was issued.

user_agent Optional[str]

User agent string of client.

last_used_at Optional[datetime]

Timestamp of last token use (refresh).

location Optional[str]

User-friendly location from IP geolocation.

fingerprint Optional[str]

SHA256 hash of device fingerprint.

is_trusted_device bool

User-marked trusted device flag.

user User

User who owns this token.

Source code in src/models/auth.py
class RefreshToken(DashtamBase, table=True):
    """Refresh token for JWT authentication with session management.

    Stores refresh tokens with rotation support. Each token is hashed
    before storage and can be revoked for logout or security events.
    Each refresh token represents a user session on a specific device.

    Session Management:
    - Users can view all active sessions (devices)
    - Users can revoke sessions individually or in bulk
    - Email alerts sent for new sessions from new devices/locations
    - Device fingerprinting for session hijacking detection

    Attributes:
        user_id: ID of user who owns this token.
        token_hash: Bcrypt hash of the refresh token.
        expires_at: Token expiration timestamp (30 days).
        revoked_at: Timestamp when token was revoked (logout).
        is_revoked: Whether token has been revoked.
        device_info: Information about device/browser.
        ip_address: IP address where token was issued.
        user_agent: User agent string of client.
        last_used_at: Timestamp of last token use (refresh).
        location: User-friendly location from IP geolocation.
        fingerprint: SHA256 hash of device fingerprint.
        is_trusted_device: User-marked trusted device flag.
        user: User who owns this token.
    """

    __tablename__ = "refresh_tokens"

    user_id: UUID = Field(
        foreign_key="users.id",
        nullable=False,
        index=True,
        ondelete="CASCADE",
        description="ID of user who owns this token",
    )
    token_hash: str = Field(
        sa_column=Column(String(255), nullable=False, unique=True, index=True),
        description="Bcrypt hash of the refresh token",
    )
    expires_at: datetime = Field(
        sa_type=DateTime(timezone=True),
        nullable=False,
        description="Token expiration timestamp",
    )
    revoked_at: Optional[datetime] = Field(
        default=None,
        sa_type=DateTime(timezone=True),
        description="Timestamp when token was revoked",
    )
    is_revoked: bool = Field(
        default=False,
        sa_column=Column(Boolean, nullable=False, server_default="false", index=True),
        description="Whether token has been revoked",
    )
    device_info: Optional[str] = Field(
        default=None,
        sa_column=Column(Text, nullable=True),
        description="Information about device/browser",
    )
    ip_address: Optional[str] = Field(
        default=None,
        sa_column=Column(INET, nullable=True),
        description="IP address where token was issued",
    )
    user_agent: Optional[str] = Field(
        default=None,
        sa_column=Column(Text, nullable=True),
        description="User agent string of client",
    )
    last_used_at: Optional[datetime] = Field(
        default=None,
        sa_type=DateTime(timezone=True),
        description="Timestamp of last token use",
    )

    # Session management fields
    location: Optional[str] = Field(
        default=None,
        sa_column=Column(String(255), nullable=True),
        description="User-friendly location from IP geolocation (e.g., 'San Francisco, USA')",
    )
    fingerprint: Optional[str] = Field(
        default=None,
        sa_column=Column(String(64), nullable=True, index=True),
        description="SHA256 hash of device fingerprint (browser + OS + screen + timezone)",
    )
    is_trusted_device: bool = Field(
        default=False,
        sa_column=Column(Boolean, nullable=False, server_default="false"),
        description="User-marked trusted device (future: extended session TTL)",
    )

    # Token versioning (hybrid approach)
    token_version: int = Field(
        default=1,
        sa_column=Column(Integer, nullable=False, server_default="1", index=True),
        description="User's token version at issuance time",
    )
    global_version_at_issuance: int = Field(
        default=1,
        sa_column=Column(Integer, nullable=False, server_default="1", index=True),
        description="Global token version at issuance time",
    )

    # Relationships
    user: "User" = Relationship(back_populates="refresh_tokens")

    @field_validator("expires_at", "revoked_at", "last_used_at", mode="before")
    @classmethod
    def ensure_timezone_aware(cls, v: Optional[datetime]) -> Optional[datetime]:
        """Ensure datetime fields are timezone-aware (UTC)."""
        if v is None:
            return None
        if v.tzinfo is None:
            return v.replace(tzinfo=timezone.utc)
        return v.astimezone(timezone.utc)

    @property
    def is_expired(self) -> bool:
        """Check if token has expired."""
        return datetime.now(timezone.utc) > self.expires_at

    @property
    def is_valid(self) -> bool:
        """Check if token is valid (not expired, not revoked)."""
        return not self.is_expired and not self.is_revoked

    def revoke(self) -> None:
        """Revoke this token."""
        self.is_revoked = True
        self.revoked_at = datetime.now(timezone.utc)

Attributes

is_expired property

is_expired: bool

Check if token has expired.

is_valid property

is_valid: bool

Check if token is valid (not expired, not revoked).

Functions

ensure_timezone_aware classmethod

ensure_timezone_aware(
    v: Optional[datetime],
) -> Optional[datetime]

Ensure datetime fields are timezone-aware (UTC).

Source code in src/models/auth.py
@field_validator("expires_at", "revoked_at", "last_used_at", mode="before")
@classmethod
def ensure_timezone_aware(cls, v: Optional[datetime]) -> Optional[datetime]:
    """Ensure datetime fields are timezone-aware (UTC)."""
    if v is None:
        return None
    if v.tzinfo is None:
        return v.replace(tzinfo=timezone.utc)
    return v.astimezone(timezone.utc)

revoke

revoke() -> None

Revoke this token.

Source code in src/models/auth.py
def revoke(self) -> None:
    """Revoke this token."""
    self.is_revoked = True
    self.revoked_at = datetime.now(timezone.utc)

ProviderAuditLog Model

src.models.provider.ProviderAuditLog

Bases: DashtamBase

Audit log for provider operations.

Tracks all significant operations on provider connections for debugging, security, and compliance purposes. This provides a complete audit trail of all provider-related activities.

Attributes:

Name Type Description
connection_id UUID

The connection this log entry relates to.

user_id UUID

User who performed the action.

action str

Type of action performed.

details Dict[str, Any]

Additional details about the action.

ip_address Optional[str]

IP address of the request.

user_agent Optional[str]

User agent string.

Source code in src/models/provider.py
class ProviderAuditLog(DashtamBase, table=True):
    """Audit log for provider operations.

    Tracks all significant operations on provider connections for debugging,
    security, and compliance purposes. This provides a complete audit trail
    of all provider-related activities.

    Attributes:
        connection_id: The connection this log entry relates to.
        user_id: User who performed the action.
        action: Type of action performed.
        details: Additional details about the action.
        ip_address: IP address of the request.
        user_agent: User agent string.
    """

    __tablename__ = "provider_audit_logs"

    # References
    connection_id: UUID = Field(
        foreign_key="provider_connections.id",
        index=True,
        description="Connection this log relates to",
    )

    user_id: UUID = Field(
        foreign_key="users.id", index=True, description="User who performed the action"
    )

    # Log details
    action: str = Field(index=True, description="Action performed")

    # Common action types
    # - provider_created: New provider instance created
    # - connection_initiated: OAuth flow started
    # - token_created: Initial tokens stored
    # - token_refreshed: Access token refreshed
    # - token_refresh_failed: Refresh attempt failed
    # - sync_started: Data sync initiated
    # - sync_completed: Data sync successful
    # - sync_failed: Data sync failed
    # - connection_revoked: User revoked connection
    # - connection_error: Connection entered error state

    details: Dict[str, Any] = Field(
        default_factory=dict,
        sa_column=Column(JSON),
        description="Additional action details",
    )

    # Request information
    ip_address: Optional[str] = Field(
        default=None, description="IP address of the request"
    )

    user_agent: Optional[str] = Field(
        default=None, sa_column=Column(Text), description="User agent of the request"
    )

    # Relationships
    connection: ProviderConnection = Relationship(back_populates="audit_logs")

    @classmethod
    def log_action(
        cls,
        connection_id: UUID,
        user_id: UUID,
        action: str,
        details: Optional[Dict[str, Any]] = None,
        ip_address: Optional[str] = None,
        user_agent: Optional[str] = None,
    ) -> "ProviderAuditLog":
        """Factory method to create an audit log entry.

        Args:
            connection_id: The connection this action relates to.
            user_id: User who performed the action.
            action: Description of the action.
            details: Additional context about the action.
            ip_address: Client IP address if available.
            user_agent: Client user agent if available.

        Returns:
            New audit log entry (not yet persisted).
        """
        return cls(
            connection_id=connection_id,
            user_id=user_id,
            action=action,
            details=details or {},
            ip_address=ip_address,
            user_agent=user_agent,
        )

Functions

log_action classmethod

log_action(
    connection_id: UUID,
    user_id: UUID,
    action: str,
    details: Optional[Dict[str, Any]] = None,
    ip_address: Optional[str] = None,
    user_agent: Optional[str] = None,
) -> ProviderAuditLog

Factory method to create an audit log entry.

Parameters:

Name Type Description Default
connection_id UUID

The connection this action relates to.

required
user_id UUID

User who performed the action.

required
action str

Description of the action.

required
details Optional[Dict[str, Any]]

Additional context about the action.

None
ip_address Optional[str]

Client IP address if available.

None
user_agent Optional[str]

Client user agent if available.

None

Returns:

Type Description
ProviderAuditLog

New audit log entry (not yet persisted).

Source code in src/models/provider.py
@classmethod
def log_action(
    cls,
    connection_id: UUID,
    user_id: UUID,
    action: str,
    details: Optional[Dict[str, Any]] = None,
    ip_address: Optional[str] = None,
    user_agent: Optional[str] = None,
) -> "ProviderAuditLog":
    """Factory method to create an audit log entry.

    Args:
        connection_id: The connection this action relates to.
        user_id: User who performed the action.
        action: Description of the action.
        details: Additional context about the action.
        ip_address: Client IP address if available.
        user_agent: Client user agent if available.

    Returns:
        New audit log entry (not yet persisted).
    """
    return cls(
        connection_id=connection_id,
        user_id=user_id,
        action=action,
        details=details or {},
        ip_address=ip_address,
        user_agent=user_agent,
    )