Skip to content

infrastructure.cache.redis_adapter

src.infrastructure.cache.redis_adapter

Redis adapter implementing CacheProtocol.

This adapter provides Redis-specific implementation of the cache protocol defined in the domain layer. It wraps the Redis client and handles all Redis-specific operations and error mapping.

Architecture: - Implements CacheProtocol without inheritance (structural typing) - Maps Redis exceptions to CacheError with proper ErrorCode - Returns Result types for all operations - Fail-open strategy for resilience

Classes

RedisAdapter

Redis implementation of CacheProtocol.

This adapter wraps an async Redis client and implements all cache operations defined in CacheProtocol. It handles Redis-specific exceptions and maps them to domain-appropriate errors.

Note: Does NOT inherit from CacheProtocol (uses structural typing).

Attributes:

Name Type Description
_redis

Async Redis client instance.

Source code in src/infrastructure/cache/redis_adapter.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
class RedisAdapter:
    """Redis implementation of CacheProtocol.

    This adapter wraps an async Redis client and implements all cache
    operations defined in CacheProtocol. It handles Redis-specific
    exceptions and maps them to domain-appropriate errors.

    Note: Does NOT inherit from CacheProtocol (uses structural typing).

    Attributes:
        _redis: Async Redis client instance.
    """

    def __init__(self, redis_client: Redis) -> None:
        """Initialize Redis adapter.

        Args:
            redis_client: Async Redis client instance.
        """
        self._redis = redis_client

    async def get(self, key: str) -> Result[str | None, CacheError]:
        """Get value from Redis.

        Args:
            key: Cache key.

        Returns:
            Result with value if found, None if not found, or CacheError.
        """
        try:
            value = await self._redis.get(key)
            # Redis returns bytes or None
            if value is None:
                return Success(value=None)
            decoded = value.decode("utf-8") if isinstance(value, bytes) else value
            return Success(value=decoded)
        except RedisError as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                    message=f"Failed to get key '{key}' from cache",
                    details={"key": key, "error": str(e)},
                )
            )
        except Exception as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                    message=f"Unexpected error getting key '{key}'",
                    details={"key": key, "error": str(e), "type": type(e).__name__},
                )
            )

    async def get_json(self, key: str) -> Result[dict[str, Any] | None, CacheError]:
        """Get JSON value from Redis.

        Args:
            key: Cache key.

        Returns:
            Result with parsed dict if found, None if not found, or CacheError.
        """
        result = await self.get(key)

        match result:
            case Success(value=None):
                return Success(value=None)
            case Success(value=val) if val is not None:
                try:
                    parsed = json.loads(val)
                    return Success(value=parsed)
                except json.JSONDecodeError as e:
                    return Failure(
                        error=CacheError(
                            code=ErrorCode.VALIDATION_FAILED,
                            infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                            message=f"Failed to parse JSON for key '{key}'",
                            details={"key": key, "error": str(e)},
                        )
                    )
            case Failure(error=err):
                return Failure(error=err)
            case _:
                # Unreachable but needed for type checker
                return Success(value=None)

    async def set(
        self,
        key: str,
        value: str,
        ttl: int | None = None,
    ) -> Result[None, CacheError]:
        """Set value in Redis.

        Args:
            key: Cache key.
            value: Value to cache.
            ttl: Time to live in seconds (None = no expiration).

        Returns:
            Result with None on success, or CacheError.
        """
        try:
            if ttl is not None:
                await self._redis.setex(key, ttl, value)
            else:
                await self._redis.set(key, value)
            return Success(value=None)
        except RedisError as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                    message=f"Failed to set key '{key}' in cache",
                    details={"key": key, "ttl": ttl, "error": str(e)},
                )
            )
        except Exception as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                    message=f"Unexpected error setting key '{key}'",
                    details={"key": key, "error": str(e), "type": type(e).__name__},
                )
            )

    async def set_json(
        self,
        key: str,
        value: dict[str, Any],
        ttl: int | None = None,
    ) -> Result[None, CacheError]:
        """Set JSON value in Redis.

        Args:
            key: Cache key.
            value: Dict to cache (will be JSON serialized).
            ttl: Time to live in seconds (None = no expiration).

        Returns:
            Result with None on success, or CacheError.
        """
        try:
            serialized = json.dumps(value)
            return await self.set(key, serialized, ttl)
        except (TypeError, ValueError) as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                    message=f"Failed to serialize value for key '{key}'",
                    details={"key": key, "error": str(e)},
                )
            )

    async def delete(self, key: str) -> Result[bool, CacheError]:
        """Delete key from Redis.

        Args:
            key: Cache key to delete.

        Returns:
            Result with True if deleted, False if key didn't exist, or CacheError.
        """
        try:
            deleted_count = await self._redis.delete(key)
            return Success(value=deleted_count > 0)
        except RedisError as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_DELETE_ERROR,
                    message=f"Failed to delete key '{key}' from cache",
                    details={"key": key, "error": str(e)},
                )
            )
        except Exception as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_DELETE_ERROR,
                    message=f"Unexpected error deleting key '{key}'",
                    details={"key": key, "error": str(e), "type": type(e).__name__},
                )
            )

    async def exists(self, key: str) -> Result[bool, CacheError]:
        """Check if key exists in Redis.

        Args:
            key: Cache key to check.

        Returns:
            Result with True if exists, False if not, or CacheError.
        """
        try:
            exists_count = await self._redis.exists(key)
            return Success(value=exists_count > 0)
        except RedisError as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                    message=f"Failed to check existence of key '{key}'",
                    details={"key": key, "error": str(e)},
                )
            )
        except Exception as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                    message=f"Unexpected error checking key '{key}'",
                    details={"key": key, "error": str(e), "type": type(e).__name__},
                )
            )

    async def expire(self, key: str, seconds: int) -> Result[bool, CacheError]:
        """Set expiration on key in Redis.

        Args:
            key: Cache key.
            seconds: Seconds until expiration.

        Returns:
            Result with True if timeout set, False if key doesn't exist, or CacheError.
        """
        try:
            was_set = await self._redis.expire(key, seconds)
            return Success(value=bool(was_set))
        except RedisError as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                    message=f"Failed to set expiration on key '{key}'",
                    details={"key": key, "seconds": seconds, "error": str(e)},
                )
            )
        except Exception as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                    message=f"Unexpected error setting expiration on key '{key}'",
                    details={"key": key, "error": str(e), "type": type(e).__name__},
                )
            )

    async def ttl(self, key: str) -> Result[int | None, CacheError]:
        """Get time to live for key in Redis.

        Args:
            key: Cache key.

        Returns:
            Result with seconds until expiration, None if no TTL or key doesn't exist, or CacheError.
        """
        try:
            ttl_value = await self._redis.ttl(key)
            # Redis returns -2 if key doesn't exist, -1 if no expiration
            if ttl_value == -2 or ttl_value == -1:
                return Success(value=None)
            return Success(value=ttl_value)
        except RedisError as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                    message=f"Failed to get TTL for key '{key}'",
                    details={"key": key, "error": str(e)},
                )
            )
        except Exception as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                    message=f"Unexpected error getting TTL for key '{key}'",
                    details={"key": key, "error": str(e), "type": type(e).__name__},
                )
            )

    async def increment(self, key: str, amount: int = 1) -> Result[int, CacheError]:
        """Increment value in Redis (atomic).

        Args:
            key: Cache key.
            amount: Amount to increment by.

        Returns:
            Result with new value after increment, or CacheError.
        """
        try:
            new_value = await self._redis.incrby(key, amount)
            return Success(value=new_value)
        except RedisError as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                    message=f"Failed to increment key '{key}'",
                    details={"key": key, "amount": amount, "error": str(e)},
                )
            )
        except Exception as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                    message=f"Unexpected error incrementing key '{key}'",
                    details={"key": key, "error": str(e), "type": type(e).__name__},
                )
            )

    async def decrement(self, key: str, amount: int = 1) -> Result[int, CacheError]:
        """Decrement value in Redis (atomic).

        Args:
            key: Cache key.
            amount: Amount to decrement by.

        Returns:
            Result with new value after decrement, or CacheError.
        """
        try:
            new_value = await self._redis.decrby(key, amount)
            return Success(value=new_value)
        except RedisError as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                    message=f"Failed to decrement key '{key}'",
                    details={"key": key, "amount": amount, "error": str(e)},
                )
            )
        except Exception as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                    message=f"Unexpected error decrementing key '{key}'",
                    details={"key": key, "error": str(e), "type": type(e).__name__},
                )
            )

    async def flush(self) -> Result[None, CacheError]:
        """Flush all keys from Redis.

        WARNING: Clears ALL cache data! Use only in tests.

        Returns:
            Result with None on success, or CacheError.
        """
        try:
            await self._redis.flushdb()
            return Success(value=None)
        except RedisError as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_DELETE_ERROR,
                    message="Failed to flush cache",
                    details={"error": str(e)},
                )
            )
        except Exception as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_DELETE_ERROR,
                    message="Unexpected error flushing cache",
                    details={"error": str(e), "type": type(e).__name__},
                )
            )

    async def ping(self) -> Result[bool, CacheError]:
        """Check Redis connectivity (health check).

        Returns:
            Result with True if Redis is reachable, or CacheError.
        """
        try:
            # ping() returns True if successful
            # Type ignore due to redis.asyncio ping() return type ambiguity
            await self._redis.ping()  # type: ignore[misc]
            return Success(value=True)
        except RedisError as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_CONNECTION_ERROR,
                    message="Redis health check failed",
                    details={"error": str(e)},
                )
            )
        except Exception as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_CONNECTION_ERROR,
                    message="Unexpected error during Redis health check",
                    details={"error": str(e), "type": type(e).__name__},
                )
            )

    async def delete_pattern(self, pattern: str) -> Result[int, CacheError]:
        """Delete all keys matching pattern.

        Uses Redis SCAN to find matching keys and DELETE to remove them.
        This is safer than KEYS + DELETE for large datasets.

        Args:
            pattern: Glob-style pattern (e.g., "authz:user123:*").

        Returns:
            Result with number of keys deleted, or CacheError.
        """
        try:
            deleted_count = 0
            # Use SCAN to iterate keys matching pattern (safer than KEYS)
            async for key in self._redis.scan_iter(match=pattern):
                await self._redis.delete(key)
                deleted_count += 1
            return Success(value=deleted_count)
        except RedisError as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_DELETE_ERROR,
                    message=f"Failed to delete keys matching pattern '{pattern}'",
                    details={"pattern": pattern, "error": str(e)},
                )
            )
        except Exception as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_DELETE_ERROR,
                    message=f"Unexpected error deleting keys matching '{pattern}'",
                    details={
                        "pattern": pattern,
                        "error": str(e),
                        "type": type(e).__name__,
                    },
                )
            )

    async def get_many(
        self, keys: list[str]
    ) -> Result[dict[str, str | None], CacheError]:
        """Get multiple values from Redis (batch operation).

        Uses Redis MGET for efficient batch retrieval.

        Args:
            keys: List of cache keys to retrieve.

        Returns:
            Result with dict mapping keys to values (None for missing), or CacheError.
        """
        if not keys:
            return Success(value={})

        try:
            values = await self._redis.mget(keys)
            result_dict: dict[str, str | None] = {}
            for key, value in zip(keys, values):
                if value is None:
                    result_dict[key] = None
                else:
                    decoded = (
                        value.decode("utf-8") if isinstance(value, bytes) else value
                    )
                    result_dict[key] = decoded
            return Success(value=result_dict)
        except RedisError as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                    message="Failed to get multiple keys from cache",
                    details={"keys": keys, "error": str(e)},
                )
            )
        except Exception as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                    message="Unexpected error getting multiple keys",
                    details={"keys": keys, "error": str(e), "type": type(e).__name__},
                )
            )

    async def set_many(
        self,
        mapping: dict[str, str],
        ttl: int | None = None,
    ) -> Result[None, CacheError]:
        """Set multiple values in Redis (batch operation).

        Uses Redis pipeline for atomic batch writes.

        Args:
            mapping: Dict of key->value pairs to cache.
            ttl: Time to live in seconds for all keys (None = no expiration).

        Returns:
            Result with None on success, or CacheError.
        """
        if not mapping:
            return Success(value=None)

        try:
            # Use pipeline for efficient batch operation
            async with self._redis.pipeline() as pipe:
                for key, value in mapping.items():
                    if ttl is not None:
                        pipe.setex(key, ttl, value)
                    else:
                        pipe.set(key, value)
                await pipe.execute()
            return Success(value=None)
        except RedisError as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                    message="Failed to set multiple keys in cache",
                    details={"keys": list(mapping.keys()), "error": str(e)},
                )
            )
        except Exception as e:
            return Failure(
                error=CacheError(
                    code=ErrorCode.VALIDATION_FAILED,
                    infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                    message="Unexpected error setting multiple keys",
                    details={
                        "keys": list(mapping.keys()),
                        "error": str(e),
                        "type": type(e).__name__,
                    },
                )
            )
