Skip to content

Services

Business logic and service layer implementations.

All services are located in src/services/ and follow dependency injection patterns. Services handle business logic, external integrations, and orchestrate operations across models.

Authentication Service

src.services.auth_service.AuthService

Service for user authentication and management (orchestrator).

This service orchestrates authentication workflows by delegating to specialized services while maintaining a unified public API.

Workflows
  • Registration: Create user → Delegate verification to VerificationService
  • Verification: Delegate to VerificationService
  • Login: Verify credentials → Generate tokens → Create refresh token
  • Refresh: Validate refresh token → Generate new access token
  • Password Reset: Delegate to PasswordResetService

Attributes:

Name Type Description
session

Database session for async operations

password_service

Service for password operations (sync)

jwt_service

Service for JWT operations (sync)

verification_service

Service for email verification workflows

password_reset_service

Service for password reset workflows

Source code in src/services/auth_service.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
class AuthService:
    """Service for user authentication and management (orchestrator).

    This service orchestrates authentication workflows by delegating to
    specialized services while maintaining a unified public API.

    Workflows:
        - Registration: Create user → Delegate verification to VerificationService
        - Verification: Delegate to VerificationService
        - Login: Verify credentials → Generate tokens → Create refresh token
        - Refresh: Validate refresh token → Generate new access token
        - Password Reset: Delegate to PasswordResetService

    Attributes:
        session: Database session for async operations
        password_service: Service for password operations (sync)
        jwt_service: Service for JWT operations (sync)
        verification_service: Service for email verification workflows
        password_reset_service: Service for password reset workflows
    """

    def __init__(self, session: AsyncSession, cache: Optional[CacheBackend] = None):
        """Initialize auth service with dependencies.

        Args:
            session: Async database session
            cache: Cache backend for token blacklist (SOLID: Dependency Injection)

        Note:
            Development mode is automatically determined from settings.DEBUG.
            When DEBUG=True, emails are logged instead of sent.
            Cache is optional (defaults to singleton if not provided).
        """
        self.session = session
        self.settings = get_settings()
        self.password_service = PasswordService()
        self.jwt_service = JWTService()
        # Delegate to specialized services for verification and password reset
        self.verification_service = VerificationService(session)
        self.password_reset_service = PasswordResetService(session)
        # Email service for notifications
        self.email_service = EmailService(development_mode=self.settings.DEBUG)
        # Session management: geolocation for IP → location
        self.geolocation_service = get_geolocation_service()
        # Cache for token blacklist (immediate revocation)
        self.cache = cache or get_cache()

    async def register_user(
        self, email: str, password: str, name: Optional[str] = None
    ) -> User:
        """Register a new user with email verification.

        Creates a new user account with hashed password and sends
        verification email. User account is inactive until verified.

        Args:
            email: User's email address (unique)
            password: Plain text password (will be hashed)
            name: User's full name (optional)

        Returns:
            Created user instance (unverified)

        Raises:
            HTTPException: If email already exists or password is weak

        Example:
            >>> service = AuthService(session)
            >>> user = await service.register_user(
            ...     email="user@example.com",
            ...     password="SecurePass123!",
            ...     name="John Doe"
            ... )
            >>> user.is_verified
            False
        """
        # Check if email already exists
        result = await self.session.execute(select(User).where(User.email == email))
        existing_user = result.scalar_one_or_none()

        if existing_user:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="Email already registered",
            )

        # Validate password strength
        is_valid, error_message = self.password_service.validate_password_strength(
            password
        )
        if not is_valid:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST, detail=error_message
            )

        # Hash password
        password_hash = self.password_service.hash_password(password)

        # Create user
        user = User(
            email=email,
            password_hash=password_hash,
            name=name,
            is_active=True,  # Account is active but not verified
            email_verified=False,
        )

        self.session.add(user)
        await self.session.flush()  # Get user ID
        await self.session.refresh(user)

        # Delegate to VerificationService to create token and send email
        await self.verification_service.create_verification_token(
            user_id=user.id, email=email, user_name=name
        )

        await self.session.commit()

        logger.info(f"User registered: {email} (ID: {user.id})")
        return user

    async def verify_email(self, token: str) -> User:
        """Verify user's email address using verification token.

        Delegates to VerificationService for token validation and user activation.

        Args:
            token: Email verification token string

        Returns:
            Verified user instance

        Raises:
            HTTPException: If token is invalid or expired

        Example:
            >>> service = AuthService(session)
            >>> user = await service.verify_email("abc123def456")
            >>> user.is_verified
            True
        """
        # Delegate to VerificationService
        return await self.verification_service.verify_email(token)

    async def login(
        self,
        email: str,
        password: str,
        ip_address: Optional[str] = None,
        user_agent: Optional[str] = None,
    ) -> Tuple[str, str, User]:
        """Authenticate user and generate JWT tokens with session tracking.

        Verifies credentials and generates both access and refresh tokens.
        Refresh token is stored in database with session metadata (location,
        device info, fingerprint) for session management.

        Args:
            email: User's email address
            password: Plain text password
            ip_address: Client IP address (for geolocation)
            user_agent: Client User-Agent header (for device fingerprinting)

        Returns:
            Tuple of (access_token, refresh_token, user)

        Raises:
            HTTPException: If credentials invalid or user not verified

        Example:
            >>> service = AuthService(session)
            >>> access, refresh, user = await service.login(
            ...     email="user@example.com",
            ...     password="SecurePass123!",
            ...     ip_address="203.0.113.1",
            ...     user_agent="Mozilla/5.0 ..."
            ... )
        """
        # Get user by email
        result = await self.session.execute(select(User).where(User.email == email))
        user = result.scalar_one_or_none()

        if not user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid email or password",
            )

        # Verify password
        if not self.password_service.verify_password(password, user.password_hash):
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid email or password",
            )

        # Check if user is active
        if not user.is_active:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN, detail="Account is disabled"
            )

        # Check if email is verified
        if not user.email_verified:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Email not verified. Please check your email for verification link.",
            )

        # Check if password needs rehashing (bcrypt rounds changed)
        if self.password_service.needs_rehash(user.password_hash):
            user.password_hash = self.password_service.hash_password(password)
            logger.info(f"Password rehashed for user {user.email}")

        # Update last login
        user.last_login_at = datetime.now(timezone.utc)

        # Generate opaque refresh token (stateful, hashed in DB) with session metadata
        # Pattern A: Opaque tokens, not JWT (industry standard)
        refresh_token, refresh_token_record = await self._create_refresh_token(
            user_id=user.id, ip_address=ip_address, user_agent=user_agent
        )

        # Generate JWT access token (stateless) with session link (jti claim) and version
        access_token = self.jwt_service.create_access_token(
            user_id=user.id,
            email=user.email,
            refresh_token_id=refresh_token_record.id,
            token_version=user.min_token_version,
        )

        await self.session.commit()

        logger.info(f"User logged in: {user.email} (ID: {user.id})")
        return access_token, refresh_token, user

    async def refresh_access_token(
        self,
        refresh_token: str,
        ip_address: Optional[str] = None,
        user_agent: Optional[str] = None,
    ) -> str:
        """Generate new access token using opaque refresh token with session tracking.

        Validates the opaque refresh token by hashing and database lookup.
        Updates last_used_at on the refresh token for session tracking.
        Generates JWT with jti claim linking to refresh token (session ID).
        This is Pattern A (industry standard): JWT access + opaque refresh.

        Args:
            refresh_token: Opaque refresh token string (not JWT)
            ip_address: Client IP address (for activity tracking)
            user_agent: Client User-Agent header (for fingerprint validation)

        Returns:
            New JWT access token string

        Raises:
            HTTPException: If refresh token invalid, expired, or revoked

        Example:
            >>> service = AuthService(session)
            >>> new_access_token = await service.refresh_access_token(
            ...     refresh_token=refresh_token,
            ...     ip_address="203.0.113.1",
            ...     user_agent="Mozilla/5.0 ..."
            ... )
        """
        # Get all active (non-revoked) refresh tokens
        result = await self.session.execute(
            select(RefreshToken).where(RefreshToken.revoked_at.is_(None))
        )
        refresh_tokens = result.scalars().all()

        # Find matching token by comparing hashes (secure validation)
        refresh_token_record = None
        for token_record in refresh_tokens:
            if self.password_service.verify_password(
                refresh_token, token_record.token_hash
            ):
                refresh_token_record = token_record
                break

        if not refresh_token_record:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid or revoked refresh token",
            )

        # Check if token is blacklisted in cache (immediate revocation)
        # This provides instant revocation even if DB hasn't propagated yet
        blacklist_key = f"revoked_token:{refresh_token_record.id}"
        try:
            is_blacklisted = await self.cache.exists(blacklist_key)
            if is_blacklisted:
                logger.warning(
                    f"Blacklisted token used: {refresh_token_record.id} "
                    f"(user: {refresh_token_record.user_id})"
                )
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Token has been revoked",
                )
        except (CacheError, RedisError) as e:
            # Log cache error but don't fail (DB check below is fallback)
            # Note: Don't catch HTTPException - let it propagate (security)
            logger.error(f"Cache blacklist check failed: {e}")

        # Check if token expired
        if datetime.now(timezone.utc) > refresh_token_record.expires_at:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Refresh token has expired",
            )

        # Get user
        result = await self.session.execute(
            select(User).where(User.id == refresh_token_record.user_id)
        )
        user = result.scalar_one_or_none()

        if not user or not user.is_active:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="User not found or inactive",
            )

        # Validate token versions (hybrid rotation check)
        is_valid, failure_reason = await self._validate_token_versions(
            refresh_token_record, user
        )
        if not is_valid:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail=f"Token has been rotated: {failure_reason}",
            )

        # Update last_used_at for session tracking
        refresh_token_record.last_used_at = datetime.now(timezone.utc)

        # Generate new JWT access token with jti claim (links to session) and version
        access_token = self.jwt_service.create_access_token(
            user_id=user.id,
            email=user.email,
            refresh_token_id=refresh_token_record.id,
            token_version=user.min_token_version,
        )

        await self.session.commit()

        logger.info(
            f"Access token refreshed for user: {user.email} (ID: {user.id}), "
            f"session: {refresh_token_record.id}"
        )
        return access_token

    async def logout(self, refresh_token: str) -> None:
        """Logout user by revoking opaque refresh token.

        Validates the opaque refresh token by hashing and database lookup,
        then marks it as revoked. Access tokens remain valid until expiration.
        This is Pattern A (industry standard): JWT access + opaque refresh.

        Args:
            refresh_token: Opaque refresh token string (not JWT)

        Raises:
            HTTPException: If refresh token invalid

        Example:
            >>> service = AuthService(session)
            >>> await service.logout(refresh_token)
        """
        # Get all active (non-revoked) refresh tokens
        result = await self.session.execute(
            select(RefreshToken).where(RefreshToken.revoked_at.is_(None))
        )
        refresh_tokens = result.scalars().all()

        # Find matching token by comparing hashes (secure validation)
        refresh_token_record = None
        for token_record in refresh_tokens:
            if self.password_service.verify_password(
                refresh_token, token_record.token_hash
            ):
                refresh_token_record = token_record
                break

        if refresh_token_record:
            # Revoke the token
            refresh_token_record.revoked_at = datetime.now(timezone.utc)
            refresh_token_record.is_revoked = True
            await self.session.commit()
            logger.info(
                f"Refresh token revoked for user: {refresh_token_record.user_id}"
            )
        else:
            # Don't reveal if token doesn't exist (security best practice)
            logger.warning("Logout attempted with invalid or already revoked token")

    async def request_password_reset(self, email: str) -> None:
        """Request password reset by sending reset email.

        Delegates to PasswordResetService for token generation and email.
        Always succeeds to prevent email enumeration attacks.

        Args:
            email: User's email address

        Example:
            >>> service = AuthService(session)
            >>> await service.request_password_reset("user@example.com")
        """
        # Delegate to PasswordResetService
        await self.password_reset_service.request_reset(email)

    async def reset_password(self, token: str, new_password: str) -> User:
        """Reset user password using reset token.

        Delegates to PasswordResetService for token validation, password update,
        session revocation, and confirmation email.

        Security: All active refresh tokens are revoked to ensure that any
        potentially compromised sessions are terminated. This forces users
        to log in again with the new password on all devices.

        Args:
            token: Password reset token string
            new_password: New plain text password

        Returns:
            User instance with updated password

        Raises:
            HTTPException: If token invalid/expired or password weak

        Example:
            >>> service = AuthService(session)
            >>> user = await service.reset_password(
            ...     token="xyz789",
            ...     new_password="NewSecurePass123!"
            ... )
        """
        # Delegate to PasswordResetService
        return await self.password_reset_service.reset_password(token, new_password)

    async def get_user_by_id(self, user_id: UUID) -> Optional[User]:
        """Get user by ID.

        Args:
            user_id: User's unique identifier

        Returns:
            User instance or None if not found

        Example:
            >>> service = AuthService(session)
            >>> user = await service.get_user_by_id(user_id)
        """
        result = await self.session.execute(select(User).where(User.id == user_id))
        return result.scalar_one_or_none()

    async def get_user_by_email(self, email: str) -> Optional[User]:
        """Get user by email address.

        Args:
            email: User's email address

        Returns:
            User instance or None if not found

        Example:
            >>> service = AuthService(session)
            >>> user = await service.get_user_by_email("user@example.com")
        """
        result = await self.session.execute(select(User).where(User.email == email))
        return result.scalar_one_or_none()

    async def update_user_profile(
        self, user_id: UUID, name: Optional[str] = None
    ) -> User:
        """Update user profile information.

        Args:
            user_id: User's unique identifier
            name: New name (optional)

        Returns:
            Updated user instance

        Raises:
            HTTPException: If user not found

        Example:
            >>> service = AuthService(session)
            >>> user = await service.update_user_profile(
            ...     user_id=user_id,
            ...     name="Jane Doe"
            ... )
        """
        user = await self.get_user_by_id(user_id)

        if not user:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
            )

        if name is not None:
            user.name = name

        await self.session.commit()
        await self.session.refresh(user)

        logger.info(f"Profile updated for user: {user.email} (ID: {user.id})")
        return user

    async def change_password(
        self, user_id: UUID, current_password: str, new_password: str
    ) -> User:
        """Change user password (requires current password).

        Args:
            user_id: User's unique identifier
            current_password: Current password for verification
            new_password: New password to set

        Returns:
            User instance with updated password

        Raises:
            HTTPException: If current password wrong or new password weak

        Example:
            >>> service = AuthService(session)
            >>> user = await service.change_password(
            ...     user_id=user_id,
            ...     current_password="OldPass123!",
            ...     new_password="NewSecurePass123!"
            ... )
        """
        user = await self.get_user_by_id(user_id)

        if not user:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
            )

        # Verify current password
        if not self.password_service.verify_password(
            current_password, user.password_hash
        ):
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="Current password is incorrect",
            )

        # Validate new password is not the same as current
        if self.password_service.verify_password(new_password, user.password_hash):
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="New password cannot be the same as current password",
            )

        # Validate new password strength
        is_valid, error_message = self.password_service.validate_password_strength(
            new_password
        )
        if not is_valid:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST, detail=error_message
            )

        # Update password
        user.password_hash = self.password_service.hash_password(new_password)

        # 🔒 SECURITY: Rotate all tokens (logout all other devices)
        # Uses TokenRotationService to increment min_token_version and invalidate all existing tokens
        # This prevents compromised sessions from remaining active after password change
        rotation_service = TokenRotationService(self.session)
        rotation_result = await rotation_service.rotate_user_tokens(
            user_id=user.id, reason="Password changed by user"
        )

        # Commit password change and token rotation together (atomic)
        await self.session.commit()

        logger.info(
            f"Password changed for user {user.email}: Rotated tokens (version {rotation_result.old_version}{rotation_result.new_version}, revoked {rotation_result.tokens_revoked} tokens)"
        )

        # Send notification email
        try:
            await self.email_service.send_password_changed_notification(
                to_email=user.email, user_name=user.name
            )
            logger.info(f"Password changed notification sent to {user.email}")
        except Exception as e:
            logger.error(f"Failed to send password changed email to {user.email}: {e}")

        return user

    # Private helper methods

    async def _create_refresh_token(
        self,
        user_id: UUID,
        ip_address: Optional[str] = None,
        user_agent: Optional[str] = None,
    ) -> tuple[str, RefreshToken]:
        """Create opaque refresh token with session metadata (Pattern A + Session Management).

        Generates a random opaque token (not JWT), hashes it for storage,
        collects session metadata (location, device fingerprint), and returns
        both the plain token and the database record.

        Session Metadata:
        - location: Geolocation from IP (e.g., "San Francisco, USA")
        - fingerprint: SHA256 hash of device characteristics
        - is_trusted_device: False by default (future: device trust learning)

        Args:
            user_id: User's unique identifier
            ip_address: Client IP address (for geolocation)
            user_agent: Client User-Agent header (for fingerprinting)

        Returns:
            Tuple of (plain_token, token_record)
            - plain_token: Opaque random token to return to client
            - token_record: Database record with hashed token and session metadata

        Note:
            Graceful degradation: If IP/user_agent missing, uses default values:
            - location: "Unknown Location"
            - fingerprint: hash of empty string
        """
        # Generate random opaque token (like email verification)
        plain_token = secrets.token_urlsafe(32)

        # Hash token for storage (security best practice)
        token_hash = self.password_service.hash_password(plain_token)

        expires_at = datetime.now(timezone.utc) + timedelta(
            days=self.settings.REFRESH_TOKEN_EXPIRE_DAYS
        )

        # Session metadata: Get geolocation from IP address
        location = "Unknown Location"
        if ip_address:
            location = self.geolocation_service.get_location(ip_address)

        # Session metadata: Generate device fingerprint from user agent
        # Simple fingerprint: SHA256 of user_agent string
        # (Real fingerprinting uses more data, but requires Request object)
        fingerprint_string = user_agent or ""
        fingerprint = hashlib.sha256(fingerprint_string.encode("utf-8")).hexdigest()

        # Token versioning: Get current versions for hybrid rotation
        from src.models.security_config import SecurityConfig

        # Get global version
        result = await self.session.execute(select(SecurityConfig))
        global_config = result.scalar_one()

        # Get user's current version
        result = await self.session.execute(select(User).where(User.id == user_id))
        user = result.scalar_one()

        # Create refresh token record with hash, session metadata, and versions
        refresh_token = RefreshToken(
            user_id=user_id,
            token_hash=token_hash,
            expires_at=expires_at,
            location=location,
            fingerprint=fingerprint,
            is_trusted_device=False,  # Default: new sessions not trusted
            token_version=user.min_token_version,  # User's current version
            global_version_at_issuance=global_config.global_min_token_version,  # Global version at creation
        )

        self.session.add(refresh_token)
        await self.session.flush()
        await self.session.refresh(refresh_token)

        logger.debug(
            f"Refresh token created for user {user_id}: "
            f"location={location}, fingerprint={fingerprint[:8]}..."
        )

        return plain_token, refresh_token

    async def _validate_token_versions(
        self, token: RefreshToken, user: User
    ) -> tuple[bool, Optional[str]]:
        """Validate token against both global and per-user versions.

        Two-level validation:
        1. Global version check (for system-wide breaches)
        2. Per-user version check (for user-specific events)

        Both checks must pass for token to be valid.

        Args:
            token: RefreshToken to validate.
            user: User who owns the token.

        Returns:
            Tuple of (is_valid, failure_reason).

        Example:
            >>> is_valid, reason = await service._validate_token_versions(token, user)
            >>> if not is_valid:
            ...     raise HTTPException(status_code=401, detail=reason)
        """
        # Get current global version
        from src.models.security_config import SecurityConfig

        result = await self.session.execute(select(SecurityConfig))
        config = result.scalar_one()

        # Check global version (rare, extreme breach)
        if token.global_version_at_issuance < config.global_min_token_version:
            logger.warning(
                f"Token failed global version check: "
                f"token_global_v{token.global_version_at_issuance} < "
                f"required_v{config.global_min_token_version}, "
                f"token_id={token.id}"
            )
            return False, "GLOBAL_TOKEN_VERSION_TOO_OLD"

        # Check per-user version (common, targeted rotation)
        if token.token_version < user.min_token_version:
            logger.info(
                f"Token failed user version check: "
                f"token_v{token.token_version} < "
                f"min_v{user.min_token_version}, "
                f"user_id={user.id}"
            )
            return False, "USER_TOKEN_VERSION_TOO_OLD"

        return True, None

