跳转至

Redis Streams

基于 Redis Streams 的消费者组抽象。

生产者

from redis_kit import StreamProducer

producer = StreamProducer(conn.sync_client, stream="orders", maxlen=10000)
msg_id = producer.add({"order_id": "123", "status": "created"})
print(producer.len())

消费者 — 自动 ACK

from redis_kit import StreamConsumer

consumer = StreamConsumer(
    conn.sync_client, stream="orders",
    group="processor", consumer_name="worker-1",
    auto_ack=True,
)
consumer.ensure_group()

for message in consumer.listen(count=10, block=5000):
    print(f"{message.id}: {message.data}")
    # Auto-ACK after each iteration

消费者 — 手动 ACK

consumer = StreamConsumer(
    conn.sync_client, stream="orders",
    group="processor", consumer_name="worker-2",
    auto_ack=False,
)
consumer.ensure_group()

for message in consumer.listen(count=10, block=5000):
    try:
        process(message.data)
        message.ack()
    except Exception:
        pass  # Recover via claim_stale later

死信恢复

stale = consumer.claim_stale(min_idle_ms=60000, count=10)
for msg in stale:
    handle_dead_letter(msg)
    msg.ack()

待处理消息

pending = consumer.pending(count=10)
for entry in pending:
    print(f"{entry['id']} idle {entry['idle_ms']}ms, delivered {entry['delivery_count']} times")

异步用法

from redis_kit import AsyncStreamProducer, AsyncStreamConsumer

producer = AsyncStreamProducer(conn.async_client, stream="orders")
await producer.add({"order_id": "456"})

consumer = AsyncStreamConsumer(conn.async_client, stream="orders", group="g", consumer_name="w")
await consumer.ensure_group()

# 自动 ACK
async for message in consumer.listen(count=10, block=5000):
    await process(message.data)

# 手动 ACK — 使用 async_ack()
consumer = AsyncStreamConsumer(conn.async_client, stream="orders", group="g", consumer_name="w", auto_ack=False)
await consumer.ensure_group()

async for message in consumer.listen(count=10, block=5000):
    try:
        await process(message.data)
        await message.async_ack()  # 异步环境使用 async_ack()
    except Exception:
        pass