Functions
__init__
__init__(redis_client: Redis) -> None

Parameters:

Name Type Description Default
redis_client Redis

Async Redis client instance.

required
Source code in src/infrastructure/cache/redis_adapter.py
def __init__(self, redis_client: Redis) -> None:
    """Initialize Redis adapter.

    Args:
        redis_client: Async Redis client instance.
    """
    self._redis = redis_client
get async
get(key: str) -> Result[str | None, CacheError]

Get value from Redis.

Parameters:

Name Type Description Default
key str

Cache key.

required

Returns:

Type Description
Result[str | None, CacheError]

Result with value if found, None if not found, or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def get(self, key: str) -> Result[str | None, CacheError]:
    """Get value from Redis.

    Args:
        key: Cache key.

    Returns:
        Result with value if found, None if not found, or CacheError.
    """
    try:
        value = await self._redis.get(key)
        # Redis returns bytes or None
        if value is None:
            return Success(value=None)
        decoded = value.decode("utf-8") if isinstance(value, bytes) else value
        return Success(value=decoded)
    except RedisError as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                message=f"Failed to get key '{key}' from cache",
                details={"key": key, "error": str(e)},
            )
        )
    except Exception as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                message=f"Unexpected error getting key '{key}'",
                details={"key": key, "error": str(e), "type": type(e).__name__},
            )
        )
