infrastructure.providers.encryption_service¶
src.infrastructure.providers.encryption_service
¶
Encryption service for provider credentials.
Provides AES-256-GCM encryption for secure storage of OAuth tokens and other sensitive provider credentials.
Security Properties
- Confidentiality: Only holder of key can decrypt
- Integrity: Tampering is detected via GCM authentication tag
- Uniqueness: Random IV per encryption prevents pattern analysis
Architecture
- Infrastructure adapter (catches cryptography exceptions)
- Returns Result types (railway-oriented programming)
- Uses domain error codes (ErrorCode enum)
Reference
- docs/architecture/provider-integration-architecture.md
- docs/architecture/error-handling-architecture.md
Classes¶
DecryptionError
dataclass
¶
Bases: EncryptionError
Decryption failure.
Occurs when: - Wrong encryption key - Data has been tampered with - Invalid encrypted data format
Source code in src/domain/protocols/encryption_protocol.py
EncryptionError
dataclass
¶
Bases: DomainError
Base encryption error.
Used when encryption or decryption fails. Does NOT inherit from Exception - used in Result types.
Source code in src/domain/protocols/encryption_protocol.py
EncryptionKeyError
dataclass
¶
Bases: EncryptionError
Invalid encryption key.
Occurs when key doesn't meet requirements (wrong length, etc.).
Source code in src/domain/protocols/encryption_protocol.py
SerializationError
dataclass
¶
Bases: EncryptionError
Serialization/deserialization failure.
Occurs when data cannot be serialized to JSON or decrypted data cannot be parsed as JSON.
Source code in src/domain/protocols/encryption_protocol.py
EncryptionService
¶
AES-256-GCM encryption service for provider credentials.
Encrypts and decrypts credential dictionaries (containing access tokens, refresh tokens, etc.) to/from bytes for secure database storage.
Format
Encrypted bytes = IV (12 bytes) || ciphertext || auth_tag (16 bytes)
Usage
from src.core.config import get_settings service = EncryptionService.create(get_settings().encryption_key) match service: ... case Success(svc): ... # Encrypt credentials ... result = svc.encrypt({"access_token": "abc123"}) ... case Failure(error): ... # Handle invalid key ... ...
Thread Safety
This service is thread-safe. The AESGCM instance can be used concurrently from multiple threads.
Reference
- NIST SP 800-38D (GCM specification)
- docs/architecture/provider-integration-architecture.md
Source code in src/infrastructure/providers/encryption_service.py
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 | |
Functions¶
__init__
¶
Use EncryptionService.create() factory instead of direct construction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
aesgcm
|
AESGCM
|
Pre-initialized AESGCM cipher instance. |
required |
Source code in src/infrastructure/providers/encryption_service.py
create
classmethod
¶
Create encryption service with validated key.
Factory method that validates the encryption key and returns a Result type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
bytes
|
32-byte (256-bit) encryption key. |
required |
Returns:
| Type | Description |
|---|---|
Result[EncryptionService, EncryptionKeyError]
|
Success(EncryptionService) if key is valid. |
Result[EncryptionService, EncryptionKeyError]
|
Failure(EncryptionKeyError) if key is invalid. |
Example
key = os.urandom(32) match EncryptionService.create(key): ... case Success(service): ... # Use service ... case Failure(error): ... logger.error(f"Invalid key: {error.message}")
Source code in src/infrastructure/providers/encryption_service.py
encrypt
¶
Encrypt credentials dictionary to bytes.
Serializes the dictionary to JSON, then encrypts using AES-256-GCM with a random IV. The IV is prepended to the ciphertext.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Credentials dictionary to encrypt. Must be JSON-serializable. Typically contains access_token, refresh_token, etc. |
required |
Returns:
| Type | Description |
|---|---|
Result[bytes, EncryptionError]
|
Success(bytes) with encrypted data in format: IV (12 bytes) || ciphertext || auth_tag (16 bytes) |
Result[bytes, EncryptionError]
|
Failure(SerializationError) if data cannot be serialized. |
Result[bytes, EncryptionError]
|
Failure(EncryptionError) if encryption fails. |
Example
credentials = { ... "access_token": "abc123", ... "refresh_token": "xyz789", ... } match service.encrypt(credentials): ... case Success(encrypted): ... # Store encrypted bytes in database ... case Failure(error): ... # Handle error
Source code in src/infrastructure/providers/encryption_service.py
decrypt
¶
Decrypt bytes back to credentials dictionary.
Extracts the IV from the first 12 bytes, then decrypts the remaining ciphertext using AES-256-GCM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
encrypted
|
bytes
|
Encrypted bytes from encrypt(). |
required |
Returns:
| Type | Description |
|---|---|
Result[dict[str, Any], EncryptionError]
|
Success(dict) with original credentials dictionary. |
Result[dict[str, Any], EncryptionError]
|
Failure(DecryptionError) if decryption fails (wrong key, tampered). |
Result[dict[str, Any], EncryptionError]
|
Failure(SerializationError) if decrypted data is not valid JSON. |
Example
match service.decrypt(encrypted_bytes): ... case Success(credentials): ... access_token = credentials["access_token"] ... case Failure(error): ... # Handle error - may need user to re-authenticate