Implement reliable message processing with Redis Streams consumer groups, handling failure recovery, poison pills, and memory management—the operational patterns that production systems require beyond basic XREADGROUP usage.
Prerequisite: See Streams and Event Sourcing for Stream basics and consumer group introduction.
Most Stream adoption failures stem from treating Streams like Lists. Key differences:
| Behavior | List (LPUSH/RPOP) | Stream (XADD/XREADGROUP) |
|---|---|---|
| Read effect | Destructive (removes item) | Non-destructive (moves cursor) |
| Failure handling | Data lost if consumer crashes | Data stays until ACKed |
| Cleanup | Automatic | Explicit (XTRIM) |
A Stream is an immutable log, not a work queue. The queue state (pending vs. completed) lives in Consumer Group metadata, not in the data itself.
When a consumer reads via XREADGROUP, messages enter the Pending Entries List. They stay pending until explicitly acknowledged with XACK.
The hidden cost: The PEL is a separate Radix tree structure. Millions of unacknowledged messages consume significant memory and degrade XREADGROUP performance.
Monitor PEL size:
XINFO GROUPS mystream
The pending field should be small relative to throughput. If it grows while processing appears healthy, consumers are failing to ACK.
A critical anti-pattern: consumers that only request > (new messages).
The Problem:
XREADGROUP GROUP mygroup worker-1 STREAMS mystream >
If worker-1 crashes after receiving messages but before ACKing, those messages become zombies—stuck in the PEL, never redelivered.
The Correct Pattern (two-phase startup):
def consume():
# Phase 1: Process any previously assigned but unacked messages
while True:
messages = redis.xreadgroup('mygroup', 'worker-1',
{'mystream': '0'}, # Read PEL history
count=100)
if not messages or not messages[0][1]:
break # PEL is empty
process_and_ack(messages)
# Phase 2: Read new messages
while True:
messages = redis.xreadgroup('mygroup', 'worker-1',
{'mystream': '>'}, # New messages only
block=5000, count=100)
process_and_ack(messages)
Using ID 0 queries the PEL for messages already assigned to this consumer. Only after clearing the backlog do you switch to >.
When a consumer dies permanently, its messages remain orphaned. Other consumers must claim them.
Legacy approach (complex):
XPENDING mystream mygroup # Find stale messages
XCLAIM mystream mygroup worker-2 60000 <message-id> # Claim one by one
Modern pattern (Redis 6.2+):
XAUTOCLAIM mystream mygroup worker-2 60000 0-0 COUNT 10
This atomically scans and claims up to 10 messages idle for over 60 seconds. Implement a background "janitor" coroutine in every consumer:
async def janitor():
while True:
await asyncio.sleep(30) # Run every 30 seconds
redis.xautoclaim('mystream', 'mygroup', 'worker-1',
min_idle_time=60000, # 60 second threshold
start='0-0', count=50)
This creates decentralized work-stealing—consumers automatically pick up slack from failed peers.
A poison pill is a message that consistently crashes consumers (malformed JSON, missing required fields, etc.).
The Infinite Loop:
Solution: Track delivery count and use a Dead Letter Queue
XPENDING mystream mygroup - + 10 consumer-1
Returns entries with a times-delivered field. Implement DLQ logic:
MAX_RETRIES = 3
def process_message(msg_id, data):
pending_info = redis.xpending_range('mystream', 'mygroup',
msg_id, msg_id, 1)
if pending_info and pending_info[0]['times_delivered'] > MAX_RETRIES:
# Move to DLQ instead of processing
redis.xadd('mystream-dlq', {'original_id': msg_id, **data})
redis.xack('mystream', 'mygroup', msg_id)
return
try:
do_work(data)
redis.xack('mystream', 'mygroup', msg_id)
except Exception:
# Don't ACK - will be retried
raise
Redis has no built-in DLQ—this pattern must be implemented in application code.
The Anti-Pattern: Deleting messages after ACK with XDEL.
# Don't do this
XACK mystream mygroup 1234567890-0
XDEL mystream 1234567890-0
Why it's harmful: Streams use Radix trees with macro-nodes (listpacks). XDEL marks entries as deleted but doesn't free memory until the entire macro-node is empty. Heavy XDEL usage creates "Swiss cheese" fragmentation.
The Correct Pattern:
# Add with automatic trimming
XADD mystream MAXLEN ~ 100000 * field value
# Or periodically trim
XTRIM mystream MAXLEN ~ 100000
The ~ enables approximate trimming—Redis trims whole macro-nodes rather than exact counts, dramatically reducing CPU overhead.
MINID trimming (Redis 6.2+): Trim by time instead of count:
XTRIM mystream MINID ~ 1609459200000-0
Removes entries older than the specified ID (timestamp-based).
Blocking XREADGROUP can exhaust connection pools.
The Deadlock Scenario:
# Connection pool size: 10
# Worker threads: 10
# Each worker does:
redis.xreadgroup('mygroup', 'worker-N', {'mystream': '>'}, block=5000)
All 10 connections are now blocked. Any other code needing Redis (even a simple GET) waits forever for a connection.
Solutions:
BLOCK 0)"Lag" in Streams is complex:
Ingestion Lag: Difference between newest Stream ID and last-delivered-id
XINFO GROUPS mystream
# Check 'last-delivered-id' vs stream's last ID
Processing Lag: PEL size—messages delivered but not ACKed
XINFO GROUPS mystream
# Check 'pending' count
Low ingestion lag with high PEL size indicates processing bottlenecks or ACK failures, not delivery problems.
A single Stream lives on one Redis shard. For extreme throughput (>100k ops/sec), the Stream becomes a hot spot.
Pattern: Partition by hash:
def get_stream_key(user_id):
shard = hash(user_id) % 16
return f'events:{{shard{shard}}}'
# Producer
redis.xadd(get_stream_key(user_id), {'event': 'login', 'user': user_id})
# Consumer groups needed for each shard
for i in range(16):
redis.xgroup_create(f'events:{{shard{i}}}', 'processors', mkstream=True)
This mimics Kafka partitions but requires manual implementation.
| Aspect | Redis Streams | Apache Kafka |
|---|---|---|
| Storage | RAM (hours/days) | Disk (months/years) |
| Latency | Sub-millisecond | Milliseconds |
| Partitioning | Manual (client-side) | Native |
| Consumer rebalancing | Manual (XAUTOCLAIM) | Automatic |
| Operational complexity | Low | High |
Use Redis Streams for real-time inter-service communication and active job queues. Use Kafka for event sourcing with long retention and analytics pipelines.
Redis Streams documentation, production post-mortems, and "You're Probably Thinking About Redis Streams Wrong" (redis.io/blog).