Functions

__init__

__init__(
    session: AsyncSession,
    cache: Optional[CacheBackend] = None,
)

Initialize auth service with dependencies.

Parameters:

Name Type Description Default
session AsyncSession

Async database session

required
cache Optional[CacheBackend]

Cache backend for token blacklist (SOLID: Dependency Injection)

None
Note

Development mode is automatically determined from settings.DEBUG. When DEBUG=True, emails are logged instead of sent. Cache is optional (defaults to singleton if not provided).

Source code in src/services/auth_service.py
def __init__(self, session: AsyncSession, cache: Optional[CacheBackend] = None):
    """Initialize auth service with dependencies.

    Args:
        session: Async database session
        cache: Cache backend for token blacklist (SOLID: Dependency Injection)

    Note:
        Development mode is automatically determined from settings.DEBUG.
        When DEBUG=True, emails are logged instead of sent.
        Cache is optional (defaults to singleton if not provided).
    """
    self.session = session
    self.settings = get_settings()
    self.password_service = PasswordService()
    self.jwt_service = JWTService()
    # Delegate to specialized services for verification and password reset
    self.verification_service = VerificationService(session)
    self.password_reset_service = PasswordResetService(session)
    # Email service for notifications
    self.email_service = EmailService(development_mode=self.settings.DEBUG)
    # Session management: geolocation for IP → location
    self.geolocation_service = get_geolocation_service()
    # Cache for token blacklist (immediate revocation)
    self.cache = cache or get_cache()

register_user async

register_user(
    email: str, password: str, name: Optional[str] = None
) -> User

Register a new user with email verification.

Creates a new user account with hashed password and sends verification email. User account is inactive until verified.

Parameters:

Name Type Description Default
email str

User's email address (unique)

required
password str

Plain text password (will be hashed)

required
name Optional[str]

User's full name (optional)

None

Returns:

Type Description
User

Created user instance (unverified)

Raises:

Type Description
HTTPException

If email already exists or password is weak

Example

service = AuthService(session) user = await service.register_user( ... email="user@example.com", ... password="SecurePass123!", ... name="John Doe" ... ) user.is_verified False

Source code in src/services/auth_service.py
async def register_user(
    self, email: str, password: str, name: Optional[str] = None
) -> User:
    """Register a new user with email verification.

    Creates a new user account with hashed password and sends
    verification email. User account is inactive until verified.

    Args:
        email: User's email address (unique)
        password: Plain text password (will be hashed)
        name: User's full name (optional)

    Returns:
        Created user instance (unverified)

    Raises:
        HTTPException: If email already exists or password is weak

    Example:
        >>> service = AuthService(session)
        >>> user = await service.register_user(
        ...     email="user@example.com",
        ...     password="SecurePass123!",
        ...     name="John Doe"
        ... )
        >>> user.is_verified
        False
    """
    # Check if email already exists
    result = await self.session.execute(select(User).where(User.email == email))
    existing_user = result.scalar_one_or_none()

    if existing_user:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Email already registered",
        )

    # Validate password strength
    is_valid, error_message = self.password_service.validate_password_strength(
        password
    )
    if not is_valid:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST, detail=error_message
        )

    # Hash password
    password_hash = self.password_service.hash_password(password)

    # Create user
    user = User(
        email=email,
        password_hash=password_hash,
        name=name,
        is_active=True,  # Account is active but not verified
        email_verified=False,
    )

    self.session.add(user)
    await self.session.flush()  # Get user ID
    await self.session.refresh(user)

    # Delegate to VerificationService to create token and send email
    await self.verification_service.create_verification_token(
        user_id=user.id, email=email, user_name=name
    )

    await self.session.commit()

    logger.info(f"User registered: {email} (ID: {user.id})")
    return user

verify_email async

verify_email(token: str) -> User

Verify user's email address using verification token.

Delegates to VerificationService for token validation and user activation.

Parameters:

Name Type Description Default
token str

Email verification token string

required

Returns:

Type Description
User

Verified user instance

Raises:

Type Description
HTTPException

If token is invalid or expired

Example

service = AuthService(session) user = await service.verify_email("abc123def456") user.is_verified True

Source code in src/services/auth_service.py
async def verify_email(self, token: str) -> User:
    """Verify user's email address using verification token.

    Delegates to VerificationService for token validation and user activation.

    Args:
        token: Email verification token string

    Returns:
        Verified user instance

    Raises:
        HTTPException: If token is invalid or expired

    Example:
        >>> service = AuthService(session)
        >>> user = await service.verify_email("abc123def456")
        >>> user.is_verified
        True
    """
    # Delegate to VerificationService
    return await self.verification_service.verify_email(token)

login async

login(
    email: str,
    password: str,
    ip_address: Optional[str] = None,
    user_agent: Optional[str] = None,
) -> Tuple[str, str, User]

Authenticate user and generate JWT tokens with session tracking.

Verifies credentials and generates both access and refresh tokens. Refresh token is stored in database with session metadata (location, device info, fingerprint) for session management.

Parameters:

Name Type Description Default
email str

User's email address

required
password str

Plain text password

required
ip_address Optional[str]

Client IP address (for geolocation)

None
user_agent Optional[str]

Client User-Agent header (for device fingerprinting)

None

Returns:

Type Description
Tuple[str, str, User]

Tuple of (access_token, refresh_token, user)

Raises:

Type Description
HTTPException

If credentials invalid or user not verified

Example

service = AuthService(session) access, refresh, user = await service.login( ... email="user@example.com", ... password="SecurePass123!", ... ip_address="203.0.113.1", ... user_agent="Mozilla/5.0 ..." ... )

Source code in src/services/auth_service.py
async def login(
    self,
    email: str,
    password: str,
    ip_address: Optional[str] = None,
    user_agent: Optional[str] = None,
) -> Tuple[str, str, User]:
    """Authenticate user and generate JWT tokens with session tracking.

    Verifies credentials and generates both access and refresh tokens.
    Refresh token is stored in database with session metadata (location,
    device info, fingerprint) for session management.

    Args:
        email: User's email address
        password: Plain text password
        ip_address: Client IP address (for geolocation)
        user_agent: Client User-Agent header (for device fingerprinting)

    Returns:
        Tuple of (access_token, refresh_token, user)

    Raises:
        HTTPException: If credentials invalid or user not verified

    Example:
        >>> service = AuthService(session)
        >>> access, refresh, user = await service.login(
        ...     email="user@example.com",
        ...     password="SecurePass123!",
        ...     ip_address="203.0.113.1",
        ...     user_agent="Mozilla/5.0 ..."
        ... )
    """
    # Get user by email
    result = await self.session.execute(select(User).where(User.email == email))
    user = result.scalar_one_or_none()

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid email or password",
        )

    # Verify password
    if not self.password_service.verify_password(password, user.password_hash):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid email or password",
        )

    # Check if user is active
    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN, detail="Account is disabled"
        )

    # Check if email is verified
    if not user.email_verified:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Email not verified. Please check your email for verification link.",
        )

    # Check if password needs rehashing (bcrypt rounds changed)
    if self.password_service.needs_rehash(user.password_hash):
        user.password_hash = self.password_service.hash_password(password)
        logger.info(f"Password rehashed for user {user.email}")

    # Update last login
    user.last_login_at = datetime.now(timezone.utc)

    # Generate opaque refresh token (stateful, hashed in DB) with session metadata
    # Pattern A: Opaque tokens, not JWT (industry standard)
    refresh_token, refresh_token_record = await self._create_refresh_token(
        user_id=user.id, ip_address=ip_address, user_agent=user_agent
    )

    # Generate JWT access token (stateless) with session link (jti claim) and version
    access_token = self.jwt_service.create_access_token(
        user_id=user.id,
        email=user.email,
        refresh_token_id=refresh_token_record.id,
        token_version=user.min_token_version,
    )

    await self.session.commit()

    logger.info(f"User logged in: {user.email} (ID: {user.id})")
    return access_token, refresh_token, user

refresh_access_token async

refresh_access_token(
    refresh_token: str,
    ip_address: Optional[str] = None,
    user_agent: Optional[str] = None,
) -> str

Generate new access token using opaque refresh token with session tracking.

Validates the opaque refresh token by hashing and database lookup. Updates last_used_at on the refresh token for session tracking. Generates JWT with jti claim linking to refresh token (session ID). This is Pattern A (industry standard): JWT access + opaque refresh.

Parameters:

Name Type Description Default
refresh_token str

Opaque refresh token string (not JWT)

required
ip_address Optional[str]

Client IP address (for activity tracking)

None
user_agent Optional[str]

Client User-Agent header (for fingerprint validation)

None

Returns:

Type Description
str

New JWT access token string

Raises:

Type Description
HTTPException

If refresh token invalid, expired, or revoked

Example

service = AuthService(session) new_access_token = await service.refresh_access_token( ... refresh_token=refresh_token, ... ip_address="203.0.113.1", ... user_agent="Mozilla/5.0 ..." ... )

Source code in src/services/auth_service.py
async def refresh_access_token(
    self,
    refresh_token: str,
    ip_address: Optional[str] = None,
    user_agent: Optional[str] = None,
) -> str:
    """Generate new access token using opaque refresh token with session tracking.

    Validates the opaque refresh token by hashing and database lookup.
    Updates last_used_at on the refresh token for session tracking.
    Generates JWT with jti claim linking to refresh token (session ID).
    This is Pattern A (industry standard): JWT access + opaque refresh.

    Args:
        refresh_token: Opaque refresh token string (not JWT)
        ip_address: Client IP address (for activity tracking)
        user_agent: Client User-Agent header (for fingerprint validation)

    Returns:
        New JWT access token string

    Raises:
        HTTPException: If refresh token invalid, expired, or revoked

    Example:
        >>> service = AuthService(session)
        >>> new_access_token = await service.refresh_access_token(
        ...     refresh_token=refresh_token,
        ...     ip_address="203.0.113.1",
        ...     user_agent="Mozilla/5.0 ..."
        ... )
    """
    # Get all active (non-revoked) refresh tokens
    result = await self.session.execute(
        select(RefreshToken).where(RefreshToken.revoked_at.is_(None))
    )
    refresh_tokens = result.scalars().all()

    # Find matching token by comparing hashes (secure validation)
    refresh_token_record = None
    for token_record in refresh_tokens:
        if self.password_service.verify_password(
            refresh_token, token_record.token_hash
        ):
            refresh_token_record = token_record
            break

    if not refresh_token_record:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or revoked refresh token",
        )

    # Check if token is blacklisted in cache (immediate revocation)
    # This provides instant revocation even if DB hasn't propagated yet
    blacklist_key = f"revoked_token:{refresh_token_record.id}"
    try:
        is_blacklisted = await self.cache.exists(blacklist_key)
        if is_blacklisted:
            logger.warning(
                f"Blacklisted token used: {refresh_token_record.id} "
                f"(user: {refresh_token_record.user_id})"
            )
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Token has been revoked",
            )
    except (CacheError, RedisError) as e:
        # Log cache error but don't fail (DB check below is fallback)
        # Note: Don't catch HTTPException - let it propagate (security)
        logger.error(f"Cache blacklist check failed: {e}")

    # Check if token expired
    if datetime.now(timezone.utc) > refresh_token_record.expires_at:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Refresh token has expired",
        )

    # Get user
    result = await self.session.execute(
        select(User).where(User.id == refresh_token_record.user_id)
    )
    user = result.scalar_one_or_none()

    if not user or not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="User not found or inactive",
        )

    # Validate token versions (hybrid rotation check)
    is_valid, failure_reason = await self._validate_token_versions(
        refresh_token_record, user
    )
    if not is_valid:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Token has been rotated: {failure_reason}",
        )

    # Update last_used_at for session tracking
    refresh_token_record.last_used_at = datetime.now(timezone.utc)

    # Generate new JWT access token with jti claim (links to session) and version
    access_token = self.jwt_service.create_access_token(
        user_id=user.id,
        email=user.email,
        refresh_token_id=refresh_token_record.id,
        token_version=user.min_token_version,
    )

    await self.session.commit()

    logger.info(
        f"Access token refreshed for user: {user.email} (ID: {user.id}), "
        f"session: {refresh_token_record.id}"
    )
    return access_token

logout async

logout(refresh_token: str) -> None

Logout user by revoking opaque refresh token.

Validates the opaque refresh token by hashing and database lookup, then marks it as revoked. Access tokens remain valid until expiration. This is Pattern A (industry standard): JWT access + opaque refresh.

Parameters:

Name Type Description Default
refresh_token str

Opaque refresh token string (not JWT)

required

Raises:

Type Description
HTTPException

If refresh token invalid

Example

service = AuthService(session) await service.logout(refresh_token)

Source code in src/services/auth_service.py
async def logout(self, refresh_token: str) -> None:
    """Logout user by revoking opaque refresh token.

    Validates the opaque refresh token by hashing and database lookup,
    then marks it as revoked. Access tokens remain valid until expiration.
    This is Pattern A (industry standard): JWT access + opaque refresh.

    Args:
        refresh_token: Opaque refresh token string (not JWT)

    Raises:
        HTTPException: If refresh token invalid

    Example:
        >>> service = AuthService(session)
        >>> await service.logout(refresh_token)
    """
    # Get all active (non-revoked) refresh tokens
    result = await self.session.execute(
        select(RefreshToken).where(RefreshToken.revoked_at.is_(None))
    )
    refresh_tokens = result.scalars().all()

    # Find matching token by comparing hashes (secure validation)
    refresh_token_record = None
    for token_record in refresh_tokens:
        if self.password_service.verify_password(
            refresh_token, token_record.token_hash
        ):
            refresh_token_record = token_record
            break

    if refresh_token_record:
        # Revoke the token
        refresh_token_record.revoked_at = datetime.now(timezone.utc)
        refresh_token_record.is_revoked = True
        await self.session.commit()
        logger.info(
            f"Refresh token revoked for user: {refresh_token_record.user_id}"
        )
    else:
        # Don't reveal if token doesn't exist (security best practice)
        logger.warning("Logout attempted with invalid or already revoked token")

request_password_reset async

request_password_reset(email: str) -> None

Request password reset by sending reset email.

Delegates to PasswordResetService for token generation and email. Always succeeds to prevent email enumeration attacks.

Parameters:

Name Type Description Default
email str

User's email address

required
Example

service = AuthService(session) await service.request_password_reset("user@example.com")

Source code in src/services/auth_service.py
async def request_password_reset(self, email: str) -> None:
    """Request password reset by sending reset email.

    Delegates to PasswordResetService for token generation and email.
    Always succeeds to prevent email enumeration attacks.

    Args:
        email: User's email address

    Example:
        >>> service = AuthService(session)
        >>> await service.request_password_reset("user@example.com")
    """
    # Delegate to PasswordResetService
    await self.password_reset_service.request_reset(email)

reset_password async

reset_password(token: str, new_password: str) -> User

Reset user password using reset token.

Delegates to PasswordResetService for token validation, password update, session revocation, and confirmation email.

Security: All active refresh tokens are revoked to ensure that any potentially compromised sessions are terminated. This forces users to log in again with the new password on all devices.

Parameters:

Name Type Description Default
token str

Password reset token string

required
new_password str

New plain text password

required

Returns:

Type Description
User

User instance with updated password

Raises:

Type Description
HTTPException

If token invalid/expired or password weak

Example

service = AuthService(session) user = await service.reset_password( ... token="xyz789", ... new_password="NewSecurePass123!" ... )

Source code in src/services/auth_service.py
async def reset_password(self, token: str, new_password: str) -> User:
    """Reset user password using reset token.

    Delegates to PasswordResetService for token validation, password update,
    session revocation, and confirmation email.

    Security: All active refresh tokens are revoked to ensure that any
    potentially compromised sessions are terminated. This forces users
    to log in again with the new password on all devices.

    Args:
        token: Password reset token string
        new_password: New plain text password

    Returns:
        User instance with updated password

    Raises:
        HTTPException: If token invalid/expired or password weak

    Example:
        >>> service = AuthService(session)
        >>> user = await service.reset_password(
        ...     token="xyz789",
        ...     new_password="NewSecurePass123!"
        ... )
    """
    # Delegate to PasswordResetService
    return await self.password_reset_service.reset_password(token, new_password)

get_user_by_id async

get_user_by_id(user_id: UUID) -> Optional[User]

Get user by ID.

Parameters:

Name Type Description Default
user_id UUID

User's unique identifier

required

Returns:

Type Description
Optional[User]

User instance or None if not found

Example

service = AuthService(session) user = await service.get_user_by_id(user_id)

Source code in src/services/auth_service.py
async def get_user_by_id(self, user_id: UUID) -> Optional[User]:
    """Get user by ID.

    Args:
        user_id: User's unique identifier

    Returns:
        User instance or None if not found

    Example:
        >>> service = AuthService(session)
        >>> user = await service.get_user_by_id(user_id)
    """
    result = await self.session.execute(select(User).where(User.id == user_id))
    return result.scalar_one_or_none()

get_user_by_email async

get_user_by_email(email: str) -> Optional[User]

Get user by email address.

Parameters:

Name Type Description Default
email str

User's email address

required

Returns:

Type Description
Optional[User]

User instance or None if not found

Example

service = AuthService(session) user = await service.get_user_by_email("user@example.com")

Source code in src/services/auth_service.py
async def get_user_by_email(self, email: str) -> Optional[User]:
    """Get user by email address.

    Args:
        email: User's email address

    Returns:
        User instance or None if not found

    Example:
        >>> service = AuthService(session)
        >>> user = await service.get_user_by_email("user@example.com")
    """
    result = await self.session.execute(select(User).where(User.email == email))
    return result.scalar_one_or_none()

update_user_profile async

update_user_profile(
    user_id: UUID, name: Optional[str] = None
) -> User

Update user profile information.

Parameters:

Name Type Description Default
user_id UUID

User's unique identifier

required
name Optional[str]

New name (optional)

None

Returns:

Type Description
User

Updated user instance

Raises:

Type Description
HTTPException

If user not found

Example

service = AuthService(session) user = await service.update_user_profile( ... user_id=user_id, ... name="Jane Doe" ... )

Source code in src/services/auth_service.py
async def update_user_profile(
    self, user_id: UUID, name: Optional[str] = None
) -> User:
    """Update user profile information.

    Args:
        user_id: User's unique identifier
        name: New name (optional)

    Returns:
        Updated user instance

    Raises:
        HTTPException: If user not found

    Example:
        >>> service = AuthService(session)
        >>> user = await service.update_user_profile(
        ...     user_id=user_id,
        ...     name="Jane Doe"
        ... )
    """
    user = await self.get_user_by_id(user_id)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
        )

    if name is not None:
        user.name = name

    await self.session.commit()
    await self.session.refresh(user)

    logger.info(f"Profile updated for user: {user.email} (ID: {user.id})")
    return user

change_password async

change_password(
    user_id: UUID, current_password: str, new_password: str
) -> User

Change user password (requires current password).

Parameters:

Name Type Description Default
user_id UUID

User's unique identifier

required
current_password str

Current password for verification

required
new_password str

