Cache¶
Redis cache with serialization, compression, TTL jitter, and fallback policy support.
Basic Usage¶
from redis_kit import Cache, ConnectionManager
conn = ConnectionManager(url="redis://localhost:6379/0")
cache = Cache(conn.sync_client, prefix="myapp:cache")
cache.set("user:1", {"name": "Alice"}, ttl="2h30m")
user = cache.get("user:1")
cache.delete("user:1")
TTL Management¶
cache.set("key", "value", ttl=3600)
cache.ttl("key") # Remaining seconds
cache.pttl("key") # Remaining milliseconds
cache.persist("key") # Remove expiration
cache.expire("key", 600) # Reset TTL
Supports string-format TTL: "2h30m", "1d", "30s". Negative values raise ValueError.
The full string must be valid; trailing garbage such as "1hfoo" also raises ValueError.
Cache-Aside Pattern¶
Batch Operations¶
Pattern-Based Operations (SCAN-based)¶
Bound Operations¶
user_cache = cache.bind("user:1")
user_cache.set({"name": "Alice"}, ttl=3600)
user_cache.get()
user_cache.ttl()
@cached Decorator¶
from redis_kit import cached
@cached(conn.sync_client, key="user:{user_id}", ttl="1h")
def get_user(user_id: int) -> dict:
return db.query_user(user_id)
# Async (auto-detected)
@cached(conn.async_client, key="product:{pid}", ttl=3600)
async def get_product(pid: int) -> dict:
return await db.query_product(pid)
Error Degradation¶
When Redis is unavailable, on_error="execute" skips the cache and runs the function directly:
@cached(conn.sync_client, key="user:{user_id}", ttl="1h", on_error="execute")
def get_user(user_id: int) -> dict:
return db.query_user(user_id)
Callable Key/TTL/Bypass¶
@cached(
conn.sync_client,
key=lambda uid: f"user:{uid}",
ttl=lambda uid: 3600 if uid < 100 else 300,
bypass=lambda uid, force=False: force,
)
def get_user(uid: int, force: bool = False) -> dict:
...
Cache Invalidation¶
@cached(conn.sync_client, key="user:{user_id}", ttl="1h")
def get_user(user_id: int) -> dict:
return db.query_user(user_id)
# Invalidate cache for specific arguments
get_user.invalidate(user_id=1)
# Async version
@cached(conn.async_client, key="product:{pid}", ttl=3600)
async def get_product(pid: int) -> dict:
return await db.query_product(pid)
await get_product.invalidate(pid=42)
Cache Penetration Protection (None Caching)¶
Fallback Policy¶
Configure degradation strategies for Redis connection failures instead of raising exceptions.
from redis_kit import Cache, FallbackPolicy
# Strategy 1: Silent degradation, return None
policy = FallbackPolicy(on_connection_error="return_none")
cache = Cache(conn.sync_client, prefix="myapp", fallback_policy=policy)
# Strategy 2: Custom callback
def my_fallback(command, key, error):
return {"from": "local_cache"}
policy = FallbackPolicy(on_connection_error="callback", fallback=my_fallback)
cache = Cache(conn.sync_client, prefix="myapp", fallback_policy=policy)
See Exception Handling - Fallback Policy for details.
Hooks (Observability)¶
All Cache operations (get, set, delete, get_many, set_many, delete_pattern) support the full hook lifecycle: before → after (on success) / error (on failure). A failing hook will not break the cache operation — errors are logged instead.
from redis_kit import Cache
from redis_kit.observability import OpenTelemetryHook, MetricsCollector
cache = Cache(
conn.sync_client,
prefix="myapp",
hooks=[OpenTelemetryHook(), MetricsCollector()],
)