get_json async
get_json(
    key: str,
) -> Result[dict[str, Any] | None, CacheError]

Get JSON value from Redis.

Parameters:

Name Type Description Default
key str

Cache key.

required

Returns:

Type Description
Result[dict[str, Any] | None, CacheError]

Result with parsed dict if found, None if not found, or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def get_json(self, key: str) -> Result[dict[str, Any] | None, CacheError]:
    """Get JSON value from Redis.

    Args:
        key: Cache key.

    Returns:
        Result with parsed dict if found, None if not found, or CacheError.
    """
    result = await self.get(key)

    match result:
        case Success(value=None):
            return Success(value=None)
        case Success(value=val) if val is not None:
            try:
                parsed = json.loads(val)
                return Success(value=parsed)
            except json.JSONDecodeError as e:
                return Failure(
                    error=CacheError(
                        code=ErrorCode.VALIDATION_FAILED,
                        infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                        message=f"Failed to parse JSON for key '{key}'",
                        details={"key": key, "error": str(e)},
                    )
                )
        case Failure(error=err):
            return Failure(error=err)
        case _:
            # Unreachable but needed for type checker
            return Success(value=None)
set async
set(
    key: str, value: str, ttl: int | None = None
) -> Result[None, CacheError]

Set value in Redis.