New password to set

required

Returns:

Type Description
User

User instance with updated password

Raises:

Type Description
HTTPException

If current password wrong or new password weak

Example

service = AuthService(session) user = await service.change_password( ... user_id=user_id, ... current_password="OldPass123!", ... new_password="NewSecurePass123!" ... )

Source code in src/services/auth_service.py
async def change_password(
    self, user_id: UUID, current_password: str, new_password: str
) -> User:
    """Change user password (requires current password).

    Args:
        user_id: User's unique identifier
        current_password: Current password for verification
        new_password: New password to set

    Returns:
        User instance with updated password

    Raises:
        HTTPException: If current password wrong or new password weak

    Example:
        >>> service = AuthService(session)
        >>> user = await service.change_password(
        ...     user_id=user_id,
        ...     current_password="OldPass123!",
        ...     new_password="NewSecurePass123!"
        ... )
    """
    user = await self.get_user_by_id(user_id)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
        )

    # Verify current password
    if not self.password_service.verify_password(
        current_password, user.password_hash
    ):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Current password is incorrect",
        )

    # Validate new password is not the same as current
    if self.password_service.verify_password(new_password, user.password_hash):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="New password cannot be the same as current password",
        )

    # Validate new password strength
    is_valid, error_message = self.password_service.validate_password_strength(
        new_password
    )
    if not is_valid:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST, detail=error_message
        )

    # Update password
    user.password_hash = self.password_service.hash_password(new_password)

    # 🔒 SECURITY: Rotate all tokens (logout all other devices)
    # Uses TokenRotationService to increment min_token_version and invalidate all existing tokens
    # This prevents compromised sessions from remaining active after password change
    rotation_service = TokenRotationService(self.session)
    rotation_result = await rotation_service.rotate_user_tokens(
        user_id=user.id, reason="Password changed by user"
    )

    # Commit password change and token rotation together (atomic)
    await self.session.commit()

    logger.info(
        f"Password changed for user {user.email}: Rotated tokens (version {rotation_result.old_version}{rotation_result.new_version}, revoked {rotation_result.tokens_revoked} tokens)"
    )

    # Send notification email
    try:
        await self.email_service.send_password_changed_notification(
            to_email=user.email, user_name=user.name
        )
        logger.info(f"Password changed notification sent to {user.email}")
    except Exception as e:
        logger.error(f"Failed to send password changed email to {user.email}: {e}")

    return user

Token Service

src.services.token_service.TokenService

Service for managing OAuth tokens throughout their lifecycle.

This service provides a high-level interface for token operations, handling encryption, refresh logic, and audit logging automatically.

Source code in src/services/token_service.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
class TokenService:
    """Service for managing OAuth tokens throughout their lifecycle.

    This service provides a high-level interface for token operations,
    handling encryption, refresh logic, and audit logging automatically.
    """

    def __init__(self, session: AsyncSession):
        """Initialize the token service.

        Args:
            session: Database session for operations.
        """
        self.session = session
        self.encryption = get_encryption_service()

    async def store_initial_tokens(
        self,
        provider_id: UUID,
        tokens: Dict[str, Any],
        user_id: UUID,
        request_info: Optional[Dict[str, str]] = None,
    ) -> ProviderToken:
        """Store initial tokens after successful OAuth authentication.

        This method is called after the OAuth callback to store the initial
        tokens received from the provider.

        Args:
            provider_id: The provider instance ID.
            tokens: Token response from provider containing:
                - access_token: The access token
                - refresh_token: Optional refresh token
                - expires_in: Token lifetime in seconds
                - id_token: Optional JWT ID token
                - scope: Optional granted scopes
            user_id: User who authorized the connection.
            request_info: Optional request metadata (IP, user agent).

        Returns:
            The created ProviderToken instance.

        Raises:
            ValueError: If provider or connection not found.
        """
        # Get provider and connection
        from sqlalchemy.orm import selectinload
        from sqlmodel import select

        # Load provider with connection and token relationships
        result = await self.session.execute(
            select(Provider)
            .options(
                selectinload(Provider.connection).selectinload(ProviderConnection.token)
            )
            .where(Provider.id == provider_id)
        )
        provider = result.scalar_one_or_none()
        if not provider:
            raise ValueError(f"Provider {provider_id} not found")

        if not provider.connection:
            # Create connection if it doesn't exist
            connection = ProviderConnection(
                provider_id=provider_id, status=ProviderStatus.PENDING
            )
            self.session.add(connection)
            await self.session.flush()
        else:
            connection = provider.connection

        # Check if token already exists
        existing_token = connection.token if connection else None

        # Encrypt tokens
        encrypted_access = self.encryption.encrypt(tokens["access_token"])
        encrypted_refresh = None
        if tokens.get("refresh_token"):
            encrypted_refresh = self.encryption.encrypt(tokens["refresh_token"])

        # Calculate expiration
        expires_at = None
        if tokens.get("expires_in"):
            expires_at = datetime.now(timezone.utc) + timedelta(
                seconds=int(tokens["expires_in"])
            )

        if existing_token:
            # Update existing token
            existing_token.access_token_encrypted = encrypted_access
            if encrypted_refresh:
                existing_token.refresh_token_encrypted = encrypted_refresh
            existing_token.expires_at = expires_at
            existing_token.id_token = tokens.get("id_token")
            existing_token.scope = tokens.get("scope")
            existing_token.updated_at = datetime.now(timezone.utc)
            token = existing_token
            action = "token_updated"
        else:
            # Create new token
            token = ProviderToken(
                connection_id=connection.id,
                access_token_encrypted=encrypted_access,
                refresh_token_encrypted=encrypted_refresh,
                expires_at=expires_at,
                id_token=tokens.get("id_token"),
                token_type=tokens.get("token_type", "Bearer"),
                scope=tokens.get("scope"),
            )
            self.session.add(token)
            action = "token_created"

        # Update connection status
        connection.mark_connected()

        # Create audit log
        audit_log = ProviderAuditLog(
            connection_id=connection.id,
            user_id=user_id,
            action=action,
            details={
                "provider_key": provider.provider_key,
                "alias": provider.alias,
                "has_refresh_token": bool(encrypted_refresh),
                "expires_in": tokens.get("expires_in"),
                "scope": tokens.get("scope"),
            },
            ip_address=request_info.get("ip_address") if request_info else None,
            user_agent=request_info.get("user_agent") if request_info else None,
        )
        self.session.add(audit_log)

        # Flush to persist changes in current transaction
        await self.session.flush()
        logger.info(f"Stored tokens for provider {provider.alias} (ID: {provider_id})")

        return token

    async def get_valid_access_token(self, provider_id: UUID, user_id: UUID) -> str:
        """Get a valid access token for a provider, refreshing if necessary.

        This is the main method used by providers to get tokens. It handles:
        - Decryption of stored tokens
        - Checking expiration
        - Automatic refresh if expired or expiring soon
        - Audit logging

        Args:
            provider_id: The provider instance ID.
            user_id: The user requesting the token.

        Returns:
            Valid decrypted access token.

        Raises:
            ValueError: If provider, connection, or token not found.
            Exception: If token refresh fails.
        """
        # Get provider with connection and token
        from sqlalchemy.orm import selectinload
        from sqlmodel import select

        result = await self.session.execute(
            select(Provider)
            .options(
                selectinload(Provider.connection).selectinload(ProviderConnection.token)
            )
            .where(Provider.id == provider_id)
        )
        provider = result.scalar_one_or_none()
        if not provider:
            raise ValueError(f"Provider {provider_id} not found")

        if not provider.connection:
            raise ValueError(f"Provider {provider.alias} is not connected")

        connection = provider.connection
        if not connection.token:
            raise ValueError(f"No tokens found for provider {provider.alias}")

        token = connection.token

        # Check if token needs refresh
        if token.needs_refresh:
            logger.info(f"Token for {provider.alias} needs refresh")
            token = await self.refresh_token(provider_id, user_id)

        # Decrypt and return access token
        access_token = self.encryption.decrypt(token.access_token_encrypted)
        return access_token

    async def refresh_token(self, provider_id: UUID, user_id: UUID) -> ProviderToken:
        """Refresh an expired or expiring access token.

        Uses the refresh token to obtain a new access token from the provider.
        Handles token rotation if the provider sends a new refresh token.

        Args:
            provider_id: The provider instance ID.
            user_id: The user requesting the refresh.

        Returns:
            Updated ProviderToken instance.

        Raises:
            ValueError: If no refresh token available.
            Exception: If refresh fails.
        """
        # Get provider and token
        from sqlalchemy.orm import selectinload
        from sqlmodel import select

        result = await self.session.execute(
            select(Provider)
            .options(
                selectinload(Provider.connection).selectinload(ProviderConnection.token)
            )
            .where(Provider.id == provider_id)
        )
        provider = result.scalar_one_or_none()
        if not provider or not provider.connection or not provider.connection.token:
            raise ValueError(f"Invalid provider state for {provider_id}")

        connection = provider.connection
        token = connection.token

        # Check if we have a refresh token
        if not token.refresh_token_encrypted:
            raise ValueError(f"No refresh token available for {provider.alias}")

        # Decrypt refresh token
        refresh_token = self.encryption.decrypt(token.refresh_token_encrypted)

        # Get provider implementation
        provider_impl = ProviderRegistry.create_provider_instance(provider.provider_key)

        try:
            # Call provider's refresh method
            new_tokens = await provider_impl.refresh_authentication(refresh_token)

            # Encrypt new access token
            encrypted_access = self.encryption.encrypt(new_tokens["access_token"])

            # Handle token rotation with improved detection and logging
            encrypted_refresh = (
                token.refresh_token_encrypted
            )  # Keep existing by default
            token_rotated = False
            rotation_type = "none"

            if new_tokens.get("refresh_token"):
                new_refresh = new_tokens["refresh_token"]

                if new_refresh != refresh_token:
                    # Provider sent a NEW refresh token (rotation occurred)
                    encrypted_refresh = self.encryption.encrypt(new_refresh)
                    token_rotated = True
                    rotation_type = "rotated"
                    logger.info(
                        f"Token rotation detected for {provider.alias}: "
                        f"Provider sent new refresh token"
                    )
                else:
                    # Provider sent SAME refresh token (unusual but valid)
                    # Keep existing encrypted version
                    rotation_type = "same_token_returned"
                    logger.debug(
                        f"Provider {provider.alias} returned same refresh token "
                        f"(no rotation)"
                    )
            else:
                # Provider did NOT send refresh token (most common for no rotation)
                rotation_type = "not_included"
                logger.debug(
                    f"Provider {provider.alias} did not include refresh_token in response "
                    f"(no rotation, keeping existing token)"
                )

            # Update token
            token.update_tokens(
                access_token_encrypted=encrypted_access,
                refresh_token_encrypted=encrypted_refresh,
                expires_in=new_tokens.get("expires_in"),
                id_token=new_tokens.get("id_token"),
            )

            # Update connection status
            connection.status = ProviderStatus.ACTIVE
            connection.error_count = 0
            connection.error_message = None

            # Create audit log with rotation details
            audit_log = ProviderAuditLog(
                connection_id=connection.id,
                user_id=user_id,
                action="token_refreshed",
                details={
                    "provider_key": provider.provider_key,
                    "alias": provider.alias,
                    "refresh_count": token.refresh_count,
                    "token_rotated": token_rotated,
                    "rotation_type": rotation_type,  # rotated, not_included, same_token_returned
                },
            )
            self.session.add(audit_log)

            # Flush to persist changes in current transaction
            await self.session.flush()
            logger.info(
                f"Refreshed token for {provider.alias} (refresh #{token.refresh_count})"
            )

            return token

        except Exception as e:
            # Log refresh failure
            error_message = str(e)
            connection.error_count += 1
            connection.error_message = f"Token refresh failed: {error_message}"

            if connection.error_count >= 3:
                connection.status = ProviderStatus.ERROR

            # Create audit log for failure
            audit_log = ProviderAuditLog(
                connection_id=connection.id,
                user_id=user_id,
                action="token_refresh_failed",
                details={
                    "provider_key": provider.provider_key,
                    "alias": provider.alias,
                    "error": error_message,
                    "error_count": connection.error_count,
                },
            )
            self.session.add(audit_log)

            # Flush to persist error state in current transaction
            await self.session.flush()

            logger.error(
                f"Failed to refresh token for {provider.alias}: {error_message}"
            )
            raise Exception(
                f"Token refresh failed for {provider.alias}: {error_message}"
            ) from e

    async def revoke_tokens(
        self,
        provider_id: UUID,
        user_id: UUID,
        request_info: Optional[Dict[str, str]] = None,
    ) -> None:
        """Revoke tokens and disconnect a provider.

        This removes the stored tokens and marks the connection as revoked.
        The provider instance itself is preserved for potential reconnection.

        Args:
            provider_id: The provider instance ID.
            user_id: The user revoking the connection.
            request_info: Optional request metadata.
        """
        # Get provider
        from sqlalchemy.orm import selectinload
        from sqlmodel import select

        result = await self.session.execute(
            select(Provider)
            .options(
                selectinload(Provider.connection).selectinload(ProviderConnection.token)
            )
            .where(Provider.id == provider_id)
        )
        provider = result.scalar_one_or_none()
        if not provider:
            raise ValueError(f"Provider {provider_id} not found")

        if provider.connection:
            connection = provider.connection

            # Delete token if exists
            if connection.token:
                await self.session.delete(connection.token)

            # Update connection status
            connection.status = ProviderStatus.REVOKED
            connection.error_message = "Connection revoked by user"

            # Create audit log
            audit_log = ProviderAuditLog(
                connection_id=connection.id,
                user_id=user_id,
                action="connection_revoked",
                details={
                    "provider_key": provider.provider_key,
                    "alias": provider.alias,
                },
                ip_address=request_info.get("ip_address") if request_info else None,
                user_agent=request_info.get("user_agent") if request_info else None,
            )
            self.session.add(audit_log)

            # Flush to persist changes in current transaction
            await self.session.flush()
            logger.info(f"Revoked tokens for provider {provider.alias}")

    async def get_token_info(self, provider_id: UUID) -> Optional[Dict[str, Any]]:
        """Get information about stored tokens without decrypting.

        Useful for checking token status without exposing sensitive data.

        Args:
            provider_id: The provider instance ID.

        Returns:
            Dictionary with token metadata, or None if no tokens.
        """
        from sqlalchemy.orm import selectinload
        from sqlmodel import select

        result = await self.session.execute(
            select(Provider)
            .options(
                selectinload(Provider.connection).selectinload(ProviderConnection.token)
            )
            .where(Provider.id == provider_id)
        )
        provider = result.scalar_one_or_none()
        if not provider or not provider.connection or not provider.connection.token:
            return None

        token = provider.connection.token

        return {
            "has_access_token": bool(token.access_token_encrypted),
            "has_refresh_token": bool(token.refresh_token_encrypted),
            "has_id_token": bool(token.id_token),
            "expires_at": token.expires_at.isoformat() if token.expires_at else None,
            "is_expired": token.is_expired,
            "is_expiring_soon": token.is_expiring_soon,
            "needs_refresh": token.needs_refresh,
            "refresh_count": token.refresh_count,
            "last_refreshed_at": token.last_refreshed_at.isoformat()
            if token.last_refreshed_at
            else None,
            "scope": token.scope,
        }

Functions

__init__

__init__(session: AsyncSession)

Initialize the token service.

Parameters:

Name Type Description Default
session AsyncSession

Database session for operations.

required
Source code in src/services/token_service.py
def __init__(self, session: AsyncSession):
    """Initialize the token service.

    Args:
        session: Database session for operations.
    """
    self.session = session
    self.encryption = get_encryption_service()

store_initial_tokens async

store_initial_tokens(
    provider_id: UUID,
    tokens: Dict[str, Any],
    user_id: UUID,
    request_info: Optional[Dict[str, str]] = None,
) -> ProviderToken

Store initial tokens after successful OAuth authentication.

This method is called after the OAuth callback to store the initial tokens received from the provider.

Parameters:

Name Type Description Default
provider_id UUID

The provider instance ID.

required
tokens Dict[str, Any]

Token response from provider containing: - access_token: The access token - refresh_token: Optional refresh token - expires_in: Token lifetime in seconds - id_token: Optional JWT ID token - scope: Optional granted scopes

required
user_id UUID

User who authorized the connection.

required
request_info Optional[Dict[str, str]]

Optional request metadata (IP, user agent).

None

Returns:

Type Description
ProviderToken

The created ProviderToken instance.

Raises:

Type Description
ValueError

If provider or connection not found.

Source code in src/services/token_service.py
async def store_initial_tokens(
    self,
    provider_id: UUID,
    tokens: Dict[str, Any],
    user_id: UUID,
    request_info: Optional[Dict[str, str]] = None,
) -> ProviderToken:
    """Store initial tokens after successful OAuth authentication.

    This method is called after the OAuth callback to store the initial
    tokens received from the provider.

    Args:
        provider_id: The provider instance ID.
        tokens: Token response from provider containing:
            - access_token: The access token
            - refresh_token: Optional refresh token
            - expires_in: Token lifetime in seconds
            - id_token: Optional JWT ID token
            - scope: Optional granted scopes
        user_id: User who authorized the connection.
        request_info: Optional request metadata (IP, user agent).

    Returns:
        The created ProviderToken instance.

    Raises:
        ValueError: If provider or connection not found.
    """
    # Get provider and connection
    from sqlalchemy.orm import selectinload
    from sqlmodel import select

    # Load provider with connection and token relationships
    result = await self.session.execute(
        select(Provider)
        .options(
            selectinload(Provider.connection).selectinload(ProviderConnection.token)
        )
        .where(Provider.id == provider_id)
    )
    provider = result.scalar_one_or_none()
    if not provider:
        raise ValueError(f"Provider {provider_id} not found")

    if not provider.connection:
        # Create connection if it doesn't exist
        connection = ProviderConnection(
            provider_id=provider_id, status=ProviderStatus.PENDING
        )
        self.session.add(connection)
        await self.session.flush()
    else:
        connection = provider.connection

    # Check if token already exists
    existing_token = connection.token if connection else None

    # Encrypt tokens
    encrypted_access = self.encryption.encrypt(tokens["access_token"])
    encrypted_refresh = None
    if tokens.get("refresh_token"):
        encrypted_refresh = self.encryption.encrypt(tokens["refresh_token"])

    # Calculate expiration
    expires_at = None
    if tokens.get("expires_in"):
        expires_at = datetime.now(timezone.utc) + timedelta(
            seconds=int(tokens["expires_in"])
        )

    if existing_token:
        # Update existing token
        existing_token.access_token_encrypted = encrypted_access
        if encrypted_refresh:
            existing_token.refresh_token_encrypted = encrypted_refresh
        existing_token.expires_at = expires_at
        existing_token.id_token = tokens.get("id_token")
        existing_token.scope = tokens.get("scope")
        existing_token.updated_at = datetime.now(timezone.utc)
        token = existing_token
        action = "token_updated"
    else:
        # Create new token
        token = ProviderToken(
            connection_id=connection.id,
            access_token_encrypted=encrypted_access,
            refresh_token_encrypted=encrypted_refresh,
            expires_at=expires_at,
            id_token=tokens.get("id_token"),
            token_type=tokens.get("token_type", "Bearer"),
            scope=tokens.get("scope"),
        )
        self.session.add(token)
        action = "token_created"

    # Update connection status
    connection.mark_connected()

    # Create audit log
    audit_log = ProviderAuditLog(
        connection_id=connection.id,
        user_id=user_id,
        action=action,
        details={
            "provider_key": provider.provider_key,
            "alias": provider.alias,
            "has_refresh_token": bool(encrypted_refresh),
            "expires_in": tokens.get("expires_in"),
            "scope": tokens.get("scope"),
        },
        ip_address=request_info.get("ip_address") if request_info else None,
        user_agent=request_info.get("user_agent") if request_info else None,
    )
    self.session.add(audit_log)

    # Flush to persist changes in current transaction
    await self.session.flush()
    logger.info(f"Stored tokens for provider {provider.alias} (ID: {provider_id})")

    return token

