Streams Consumer Group Patterns

Implement reliable message processing with Redis Streams consumer groups, handling failure recovery, poison pills, and memory management—the operational patterns that production systems require beyond basic XREADGROUP usage.

Prerequisite: See Streams and Event Sourcing for Stream basics and consumer group introduction.

The Mental Model Problem

Most Stream adoption failures stem from treating Streams like Lists. Key differences:

Behavior List (LPUSH/RPOP) Stream (XADD/XREADGROUP)
Read effect Destructive (removes item) Non-destructive (moves cursor)
Failure handling Data lost if consumer crashes Data stays until ACKed
Cleanup Automatic Explicit (XTRIM)

A Stream is an immutable log, not a work queue. The queue state (pending vs. completed) lives in Consumer Group metadata, not in the data itself.

The Pending Entries List (PEL)

When a consumer reads via XREADGROUP, messages enter the Pending Entries List. They stay pending until explicitly acknowledged with XACK.

The hidden cost: The PEL is a separate Radix tree structure. Millions of unacknowledged messages consume significant memory and degrade XREADGROUP performance.

Monitor PEL size:

XINFO GROUPS mystream

The pending field should be small relative to throughput. If it grows while processing appears healthy, consumers are failing to ACK.

The Startup Recovery Pattern

A critical anti-pattern: consumers that only request > (new messages).

The Problem:

XREADGROUP GROUP mygroup worker-1 STREAMS mystream >

If worker-1 crashes after receiving messages but before ACKing, those messages become zombies—stuck in the PEL, never redelivered.

The Correct Pattern (two-phase startup):

def consume():
    # Phase 1: Process any previously assigned but unacked messages
    while True:
        messages = redis.xreadgroup('mygroup', 'worker-1',
                                    {'mystream': '0'},  # Read PEL history
                                    count=100)
        if not messages or not messages[0][1]:
            break  # PEL is empty
        process_and_ack(messages)

    # Phase 2: Read new messages
    while True:
        messages = redis.xreadgroup('mygroup', 'worker-1',
                                    {'mystream': '>'},  # New messages only
                                    block=5000, count=100)
        process_and_ack(messages)

Using ID 0 queries the PEL for messages already assigned to this consumer. Only after clearing the backlog do you switch to >.

Automated Recovery with XAUTOCLAIM

When a consumer dies permanently, its messages remain orphaned. Other consumers must claim them.

Legacy approach (complex):

XPENDING mystream mygroup    # Find stale messages
XCLAIM mystream mygroup worker-2 60000 <message-id>   # Claim one by one

Modern pattern (Redis 6.2+):

XAUTOCLAIM mystream mygroup worker-2 60000 0-0 COUNT 10

This atomically scans and claims up to 10 messages idle for over 60 seconds. Implement a background "janitor" coroutine in every consumer:

async def janitor():
    while True:
        await asyncio.sleep(30)  # Run every 30 seconds
        redis.xautoclaim('mystream', 'mygroup', 'worker-1',
                        min_idle_time=60000,  # 60 second threshold
                        start='0-0', count=50)

This creates decentralized work-stealing—consumers automatically pick up slack from failed peers.

Poison Pill Handling

A poison pill is a message that consistently crashes consumers (malformed JSON, missing required fields, etc.).

The Infinite Loop:

  1. Consumer A reads message, crashes
  2. Message becomes idle, Consumer B auto-claims it
  3. Consumer B crashes
  4. Loop continues until all consumers are dead

Solution: Track delivery count and use a Dead Letter Queue

XPENDING mystream mygroup - + 10 consumer-1

Returns entries with a times-delivered field. Implement DLQ logic:

MAX_RETRIES = 3

def process_message(msg_id, data):
    pending_info = redis.xpending_range('mystream', 'mygroup',
                                        msg_id, msg_id, 1)
    if pending_info and pending_info[0]['times_delivered'] > MAX_RETRIES:
        # Move to DLQ instead of processing
        redis.xadd('mystream-dlq', {'original_id': msg_id, **data})
        redis.xack('mystream', 'mygroup', msg_id)
        return

    try:
        do_work(data)
        redis.xack('mystream', 'mygroup', msg_id)
    except Exception:
        # Don't ACK - will be retried
        raise

Redis has no built-in DLQ—this pattern must be implemented in application code.

Memory Management: XDEL vs XTRIM

The Anti-Pattern: Deleting messages after ACK with XDEL.

# Don't do this
XACK mystream mygroup 1234567890-0
XDEL mystream 1234567890-0

Why it's harmful: Streams use Radix trees with macro-nodes (listpacks). XDEL marks entries as deleted but doesn't free memory until the entire macro-node is empty. Heavy XDEL usage creates "Swiss cheese" fragmentation.

The Correct Pattern:

  1. Use XACK to mark messages as processed (affects Consumer Group state only)
  2. Use XTRIM to enforce retention policy (frees memory efficiently)
# Add with automatic trimming
XADD mystream MAXLEN ~ 100000 * field value

# Or periodically trim
XTRIM mystream MAXLEN ~ 100000

The ~ enables approximate trimming—Redis trims whole macro-nodes rather than exact counts, dramatically reducing CPU overhead.

MINID trimming (Redis 6.2+): Trim by time instead of count:

XTRIM mystream MINID ~ 1609459200000-0

Removes entries older than the specified ID (timestamp-based).

Blocking Read Pitfalls

Blocking XREADGROUP can exhaust connection pools.

The Deadlock Scenario:

# Connection pool size: 10
# Worker threads: 10
# Each worker does:
redis.xreadgroup('mygroup', 'worker-N', {'mystream': '>'}, block=5000)

All 10 connections are now blocked. Any other code needing Redis (even a simple GET) waits forever for a connection.

Solutions:

  1. Dedicated connections: Use separate connection instances for blocking consumers
  2. Shorter block times: Use 2-3 second blocks instead of infinite (BLOCK 0)
  3. Timeout coordination: Redis BLOCK timeout must be shorter than client socket timeout

Lag Monitoring

"Lag" in Streams is complex:

Ingestion Lag: Difference between newest Stream ID and last-delivered-id

XINFO GROUPS mystream
# Check 'last-delivered-id' vs stream's last ID

Processing Lag: PEL size—messages delivered but not ACKed

XINFO GROUPS mystream
# Check 'pending' count

Low ingestion lag with high PEL size indicates processing bottlenecks or ACK failures, not delivery problems.

Client-Side Sharding for Scale

A single Stream lives on one Redis shard. For extreme throughput (>100k ops/sec), the Stream becomes a hot spot.

Pattern: Partition by hash:

def get_stream_key(user_id):
    shard = hash(user_id) % 16
    return f'events:{{shard{shard}}}'

# Producer
redis.xadd(get_stream_key(user_id), {'event': 'login', 'user': user_id})

# Consumer groups needed for each shard
for i in range(16):
    redis.xgroup_create(f'events:{{shard{i}}}', 'processors', mkstream=True)

This mimics Kafka partitions but requires manual implementation.

Comparison with Kafka

Aspect Redis Streams Apache Kafka
Storage RAM (hours/days) Disk (months/years)
Latency Sub-millisecond Milliseconds
Partitioning Manual (client-side) Native
Consumer rebalancing Manual (XAUTOCLAIM) Automatic
Operational complexity Low High

Use Redis Streams for real-time inter-service communication and active job queues. Use Kafka for event sourcing with long retention and analytics pipelines.

Source

Redis Streams documentation, production post-mortems, and "You're Probably Thinking About Redis Streams Wrong" (redis.io/blog).


← Back to Index | Markdown source