Parameters:

Name Type Description Default
key str

Cache key.

required
value str

Value to cache.

required
ttl int | None

Time to live in seconds (None = no expiration).

None

Returns:

Type Description
Result[None, CacheError]

Result with None on success, or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def set(
    self,
    key: str,
    value: str,
    ttl: int | None = None,
) -> Result[None, CacheError]:
    """Set value in Redis.

    Args:
        key: Cache key.
        value: Value to cache.
        ttl: Time to live in seconds (None = no expiration).

    Returns:
        Result with None on success, or CacheError.
    """
    try:
        if ttl is not None:
            await self._redis.setex(key, ttl, value)
        else:
            await self._redis.set(key, value)
        return Success(value=None)
    except RedisError as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                message=f"Failed to set key '{key}' in cache",
                details={"key": key, "ttl": ttl, "error": str(e)},
            )
        )
    except Exception as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                message=f"Unexpected error setting key '{key}'",
                details={"key": key, "error": str(e), "type": type(e).__name__},
            )
        )
set_json async
set_json(
    key: str, value: dict[str, Any], ttl: int | None = None
) -> Result[None, CacheError]

Set JSON value in Redis.

Parameters:

Name Type Description Default
key str

Cache key.

required
value dict[str, Any]

Dict to cache (will be JSON serialized).

required
ttl int | None

Time to live in seconds (None = no expiration).