get_valid_access_token async

get_valid_access_token(
    provider_id: UUID, user_id: UUID
) -> str

Get a valid access token for a provider, refreshing if necessary.

This is the main method used by providers to get tokens. It handles: - Decryption of stored tokens - Checking expiration - Automatic refresh if expired or expiring soon - Audit logging

Parameters:

Name Type Description Default
provider_id UUID

The provider instance ID.

required
user_id UUID

The user requesting the token.

required

Returns:

Type Description
str

Valid decrypted access token.

Raises:

Type Description
ValueError

If provider, connection, or token not found.

Exception

If token refresh fails.

Source code in src/services/token_service.py
async def get_valid_access_token(self, provider_id: UUID, user_id: UUID) -> str:
    """Get a valid access token for a provider, refreshing if necessary.

    This is the main method used by providers to get tokens. It handles:
    - Decryption of stored tokens
    - Checking expiration
    - Automatic refresh if expired or expiring soon
    - Audit logging

    Args:
        provider_id: The provider instance ID.
        user_id: The user requesting the token.

    Returns:
        Valid decrypted access token.

    Raises:
        ValueError: If provider, connection, or token not found.
        Exception: If token refresh fails.
    """
    # Get provider with connection and token
    from sqlalchemy.orm import selectinload
    from sqlmodel import select

    result = await self.session.execute(
        select(Provider)
        .options(
            selectinload(Provider.connection).selectinload(ProviderConnection.token)
        )
        .where(Provider.id == provider_id)
    )
    provider = result.scalar_one_or_none()
    if not provider:
        raise ValueError(f"Provider {provider_id} not found")

    if not provider.connection:
        raise ValueError(f"Provider {provider.alias} is not connected")

    connection = provider.connection
    if not connection.token:
        raise ValueError(f"No tokens found for provider {provider.alias}")

    token = connection.token

    # Check if token needs refresh
    if token.needs_refresh:
        logger.info(f"Token for {provider.alias} needs refresh")
        token = await self.refresh_token(provider_id, user_id)

    # Decrypt and return access token
    access_token = self.encryption.decrypt(token.access_token_encrypted)
    return access_token

refresh_token async

refresh_token(
    provider_id: UUID, user_id: UUID
) -> ProviderToken

Refresh an expired or expiring access token.

Uses the refresh token to obtain a new access token from the provider. Handles token rotation if the provider sends a new refresh token.

Parameters:

Name Type Description Default
provider_id UUID

The provider instance ID.

required
user_id UUID

The user requesting the refresh.

required

Returns:

Type Description
ProviderToken

Updated ProviderToken instance.

Raises:

Type Description
ValueError

If no refresh token available.

Exception

If refresh fails.

Source code in src/services/token_service.py
async def refresh_token(self, provider_id: UUID, user_id: UUID) -> ProviderToken:
    """Refresh an expired or expiring access token.

    Uses the refresh token to obtain a new access token from the provider.
    Handles token rotation if the provider sends a new refresh token.

    Args:
        provider_id: The provider instance ID.
        user_id: The user requesting the refresh.

    Returns:
        Updated ProviderToken instance.

    Raises:
        ValueError: If no refresh token available.
        Exception: If refresh fails.
    """
    # Get provider and token
    from sqlalchemy.orm import selectinload
    from sqlmodel import select

    result = await self.session.execute(
        select(Provider)
        .options(
            selectinload(Provider.connection).selectinload(ProviderConnection.token)
        )
        .where(Provider.id == provider_id)
    )
    provider = result.scalar_one_or_none()
    if not provider or not provider.connection or not provider.connection.token:
        raise ValueError(f"Invalid provider state for {provider_id}")

    connection = provider.connection
    token = connection.token

    # Check if we have a refresh token
    if not token.refresh_token_encrypted:
        raise ValueError(f"No refresh token available for {provider.alias}")

    # Decrypt refresh token
    refresh_token = self.encryption.decrypt(token.refresh_token_encrypted)

    # Get provider implementation
    provider_impl = ProviderRegistry.create_provider_instance(provider.provider_key)

    try:
        # Call provider's refresh method
        new_tokens = await provider_impl.refresh_authentication(refresh_token)

        # Encrypt new access token
        encrypted_access = self.encryption.encrypt(new_tokens["access_token"])

        # Handle token rotation with improved detection and logging
        encrypted_refresh = (
            token.refresh_token_encrypted
        )  # Keep existing by default
        token_rotated = False
        rotation_type = "none"

        if new_tokens.get("refresh_token"):
            new_refresh = new_tokens["refresh_token"]

            if new_refresh != refresh_token:
                # Provider sent a NEW refresh token (rotation occurred)
                encrypted_refresh = self.encryption.encrypt(new_refresh)
                token_rotated = True
                rotation_type = "rotated"
                logger.info(
                    f"Token rotation detected for {provider.alias}: "
                    f"Provider sent new refresh token"
                )
            else:
                # Provider sent SAME refresh token (unusual but valid)
                # Keep existing encrypted version
                rotation_type = "same_token_returned"
                logger.debug(
                    f"Provider {provider.alias} returned same refresh token "
                    f"(no rotation)"
                )
        else:
            # Provider did NOT send refresh token (most common for no rotation)
            rotation_type = "not_included"
            logger.debug(
                f"Provider {provider.alias} did not include refresh_token in response "
                f"(no rotation, keeping existing token)"
            )

        # Update token
        token.update_tokens(
            access_token_encrypted=encrypted_access,
            refresh_token_encrypted=encrypted_refresh,
            expires_in=new_tokens.get("expires_in"),
            id_token=new_tokens.get("id_token"),
        )

        # Update connection status
        connection.status = ProviderStatus.ACTIVE
        connection.error_count = 0
        connection.error_message = None

        # Create audit log with rotation details
        audit_log = ProviderAuditLog(
            connection_id=connection.id,
            user_id=user_id,
            action="token_refreshed",
            details={
                "provider_key": provider.provider_key,
                "alias": provider.alias,
                "refresh_count": token.refresh_count,
                "token_rotated": token_rotated,
                "rotation_type": rotation_type,  # rotated, not_included, same_token_returned
            },
        )
        self.session.add(audit_log)

        # Flush to persist changes in current transaction
        await self.session.flush()
        logger.info(
            f"Refreshed token for {provider.alias} (refresh #{token.refresh_count})"
        )

        return token

    except Exception as e:
        # Log refresh failure
        error_message = str(e)
        connection.error_count += 1
        connection.error_message = f"Token refresh failed: {error_message}"

        if connection.error_count >= 3:
            connection.status = ProviderStatus.ERROR

        # Create audit log for failure
        audit_log = ProviderAuditLog(
            connection_id=connection.id,
            user_id=user_id,
            action="token_refresh_failed",
            details={
                "provider_key": provider.provider_key,
                "alias": provider.alias,
                "error": error_message,
                "error_count": connection.error_count,
            },
        )
        self.session.add(audit_log)

        # Flush to persist error state in current transaction
        await self.session.flush()

        logger.error(
            f"Failed to refresh token for {provider.alias}: {error_message}"
        )
        raise Exception(
            f"Token refresh failed for {provider.alias}: {error_message}"
        ) from e

revoke_tokens async

revoke_tokens(
    provider_id: UUID,
    user_id: UUID,
    request_info: Optional[Dict[str, str]] = None,
) -> None

Revoke tokens and disconnect a provider.

This removes the stored tokens and marks the connection as revoked. The provider instance itself is preserved for potential reconnection.

Parameters:

Name Type Description Default
provider_id UUID

The provider instance ID.

required
user_id UUID

The user revoking the connection.

required
request_info Optional[Dict[str, str]]

Optional request metadata.

None
Source code in src/services/token_service.py
async def revoke_tokens(
    self,
    provider_id: UUID,
    user_id: UUID,
    request_info: Optional[Dict[str, str]] = None,
) -> None:
    """Revoke tokens and disconnect a provider.

    This removes the stored tokens and marks the connection as revoked.
    The provider instance itself is preserved for potential reconnection.

    Args:
        provider_id: The provider instance ID.
        user_id: The user revoking the connection.
        request_info: Optional request metadata.
    """
    # Get provider
    from sqlalchemy.orm import selectinload
    from sqlmodel import select

    result = await self.session.execute(
        select(Provider)
        .options(
            selectinload(Provider.connection).selectinload(ProviderConnection.token)
        )
        .where(Provider.id == provider_id)
    )
    provider = result.scalar_one_or_none()
    if not provider:
        raise ValueError(f"Provider {provider_id} not found")

    if provider.connection:
        connection = provider.connection

        # Delete token if exists
        if connection.token:
            await self.session.delete(connection.token)

        # Update connection status
        connection.status = ProviderStatus.REVOKED
        connection.error_message = "Connection revoked by user"

        # Create audit log
        audit_log = ProviderAuditLog(
            connection_id=connection.id,
            user_id=user_id,
            action="connection_revoked",
            details={
                "provider_key": provider.provider_key,
                "alias": provider.alias,
            },
            ip_address=request_info.get("ip_address") if request_info else None,
            user_agent=request_info.get("user_agent") if request_info else None,
        )
        self.session.add(audit_log)

        # Flush to persist changes in current transaction
        await self.session.flush()
        logger.info(f"Revoked tokens for provider {provider.alias}")

get_token_info async

get_token_info(
    provider_id: UUID,
) -> Optional[Dict[str, Any]]

Get information about stored tokens without decrypting.

Useful for checking token status without exposing sensitive data.

Parameters:

Name Type Description Default
provider_id UUID

The provider instance ID.

required

Returns:

Type Description
Optional[Dict[str, Any]]

Dictionary with token metadata, or None if no tokens.

Source code in src/services/token_service.py
async def get_token_info(self, provider_id: UUID) -> Optional[Dict[str, Any]]:
    """Get information about stored tokens without decrypting.

    Useful for checking token status without exposing sensitive data.

    Args:
        provider_id: The provider instance ID.

    Returns:
        Dictionary with token metadata, or None if no tokens.
    """
    from sqlalchemy.orm import selectinload
    from sqlmodel import select

    result = await self.session.execute(
        select(Provider)
        .options(
            selectinload(Provider.connection).selectinload(ProviderConnection.token)
        )
        .where(Provider.id == provider_id)
    )
    provider = result.scalar_one_or_none()
    if not provider or not provider.connection or not provider.connection.token:
        return None

    token = provider.connection.token

    return {
        "has_access_token": bool(token.access_token_encrypted),
        "has_refresh_token": bool(token.refresh_token_encrypted),
        "has_id_token": bool(token.id_token),
        "expires_at": token.expires_at.isoformat() if token.expires_at else None,
        "is_expired": token.is_expired,
        "is_expiring_soon": token.is_expiring_soon,
        "needs_refresh": token.needs_refresh,
        "refresh_count": token.refresh_count,
        "last_refreshed_at": token.last_refreshed_at.isoformat()
        if token.last_refreshed_at
        else None,
        "scope": token.scope,
    }

JWT Service

src.services.jwt_service.JWTService

Service for JWT token operations.

This service provides JWT encoding and decoding functionality using python-jose. All methods are synchronous because JWT operations are pure CPU work with no I/O operations.

Token Types
  • Access Token: Short-lived (30 min default), used for API authentication
  • Refresh Token: Long-lived (30 days default), used to get new access tokens
Token Claims
  • sub (subject): User ID
  • email: User email address
  • type: Token type (access/refresh)
  • exp (expiration): Token expiration timestamp
  • iat (issued at): Token creation timestamp
  • jti (JWT ID): Unique token identifier (for refresh tokens)

Attributes:

Name Type Description
secret_key

Secret key for signing tokens

algorithm

Signing algorithm (HS256)

access_token_expire_minutes

Access token TTL

refresh_token_expire_days

Refresh token TTL

Source code in src/services/jwt_service.py
class JWTService:
    """Service for JWT token operations.

    This service provides JWT encoding and decoding functionality
    using python-jose. All methods are synchronous because JWT
    operations are pure CPU work with no I/O operations.

    Token Types:
        - Access Token: Short-lived (30 min default), used for API authentication
        - Refresh Token: Long-lived (30 days default), used to get new access tokens

    Token Claims:
        - sub (subject): User ID
        - email: User email address
        - type: Token type (access/refresh)
        - exp (expiration): Token expiration timestamp
        - iat (issued at): Token creation timestamp
        - jti (JWT ID): Unique token identifier (for refresh tokens)

    Attributes:
        secret_key: Secret key for signing tokens
        algorithm: Signing algorithm (HS256)
        access_token_expire_minutes: Access token TTL
        refresh_token_expire_days: Refresh token TTL
    """

    def __init__(self):
        """Initialize JWT service with configuration from settings."""
        settings = get_settings()
        self.secret_key = settings.SECRET_KEY
        self.algorithm = settings.ALGORITHM
        self.access_token_expire_minutes = settings.ACCESS_TOKEN_EXPIRE_MINUTES
        self.refresh_token_expire_days = settings.REFRESH_TOKEN_EXPIRE_DAYS

    def create_access_token(
        self,
        user_id: UUID,
        email: str,
        refresh_token_id: Optional[UUID] = None,
        token_version: Optional[int] = None,
        additional_claims: Optional[Dict[str, Any]] = None,
    ) -> str:
        """Create a new access token for user authentication.

        Access tokens are short-lived (default: 30 minutes) and used
        for authenticating API requests. They contain user identity
        information but should not contain sensitive data.

        Args:
            user_id: Unique user identifier
            email: User's email address
            refresh_token_id: Optional refresh token UUID (adds jti claim for session management)
            token_version: Optional user token version (for rotation validation)
            additional_claims: Optional additional claims to include

        Returns:
            Encoded JWT access token string

        Example:
            >>> service = JWTService()
            >>> from uuid import uuid4
            >>> user_id = uuid4()
            >>> token = service.create_access_token(user_id, "user@example.com", token_version=1)
            >>> len(token) > 0
            True

        Notes:
            - Adding refresh_token_id as jti enables current session detection
            - Adding token_version enables rotation-based invalidation
            - Existing tokens without jti/version still work (graceful degradation)
            - jti = JWT ID (RFC 7519 standard claim)
        """
        now = datetime.now(UTC)
        expire = now + timedelta(minutes=self.access_token_expire_minutes)

        # Base claims for access token
        claims = {
            "sub": str(user_id),
            "email": email,
            "type": "access",
            "exp": expire,
            "iat": now,
        }

        # Add token version if provided (for rotation validation)
        if token_version is not None:
            claims["version"] = token_version

        # Add jti claim if refresh_token_id provided (links access token to session)
        if refresh_token_id:
            claims["jti"] = str(refresh_token_id)

        # Add any additional claims
        if additional_claims:
            claims.update(additional_claims)

        # Encode and return token
        return jwt.encode(claims, self.secret_key, algorithm=self.algorithm)

    def create_refresh_token(self, user_id: UUID, jti: UUID) -> str:
        """Create a new refresh token (DEPRECATED).

        **DEPRECATED**: This method is deprecated. The application now uses
        opaque (random) refresh tokens instead of JWT refresh tokens.
        This follows industry standard Pattern A (JWT access + opaque refresh).

        Refresh tokens should be generated using secrets.token_urlsafe()
        and stored as hashed values in the database. See AuthService._create_refresh_token().

        This method is kept for backwards compatibility with existing tests only.

        Args:
            user_id: Unique user identifier
            jti: Unique token identifier

        Returns:
            Encoded JWT refresh token string (for testing only)

        Note:
            DO NOT use this for production authentication flows.
            Use AuthService._create_refresh_token() instead.
        """
        now = datetime.now(UTC)
        expire = now + timedelta(days=self.refresh_token_expire_days)

        claims = {
            "sub": str(user_id),
            "type": "refresh",
            "jti": str(jti),  # Unique token ID for database tracking
            "exp": expire,
            "iat": now,
        }

        return jwt.encode(claims, self.secret_key, algorithm=self.algorithm)

    def decode_token(self, token: str) -> Dict[str, Any]:
        """Decode and validate a JWT token.

        This method decodes the token, verifies its signature, and
        checks that it hasn't expired. If validation fails, a
        JWTError exception is raised.

        Args:
            token: Encoded JWT token string

        Returns:
            Dictionary of token claims (sub, email, type, exp, etc.)

        Raises:
            JWTError: If token is invalid, expired, or signature doesn't match

        Example:
            >>> service = JWTService()
            >>> from uuid import uuid4
            >>> user_id = uuid4()
            >>> token = service.create_access_token(user_id, "user@example.com")
            >>> claims = service.decode_token(token)
            >>> claims["email"]
            'user@example.com'
            >>> claims["type"]
            'access'
        """
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            return payload
        except JWTError as e:
            # Re-raise with more context
            raise JWTError(f"Token validation failed: {str(e)}")

    def verify_token_type(self, token: str, expected_type: str) -> Dict[str, Any]:
        """Decode token and verify it's of the expected type.

        Use this to ensure tokens are being used correctly:
        - Access tokens for API authentication
        - Refresh tokens for token refresh

        Args:
            token: Encoded JWT token string
            expected_type: Expected token type ("access" or "refresh")

        Returns:
            Dictionary of token claims

        Raises:
            JWTError: If token is invalid or wrong type

        Example:
            >>> service = JWTService()
            >>> from uuid import uuid4
            >>> user_id = uuid4()
            >>> token = service.create_access_token(user_id, "user@example.com")
            >>> claims = service.verify_token_type(token, "access")
            >>> claims["type"]
            'access'

            >>> service.verify_token_type(token, "refresh")
            Traceback (most recent call last):
            ...
            jose.exceptions.JWTError: Invalid token type: expected refresh, got access
        """
        payload = self.decode_token(token)

        token_type = payload.get("type")
        if token_type != expected_type:
            raise JWTError(
                f"Invalid token type: expected {expected_type}, got {token_type}"
            )

        return payload

    def get_user_id_from_token(self, token: str) -> UUID:
        """Extract user ID from a valid token.

        Convenience method to get user ID without manually parsing claims.

        Args:
            token: Encoded JWT token string

        Returns:
            User ID as UUID

        Raises:
            JWTError: If token is invalid or missing subject claim

        Example:
            >>> service = JWTService()
            >>> from uuid import uuid4
            >>> user_id = uuid4()
            >>> token = service.create_access_token(user_id, "user@example.com")
            >>> extracted_id = service.get_user_id_from_token(token)
            >>> extracted_id == user_id
            True
        """
        payload = self.decode_token(token)

        user_id_str = payload.get("sub")
        if not user_id_str:
            raise JWTError("Token missing subject (user ID) claim")

        try:
            return UUID(user_id_str)
        except ValueError as e:
            raise JWTError(f"Invalid user ID format in token: {str(e)}")

    def get_token_jti(self, token: str) -> UUID:
        """Extract JWT ID (jti) from a refresh token.

        The jti claim is used to track refresh tokens in the database
        and enable token revocation.

        Args:
            token: Encoded refresh token string

        Returns:
            Token ID as UUID

        Raises:
            JWTError: If token is invalid or missing jti claim

        Example:
            >>> service = JWTService()
            >>> from uuid import uuid4
            >>> user_id = uuid4()
            >>> jti = uuid4()
            >>> token = service.create_refresh_token(user_id, jti)
            >>> extracted_jti = service.get_token_jti(token)
            >>> extracted_jti == jti
            True
        """
        # Verify it's a refresh token
        payload = self.verify_token_type(token, "refresh")

        jti_str = payload.get("jti")
        if not jti_str:
            raise JWTError("Refresh token missing jti (token ID) claim")

        try:
            return UUID(jti_str)
        except ValueError as e:
            raise JWTError(f"Invalid token ID format: {str(e)}")

    def is_token_expired(self, token: str) -> bool:
        """Check if a token is expired without raising an exception.

        Useful for checking token status without catching exceptions.

        Args:
            token: Encoded JWT token string

        Returns:
            True if token is expired, False if still valid

        Note:
            If the token is malformed or has invalid signature,
            this will return True (treating invalid as expired).

        Example:
            >>> service = JWTService()
            >>> from uuid import uuid4
            >>> user_id = uuid4()
            >>> token = service.create_access_token(user_id, "user@example.com")
            >>> service.is_token_expired(token)
            False
        """
        try:
            payload = self.decode_token(token)

            # Check if exp claim exists
            exp = payload.get("exp")
            if not exp:
                return True

            # Compare expiration time with current time
            exp_datetime = datetime.fromtimestamp(exp, UTC)
            return datetime.now(UTC) >= exp_datetime

        except JWTError:
            # Treat any decode error as expired
            return True

    def get_token_expiration(self, token: str) -> Optional[datetime]:
        """Get the expiration time of a token.

        Args:
            token: Encoded JWT token string

        Returns:
            Expiration datetime, or None if token is invalid

        Example:
            >>> service = JWTService()
            >>> from uuid import uuid4
            >>> user_id = uuid4()
            >>> token = service.create_access_token(user_id, "user@example.com")
            >>> exp = service.get_token_expiration(token)
            >>> exp is not None
            True
        """
        try:
            payload = self.decode_token(token)
            exp = payload.get("exp")

            if exp:
                return datetime.fromtimestamp(exp, UTC)
            return None

        except JWTError:
            return None

