# Streams Consumer Group Patterns Implement reliable message processing with Redis Streams consumer groups, handling failure recovery, poison pills, and memory management—the operational patterns that production systems require beyond basic XREADGROUP usage. > **Prerequisite:** See [Streams and Event Sourcing](streams-event-sourcing.md) for Stream basics and consumer group introduction. ## The Mental Model Problem Most Stream adoption failures stem from treating Streams like Lists. Key differences: | Behavior | List (LPUSH/RPOP) | Stream (XADD/XREADGROUP) | |----------|-------------------|--------------------------| | Read effect | Destructive (removes item) | Non-destructive (moves cursor) | | Failure handling | Data lost if consumer crashes | Data stays until ACKed | | Cleanup | Automatic | Explicit (XTRIM) | A Stream is an **immutable log**, not a work queue. The queue state (pending vs. completed) lives in Consumer Group metadata, not in the data itself. ## The Pending Entries List (PEL) When a consumer reads via XREADGROUP, messages enter the Pending Entries List. They stay pending until explicitly acknowledged with XACK. **The hidden cost:** The PEL is a separate Radix tree structure. Millions of unacknowledged messages consume significant memory and degrade XREADGROUP performance. Monitor PEL size: XINFO GROUPS mystream The `pending` field should be small relative to throughput. If it grows while processing appears healthy, consumers are failing to ACK. ## The Startup Recovery Pattern A critical anti-pattern: consumers that only request `>` (new messages). **The Problem:** XREADGROUP GROUP mygroup worker-1 STREAMS mystream > If worker-1 crashes after receiving messages but before ACKing, those messages become zombies—stuck in the PEL, never redelivered. **The Correct Pattern (two-phase startup):** ```python def consume(): # Phase 1: Process any previously assigned but unacked messages while True: messages = redis.xreadgroup('mygroup', 'worker-1', {'mystream': '0'}, # Read PEL history count=100) if not messages or not messages[0][1]: break # PEL is empty process_and_ack(messages) # Phase 2: Read new messages while True: messages = redis.xreadgroup('mygroup', 'worker-1', {'mystream': '>'}, # New messages only block=5000, count=100) process_and_ack(messages) ``` Using ID `0` queries the PEL for messages already assigned to this consumer. Only after clearing the backlog do you switch to `>`. ## Automated Recovery with XAUTOCLAIM When a consumer dies permanently, its messages remain orphaned. Other consumers must claim them. **Legacy approach (complex):** XPENDING mystream mygroup # Find stale messages XCLAIM mystream mygroup worker-2 60000 # Claim one by one **Modern pattern (Redis 6.2+):** XAUTOCLAIM mystream mygroup worker-2 60000 0-0 COUNT 10 This atomically scans and claims up to 10 messages idle for over 60 seconds. Implement a background "janitor" coroutine in every consumer: ```python async def janitor(): while True: await asyncio.sleep(30) # Run every 30 seconds redis.xautoclaim('mystream', 'mygroup', 'worker-1', min_idle_time=60000, # 60 second threshold start='0-0', count=50) ``` This creates decentralized work-stealing—consumers automatically pick up slack from failed peers. ## Poison Pill Handling A poison pill is a message that consistently crashes consumers (malformed JSON, missing required fields, etc.). **The Infinite Loop:** 1. Consumer A reads message, crashes 2. Message becomes idle, Consumer B auto-claims it 3. Consumer B crashes 4. Loop continues until all consumers are dead **Solution: Track delivery count and use a Dead Letter Queue** XPENDING mystream mygroup - + 10 consumer-1 Returns entries with a `times-delivered` field. Implement DLQ logic: ```python MAX_RETRIES = 3 def process_message(msg_id, data): pending_info = redis.xpending_range('mystream', 'mygroup', msg_id, msg_id, 1) if pending_info and pending_info[0]['times_delivered'] > MAX_RETRIES: # Move to DLQ instead of processing redis.xadd('mystream-dlq', {'original_id': msg_id, **data}) redis.xack('mystream', 'mygroup', msg_id) return try: do_work(data) redis.xack('mystream', 'mygroup', msg_id) except Exception: # Don't ACK - will be retried raise ``` Redis has no built-in DLQ—this pattern must be implemented in application code. ## Memory Management: XDEL vs XTRIM **The Anti-Pattern:** Deleting messages after ACK with XDEL. # Don't do this XACK mystream mygroup 1234567890-0 XDEL mystream 1234567890-0 **Why it's harmful:** Streams use Radix trees with macro-nodes (listpacks). XDEL marks entries as deleted but doesn't free memory until the entire macro-node is empty. Heavy XDEL usage creates "Swiss cheese" fragmentation. **The Correct Pattern:** 1. Use XACK to mark messages as processed (affects Consumer Group state only) 2. Use XTRIM to enforce retention policy (frees memory efficiently) ``` # Add with automatic trimming XADD mystream MAXLEN ~ 100000 * field value # Or periodically trim XTRIM mystream MAXLEN ~ 100000 ``` The `~` enables approximate trimming—Redis trims whole macro-nodes rather than exact counts, dramatically reducing CPU overhead. **MINID trimming (Redis 6.2+):** Trim by time instead of count: XTRIM mystream MINID ~ 1609459200000-0 Removes entries older than the specified ID (timestamp-based). ## Blocking Read Pitfalls Blocking XREADGROUP can exhaust connection pools. **The Deadlock Scenario:** ```python # Connection pool size: 10 # Worker threads: 10 # Each worker does: redis.xreadgroup('mygroup', 'worker-N', {'mystream': '>'}, block=5000) ``` All 10 connections are now blocked. Any other code needing Redis (even a simple GET) waits forever for a connection. **Solutions:** 1. **Dedicated connections:** Use separate connection instances for blocking consumers 2. **Shorter block times:** Use 2-3 second blocks instead of infinite (`BLOCK 0`) 3. **Timeout coordination:** Redis BLOCK timeout must be shorter than client socket timeout ## Lag Monitoring "Lag" in Streams is complex: **Ingestion Lag:** Difference between newest Stream ID and last-delivered-id XINFO GROUPS mystream # Check 'last-delivered-id' vs stream's last ID **Processing Lag:** PEL size—messages delivered but not ACKed XINFO GROUPS mystream # Check 'pending' count Low ingestion lag with high PEL size indicates processing bottlenecks or ACK failures, not delivery problems. ## Client-Side Sharding for Scale A single Stream lives on one Redis shard. For extreme throughput (>100k ops/sec), the Stream becomes a hot spot. **Pattern:** Partition by hash: ```python def get_stream_key(user_id): shard = hash(user_id) % 16 return f'events:{{shard{shard}}}' # Producer redis.xadd(get_stream_key(user_id), {'event': 'login', 'user': user_id}) # Consumer groups needed for each shard for i in range(16): redis.xgroup_create(f'events:{{shard{i}}}', 'processors', mkstream=True) ``` This mimics Kafka partitions but requires manual implementation. ## Comparison with Kafka | Aspect | Redis Streams | Apache Kafka | |--------|---------------|--------------| | Storage | RAM (hours/days) | Disk (months/years) | | Latency | Sub-millisecond | Milliseconds | | Partitioning | Manual (client-side) | Native | | Consumer rebalancing | Manual (XAUTOCLAIM) | Automatic | | Operational complexity | Low | High | Use Redis Streams for real-time inter-service communication and active job queues. Use Kafka for event sourcing with long retention and analytics pipelines. ## Source Redis Streams documentation, production post-mortems, and "You're Probably Thinking About Redis Streams Wrong" (redis.io/blog).