None

Returns:

Type Description
Result[None, CacheError]

Result with None on success, or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def set_json(
    self,
    key: str,
    value: dict[str, Any],
    ttl: int | None = None,
) -> Result[None, CacheError]:
    """Set JSON value in Redis.

    Args:
        key: Cache key.
        value: Dict to cache (will be JSON serialized).
        ttl: Time to live in seconds (None = no expiration).

    Returns:
        Result with None on success, or CacheError.
    """
    try:
        serialized = json.dumps(value)
        return await self.set(key, serialized, ttl)
    except (TypeError, ValueError) as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                message=f"Failed to serialize value for key '{key}'",
                details={"key": key, "error": str(e)},
            )
        )
delete async
delete(key: str) -> Result[bool, CacheError]

Delete key from Redis.

Parameters:

Name Type Description Default
key str

Cache key to delete.

required

Returns:

Type Description
Result[bool, CacheError]

Result with True if deleted, False if key didn't exist, or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def delete(self, key: str) -> Result[bool, CacheError]:
    """Delete key from Redis.

    Args:
        key: Cache key to delete.

    Returns:
        Result with True if deleted, False if key didn't exist, or CacheError.
    """
    try:
        deleted_count = await self._redis.delete(key)
        return Success(value=deleted_count > 0)
    except RedisError as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_DELETE_ERROR,
                message=f"Failed to delete key '{key}' from cache",
                details={"key": key, "error": str(e)},
            )
        )
    except Exception as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_DELETE_ERROR,
                message=f"Unexpected error deleting key '{key}'",
                details={"key": key, "error": str(e), "type": type(e).__name__},
            )
        )
exists async
exists(key: str) -> Result[bool, CacheError]

Check if key exists in Redis.

Parameters:

Name Type Description Default
key str

Cache key to check.

required

Returns:

Type Description
Result[bool, CacheError]

Result with True if exists, False if not, or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def exists(self, key: str) -> Result[bool, CacheError]:
    """Check if key exists in Redis.

    Args:
        key: Cache key to check.

    Returns:
        Result with True if exists, False if not, or CacheError.
    """
    try:
        exists_count = await self._redis.exists(key)
        return Success(value=exists_count > 0)
    except RedisError as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                message=f"Failed to check existence of key '{key}'",
                details={"key": key, "error": str(e)},
            )
        )
    except Exception as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                message=f"Unexpected error checking key '{key}'",
                details={"key": key, "error": str(e), "type": type(e).__name__},
            )
        )
expire async
expire(key: str, seconds: int) -> Result[bool, CacheError]

Set expiration on key in Redis.

Parameters:

Name Type Description Default
key str

Cache key.

required
seconds int

Seconds until expiration.

required

Returns:

Type Description
Result[bool, CacheError]

Result with True if timeout set, False if key doesn't exist, or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def expire(self, key: str, seconds: int) -> Result[bool, CacheError]:
    """Set expiration on key in Redis.

    Args:
        key: Cache key.
        seconds: Seconds until expiration.

    Returns:
        Result with True if timeout set, False if key doesn't exist, or CacheError.
    """
    try:
        was_set = await self._redis.expire(key, seconds)
        return Success(value=bool(was_set))
    except RedisError as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                message=f"Failed to set expiration on key '{key}'",
                details={"key": key, "seconds": seconds, "error": str(e)},
            )
        )
    except Exception as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                message=f"Unexpected error setting expiration on key '{key}'",
                details={"key": key, "error": str(e), "type": type(e).__name__},
            )
        )
ttl async
ttl(key: str) -> Result[int | None, CacheError]

Get time to live for key in Redis.

Parameters:

Name Type Description Default
key str

Cache key.

required

Returns:

Type Description
Result[int | None, CacheError]

Result with seconds until expiration, None if no TTL or key doesn't exist, or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def ttl(self, key: str) -> Result[int | None, CacheError]:
    """Get time to live for key in Redis.

    Args:
        key: Cache key.

    Returns:
        Result with seconds until expiration, None if no TTL or key doesn't exist, or CacheError.
    """
    try:
        ttl_value = await self._redis.ttl(key)
        # Redis returns -2 if key doesn't exist, -1 if no expiration
        if ttl_value == -2 or ttl_value == -1:
            return Success(value=None)
        return Success(value=ttl_value)
    except RedisError as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                message=f"Failed to get TTL for key '{key}'",
                details={"key": key, "error": str(e)},
            )
        )
    except Exception as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                message=f"Unexpected error getting TTL for key '{key}'",
                details={"key": key, "error": str(e), "type": type(e).__name__},
            )
        )