Functions

__init__

__init__()

Initialize JWT service with configuration from settings.

Source code in src/services/jwt_service.py
def __init__(self):
    """Initialize JWT service with configuration from settings."""
    settings = get_settings()
    self.secret_key = settings.SECRET_KEY
    self.algorithm = settings.ALGORITHM
    self.access_token_expire_minutes = settings.ACCESS_TOKEN_EXPIRE_MINUTES
    self.refresh_token_expire_days = settings.REFRESH_TOKEN_EXPIRE_DAYS

create_access_token

create_access_token(
    user_id: UUID,
    email: str,
    refresh_token_id: Optional[UUID] = None,
    token_version: Optional[int] = None,
    additional_claims: Optional[Dict[str, Any]] = None,
) -> str

Create a new access token for user authentication.

Access tokens are short-lived (default: 30 minutes) and used for authenticating API requests. They contain user identity information but should not contain sensitive data.

Parameters:

Name Type Description Default
user_id UUID

Unique user identifier

required
email str

User's email address

required
refresh_token_id Optional[UUID]

Optional refresh token UUID (adds jti claim for session management)

None
token_version Optional[int]

Optional user token version (for rotation validation)

None
additional_claims Optional[Dict[str, Any]]

Optional additional claims to include

None

Returns:

Type Description
str

Encoded JWT access token string

Example

service = JWTService() from uuid import uuid4 user_id = uuid4() token = service.create_access_token(user_id, "user@example.com", token_version=1) len(token) > 0 True

Notes
  • Adding refresh_token_id as jti enables current session detection
  • Adding token_version enables rotation-based invalidation
  • Existing tokens without jti/version still work (graceful degradation)
  • jti = JWT ID (RFC 7519 standard claim)
Source code in src/services/jwt_service.py
def create_access_token(
    self,
    user_id: UUID,
    email: str,
    refresh_token_id: Optional[UUID] = None,
    token_version: Optional[int] = None,
    additional_claims: Optional[Dict[str, Any]] = None,
) -> str:
    """Create a new access token for user authentication.

    Access tokens are short-lived (default: 30 minutes) and used
    for authenticating API requests. They contain user identity
    information but should not contain sensitive data.

    Args:
        user_id: Unique user identifier
        email: User's email address
        refresh_token_id: Optional refresh token UUID (adds jti claim for session management)
        token_version: Optional user token version (for rotation validation)
        additional_claims: Optional additional claims to include

    Returns:
        Encoded JWT access token string

    Example:
        >>> service = JWTService()
        >>> from uuid import uuid4
        >>> user_id = uuid4()
        >>> token = service.create_access_token(user_id, "user@example.com", token_version=1)
        >>> len(token) > 0
        True

    Notes:
        - Adding refresh_token_id as jti enables current session detection
        - Adding token_version enables rotation-based invalidation
        - Existing tokens without jti/version still work (graceful degradation)
        - jti = JWT ID (RFC 7519 standard claim)
    """
    now = datetime.now(UTC)
    expire = now + timedelta(minutes=self.access_token_expire_minutes)

    # Base claims for access token
    claims = {
        "sub": str(user_id),
        "email": email,
        "type": "access",
        "exp": expire,
        "iat": now,
    }

    # Add token version if provided (for rotation validation)
    if token_version is not None:
        claims["version"] = token_version

    # Add jti claim if refresh_token_id provided (links access token to session)
    if refresh_token_id:
        claims["jti"] = str(refresh_token_id)

    # Add any additional claims
    if additional_claims:
        claims.update(additional_claims)

    # Encode and return token
    return jwt.encode(claims, self.secret_key, algorithm=self.algorithm)

create_refresh_token

create_refresh_token(user_id: UUID, jti: UUID) -> str

Create a new refresh token (DEPRECATED).

DEPRECATED: This method is deprecated. The application now uses opaque (random) refresh tokens instead of JWT refresh tokens. This follows industry standard Pattern A (JWT access + opaque refresh).

Refresh tokens should be generated using secrets.token_urlsafe() and stored as hashed values in the database. See AuthService._create_refresh_token().

This method is kept for backwards compatibility with existing tests only.

Parameters:

Name Type Description Default
user_id UUID

Unique user identifier

required
jti UUID

Unique token identifier

required

Returns:

Type Description
str

Encoded JWT refresh token string (for testing only)

Note

DO NOT use this for production authentication flows. Use AuthService._create_refresh_token() instead.

Source code in src/services/jwt_service.py
def create_refresh_token(self, user_id: UUID, jti: UUID) -> str:
    """Create a new refresh token (DEPRECATED).

    **DEPRECATED**: This method is deprecated. The application now uses
    opaque (random) refresh tokens instead of JWT refresh tokens.
    This follows industry standard Pattern A (JWT access + opaque refresh).

    Refresh tokens should be generated using secrets.token_urlsafe()
    and stored as hashed values in the database. See AuthService._create_refresh_token().

    This method is kept for backwards compatibility with existing tests only.

    Args:
        user_id: Unique user identifier
        jti: Unique token identifier

    Returns:
        Encoded JWT refresh token string (for testing only)

    Note:
        DO NOT use this for production authentication flows.
        Use AuthService._create_refresh_token() instead.
    """
    now = datetime.now(UTC)
    expire = now + timedelta(days=self.refresh_token_expire_days)

    claims = {
        "sub": str(user_id),
        "type": "refresh",
        "jti": str(jti),  # Unique token ID for database tracking
        "exp": expire,
        "iat": now,
    }

    return jwt.encode(claims, self.secret_key, algorithm=self.algorithm)

decode_token

decode_token(token: str) -> Dict[str, Any]

Decode and validate a JWT token.

This method decodes the token, verifies its signature, and checks that it hasn't expired. If validation fails, a JWTError exception is raised.

Parameters:

Name Type Description Default
token str

Encoded JWT token string

required

Returns:

Type Description
Dict[str, Any]

Dictionary of token claims (sub, email, type, exp, etc.)

Raises:

Type Description
JWTError

If token is invalid, expired, or signature doesn't match

Example

service = JWTService() from uuid import uuid4 user_id = uuid4() token = service.create_access_token(user_id, "user@example.com") claims = service.decode_token(token) claims["email"] 'user@example.com' claims["type"] 'access'

Source code in src/services/jwt_service.py
def decode_token(self, token: str) -> Dict[str, Any]:
    """Decode and validate a JWT token.

    This method decodes the token, verifies its signature, and
    checks that it hasn't expired. If validation fails, a
    JWTError exception is raised.

    Args:
        token: Encoded JWT token string

    Returns:
        Dictionary of token claims (sub, email, type, exp, etc.)

    Raises:
        JWTError: If token is invalid, expired, or signature doesn't match

    Example:
        >>> service = JWTService()
        >>> from uuid import uuid4
        >>> user_id = uuid4()
        >>> token = service.create_access_token(user_id, "user@example.com")
        >>> claims = service.decode_token(token)
        >>> claims["email"]
        'user@example.com'
        >>> claims["type"]
        'access'
    """
    try:
        payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
        return payload
    except JWTError as e:
        # Re-raise with more context
        raise JWTError(f"Token validation failed: {str(e)}")

verify_token_type

verify_token_type(
    token: str, expected_type: str
) -> Dict[str, Any]

Decode token and verify it's of the expected type.

Use this to ensure tokens are being used correctly: - Access tokens for API authentication - Refresh tokens for token refresh

Parameters:

Name Type Description Default
token str

Encoded JWT token string

required
expected_type str

Expected token type ("access" or "refresh")

required

Returns:

Type Description
Dict[str, Any]

Dictionary of token claims

Raises:

Type Description
JWTError

If token is invalid or wrong type

Example

service = JWTService() from uuid import uuid4 user_id = uuid4() token = service.create_access_token(user_id, "user@example.com") claims = service.verify_token_type(token, "access") claims["type"] 'access'

service.verify_token_type(token, "refresh") Traceback (most recent call last): ... jose.exceptions.JWTError: Invalid token type: expected refresh, got access

Source code in src/services/jwt_service.py
def verify_token_type(self, token: str, expected_type: str) -> Dict[str, Any]:
    """Decode token and verify it's of the expected type.

    Use this to ensure tokens are being used correctly:
    - Access tokens for API authentication
    - Refresh tokens for token refresh

    Args:
        token: Encoded JWT token string
        expected_type: Expected token type ("access" or "refresh")

    Returns:
        Dictionary of token claims

    Raises:
        JWTError: If token is invalid or wrong type

    Example:
        >>> service = JWTService()
        >>> from uuid import uuid4
        >>> user_id = uuid4()
        >>> token = service.create_access_token(user_id, "user@example.com")
        >>> claims = service.verify_token_type(token, "access")
        >>> claims["type"]
        'access'

        >>> service.verify_token_type(token, "refresh")
        Traceback (most recent call last):
        ...
        jose.exceptions.JWTError: Invalid token type: expected refresh, got access
    """
    payload = self.decode_token(token)

    token_type = payload.get("type")
    if token_type != expected_type:
        raise JWTError(
            f"Invalid token type: expected {expected_type}, got {token_type}"
        )

    return payload

get_user_id_from_token

get_user_id_from_token(token: str) -> UUID

Extract user ID from a valid token.

Convenience method to get user ID without manually parsing claims.

Parameters:

Name Type Description Default
token str

Encoded JWT token string

required

Returns:

Type Description
UUID

User ID as UUID

Raises:

Type Description
JWTError

If token is invalid or missing subject claim

Example

service = JWTService() from uuid import uuid4 user_id = uuid4() token = service.create_access_token(user_id, "user@example.com") extracted_id = service.get_user_id_from_token(token) extracted_id == user_id True

Source code in src/services/jwt_service.py
def get_user_id_from_token(self, token: str) -> UUID:
    """Extract user ID from a valid token.

    Convenience method to get user ID without manually parsing claims.

    Args:
        token: Encoded JWT token string

    Returns:
        User ID as UUID

    Raises:
        JWTError: If token is invalid or missing subject claim

    Example:
        >>> service = JWTService()
        >>> from uuid import uuid4
        >>> user_id = uuid4()
        >>> token = service.create_access_token(user_id, "user@example.com")
        >>> extracted_id = service.get_user_id_from_token(token)
        >>> extracted_id == user_id
        True
    """
    payload = self.decode_token(token)

    user_id_str = payload.get("sub")
    if not user_id_str:
        raise JWTError("Token missing subject (user ID) claim")

    try:
        return UUID(user_id_str)
    except ValueError as e:
        raise JWTError(f"Invalid user ID format in token: {str(e)}")

get_token_jti

get_token_jti(token: str) -> UUID

Extract JWT ID (jti) from a refresh token.

The jti claim is used to track refresh tokens in the database and enable token revocation.

Parameters:

Name Type Description Default
token str

Encoded refresh token string

required

Returns:

Type Description
UUID

Token ID as UUID

Raises:

Type Description
JWTError

If token is invalid or missing jti claim

Example

service = JWTService() from uuid import uuid4 user_id = uuid4() jti = uuid4() token = service.create_refresh_token(user_id, jti) extracted_jti = service.get_token_jti(token) extracted_jti == jti True

Source code in src/services/jwt_service.py
def get_token_jti(self, token: str) -> UUID:
    """Extract JWT ID (jti) from a refresh token.

    The jti claim is used to track refresh tokens in the database
    and enable token revocation.

    Args:
        token: Encoded refresh token string

    Returns:
        Token ID as UUID

    Raises:
        JWTError: If token is invalid or missing jti claim

    Example:
        >>> service = JWTService()
        >>> from uuid import uuid4
        >>> user_id = uuid4()
        >>> jti = uuid4()
        >>> token = service.create_refresh_token(user_id, jti)
        >>> extracted_jti = service.get_token_jti(token)
        >>> extracted_jti == jti
        True
    """
    # Verify it's a refresh token
    payload = self.verify_token_type(token, "refresh")

    jti_str = payload.get("jti")
    if not jti_str:
        raise JWTError("Refresh token missing jti (token ID) claim")

    try:
        return UUID(jti_str)
    except ValueError as e:
        raise JWTError(f"Invalid token ID format: {str(e)}")

is_token_expired

is_token_expired(token: str) -> bool

Check if a token is expired without raising an exception.

Useful for checking token status without catching exceptions.

Parameters:

Name Type Description Default
token str

Encoded JWT token string

required

Returns:

Type Description
bool

True if token is expired, False if still valid

Note

If the token is malformed or has invalid signature, this will return True (treating invalid as expired).

Example

service = JWTService() from uuid import uuid4 user_id = uuid4() token = service.create_access_token(user_id, "user@example.com") service.is_token_expired(token) False

Source code in src/services/jwt_service.py
def is_token_expired(self, token: str) -> bool:
    """Check if a token is expired without raising an exception.

    Useful for checking token status without catching exceptions.

    Args:
        token: Encoded JWT token string

    Returns:
        True if token is expired, False if still valid

    Note:
        If the token is malformed or has invalid signature,
        this will return True (treating invalid as expired).

    Example:
        >>> service = JWTService()
        >>> from uuid import uuid4
        >>> user_id = uuid4()
        >>> token = service.create_access_token(user_id, "user@example.com")
        >>> service.is_token_expired(token)
        False
    """
    try:
        payload = self.decode_token(token)

        # Check if exp claim exists
        exp = payload.get("exp")
        if not exp:
            return True

        # Compare expiration time with current time
        exp_datetime = datetime.fromtimestamp(exp, UTC)
        return datetime.now(UTC) >= exp_datetime

    except JWTError:
        # Treat any decode error as expired
        return True

get_token_expiration

get_token_expiration(token: str) -> Optional[datetime]

Get the expiration time of a token.

Parameters:

Name Type Description Default
token str

Encoded JWT token string

required

Returns:

Type Description
Optional[datetime]

Expiration datetime, or None if token is invalid

Example

service = JWTService() from uuid import uuid4 user_id = uuid4() token = service.create_access_token(user_id, "user@example.com") exp = service.get_token_expiration(token) exp is not None True

Source code in src/services/jwt_service.py
def get_token_expiration(self, token: str) -> Optional[datetime]:
    """Get the expiration time of a token.

    Args:
        token: Encoded JWT token string

    Returns:
        Expiration datetime, or None if token is invalid

    Example:
        >>> service = JWTService()
        >>> from uuid import uuid4
        >>> user_id = uuid4()
        >>> token = service.create_access_token(user_id, "user@example.com")
        >>> exp = service.get_token_expiration(token)
        >>> exp is not None
        True
    """
    try:
        payload = self.decode_token(token)
        exp = payload.get("exp")

        if exp:
            return datetime.fromtimestamp(exp, UTC)
        return None

    except JWTError:
        return None

Password Service

src.services.password_service.PasswordService

Service for password operations using bcrypt.

This service provides secure password hashing and verification using bcrypt with configurable rounds. All methods are synchronous because bcrypt is CPU-bound and passlib is a synchronous library.

Attributes:

Name Type Description
pwd_context

Passlib CryptContext configured with bcrypt

min_length

Minimum password length (default: 8)

require_uppercase

Whether uppercase letter is required (default: True)

require_lowercase

Whether lowercase letter is required (default: True)

require_digit

Whether digit is required (default: True)

require_special

Whether special character is required (default: True)

