Redis Streams¶
Consumer group abstraction built on Redis Streams.
Producer¶
from redis_kit import StreamProducer
producer = StreamProducer(conn.sync_client, stream="orders", maxlen=10000)
msg_id = producer.add({"order_id": "123", "status": "created"})
print(producer.len())
Consumer -- Auto ACK¶
from redis_kit import StreamConsumer
consumer = StreamConsumer(
conn.sync_client, stream="orders",
group="processor", consumer_name="worker-1",
auto_ack=True,
)
consumer.ensure_group()
for message in consumer.listen(count=10, block=5000):
print(f"{message.id}: {message.data}")
# Auto-ACK after each iteration
Consumer -- Manual ACK¶
consumer = StreamConsumer(
conn.sync_client, stream="orders",
group="processor", consumer_name="worker-2",
auto_ack=False,
)
consumer.ensure_group()
for message in consumer.listen(count=10, block=5000):
try:
process(message.data)
message.ack()
except Exception:
pass # Recover via claim_stale later
Dead-Letter Recovery¶
stale = consumer.claim_stale(min_idle_ms=60000, count=10)
for msg in stale:
handle_dead_letter(msg)
msg.ack()
Pending Messages¶
pending = consumer.pending(count=10)
for entry in pending:
print(f"{entry['id']} idle {entry['idle_ms']}ms, delivered {entry['delivery_count']} times")
Async Usage¶
from redis_kit import AsyncStreamProducer, AsyncStreamConsumer
producer = AsyncStreamProducer(conn.async_client, stream="orders")
await producer.add({"order_id": "456"})
consumer = AsyncStreamConsumer(conn.async_client, stream="orders", group="g", consumer_name="w")
await consumer.ensure_group()
# Auto ACK
async for message in consumer.listen(count=10, block=5000):
await process(message.data)
# Manual ACK — use async_ack()
consumer = AsyncStreamConsumer(conn.async_client, stream="orders", group="g", consumer_name="w", auto_ack=False)
await consumer.ensure_group()
async for message in consumer.listen(count=10, block=5000):
try:
await process(message.data)
await message.async_ack() # Use async_ack() in async context
except Exception:
pass