increment async
increment(
    key: str, amount: int = 1
) -> Result[int, CacheError]

Increment value in Redis (atomic).

Parameters:

Name Type Description Default
key str

Cache key.

required
amount int

Amount to increment by.

1

Returns:

Type Description
Result[int, CacheError]

Result with new value after increment, or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def increment(self, key: str, amount: int = 1) -> Result[int, CacheError]:
    """Increment value in Redis (atomic).

    Args:
        key: Cache key.
        amount: Amount to increment by.

    Returns:
        Result with new value after increment, or CacheError.
    """
    try:
        new_value = await self._redis.incrby(key, amount)
        return Success(value=new_value)
    except RedisError as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                message=f"Failed to increment key '{key}'",
                details={"key": key, "amount": amount, "error": str(e)},
            )
        )
    except Exception as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                message=f"Unexpected error incrementing key '{key}'",
                details={"key": key, "error": str(e), "type": type(e).__name__},
            )
        )
decrement async
decrement(
    key: str, amount: int = 1
) -> Result[int, CacheError]

Decrement value in Redis (atomic).

Parameters:

Name Type Description Default
key str

Cache key.

required
amount int

Amount to decrement by.

1

Returns:

Type Description
Result[int, CacheError]

Result with new value after decrement, or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def decrement(self, key: str, amount: int = 1) -> Result[int, CacheError]:
    """Decrement value in Redis (atomic).

    Args:
        key: Cache key.
        amount: Amount to decrement by.

    Returns:
        Result with new value after decrement, or CacheError.
    """
    try:
        new_value = await self._redis.decrby(key, amount)
        return Success(value=new_value)
    except RedisError as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                message=f"Failed to decrement key '{key}'",
                details={"key": key, "amount": amount, "error": str(e)},
            )
        )
    except Exception as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                message=f"Unexpected error decrementing key '{key}'",
                details={"key": key, "error": str(e), "type": type(e).__name__},
            )
        )
flush async
flush() -> Result[None, CacheError]

Flush all keys from Redis.

WARNING: Clears ALL cache data! Use only in tests.

Returns:

Type Description
Result[None, CacheError]

Result with None on success, or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def flush(self) -> Result[None, CacheError]:
    """Flush all keys from Redis.

    WARNING: Clears ALL cache data! Use only in tests.

    Returns:
        Result with None on success, or CacheError.
    """
    try:
        await self._redis.flushdb()
        return Success(value=None)
    except RedisError as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_DELETE_ERROR,
                message="Failed to flush cache",
                details={"error": str(e)},
            )
        )
    except Exception as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_DELETE_ERROR,
                message="Unexpected error flushing cache",
                details={"error": str(e), "type": type(e).__name__},
            )
        )
ping async
ping() -> Result[bool, CacheError]

Check Redis connectivity (health check).

Returns:

Type Description
Result[bool, CacheError]

Result with True if Redis is reachable, or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def ping(self) -> Result[bool, CacheError]:
    """Check Redis connectivity (health check).

    Returns:
        Result with True if Redis is reachable, or CacheError.
    """
    try:
        # ping() returns True if successful
        # Type ignore due to redis.asyncio ping() return type ambiguity
        await self._redis.ping()  # type: ignore[misc]
        return Success(value=True)
    except RedisError as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_CONNECTION_ERROR,
                message="Redis health check failed",
                details={"error": str(e)},
            )
        )
    except Exception as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_CONNECTION_ERROR,
                message="Unexpected error during Redis health check",
                details={"error": str(e), "type": type(e).__name__},
            )
        )
delete_pattern async
delete_pattern(pattern: str) -> Result[int, CacheError]

Delete all keys matching pattern.

Uses Redis SCAN to find matching keys and DELETE to remove them. This is safer than KEYS + DELETE for large datasets.

Parameters:

Name Type Description Default
pattern str

Glob-style pattern (e.g., "authz:user123:*").

required

Returns:

Type Description
Result[int, CacheError]