Source code in src/services/password_service.py
class PasswordService:
    """Service for password operations using bcrypt.

    This service provides secure password hashing and verification using
    bcrypt with configurable rounds. All methods are synchronous because
    bcrypt is CPU-bound and passlib is a synchronous library.

    Attributes:
        pwd_context: Passlib CryptContext configured with bcrypt
        min_length: Minimum password length (default: 8)
        require_uppercase: Whether uppercase letter is required (default: True)
        require_lowercase: Whether lowercase letter is required (default: True)
        require_digit: Whether digit is required (default: True)
        require_special: Whether special character is required (default: True)
    """

    def __init__(self):
        """Initialize password service with bcrypt configuration.

        Bcrypt rounds are loaded from config (default: 12).
        Higher rounds = more secure but slower (exponential).
        - 10 rounds: ~100ms
        - 12 rounds: ~300ms (recommended)
        - 14 rounds: ~1200ms
        """
        settings = get_settings()
        self.bcrypt_rounds = getattr(settings, "BCRYPT_ROUNDS", 12)

        # Password strength requirements
        self.min_length = 8
        self.require_uppercase = True
        self.require_lowercase = True
        self.require_digit = True
        self.require_special = True

        # Special characters allowed in passwords
        self.special_chars = "!@#$%^&*()_+-=[]{}|;:,.<>?"

    def hash_password(self, password: str) -> str:
        """Hash a password using bcrypt.

        This operation is CPU-intensive (~300ms with 12 rounds) but
        synchronous. It can be called directly from async code without
        blocking the event loop improperly.

        Note: Bcrypt has a 72-byte maximum password length. Passwords are
        automatically truncated to 72 bytes before hashing. This is safe
        because 72 bytes provides sufficient entropy.

        Args:
            password: Plain text password to hash

        Returns:
            Hashed password string (includes salt and algorithm info)

        Example:
            >>> service = PasswordService()
            >>> hashed = service.hash_password("SecurePass123!")
            >>> hashed.startswith("$2b$")
            True
        """
        # Bcrypt has a 72-byte limit, truncate if needed
        # This is safe as 72 bytes provides sufficient entropy
        password_bytes = password.encode("utf-8")[:72]
        salt = bcrypt.gensalt(rounds=self.bcrypt_rounds)
        hashed = bcrypt.hashpw(password_bytes, salt)
        return hashed.decode("utf-8")

    def verify_password(self, plain_password: str, hashed_password: str) -> bool:
        """Verify a password against its hash.

        Uses constant-time comparison to prevent timing attacks.

        Note: Bcrypt has a 72-byte maximum password length. Passwords are
        automatically truncated to 72 bytes before verification to match
        the hashing behavior.

        Args:
            plain_password: Plain text password to verify
            hashed_password: Hashed password to compare against

        Returns:
            True if password matches, False otherwise

        Example:
            >>> service = PasswordService()
            >>> hashed = service.hash_password("SecurePass123!")
            >>> service.verify_password("SecurePass123!", hashed)
            True
            >>> service.verify_password("WrongPassword", hashed)
            False
        """
        # Truncate to 72 bytes to match hashing behavior
        password_bytes = plain_password.encode("utf-8")[:72]
        hashed_bytes = hashed_password.encode("utf-8")
        return bcrypt.checkpw(password_bytes, hashed_bytes)

    def validate_password_strength(self, password: str) -> Tuple[bool, str]:
        """Validate password meets strength requirements.

        Requirements (all must be met):
        - Minimum length (default: 8 characters)
        - At least one uppercase letter
        - At least one lowercase letter
        - At least one digit
        - At least one special character

        Args:
            password: Password to validate

        Returns:
            Tuple of (is_valid: bool, error_message: str)
            If valid, error_message is empty string
            If invalid, error_message describes the issue

        Example:
            >>> service = PasswordService()
            >>> valid, msg = service.validate_password_strength("weak")
            >>> valid
            False
            >>> msg
            'Password must be at least 8 characters long'

            >>> valid, msg = service.validate_password_strength("SecurePass123!")
            >>> valid
            True
            >>> msg
            ''
        """
        # Check minimum length
        if len(password) < self.min_length:
            return False, f"Password must be at least {self.min_length} characters long"

        # Check for uppercase letter
        if self.require_uppercase and not re.search(r"[A-Z]", password):
            return False, "Password must contain at least one uppercase letter"

        # Check for lowercase letter
        if self.require_lowercase and not re.search(r"[a-z]", password):
            return False, "Password must contain at least one lowercase letter"

        # Check for digit
        if self.require_digit and not re.search(r"\d", password):
            return False, "Password must contain at least one digit"

        # Check for special character
        if self.require_special:
            special_char_pattern = f"[{re.escape(self.special_chars)}]"
            if not re.search(special_char_pattern, password):
                return (
                    False,
                    f"Password must contain at least one special character ({self.special_chars})",
                )

        return True, ""

    def needs_rehash(self, hashed_password: str) -> bool:
        """Check if a password hash needs to be updated.

        This happens when:
        - The bcrypt rounds configuration has changed
        - The hashing algorithm has been deprecated
        - The hash format is outdated

        If this returns True, you should re-hash the password
        with the current settings after successful login.

        Args:
            hashed_password: Existing password hash to check

        Returns:
            True if hash should be regenerated, False otherwise

        Example:
            >>> service = PasswordService()
            >>> hashed = service.hash_password("SecurePass123!")
            >>> service.needs_rehash(hashed)
            False
        """
        # Extract the number of rounds from the bcrypt hash
        # Format: $2b$12$...
        try:
            parts = hashed_password.split("$")
            if len(parts) >= 3 and parts[1] in ("2a", "2b", "2y"):
                current_rounds = int(parts[2])
                return current_rounds != self.bcrypt_rounds
        except (ValueError, IndexError):
            pass
        return False

    def generate_random_password(self, length: int = 16) -> str:
        """Generate a cryptographically secure random password.

        Useful for:
        - Temporary passwords during account creation
        - Password reset flows
        - Testing

        The generated password will meet all strength requirements.

        Args:
            length: Length of password to generate (default: 16)
                   Must be >= min_length (8)

        Returns:
            Randomly generated password meeting all requirements

        Raises:
            ValueError: If length < min_length

        Example:
            >>> service = PasswordService()
            >>> password = service.generate_random_password(16)
            >>> len(password)
            16
            >>> service.validate_password_strength(password)[0]
            True
        """
        if length < self.min_length:
            raise ValueError(
                f"Password length must be at least {self.min_length}, got {length}"
            )

        # Character sets
        uppercase = string.ascii_uppercase
        lowercase = string.ascii_lowercase
        digits = string.digits
        special = self.special_chars

        # Ensure at least one character from each required set
        password_chars = []

        if self.require_uppercase:
            password_chars.append(secrets.choice(uppercase))
        if self.require_lowercase:
            password_chars.append(secrets.choice(lowercase))
        if self.require_digit:
            password_chars.append(secrets.choice(digits))
        if self.require_special:
            password_chars.append(secrets.choice(special))

        # Fill remaining length with random characters from all sets
        all_chars = uppercase + lowercase + digits + special
        remaining_length = length - len(password_chars)
        password_chars.extend(
            secrets.choice(all_chars) for _ in range(remaining_length)
        )

        # Shuffle to avoid predictable patterns
        # Use secrets.SystemRandom for cryptographically secure shuffle
        rng = secrets.SystemRandom()
        rng.shuffle(password_chars)

        password = "".join(password_chars)

        # Verify the generated password meets requirements
        # (should always pass, but defensive programming)
        is_valid, error_msg = self.validate_password_strength(password)
        if not is_valid:
            # Recursively try again (should never happen)
            return self.generate_random_password(length)

        return password

    def get_password_requirements_text(self) -> str:
        """Get human-readable password requirements.

        Useful for displaying requirements to users during
        registration or password change flows.

        Returns:
            Formatted string describing password requirements

        Example:
            >>> service = PasswordService()
            >>> print(service.get_password_requirements_text())
            Password must:
            - Be at least 8 characters long
            - Contain at least one uppercase letter
            - Contain at least one lowercase letter
            - Contain at least one digit
            - Contain at least one special character (!@#$%^&*()_+-=[]{}|;:,.<>?)
        """
        requirements = ["Password must:"]
        requirements.append(f"- Be at least {self.min_length} characters long")

        if self.require_uppercase:
            requirements.append("- Contain at least one uppercase letter")
        if self.require_lowercase:
            requirements.append("- Contain at least one lowercase letter")
        if self.require_digit:
            requirements.append("- Contain at least one digit")
        if self.require_special:
            requirements.append(
                f"- Contain at least one special character ({self.special_chars})"
            )

        return "\n".join(requirements)

Functions

__init__

__init__()

Initialize password service with bcrypt configuration.

Bcrypt rounds are loaded from config (default: 12). Higher rounds = more secure but slower (exponential). - 10 rounds: ~100ms - 12 rounds: ~300ms (recommended) - 14 rounds: ~1200ms

Source code in src/services/password_service.py
def __init__(self):
    """Initialize password service with bcrypt configuration.

    Bcrypt rounds are loaded from config (default: 12).
    Higher rounds = more secure but slower (exponential).
    - 10 rounds: ~100ms
    - 12 rounds: ~300ms (recommended)
    - 14 rounds: ~1200ms
    """
    settings = get_settings()
    self.bcrypt_rounds = getattr(settings, "BCRYPT_ROUNDS", 12)

    # Password strength requirements
    self.min_length = 8
    self.require_uppercase = True
    self.require_lowercase = True
    self.require_digit = True
    self.require_special = True

    # Special characters allowed in passwords
    self.special_chars = "!@#$%^&*()_+-=[]{}|;:,.<>?"

hash_password

hash_password(password: str) -> str

Hash a password using bcrypt.

This operation is CPU-intensive (~300ms with 12 rounds) but synchronous. It can be called directly from async code without blocking the event loop improperly.

Note: Bcrypt has a 72-byte maximum password length. Passwords are automatically truncated to 72 bytes before hashing. This is safe because 72 bytes provides sufficient entropy.

Parameters:

Name Type Description Default
password str

Plain text password to hash

required

Returns:

Type Description
str

Hashed password string (includes salt and algorithm info)

Example

service = PasswordService() hashed = service.hash_password("SecurePass123!") hashed.startswith("$2b$") True

Source code in src/services/password_service.py
def hash_password(self, password: str) -> str:
    """Hash a password using bcrypt.

    This operation is CPU-intensive (~300ms with 12 rounds) but
    synchronous. It can be called directly from async code without
    blocking the event loop improperly.

    Note: Bcrypt has a 72-byte maximum password length. Passwords are
    automatically truncated to 72 bytes before hashing. This is safe
    because 72 bytes provides sufficient entropy.

    Args:
        password: Plain text password to hash

    Returns:
        Hashed password string (includes salt and algorithm info)

    Example:
        >>> service = PasswordService()
        >>> hashed = service.hash_password("SecurePass123!")
        >>> hashed.startswith("$2b$")
        True
    """
    # Bcrypt has a 72-byte limit, truncate if needed
    # This is safe as 72 bytes provides sufficient entropy
    password_bytes = password.encode("utf-8")[:72]
    salt = bcrypt.gensalt(rounds=self.bcrypt_rounds)
    hashed = bcrypt.hashpw(password_bytes, salt)
    return hashed.decode("utf-8")

verify_password

verify_password(
    plain_password: str, hashed_password: str
) -> bool

Verify a password against its hash.

Uses constant-time comparison to prevent timing attacks.

Note: Bcrypt has a 72-byte maximum password length. Passwords are automatically truncated to 72 bytes before verification to match the hashing behavior.

Parameters:

Name Type Description Default
plain_password str

Plain text password to verify

required
hashed_password str

Hashed password to compare against

required

Returns:

Type Description
bool

True if password matches, False otherwise

Example

service = PasswordService() hashed = service.hash_password("SecurePass123!") service.verify_password("SecurePass123!", hashed) True service.verify_password("WrongPassword", hashed) False

Source code in src/services/password_service.py
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
    """Verify a password against its hash.

    Uses constant-time comparison to prevent timing attacks.

    Note: Bcrypt has a 72-byte maximum password length. Passwords are
    automatically truncated to 72 bytes before verification to match
    the hashing behavior.

    Args:
        plain_password: Plain text password to verify
        hashed_password: Hashed password to compare against

    Returns:
        True if password matches, False otherwise

    Example:
        >>> service = PasswordService()
        >>> hashed = service.hash_password("SecurePass123!")
        >>> service.verify_password("SecurePass123!", hashed)
        True
        >>> service.verify_password("WrongPassword", hashed)
        False
    """
    # Truncate to 72 bytes to match hashing behavior
    password_bytes = plain_password.encode("utf-8")[:72]
    hashed_bytes = hashed_password.encode("utf-8")
    return bcrypt.checkpw(password_bytes, hashed_bytes)

validate_password_strength

validate_password_strength(
    password: str,
) -> Tuple[bool, str]

Validate password meets strength requirements.

Requirements (all must be met): - Minimum length (default: 8 characters) - At least one uppercase letter - At least one lowercase letter - At least one digit - At least one special character

Parameters:

Name Type Description Default
password str

Password to validate

required

Returns:

Type Description
bool

Tuple of (is_valid: bool, error_message: str)

str

If valid, error_message is empty string

Tuple[bool, str]

If invalid, error_message describes the issue

Example

service = PasswordService() valid, msg = service.validate_password_strength("weak") valid False msg 'Password must be at least 8 characters long'

valid, msg = service.validate_password_strength("SecurePass123!") valid True msg ''

Source code in src/services/password_service.py
def validate_password_strength(self, password: str) -> Tuple[bool, str]:
    """Validate password meets strength requirements.

    Requirements (all must be met):
    - Minimum length (default: 8 characters)
    - At least one uppercase letter
    - At least one lowercase letter
    - At least one digit
    - At least one special character

    Args:
        password: Password to validate

    Returns:
        Tuple of (is_valid: bool, error_message: str)
        If valid, error_message is empty string
        If invalid, error_message describes the issue

    Example:
        >>> service = PasswordService()
        >>> valid, msg = service.validate_password_strength("weak")
        >>> valid
        False
        >>> msg
        'Password must be at least 8 characters long'

        >>> valid, msg = service.validate_password_strength("SecurePass123!")
        >>> valid
        True
        >>> msg
        ''
    """
    # Check minimum length
    if len(password) < self.min_length:
        return False, f"Password must be at least {self.min_length} characters long"

    # Check for uppercase letter
    if self.require_uppercase and not re.search(r"[A-Z]", password):
        return False, "Password must contain at least one uppercase letter"

    # Check for lowercase letter
    if self.require_lowercase and not re.search(r"[a-z]", password):
        return False, "Password must contain at least one lowercase letter"

    # Check for digit
    if self.require_digit and not re.search(r"\d", password):
        return False, "Password must contain at least one digit"

    # Check for special character
    if self.require_special:
        special_char_pattern = f"[{re.escape(self.special_chars)}]"
        if not re.search(special_char_pattern, password):
            return (
                False,
                f"Password must contain at least one special character ({self.special_chars})",
            )

    return True, ""

needs_rehash

needs_rehash(hashed_password: str) -> bool

Check if a password hash needs to be updated.

This happens when: - The bcrypt rounds configuration has changed - The hashing algorithm has been deprecated - The hash format is outdated

If this returns True, you should re-hash the password with the current settings after successful login.

Parameters:

Name Type Description Default
hashed_password str

Existing password hash to check

required

Returns:

Type Description
bool

True if hash should be regenerated, False otherwise

Example

service = PasswordService() hashed = service.hash_password("SecurePass123!") service.needs_rehash(hashed) False

Source code in src/services/password_service.py
def needs_rehash(self, hashed_password: str) -> bool:
    """Check if a password hash needs to be updated.

    This happens when:
    - The bcrypt rounds configuration has changed
    - The hashing algorithm has been deprecated
    - The hash format is outdated

    If this returns True, you should re-hash the password
    with the current settings after successful login.

    Args:
        hashed_password: Existing password hash to check

    Returns:
        True if hash should be regenerated, False otherwise

    Example:
        >>> service = PasswordService()
        >>> hashed = service.hash_password("SecurePass123!")
        >>> service.needs_rehash(hashed)
        False
    """
    # Extract the number of rounds from the bcrypt hash
    # Format: $2b$12$...
    try:
        parts = hashed_password.split("$")
        if len(parts) >= 3 and parts[1] in ("2a", "2b", "2y"):
            current_rounds = int(parts[2])
            return current_rounds != self.bcrypt_rounds
    except (ValueError, IndexError):
        pass
    return False

generate_random_password

generate_random_password(length: int = 16) -> str

Generate a cryptographically secure random password.

Useful for: - Temporary passwords during account creation - Password reset flows - Testing

The generated password will meet all strength requirements.

Parameters:

Name Type Description Default
length int

Length of password to generate (default: 16) Must be >= min_length (8)

16

Returns:

Type Description
str

Randomly generated password meeting all requirements

Raises:

Type Description
ValueError

If length < min_length

Example

service = PasswordService() password = service.generate_random_password(16) len(password) 16 service.validate_password_strength(password)[0] True

Source code in src/services/password_service.py
def generate_random_password(self, length: int = 16) -> str:
    """Generate a cryptographically secure random password.

    Useful for:
    - Temporary passwords during account creation
    - Password reset flows
    - Testing

    The generated password will meet all strength requirements.

    Args:
        length: Length of password to generate (default: 16)
               Must be >= min_length (8)

    Returns:
        Randomly generated password meeting all requirements

    Raises:
        ValueError: If length < min_length

    Example:
        >>> service = PasswordService()
        >>> password = service.generate_random_password(16)
        >>> len(password)
        16
        >>> service.validate_password_strength(password)[0]
        True
    """
    if length < self.min_length:
        raise ValueError(
            f"Password length must be at least {self.min_length}, got {length}"
        )

    # Character sets
    uppercase = string.ascii_uppercase
    lowercase = string.ascii_lowercase
    digits = string.digits
    special = self.special_chars

    # Ensure at least one character from each required set
    password_chars = []

    if self.require_uppercase:
        password_chars.append(secrets.choice(uppercase))
    if self.require_lowercase:
        password_chars.append(secrets.choice(lowercase))
    if self.require_digit:
        password_chars.append(secrets.choice(digits))
    if self.require_special:
        password_chars.append(secrets.choice(special))

    # Fill remaining length with random characters from all sets
    all_chars = uppercase + lowercase + digits + special
    remaining_length = length - len(password_chars)
    password_chars.extend(
        secrets.choice(all_chars) for _ in range(remaining_length)
    )

    # Shuffle to avoid predictable patterns
    # Use secrets.SystemRandom for cryptographically secure shuffle
    rng = secrets.SystemRandom()
    rng.shuffle(password_chars)

    password = "".join(password_chars)

    # Verify the generated password meets requirements
    # (should always pass, but defensive programming)
    is_valid, error_msg = self.validate_password_strength(password)
    if not is_valid:
        # Recursively try again (should never happen)
        return self.generate_random_password(length)

    return password

get_password_requirements_text

get_password_requirements_text() -> str

Get human-readable password requirements.

Useful for displaying requirements to users during registration or password change flows.

Returns:

Type Description
str

Formatted string describing password requirements

Example

