Consumer Reliability
Table of Contents
Group Coordination: Liveness, Rebalance, and Static Membership #
A consumer group is not a passive subscription — it is a distributed coordination protocol. Every consumer in the group participates in an ongoing negotiation with the broker-side group coordinator: proving it is alive, claiming partition ownership, and committing its position. When this negotiation breaks down — through misconfigured timeouts, aggressive rebalances, or transient network pauses — the consumer stops processing and the group stalls. Understanding the mechanics of this protocol is what separates a consumer that is merely connected from one that is reliably processing.
Liveness: Two Timeouts, Two Failure Modes #
The group coordinator uses two independent timeouts to assess consumer health. Conflating them is a common source of spurious rebalances.
session.timeout.ms— the maximum time the coordinator waits for a heartbeat before declaring the consumer dead. Heartbeats are sent by a background thread independently of message processing. A consumer that is alive but not processing will still send heartbeats.max.poll.interval.ms— the maximum time between two consecutive calls topoll(). If the application takes too long processing a batch — due to a slow database write, a downstream API call, or a large batch — and misses this deadline, the consumer is evicted from the group even if its heartbeat thread is healthy.
These two timeouts create two distinct failure modes:
Process crash — heartbeats stop. Coordinator detects absence within
session.timeout.ms. Rebalance triggered. Partitions reassigned. Recovery is fast.Processing stall — heartbeats continue. The coordinator sees a healthy consumer. But
poll()is not being called. Aftermax.poll.interval.msexpires, the consumer is evicted. The application may not notice until the nextpoll()call returns aRebalanceInProgressException.
The correct configuration is not to set both timeouts as low as possible. A low session.timeout.ms causes spurious rebalances during minor network hiccups. A low max.poll.interval.ms causes evictions during legitimate slow processing. The right approach is to size max.poll.interval.ms to the worst-case processing time for a single batch, and keep session.timeout.ms large enough to tolerate brief network delays — typically 3× the heartbeat interval.
Rebalance Protocol: Eager vs Cooperative #
When the group membership changes — a consumer joins, leaves, or is evicted — the coordinator triggers a rebalance. How partitions are revoked and reassigned determines how long the group stops processing.
Eager rebalance (the default in older clients) is a stop-the-world protocol:
- All consumers receive a rebalance signal and revoke all their partitions.
- All consumers rejoin the group.
- The coordinator assigns partitions.
- Consumers resume processing.
Every rebalance causes a full processing pause across the entire group, regardless of how many partitions actually changed ownership.
Cooperative rebalance (incremental rebalance) is a two-phase protocol:
- The coordinator identifies which partitions need to move.
- Only the consumers holding those specific partitions revoke them.
- A second, smaller rebalance assigns the revoked partitions to their new owners.
- Consumers that retained their partitions never stop processing.
The tradeoff is two round-trips instead of one — slightly more coordination overhead per rebalance, but dramatically less processing disruption. For groups with many partitions and stable workloads, Cooperative rebalance is the correct default.
Static Membership: Eliminating Rebalances on Restart #
By default, every time a consumer restarts — a rolling deployment, a pod restart in Kubernetes, a process crash and recovery — it joins the group as a new member with a new member ID. This triggers a full rebalance even though the same logical consumer will end up with the same partitions.
Static membership (KIP-345) eliminates this. By assigning each consumer a stable group.instance.id, the coordinator recognizes a rejoining consumer as the same logical member. If the consumer returns within session.timeout.ms, it reclaims its previous partition assignment without triggering a rebalance. No processing interruption for the rest of the group.
The cost is explicit: if a static member does not return within session.timeout.ms, its partitions remain unassigned until the timeout expires. Unlike dynamic members — which are immediately evicted and their partitions reassigned — static members require the coordinator to wait. For rolling deployments, this is acceptable. For unexpected crashes, session.timeout.ms must be tuned to balance rebalance avoidance against reassignment latency.
Offset Management: Commits, Atomicity, and the Dual-Write Problem #
Reading a record is not the same as processing it. The consumer’s position in the log — its committed offset — is what survives a restart. If the offset is committed at the wrong time relative to the processing side effect, the application either reprocesses or skips records. Getting this right requires understanding what commit semantics the client actually provides.
Auto-Commit: Convenient and Wrong for Most Use Cases #
enable.auto.commit=true commits the current offset every auto.commit.interval.ms. The commit happens in the background, decoupled from processing. This creates a window where:
- Records have been returned by
poll()and are being processed. - The auto-commit fires and advances the offset past those records.
- The consumer crashes.
- On restart, the committed offset is ahead of the last successfully processed record. Those records are skipped permanently.
Auto-commit provides at-most-once semantics — not at-least-once. It is appropriate only for workloads where occasional record loss is explicitly acceptable.
Manual Commit: commitSync vs commitAsync #
Manual offset management gives the application control over when a commit occurs. Two methods, two tradeoffs:
commitSync()— blocks until the broker confirms the commit. Guarantees the offset is durable before proceeding. Adds latency proportional to the broker round-trip on every commit.commitAsync()— sends the commit request without blocking. The application continues processing. If the commit fails — due to a rebalance or transient broker error — the failure is reported via a callback. The application must decide whether to retry.
The correct pattern for high-throughput at-least-once processing:
- Use
commitAsync()in the normal processing loop — non-blocking, low overhead. - Use
commitSync()in thefinallyblock of the processing loop — guarantees a durable commit on shutdown or rebalance.
try {
while (running) {
var records = consumer.poll(Duration.ofMillis(100));
process(records);
consumer.commitAsync((offsets, exception) -> {
if (exception != null) log.warn("Commit failed", exception);
});
}
} finally {
consumer.commitSync(); // ensure last batch is committed on shutdown
consumer.close();
}
The Dual-Write Problem: Atomicity Between Offset and Side Effect #
The deepest correctness problem in consumer design is the dual-write problem: the offset commit and the processing side effect are two separate writes to two separate systems. They cannot be made atomic without additional coordination.
Consider a consumer that reads a record, writes derived state to a database, then commits the offset. Two failure modes:
- Crash after database write, before offset commit — on restart, the consumer reprocesses the record. The database write happens again. If the write is not idempotent, the state is corrupted.
- Crash after offset commit, before database write — on restart, the consumer skips the record. The database state is missing an update. No error is reported.
True end-to-end exactly-once requires coupling the offset commit with the side effect atomically. There are two practical approaches:
- Store offsets in the same transaction as the derived state — if the sink is a transactional database, write both the derived state and the offset in the same transaction. On restart, read the offset from the database. This eliminates the gap entirely.
- Use Kafka transactions — the producer and consumer participate in a Kafka transaction. The consumer reads, the producer writes the derived result to an output topic, and the offset commit happens atomically with the output write within the same transaction. This is the transactional outbox pattern applied to streaming.
Lag, Backpressure, and Load Shedding #
Consumer lag is the delta between the partition’s high water mark and the consumer group’s committed offset. It represents latency in the consumption pipeline. But lag alone is not an incident — growing lag is.
The Lag Taxonomy #
Three lag patterns, three different root causes:
High lag, high throughput — the consumer is working hard but cannot keep up with the producer. The consumer is under-provisioned. Solution: add consumers (up to the partition count) or increase processing efficiency.
High lag, zero throughput — the consumer has stalled. It is down, stuck in a rebalance loop, or blocked on a poison pill. This is an availability incident. No amount of scaling helps — the consumer is not processing at all.
Growing lag, low broker throughput — the broker is starving fetch requests. Possible causes: disk I/O saturation, quota throttling, or network contention. The consumer is healthy; the bottleneck is upstream.
Correlating lag with throughput is the first diagnostic step. Lag without throughput context is noise.
Backpressure: The Pause/Resume API #
When a consumer falls behind — due to a slow downstream sink or processing bottleneck — continuing to fetch records fills memory without making progress. The pause/resume API provides explicit backpressure control:
// Pause fetching on overloaded partitions
consumer.pause(consumer.assignment());
// Resume when downstream has capacity
consumer.resume(consumer.assignment());
When a partition is paused, poll() returns no records for that partition but still sends heartbeats. The consumer stays in the group. Leadership is not surrendered. The partition is simply not fetched until explicitly resumed. This is the correct mechanism for propagating backpressure from a slow sink back to the consumer without triggering a rebalance.
Poison Pills and the Dead-Letter Queue #
A poison pill is a record that causes the consumer to crash or loop on every processing attempt. The pattern is:
- Consumer fetches record at offset N.
- Processing throws an unhandled exception.
- Consumer restarts, offsets roll back to N.
- Consumer fetches offset N again. Same exception. Same crash.
The consumer is stuck. Lag grows. The rest of the topic is blocked behind one bad record.
The solution is a Dead-Letter Queue (DLQ): a separate topic where unprocessable records are routed instead of retried indefinitely. On a processing failure that exceeds a retry threshold, the consumer writes the record to the DLQ, commits the offset past it, and continues processing. The DLQ record is available for inspection, replay, or alerting without blocking the main stream.
try {
process(record);
} catch (UnrecoverableException e) {
dlqProducer.send(new ProducerRecord<>(DLQ_TOPIC, record.key(), record.value()));
log.error("Sent to DLQ: offset={}, partition={}", record.offset(), record.partition());
}
// commit offset regardless — do not reprocess the poison pill
consumer.commitSync(Map.of(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
));
The DLQ pattern converts an availability incident — a stuck consumer — into a data quality incident that can be investigated asynchronously.