Result with number of keys deleted, or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def delete_pattern(self, pattern: str) -> Result[int, CacheError]:
    """Delete all keys matching pattern.

    Uses Redis SCAN to find matching keys and DELETE to remove them.
    This is safer than KEYS + DELETE for large datasets.

    Args:
        pattern: Glob-style pattern (e.g., "authz:user123:*").

    Returns:
        Result with number of keys deleted, or CacheError.
    """
    try:
        deleted_count = 0
        # Use SCAN to iterate keys matching pattern (safer than KEYS)
        async for key in self._redis.scan_iter(match=pattern):
            await self._redis.delete(key)
            deleted_count += 1
        return Success(value=deleted_count)
    except RedisError as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_DELETE_ERROR,
                message=f"Failed to delete keys matching pattern '{pattern}'",
                details={"pattern": pattern, "error": str(e)},
            )
        )
    except Exception as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_DELETE_ERROR,
                message=f"Unexpected error deleting keys matching '{pattern}'",
                details={
                    "pattern": pattern,
                    "error": str(e),
                    "type": type(e).__name__,
                },
            )
        )
get_many async
get_many(
    keys: list[str],
) -> Result[dict[str, str | None], CacheError]

Get multiple values from Redis (batch operation).

Uses Redis MGET for efficient batch retrieval.

Parameters:

Name Type Description Default
keys list[str]

List of cache keys to retrieve.

required

Returns:

Type Description
Result[dict[str, str | None], CacheError]

Result with dict mapping keys to values (None for missing), or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def get_many(
    self, keys: list[str]
) -> Result[dict[str, str | None], CacheError]:
    """Get multiple values from Redis (batch operation).

    Uses Redis MGET for efficient batch retrieval.

    Args:
        keys: List of cache keys to retrieve.

    Returns:
        Result with dict mapping keys to values (None for missing), or CacheError.
    """
    if not keys:
        return Success(value={})

    try:
        values = await self._redis.mget(keys)
        result_dict: dict[str, str | None] = {}
        for key, value in zip(keys, values):
            if value is None:
                result_dict[key] = None
            else:
                decoded = (
                    value.decode("utf-8") if isinstance(value, bytes) else value
                )
                result_dict[key] = decoded
        return Success(value=result_dict)
    except RedisError as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                message="Failed to get multiple keys from cache",
                details={"keys": keys, "error": str(e)},
            )
        )
    except Exception as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_GET_ERROR,
                message="Unexpected error getting multiple keys",
                details={"keys": keys, "error": str(e), "type": type(e).__name__},
            )
        )
set_many async
set_many(
    mapping: dict[str, str], ttl: int | None = None
) -> Result[None, CacheError]

Set multiple values in Redis (batch operation).

Uses Redis pipeline for atomic batch writes.

Parameters:

Name Type Description Default
mapping dict[str, str]

Dict of key->value pairs to cache.

required
ttl int | None

Time to live in seconds for all keys (None = no expiration).

None

Returns:

Type Description
Result[None, CacheError]

Result with None on success, or CacheError.

Source code in src/infrastructure/cache/redis_adapter.py
async def set_many(
    self,
    mapping: dict[str, str],
    ttl: int | None = None,
) -> Result[None, CacheError]:
    """Set multiple values in Redis (batch operation).

    Uses Redis pipeline for atomic batch writes.

    Args:
        mapping: Dict of key->value pairs to cache.
        ttl: Time to live in seconds for all keys (None = no expiration).

    Returns:
        Result with None on success, or CacheError.
    """
    if not mapping:
        return Success(value=None)

    try:
        # Use pipeline for efficient batch operation
        async with self._redis.pipeline() as pipe:
            for key, value in mapping.items():
                if ttl is not None:
                    pipe.setex(key, ttl, value)
                else:
                    pipe.set(key, value)
            await pipe.execute()
        return Success(value=None)
    except RedisError as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                message="Failed to set multiple keys in cache",
                details={"keys": list(mapping.keys()), "error": str(e)},
            )
        )
    except Exception as e:
        return Failure(
            error=CacheError(
                code=ErrorCode.VALIDATION_FAILED,
                infrastructure_code=InfrastructureErrorCode.CACHE_SET_ERROR,
                message="Unexpected error setting multiple keys",
                details={
                    "keys": list(mapping.keys()),
                    "error": str(e),
                    "type": type(e).__name__,
                },
            )
        )