service = PasswordService() print(service.get_password_requirements_text()) Password must: - Be at least 8 characters long - Contain at least one uppercase letter - Contain at least one lowercase letter - Contain at least one digit - Contain at least one special character (!@#$%^&*()_+-=[]{}|;:,.<>?)

Source code in src/services/password_service.py
def get_password_requirements_text(self) -> str:
    """Get human-readable password requirements.

    Useful for displaying requirements to users during
    registration or password change flows.

    Returns:
        Formatted string describing password requirements

    Example:
        >>> service = PasswordService()
        >>> print(service.get_password_requirements_text())
        Password must:
        - Be at least 8 characters long
        - Contain at least one uppercase letter
        - Contain at least one lowercase letter
        - Contain at least one digit
        - Contain at least one special character (!@#$%^&*()_+-=[]{}|;:,.<>?)
    """
    requirements = ["Password must:"]
    requirements.append(f"- Be at least {self.min_length} characters long")

    if self.require_uppercase:
        requirements.append("- Contain at least one uppercase letter")
    if self.require_lowercase:
        requirements.append("- Contain at least one lowercase letter")
    if self.require_digit:
        requirements.append("- Contain at least one digit")
    if self.require_special:
        requirements.append(
            f"- Contain at least one special character ({self.special_chars})"
        )

    return "\n".join(requirements)

Email Service

src.services.email_service.EmailService

Service for sending emails via AWS SES.

This service provides async email sending functionality using AWS SES. In development mode, emails can be logged instead of sent for testing.

Features
  • HTML email support with plain text fallback
  • Template-based emails for common scenarios
  • Development mode (logs emails instead of sending)
  • Error handling with graceful degradation
  • AWS SES integration with proper credentials

Attributes:

Name Type Description
ses_client

Boto3 SES client

from_email

Configured sender email address

from_name

Configured sender display name

development_mode

Whether to log emails instead of sending

Source code in src/services/email_service.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
class EmailService:
    """Service for sending emails via AWS SES.

    This service provides async email sending functionality using AWS SES.
    In development mode, emails can be logged instead of sent for testing.

    Features:
        - HTML email support with plain text fallback
        - Template-based emails for common scenarios
        - Development mode (logs emails instead of sending)
        - Error handling with graceful degradation
        - AWS SES integration with proper credentials

    Attributes:
        ses_client: Boto3 SES client
        from_email: Configured sender email address
        from_name: Configured sender display name
        development_mode: Whether to log emails instead of sending
    """

    def __init__(self, development_mode: bool = False):
        """Initialize email service with AWS SES configuration.

        Args:
            development_mode: If True, log emails instead of sending.
                            Useful for local development without AWS credentials.
        """
        settings = get_settings()

        self.from_email = settings.SES_FROM_EMAIL
        self.from_name = settings.SES_FROM_NAME
        self.development_mode = development_mode

        # Initialize SES client only if not in development mode
        if not self.development_mode:
            try:
                self.ses_client = boto3.client(
                    "ses",
                    region_name=settings.AWS_REGION,
                    aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
                    aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
                )
                logger.info("AWS SES client initialized successfully")
            except Exception as e:
                logger.error(f"Failed to initialize AWS SES client: {e}")
                # Fall back to development mode if AWS credentials are invalid
                self.development_mode = True
                logger.warning(
                    "Falling back to development mode (emails will be logged)"
                )
        else:
            self.ses_client = None
            logger.info("Email service in development mode (emails will be logged)")

    async def send_email(
        self,
        to_email: str,
        subject: str,
        html_body: str,
        text_body: Optional[str] = None,
    ) -> bool:
        """Send an email via AWS SES or log it in development mode.

        Args:
            to_email: Recipient email address
            subject: Email subject line
            html_body: HTML email body
            text_body: Plain text fallback (optional)

        Returns:
            True if email sent successfully (or logged in dev mode), False otherwise

        Example:
            >>> service = EmailService()
            >>> success = await service.send_email(
            ...     to_email="user@example.com",
            ...     subject="Welcome!",
            ...     html_body="<h1>Welcome to Dashtam!</h1>",
            ...     text_body="Welcome to Dashtam!"
            ... )
            >>> success
            True
        """
        # Development mode: log email instead of sending
        if self.development_mode:
            logger.info("=" * 80)
            logger.info("📧 EMAIL (Development Mode - Not Sent)")
            logger.info("=" * 80)
            logger.info(f"From: {self.from_name} <{self.from_email}>")
            logger.info(f"To: {to_email}")
            logger.info(f"Subject: {subject}")
            logger.info("-" * 80)
            logger.info("HTML Body:")
            logger.info(html_body[:500] + "..." if len(html_body) > 500 else html_body)
            if text_body:
                logger.info("-" * 80)
                logger.info("Text Body:")
                logger.info(
                    text_body[:500] + "..." if len(text_body) > 500 else text_body
                )
            logger.info("=" * 80)
            return True

        # Production mode: send via AWS SES
        try:
            # Prepare email body
            body_data: Dict[str, Any] = {
                "Html": {
                    "Charset": "UTF-8",
                    "Data": html_body,
                }
            }

            # Add text fallback if provided
            if text_body:
                body_data["Text"] = {
                    "Charset": "UTF-8",
                    "Data": text_body,
                }

            # Send email via SES
            response = self.ses_client.send_email(
                Source=f"{self.from_name} <{self.from_email}>",
                Destination={
                    "ToAddresses": [to_email],
                },
                Message={
                    "Subject": {
                        "Charset": "UTF-8",
                        "Data": subject,
                    },
                    "Body": body_data,
                },
            )

            message_id = response.get("MessageId", "unknown")
            logger.info(
                f"Email sent successfully to {to_email} (MessageId: {message_id})"
            )
            return True

        except ClientError as e:
            error_code = e.response["Error"]["Code"]
            error_message = e.response["Error"]["Message"]
            logger.error(
                f"AWS SES error sending email to {to_email}: {error_code} - {error_message}"
            )
            return False

        except Exception as e:
            logger.error(f"Unexpected error sending email to {to_email}: {e}")
            return False

    async def send_verification_email(
        self, to_email: str, verification_token: str, user_name: Optional[str] = None
    ) -> bool:
        """Send email verification email to user.

        Args:
            to_email: User's email address
            verification_token: Unique verification token
            user_name: User's name for personalization (optional)

        Returns:
            True if email sent successfully, False otherwise

        Example:
            >>> service = EmailService()
            >>> success = await service.send_verification_email(
            ...     to_email="user@example.com",
            ...     verification_token="abc123def456",
            ...     user_name="John Doe"
            ... )
        """
        settings = get_settings()

        # Generate verification URL
        # TODO: Update this with actual frontend URL when available
        verification_url = (
            f"https://localhost:3000/verify-email?token={verification_token}"
        )

        greeting = f"Hi {user_name}," if user_name else "Hello,"

        subject = "Verify Your Dashtam Account"

        html_body = f"""
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
    <div style="max-width: 600px; margin: 0 auto; padding: 20px;">
        <h2 style="color: #4A90E2;">Welcome to Dashtam!</h2>

        <p>{greeting}</p>

        <p>Thank you for signing up for Dashtam, your secure financial data aggregation platform.</p>

        <p>To complete your registration and verify your email address, please click the button below:</p>

        <div style="text-align: center; margin: 30px 0;">
            <a href="{verification_url}" 
               style="background-color: #4A90E2; 
                      color: white; 
                      padding: 12px 30px; 
                      text-decoration: none; 
                      border-radius: 5px; 
                      display: inline-block;">
                Verify Email Address
            </a>
        </div>

        <p>Or copy and paste this link into your browser:</p>
        <p style="word-break: break-all; color: #4A90E2;">{verification_url}</p>

        <p style="margin-top: 30px; color: #666; font-size: 14px;">
            This verification link will expire in {settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_HOURS} hours.
        </p>

        <p style="color: #666; font-size: 14px;">
            If you didn't create a Dashtam account, please ignore this email.
        </p>

        <hr style="border: none; border-top: 1px solid #eee; margin: 30px 0;">

        <p style="color: #999; font-size: 12px; text-align: center;">
            © {datetime.now(UTC).year} Dashtam. All rights reserved.
        </p>
    </div>
</body>
</html>
"""

        text_body = f"""
{greeting}

Thank you for signing up for Dashtam, your secure financial data aggregation platform.

To complete your registration and verify your email address, please visit:
{verification_url}

This verification link will expire in {settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_HOURS} hours.

If you didn't create a Dashtam account, please ignore this email.

© {datetime.now(UTC).year} Dashtam. All rights reserved.
"""

        return await self.send_email(
            to_email=to_email, subject=subject, html_body=html_body, text_body=text_body
        )

    async def send_password_reset_email(
        self, to_email: str, reset_token: str, user_name: Optional[str] = None
    ) -> bool:
        """Send password reset email to user.

        Args:
            to_email: User's email address
            reset_token: Unique password reset token
            user_name: User's name for personalization (optional)

        Returns:
            True if email sent successfully, False otherwise

        Example:
            >>> service = EmailService()
            >>> success = await service.send_password_reset_email(
            ...     to_email="user@example.com",
            ...     reset_token="xyz789abc123",
            ...     user_name="John Doe"
            ... )
        """
        settings = get_settings()

        # Generate reset URL
        # TODO: Update this with actual frontend URL when available
        reset_url = f"https://localhost:3000/reset-password?token={reset_token}"

        greeting = f"Hi {user_name}," if user_name else "Hello,"

        subject = "Reset Your Dashtam Password"

        html_body = f"""
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
    <div style="max-width: 600px; margin: 0 auto; padding: 20px;">
        <h2 style="color: #4A90E2;">Password Reset Request</h2>

        <p>{greeting}</p>

        <p>We received a request to reset the password for your Dashtam account.</p>

        <p>To reset your password, please click the button below:</p>

        <div style="text-align: center; margin: 30px 0;">
            <a href="{reset_url}" 
               style="background-color: #4A90E2; 
                      color: white; 
                      padding: 12px 30px; 
                      text-decoration: none; 
                      border-radius: 5px; 
                      display: inline-block;">
                Reset Password
            </a>
        </div>

        <p>Or copy and paste this link into your browser:</p>
        <p style="word-break: break-all; color: #4A90E2;">{reset_url}</p>

        <p style="margin-top: 30px; color: #666; font-size: 14px;">
            This password reset link will expire in {settings.PASSWORD_RESET_TOKEN_EXPIRE_HOURS} hour(s).
        </p>

        <p style="color: #E74C3C; font-weight: bold;">
            If you didn't request a password reset, please ignore this email and your password will remain unchanged.
        </p>

        <hr style="border: none; border-top: 1px solid #eee; margin: 30px 0;">

        <p style="color: #999; font-size: 12px; text-align: center;">
            © {datetime.now(UTC).year} Dashtam. All rights reserved.
        </p>
    </div>
</body>
</html>
"""

        text_body = f"""
{greeting}

We received a request to reset the password for your Dashtam account.

To reset your password, please visit:
{reset_url}

This password reset link will expire in {settings.PASSWORD_RESET_TOKEN_EXPIRE_HOURS} hour(s).

If you didn't request a password reset, please ignore this email and your password will remain unchanged.

© {datetime.now(UTC).year} Dashtam. All rights reserved.
"""

        return await self.send_email(
            to_email=to_email, subject=subject, html_body=html_body, text_body=text_body
        )

    async def send_welcome_email(
        self, to_email: str, user_name: Optional[str] = None
    ) -> bool:
        """Send welcome email to newly registered and verified user.

        Args:
            to_email: User's email address
            user_name: User's name for personalization (optional)

        Returns:
            True if email sent successfully, False otherwise

        Example:
            >>> service = EmailService()
            >>> success = await service.send_welcome_email(
            ...     to_email="user@example.com",
            ...     user_name="John Doe"
            ... )
        """
        greeting = f"Hi {user_name}," if user_name else "Hello,"

        subject = "Welcome to Dashtam!"

        html_body = f"""
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
    <div style="max-width: 600px; margin: 0 auto; padding: 20px;">
        <h2 style="color: #4A90E2;">Welcome to Dashtam! 🎉</h2>

        <p>{greeting}</p>

        <p>Your email has been verified and your account is now active!</p>

        <p>With Dashtam, you can securely:</p>
        <ul>
            <li>Connect your financial accounts from multiple institutions</li>
            <li>View all your accounts in one unified dashboard</li>
            <li>Access transaction history and financial data</li>
            <li>Keep your financial information secure with bank-level encryption</li>
        </ul>

        <p>Ready to get started? Log in to your account and connect your first financial institution.</p>

        <div style="text-align: center; margin: 30px 0;">
            <a href="https://localhost:3000/login" 
               style="background-color: #4A90E2; 
                      color: white; 
                      padding: 12px 30px; 
                      text-decoration: none; 
                      border-radius: 5px; 
                      display: inline-block;">
                Go to Dashboard
            </a>
        </div>

        <p>If you have any questions or need assistance, feel free to reach out to our support team.</p>

        <hr style="border: none; border-top: 1px solid #eee; margin: 30px 0;">

        <p style="color: #999; font-size: 12px; text-align: center;">
            © {datetime.now(UTC).year} Dashtam. All rights reserved.
        </p>
    </div>
</body>
</html>
"""

        text_body = f"""
{greeting}

Your email has been verified and your account is now active!

With Dashtam, you can securely:
- Connect your financial accounts from multiple institutions
- View all your accounts in one unified dashboard
- Access transaction history and financial data
- Keep your financial information secure with bank-level encryption

Ready to get started? Log in to your account and connect your first financial institution at:
https://localhost:3000/login

If you have any questions or need assistance, feel free to reach out to our support team.

© {datetime.now(UTC).year} Dashtam. All rights reserved.
"""

        return await self.send_email(
            to_email=to_email, subject=subject, html_body=html_body, text_body=text_body
        )

    async def send_password_changed_notification(
        self, to_email: str, user_name: Optional[str] = None
    ) -> bool:
        """Send notification email after password is successfully changed.

        Args:
            to_email: User's email address
            user_name: User's name for personalization (optional)

        Returns:
            True if email sent successfully, False otherwise

        Example:
            >>> service = EmailService()
            >>> success = await service.send_password_changed_notification(
            ...     to_email="user@example.com",
            ...     user_name="John Doe"
            ... )
        """
        greeting = f"Hi {user_name}," if user_name else "Hello,"

        subject = "Your Dashtam Password Was Changed"

        html_body = f"""
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
    <div style="max-width: 600px; margin: 0 auto; padding: 20px;">
        <h2 style="color: #4A90E2;">Password Changed Successfully</h2>

        <p>{greeting}</p>

        <p>This is a confirmation that the password for your Dashtam account was successfully changed.</p>

        <p style="color: #E74C3C; font-weight: bold; margin-top: 20px;">
            If you did not make this change, please contact our support team immediately.
        </p>

        <div style="text-align: center; margin: 30px 0;">
            <a href="mailto:support@dashtam.com" 
               style="background-color: #E74C3C; 
                      color: white; 
                      padding: 12px 30px; 
                      text-decoration: none; 
                      border-radius: 5px; 
                      display: inline-block;">
                Contact Support
            </a>
        </div>

        <p style="color: #666; font-size: 14px;">
            Changed at: {datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S")} UTC
        </p>

        <hr style="border: none; border-top: 1px solid #eee; margin: 30px 0;">

        <p style="color: #999; font-size: 12px; text-align: center;">
            © {datetime.now(UTC).year} Dashtam. All rights reserved.
        </p>
    </div>
</body>
</html>
"""

        text_body = f"""
{greeting}

This is a confirmation that the password for your Dashtam account was successfully changed.

If you did not make this change, please contact our support team immediately at support@dashtam.com

Changed at: {datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S")} UTC

© {datetime.now(UTC).year} Dashtam. All rights reserved.
"""

        return await self.send_email(
            to_email=to_email, subject=subject, html_body=html_body, text_body=text_body
        )

Functions

__init__

__init__(development_mode: bool = False)

Initialize email service with AWS SES configuration.

Parameters:

Name Type Description Default
development_mode bool

If True, log emails instead of sending. Useful for local development without AWS credentials.

False
Source code in src/services/email_service.py
def __init__(self, development_mode: bool = False):
    """Initialize email service with AWS SES configuration.

    Args:
        development_mode: If True, log emails instead of sending.
                        Useful for local development without AWS credentials.
    """
    settings = get_settings()

    self.from_email = settings.SES_FROM_EMAIL
    self.from_name = settings.SES_FROM_NAME
    self.development_mode = development_mode

    # Initialize SES client only if not in development mode
    if not self.development_mode:
        try:
            self.ses_client = boto3.client(
                "ses",
                region_name=settings.AWS_REGION,
                aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
                aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
            )
            logger.info("AWS SES client initialized successfully")
        except Exception as e:
            logger.error(f"Failed to initialize AWS SES client: {e}")
            # Fall back to development mode if AWS credentials are invalid
            self.development_mode = True
            logger.warning(
                "Falling back to development mode (emails will be logged)"
            )
    else:
        self.ses_client = None
        logger.info("Email service in development mode (emails will be logged)")

send_email async

send_email(
    to_email: str,
    subject: str,
    html_body: str,
    text_body: Optional[str] = None,
) -> bool

Send an email via AWS SES or log it in development mode.

Parameters:

Name Type Description Default
to_email str

Recipient email address

required
subject str

Email subject line

required
html_body str

HTML email body

required
text_body Optional[str]

Plain text fallback (optional)

None

Returns:

Type Description
bool

True if email sent successfully (or logged in dev mode), False otherwise

Example

service = EmailService() success = await service.send_email( ... to_email="user@example.com", ... subject="Welcome!", ... html_body="

Welcome to Dashtam!

", ... text_body="Welcome to Dashtam!" ... ) success True

Source code in src/services/email_service.py
async def send_email(
    self,
    to_email: str,
    subject: str,
    html_body: str,
    text_body: Optional[str] = None,
) -> bool:
    """Send an email via AWS SES or log it in development mode.

    Args:
        to_email: Recipient email address
        subject: Email subject line
        html_body: HTML email body
        text_body: Plain text fallback (optional)

    Returns:
        True if email sent successfully (or logged in dev mode), False otherwise

    Example:
        >>> service = EmailService()
        >>> success = await service.send_email(
        ...     to_email="user@example.com",
        ...     subject="Welcome!",
        ...     html_body="<h1>Welcome to Dashtam!</h1>",
        ...     text_body="Welcome to Dashtam!"
        ... )
        >>> success
        True
    """
    # Development mode: log email instead of sending
    if self.development_mode:
        logger.info("=" * 80)
        logger.info("📧 EMAIL (Development Mode - Not Sent)")
        logger.info("=" * 80)
        logger.info(f"From: {self.from_name} <{self.from_email}>")
        logger.info(f"To: {to_email}")
        logger.info(f"Subject: {subject}")
        logger.info("-" * 80)
        logger.info("HTML Body:")
        logger.info(html_body[:500] + "..." if len(html_body) > 500 else html_body)
        if text_body:
            logger.info("-" * 80)
            logger.info("Text Body:")
            logger.info(
                text_body[:500] + "..." if len(text_body) > 500 else text_body
            )
        logger.info("=" * 80)
        return True

    # Production mode: send via AWS SES
    try:
        # Prepare email body
        body_data: Dict[str, Any] = {
            "Html": {
                "Charset": "UTF-8",
                "Data": html_body,
            }
        }

        # Add text fallback if provided
        if text_body:
            body_data["Text"] = {
                "Charset": "UTF-8",
                "Data": text_body,
            }

        # Send email via SES
        response = self.ses_client.send_email(
            Source=f"{self.from_name} <{self.from_email}>",
            Destination={
                "ToAddresses": [to_email],
            },
            Message={
                "Subject": {
                    "Charset": "UTF-8",
                    "Data": subject,
                },
                "Body": body_data,
            },
        )

        message_id = response.get("MessageId", "unknown")
        logger.info(
            f"Email sent successfully to {to_email} (MessageId: {message_id})"
        )
        return True

    except ClientError as e:
        error_code = e.response["Error"]["Code"]
        error_message = e.response["Error"]["Message"]
        logger.error(
            f"AWS SES error sending email to {to_email}: {error_code} - {error_message}"
        )
        return False

    except Exception as e:
        logger.error(f"Unexpected error sending email to {to_email}: {e}")
        return False

send_verification_email async

send_verification_email(
    to_email: str,
    verification_token: str,
    user_name: Optional[str] = None,
) -> bool

Send email verification email to user.

Parameters:

Name Type Description Default
to_email str

User's email address

required
verification_token str

Unique verification token

required
user_name Optional[str]

User's name for personalization (optional)

None

Returns:

Type Description
bool

True if email sent successfully, False otherwise

Example

service = EmailService() success = await service.send_verification_email( ... to_email="user@example.com", ... verification_token="abc123def456", ... user_name="John Doe" ... )

Source code in src/services/email_service.py
    async def send_verification_email(
        self, to_email: str, verification_token: str, user_name: Optional[str] = None
    ) -> bool:
        """Send email verification email to user.

        Args:
            to_email: User's email address
            verification_token: Unique verification token
            user_name: User's name for personalization (optional)

        Returns:
            True if email sent successfully, False otherwise

        Example:
            >>> service = EmailService()
            >>> success = await service.send_verification_email(
            ...     to_email="user@example.com",
            ...     verification_token="abc123def456",
            ...     user_name="John Doe"
            ... )
        """
        settings = get_settings()

        # Generate verification URL
        # TODO: Update this with actual frontend URL when available
        verification_url = (
            f"https://localhost:3000/verify-email?token={verification_token}"
        )

        greeting = f"Hi {user_name}," if user_name else "Hello,"

        subject = "Verify Your Dashtam Account"

        html_body = f"""
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
    <div style="max-width: 600px; margin: 0 auto; padding: 20px;">
        <h2 style="color: #4A90E2;">Welcome to Dashtam!</h2>

        <p>{greeting}</p>

        <p>Thank you for signing up for Dashtam, your secure financial data aggregation platform.</p>

        <p>To complete your registration and verify your email address, please click the button below:</p>

        <div style="text-align: center; margin: 30px 0;">
            <a href="{verification_url}" 
               style="background-color: #4A90E2; 
                      color: white; 
                      padding: 12px 30px; 
                      text-decoration: none; 
                      border-radius: 5px; 
                      display: inline-block;">
                Verify Email Address
            </a>
        </div>

        <p>Or copy and paste this link into your browser:</p>
        <p style="word-break: break-all; color: #4A90E2;">{verification_url}</p>

        <p style="margin-top: 30px; color: #666; font-size: 14px;">
            This verification link will expire in {settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_HOURS} hours.
        </p>

        <p style="color: #666; font-size: 14px;">
            If you didn't create a Dashtam account, please ignore this email.
        </p>

        <hr style="border: none; border-top: 1px solid #eee; margin: 30px 0;">

        <p style="color: #999; font-size: 12px; text-align: center;">
            © {datetime.now(UTC).year} Dashtam. All rights reserved.
        </p>
    </div>
</body>
</html>
"""

        text_body = f"""
{greeting}

Thank you for signing up for Dashtam, your secure financial data aggregation platform.

To complete your registration and verify your email address, please visit:
{verification_url}

This verification link will expire in {settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_HOURS} hours.

If you didn't create a Dashtam account, please ignore this email.

© {datetime.now(UTC).year} Dashtam. All rights reserved.
"""

        return await self.send_email(
            to_email=to_email, subject=subject, html_body=html_body, text_body=text_body
        )

send_password_reset_email async

send_password_reset_email(
    to_email: str,
    reset_token: str,
    user_name: Optional[str] = None,
) -> bool

Send password reset email to user.

Parameters:

Name Type Description Default
to_email str

User's email address

required
reset_token str

Unique password reset token

required
user_name Optional[str]

User's name for personalization (optional)

None

Returns:

Type Description
bool

True if email sent successfully, False otherwise

Example

service = EmailService() success = await service.send_password_reset_email( ... to_email="user@example.com", ... reset_token="xyz789abc123", ... user_name="John Doe" ... )

Source code in src/services/email_service.py
    async def send_password_reset_email(
        self, to_email: str, reset_token: str, user_name: Optional[str] = None
    ) -> bool:
        """Send password reset email to user.

        Args:
            to_email: User's email address
            reset_token: Unique password reset token
            user_name: User's name for personalization (optional)

        Returns:
            True if email sent successfully, False otherwise

        Example:
            >>> service = EmailService()
            >>> success = await service.send_password_reset_email(
            ...     to_email="user@example.com",
            ...     reset_token="xyz789abc123",
            ...     user_name="John Doe"
            ... )
        """
        settings = get_settings()

        # Generate reset URL
        # TODO: Update this with actual frontend URL when available
        reset_url = f"https://localhost:3000/reset-password?token={reset_token}"

        greeting = f"Hi {user_name}," if user_name else "Hello,"

        subject = "Reset Your Dashtam Password"

        html_body = f"""
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
    <div style="max-width: 600px; margin: 0 auto; padding: 20px;">
        <h2 style="color: #4A90E2;">Password Reset Request</h2>

        <p>{greeting}</p>

        <p>We received a request to reset the password for your Dashtam account.</p>

        <p>To reset your password, please click the button below:</p>

        <div style="text-align: center; margin: 30px 0;">
            <a href="{reset_url}" 
               style="background-color: #4A90E2; 
                      color: white; 
                      padding: 12px 30px; 
                      text-decoration: none; 
                      border-radius: 5px; 
                      display: inline-block;">
                Reset Password
            </a>
        </div>

        <p>Or copy and paste this link into your browser:</p>
        <p style="word-break: break-all; color: #4A90E2;">{reset_url}</p>

        <p style="margin-top: 30px; color: #666; font-size: 14px;">
            This password reset link will expire in {settings.PASSWORD_RESET_TOKEN_EXPIRE_HOURS} hour(s).
        </p>

        <p style="color: #E74C3C; font-weight: bold;">
            If you didn't request a password reset, please ignore this email and your password will remain unchanged.
        </p>

        <hr style="border: none; border-top: 1px solid #eee; margin: 30px 0;">

        <p style="color: #999; font-size: 12px; text-align: center;">
            © {datetime.now(UTC).year} Dashtam. All rights reserved.
        </p>
    </div>
</body>
</html>
"""

        text_body = f"""
{greeting}

We received a request to reset the password for your Dashtam account.

To reset your password, please visit:
{reset_url}

This password reset link will expire in {settings.PASSWORD_RESET_TOKEN_EXPIRE_HOURS} hour(s).

If you didn't request a password reset, please ignore this email and your password will remain unchanged.

© {datetime.now(UTC).year} Dashtam. All rights reserved.
"""

        return await self.send_email(
            to_email=to_email, subject=subject, html_body=html_body, text_body=text_body
        )

send_welcome_email async

send_welcome_email(
    to_email: str, user_name: Optional[str] = None
) -> bool

Send welcome email to newly registered and verified user.

Parameters:

Name Type Description Default
to_email str

User's email address

required
user_name Optional[str]

User's name for personalization (optional)

None

Returns:

Type Description
bool

True if email sent successfully, False otherwise

Example

service = EmailService() success = await service.send_welcome_email( ... to_email="user@example.com", ... user_name="John Doe" ... )

Source code in src/services/email_service.py
    async def send_welcome_email(
        self, to_email: str, user_name: Optional[str] = None
    ) -> bool:
        """Send welcome email to newly registered and verified user.

        Args:
            to_email: User's email address
            user_name: User's name for personalization (optional)

        Returns:
            True if email sent successfully, False otherwise

        Example:
            >>> service = EmailService()
            >>> success = await service.send_welcome_email(
            ...     to_email="user@example.com",
            ...     user_name="John Doe"
            ... )
        """
        greeting = f"Hi {user_name}," if user_name else "Hello,"

        subject = "Welcome to Dashtam!"

        html_body = f"""
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
    <div style="max-width: 600px; margin: 0 auto; padding: 20px;">
        <h2 style="color: #4A90E2;">Welcome to Dashtam! 🎉</h2>

        <p>{greeting}</p>

        <p>Your email has been verified and your account is now active!</p>

        <p>With Dashtam, you can securely:</p>
        <ul>
            <li>Connect your financial accounts from multiple institutions</li>
            <li>View all your accounts in one unified dashboard</li>
            <li>Access transaction history and financial data</li>
            <li>Keep your financial information secure with bank-level encryption</li>
        </ul>

        <p>Ready to get started? Log in to your account and connect your first financial institution.</p>

        <div style="text-align: center; margin: 30px 0;">
            <a href="https://localhost:3000/login" 
               style="background-color: #4A90E2; 
                      color: white; 
                      padding: 12px 30px; 
                      text-decoration: none; 
                      border-radius: 5px; 
                      display: inline-block;">
                Go to Dashboard
            </a>
        </div>

        <p>If you have any questions or need assistance, feel free to reach out to our support team.</p>

        <hr style="border: none; border-top: 1px solid #eee; margin: 30px 0;">

        <p style="color: #999; font-size: 12px; text-align: center;">
            © {datetime.now(UTC).year} Dashtam. All rights reserved.
        </p>
    </div>
</body>
</html>
"""

        text_body = f"""
{greeting}

Your email has been verified and your account is now active!

With Dashtam, you can securely:
- Connect your financial accounts from multiple institutions
- View all your accounts in one unified dashboard
- Access transaction history and financial data
- Keep your financial information secure with bank-level encryption

Ready to get started? Log in to your account and connect your first financial institution at:
https://localhost:3000/login

If you have any questions or need assistance, feel free to reach out to our support team.

© {datetime.now(UTC).year} Dashtam. All rights reserved.
"""

        return await self.send_email(
            to_email=to_email, subject=subject, html_body=html_body, text_body=text_body
        )

send_password_changed_notification async

send_password_changed_notification(
    to_email: str, user_name: Optional[str] = None
) -> bool

Send notification email after password is successfully changed.

Parameters:

Name Type Description Default
to_email str

User's email address

required
user_name Optional[str]

User's name for personalization (optional)

None

Returns:

Type Description
bool

True if email sent successfully, False otherwise

Example

service = EmailService() success = await service.send_password_changed_notification( ... to_email="user@example.com", ... user_name="John Doe" ... )

Source code in src/services/email_service.py
    async def send_password_changed_notification(
        self, to_email: str, user_name: Optional[str] = None
    ) -> bool:
        """Send notification email after password is successfully changed.

        Args:
            to_email: User's email address
            user_name: User's name for personalization (optional)

        Returns:
            True if email sent successfully, False otherwise

        Example:
            >>> service = EmailService()
            >>> success = await service.send_password_changed_notification(
            ...     to_email="user@example.com",
            ...     user_name="John Doe"
            ... )
        """
        greeting = f"Hi {user_name}," if user_name else "Hello,"

        subject = "Your Dashtam Password Was Changed"

        html_body = f"""
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
    <div style="max-width: 600px; margin: 0 auto; padding: 20px;">
        <h2 style="color: #4A90E2;">Password Changed Successfully</h2>

        <p>{greeting}</p>

        <p>This is a confirmation that the password for your Dashtam account was successfully changed.</p>

        <p style="color: #E74C3C; font-weight: bold; margin-top: 20px;">
            If you did not make this change, please contact our support team immediately.
        </p>

        <div style="text-align: center; margin: 30px 0;">
            <a href="mailto:support@dashtam.com" 
               style="background-color: #E74C3C; 
                      color: white; 
                      padding: 12px 30px; 
                      text-decoration: none; 
                      border-radius: 5px; 
                      display: inline-block;">
                Contact Support
            </a>
        </div>

        <p style="color: #666; font-size: 14px;">
            Changed at: {datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S")} UTC
        </p>

        <hr style="border: none; border-top: 1px solid #eee; margin: 30px 0;">

        <p style="color: #999; font-size: 12px; text-align: center;">
            © {datetime.now(UTC).year} Dashtam. All rights reserved.
        </p>
    </div>
</body>
</html>
"""

        text_body = f"""
{greeting}

This is a confirmation that the password for your Dashtam account was successfully changed.

If you did not make this change, please contact our support team immediately at support@dashtam.com

Changed at: {datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S")} UTC

© {datetime.now(UTC).year} Dashtam. All rights reserved.
"""

        return await self.send_email(
            to_email=to_email, subject=subject, html_body=html_body, text_body=text_body
        )

Encryption Service

src.services.encryption.EncryptionService

Service for encrypting and decrypting sensitive data.

This service provides a simple interface for encrypting strings (like OAuth tokens) before storing them in the database and decrypting them when needed.

The encryption key is derived from the application's SECRET_KEY or can be set via ENCRYPTION_KEY environment variable.

Source code in src/services/encryption.py
class EncryptionService:
    """Service for encrypting and decrypting sensitive data.

    This service provides a simple interface for encrypting strings
    (like OAuth tokens) before storing them in the database and
    decrypting them when needed.

    The encryption key is derived from the application's SECRET_KEY
    or can be set via ENCRYPTION_KEY environment variable.
    """

    _instance: Optional["EncryptionService"] = None
    _cipher: Optional[Fernet] = None

    def __new__(cls) -> "EncryptionService":
        """Singleton pattern to ensure one encryption service instance."""
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self):
        """Initialize the encryption service with a cipher."""
        if self._cipher is None:
            self._cipher = self._create_cipher()

    def _create_cipher(self) -> Fernet:
        """Create a Fernet cipher for encryption/decryption.

        The cipher uses either:
        1. ENCRYPTION_KEY from environment (if set)
        2. Derived key from SECRET_KEY (for development)

        Returns:
            Fernet cipher instance.
        """
        # Check for explicit encryption key
        encryption_key = os.getenv("ENCRYPTION_KEY")

        if encryption_key:
            # Use provided encryption key
            try:
                # Ensure it's properly formatted
                if not encryption_key.startswith("gAAAAA"):
                    # If it's not a Fernet key, try to use it as is
                    encryption_key = base64.urlsafe_b64encode(
                        encryption_key.encode()[:32].ljust(32, b"0")
                    )
                else:
                    encryption_key = encryption_key.encode()

                cipher = Fernet(encryption_key)
                logger.info("Using provided ENCRYPTION_KEY")
                return cipher
            except Exception as e:
                logger.warning(f"Invalid ENCRYPTION_KEY, generating new one: {e}")

        # Derive key from SECRET_KEY for development
        if hasattr(settings, "SECRET_KEY"):
            # Use PBKDF2 to derive a key from the secret
            kdf = PBKDF2HMAC(
                algorithm=hashes.SHA256(),
                length=32,
                salt=b"dashtam-token-salt",  # Fixed salt for consistency
                iterations=100000,
            )
            key = base64.urlsafe_b64encode(kdf.derive(settings.SECRET_KEY.encode()))
            logger.info("Using key derived from SECRET_KEY")
            return Fernet(key)

        # Generate a new key if nothing else is available
        key = Fernet.generate_key()
        logger.warning(
            "Generated new encryption key. Set ENCRYPTION_KEY in production!"
        )
        logger.debug(f"Generated key: {key.decode()}")
        return Fernet(key)

    def encrypt(self, plaintext: str) -> str:
        """Encrypt a string value.

        Args:
            plaintext: The string to encrypt.

        Returns:
            Base64-encoded encrypted string.

        Raises:
            Exception: If encryption fails.
        """
        if not plaintext:
            return ""

        try:
            encrypted_bytes = self._cipher.encrypt(plaintext.encode())
            return encrypted_bytes.decode("utf-8")
        except Exception as e:
            logger.error(f"Encryption failed: {e}")
            raise Exception("Failed to encrypt data") from e

    def decrypt(self, ciphertext: str) -> str:
        """Decrypt an encrypted string.

        Args:
            ciphertext: The encrypted string to decrypt.

        Returns:
            The original plaintext string.

        Raises:
            Exception: If decryption fails.
        """
        if not ciphertext:
            return ""

        try:
            decrypted_bytes = self._cipher.decrypt(ciphertext.encode())
            return decrypted_bytes.decode("utf-8")
        except Exception as e:
            logger.error(f"Decryption failed: {e}")
            raise Exception("Failed to decrypt data") from e

    def encrypt_dict(self, data: dict) -> dict:
        """Encrypt all string values in a dictionary.

        Useful for encrypting multiple tokens at once.

        Args:
            data: Dictionary with string values to encrypt.

        Returns:
            Dictionary with encrypted values.
        """
        encrypted = {}
        for key, value in data.items():
            if isinstance(value, str):
                encrypted[key] = self.encrypt(value)
            else:
                encrypted[key] = value
        return encrypted

    def decrypt_dict(self, data: dict) -> dict:
        """Decrypt all string values in a dictionary.

        Args:
            data: Dictionary with encrypted string values.

        Returns:
            Dictionary with decrypted values.
        """
        decrypted = {}
        for key, value in data.items():
            if isinstance(value, str) and value:
                try:
                    decrypted[key] = self.decrypt(value)
                except Exception:
                    # If decryption fails, keep original value
                    decrypted[key] = value
            else:
                decrypted[key] = value
        return decrypted

    def is_encrypted(self, value: str) -> bool:
        """Check if a string appears to be encrypted.

        This is a heuristic check based on Fernet token format.

        Args:
            value: String to check.

        Returns:
            True if the string appears to be encrypted.
        """
        if not value:
            return False

        # Fernet tokens start with "gAAAAA"
        if value.startswith("gAAAAA"):
            return True

        # Try to decrypt - if it fails, it's not encrypted
        try:
            self.decrypt(value)
            return True
        except Exception:
            return False

    @classmethod
    def get_instance(cls) -> "EncryptionService":
        """Get the singleton instance of the encryption service.

        Returns:
            The encryption service instance.
        """
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance

Functions

__new__

__new__() -> EncryptionService

Singleton pattern to ensure one encryption service instance.

Source code in src/services/encryption.py
def __new__(cls) -> "EncryptionService":
    """Singleton pattern to ensure one encryption service instance."""
    if cls._instance is None:
        cls._instance = super().__new__(cls)
    return cls._instance

__init__

__init__()

Initialize the encryption service with a cipher.

Source code in src/services/encryption.py
def __init__(self):
    """Initialize the encryption service with a cipher."""
    if self._cipher is None:
        self._cipher = self._create_cipher()

encrypt

encrypt(plaintext: str) -> str

Encrypt a string value.

Parameters:

Name Type Description Default
plaintext str

The string to encrypt.

required

Returns:

Type Description
str

Base64-encoded encrypted string.

Raises:

Type Description
Exception

If encryption fails.

Source code in src/services/encryption.py
def encrypt(self, plaintext: str) -> str:
    """Encrypt a string value.

    Args:
        plaintext: The string to encrypt.

    Returns:
        Base64-encoded encrypted string.

    Raises:
        Exception: If encryption fails.
    """
    if not plaintext:
        return ""

    try:
        encrypted_bytes = self._cipher.encrypt(plaintext.encode())
        return encrypted_bytes.decode("utf-8")
    except Exception as e:
        logger.error(f"Encryption failed: {e}")
        raise Exception("Failed to encrypt data") from e

decrypt

decrypt(ciphertext: str) -> str

Decrypt an encrypted string.

Parameters:

Name Type Description Default
ciphertext str

The encrypted string to decrypt.

required

Returns:

Type Description
str

The original plaintext string.

Raises:

Type Description
Exception

If decryption fails.

Source code in src/services/encryption.py
def decrypt(self, ciphertext: str) -> str:
    """Decrypt an encrypted string.

    Args:
        ciphertext: The encrypted string to decrypt.

    Returns:
        The original plaintext string.

    Raises:
        Exception: If decryption fails.
    """
    if not ciphertext:
        return ""

    try:
        decrypted_bytes = self._cipher.decrypt(ciphertext.encode())
        return decrypted_bytes.decode("utf-8")
    except Exception as e:
        logger.error(f"Decryption failed: {e}")
        raise Exception("Failed to decrypt data") from e

encrypt_dict

encrypt_dict(data: dict) -> dict

Encrypt all string values in a dictionary.

Useful for encrypting multiple tokens at once.

Parameters:

Name Type Description Default
data dict

Dictionary with string values to encrypt.

required

Returns:

Type Description
dict

Dictionary with encrypted values.

Source code in src/services/encryption.py
def encrypt_dict(self, data: dict) -> dict:
    """Encrypt all string values in a dictionary.

    Useful for encrypting multiple tokens at once.

    Args:
        data: Dictionary with string values to encrypt.

    Returns:
        Dictionary with encrypted values.
    """
    encrypted = {}
    for key, value in data.items():
        if isinstance(value, str):
            encrypted[key] = self.encrypt(value)
        else:
            encrypted[key] = value
    return encrypted

decrypt_dict

decrypt_dict(data: dict) -> dict

Decrypt all string values in a dictionary.

Parameters:

Name Type Description Default
data dict

Dictionary with encrypted string values.

required

Returns:

Type Description
dict

Dictionary with decrypted values.

Source code in src/services/encryption.py
def decrypt_dict(self, data: dict) -> dict:
    """Decrypt all string values in a dictionary.

    Args:
        data: Dictionary with encrypted string values.

    Returns:
        Dictionary with decrypted values.
    """
    decrypted = {}
    for key, value in data.items():
        if isinstance(value, str) and value:
            try:
                decrypted[key] = self.decrypt(value)
            except Exception:
                # If decryption fails, keep original value
                decrypted[key] = value
        else:
            decrypted[key] = value
    return decrypted

is_encrypted

is_encrypted(value: str) -> bool

Check if a string appears to be encrypted.

This is a heuristic check based on Fernet token format.

Parameters:

Name Type Description Default
value str

String to check.

required

Returns:

Type Description
bool

True if the string appears to be encrypted.

Source code in src/services/encryption.py
def is_encrypted(self, value: str) -> bool:
    """Check if a string appears to be encrypted.

    This is a heuristic check based on Fernet token format.

    Args:
        value: String to check.

    Returns:
        True if the string appears to be encrypted.
    """
    if not value:
        return False

    # Fernet tokens start with "gAAAAA"
    if value.startswith("gAAAAA"):
        return True

    # Try to decrypt - if it fails, it's not encrypted
    try:
        self.decrypt(value)
        return True
    except Exception:
        return False

get_instance classmethod

get_instance() -> EncryptionService

Get the singleton instance of the encryption service.

Returns:

Type Description
EncryptionService

The encryption service instance.

Source code in src/services/encryption.py
@classmethod
def get_instance(cls) -> "EncryptionService":
    """Get the singleton instance of the encryption service.

    Returns:
        The encryption service instance.
    """
    if cls._instance is None:
        cls._instance = cls()
    return cls._instance