- My Development Notes/
- Distributed Coordination: The Hidden Component/
- Suppression — Exactly-Once at the Infrastructure Layer/
Suppression — Exactly-Once at the Infrastructure Layer
Table of Contents
Suppression — Exactly-Once at the Infrastructure Layer #
Infrastructure systems face duplication at every layer: producers retry failed sends, consumers reprocess after crashes, coordinators re-send commands to recovering replicas. Suppression at the infrastructure level is the set of mechanisms by which the platform itself — not the application — absorbs these duplicates, presenting an exactly-once abstraction to the layer above.
The distinction from application-level Suppression (Chapter 12): infrastructure Suppression is built into the protocol. The Kafka broker deduplicates producer retries transparently. The etcd watch stream delivers each event exactly once regardless of client disconnects. The Raft log never applies the same entry twice to the state machine. Applications built on these platforms inherit the guarantee without implementing deduplication themselves.
The Duplication Problem in Message Systems #
A producer sends a message. The broker receives it, appends it to the log, but crashes before sending the acknowledgment. The producer times out and retries. The broker (now recovered, or a new leader) receives the same message again and appends it a second time.
Producer → Broker: message M, sequence=1
Broker: append M to log at offset 100
Broker: crash before ACK
Producer: timeout → retry
New leader Broker: receives message M, sequence=1 again
Without suppression: append M to log at offset 101 (DUPLICATE)
This is the fundamental at-least-once delivery problem. The producer correctly retried — it had no acknowledgment. The broker incorrectly appended — it had no record of the first delivery.
Kafka Idempotent Producer #
Kafka’s idempotent producer (introduced in Kafka 0.11, enabled by enable.idempotence=true) solves exactly this problem at the broker level.
Producer ID and Sequence Numbers #
When an idempotent producer initializes, the broker assigns it a Producer ID (PID) — a unique 64-bit integer. The PID is associated with a producer epoch to handle producer restarts (each restart gets a new epoch, invalidating in-flight messages from the previous epoch).
Each message batch is tagged with (PID, epoch, partition, sequence_number). The sequence number is monotonically increasing per (PID, partition):
Producer PID=42, epoch=1:
Batch to partition 0: sequence=0 → offset 100
Batch to partition 0: sequence=1 → offset 101
Batch to partition 0: sequence=2 (retry of sequence=1 after timeout) → REJECTED (duplicate)
The broker maintains, per (PID, partition): last_committed_sequence. On receiving a batch:
sequence == last_committed_sequence + 1: accept, append, update state.sequence <= last_committed_sequence: duplicate, reject silently, return success (the producer’s retry will get the correct ACK).sequence > last_committed_sequence + 1: out-of-order, return error (producer should not retry out of order).
What this guarantees: each message batch is written to the log exactly once, even under producer retries, leader failovers, and network duplicates. The guarantee is per (PID, partition, sequence) — a specific batch from a specific producer to a specific partition appears at most once in the log.
What this does not guarantee by itself: ordering across partitions. Two messages from the same producer to different partitions are independently sequenced. Exactly-once across partitions requires transactions.
Kafka Transactions #
Kafka transactions extend the idempotent producer to atomic multi-partition writes — either all writes in a transaction are committed (visible to consumers with isolation.level=read_committed) or none are.
producer.initTransactions();
producer.beginTransaction();
try {
producer.send(new ProducerRecord<>("output-topic", key, processedValue));
producer.sendOffsetsToTransaction(consumerOffsets, consumerGroupMetadata);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
sendOffsetsToTransaction atomically commits the consumer offset alongside the output message. If the producer crashes after writing the output but before committing the offset, on recovery:
- The coordinator checks for open transactions from the crashed PID+epoch.
- The transaction is aborted (the output message is marked as aborted in the log).
- The consumer offset is not advanced.
- The consumer reprocesses the input and re-produces the output.
Consumers with isolation.level=read_committed skip aborted records — they only see committed transactions. This achieves exactly-once stream processing: the output message appears exactly once in the output topic.
Transaction Coordinator #
Transactions are coordinated by a transaction coordinator — a Kafka broker that owns the transaction log partition for a given transactional.id. The transaction coordinator:
- Tracks the state of each transaction (begin, prepare-commit, committed, aborted).
- Manages PID + epoch assignment.
- Handles recovery of incomplete transactions after producer crash.
The transaction log is itself a Kafka topic (__transaction_state), replicated with Raft-equivalent replication. Transaction state is durable.
The two-phase commit within Kafka: Kafka transactions use a two-phase protocol internally:
- Prepare: coordinator writes
PrepareCommitmarker to transaction log. Broker writesEndTransaction(COMMIT)marker to each affected partition. - Commit: coordinator writes
Committedto transaction log. Transaction is complete.
If the coordinator crashes after PrepareCommit but before Committed, recovery replays the prepare and completes the commit. The transaction is never left in an indeterminate state — the PrepareCommit marker is sufficient for recovery to commit.
This avoids the 2PC blocking problem (Chapter 4) because the coordinator state is replicated in Kafka itself — a new coordinator leader can take over from the transaction log.
etcd Watch: Exactly-Once Event Delivery #
etcd’s Watch service delivers every key event (PUT, DELETE) to subscribers exactly once, in revision order, without gaps — even when the client disconnects and reconnects.
Revision-Based Delivery #
Every mutation to the etcd cluster increments the cluster-wide revision counter (Chapter 6). Watch events carry the revision at which they occurred:
watcher := client.Watch(ctx, "/config/", clientv3.WithPrefix())
for resp := range watcher {
for _, event := range resp.Events {
fmt.Printf("rev=%d type=%s key=%s\n",
event.Kv.ModRevision, event.Type, event.Kv.Key)
}
}
If the client disconnects (network failure, restart), it can resume from the last seen revision — no events are missed:
// Resume from last processed revision
lastSeenRevision := loadFromCheckpoint()
watcher := client.Watch(ctx, "/config/",
clientv3.WithPrefix(),
clientv3.WithRev(lastSeenRevision + 1))
etcd guarantees: all events at revision ≥ startRevision will be delivered in order, with no gaps, as long as the revision is within the compaction window. Events before the compaction window are lost — clients must do a full sync (List + Watch from the sync revision) if they fall behind compaction.
Why This Is Suppression #
The revision counter is a deduplication mechanism. Each event has a unique, monotonically increasing revision. A client that receives the same watch event twice (e.g., due to a gRPC stream reconnect mid-batch) can detect and discard the duplicate by checking: “have I already processed revision R?”
lastProcessed := int64(0)
for resp := range watcher {
for _, event := range resp.Events {
if event.Kv.ModRevision <= lastProcessed {
continue // Duplicate — already processed this revision
}
process(event)
lastProcessed = event.Kv.ModRevision
checkpoint(lastProcessed)
}
}
Kubernetes controllers implement exactly this pattern: the informer framework tracks the last seen resource version (etcd revision) and resumes from that point after reconnect. No controller action is triggered twice for the same event.
Compaction and the Watch Window #
etcd retains all historical revisions by default, but this grows the storage unboundedly. The compaction operation removes revisions older than a specified revision:
// Compact revisions older than current - 1000
resp, _ := client.Get(ctx, "/")
currentRevision := resp.Header.Revision
client.Compact(ctx, currentRevision - 1000)
After compaction, a client watching from a revision before the compaction point receives a compacted error — it must re-sync from scratch. Kubernetes handles this with a re-list: fetch the full current state, then watch from the current revision.
Raft Log Deduplication #
The Raft consensus protocol (Chapter 3) provides Suppression at the log replication level: the same log entry is never applied to the state machine twice.
Log Matching Invariant #
Raft’s log matching invariant ensures that if two log entries at the same index have the same term, they are identical — and all preceding entries are also identical:
Leader log: [index=1, term=1, op=SET x=1] [index=2, term=1, op=SET y=2] [index=3, term=2, op=DEL z]
Follower log: must be identical to leader up to the committed index
A leader never sends the same AppendEntries entry twice for the same (index, term). If a follower already has an entry at index i with the same term, it does not re-apply it. If the terms differ (the follower has a conflicting entry from a stale leader), the entry is overwritten — but this represents a correction, not a duplicate.
State Machine Application #
The Raft state machine applies log entries in strict log index order, exactly once:
type RaftStateMachine struct {
lastApplied int64
}
func (sm *RaftStateMachine) Apply(entries []LogEntry) {
for _, entry := range entries {
if entry.Index <= sm.lastApplied {
// Already applied — skip (should not happen in correct Raft)
continue
}
sm.applyToState(entry.Command)
sm.lastApplied = entry.Index
}
}
lastApplied is the Suppression mechanism: if an entry at index i has already been applied, any future deliver of the same entry (impossible in correct Raft, but defensive) is a no-op.
etcd builds on this: every key-value operation in etcd is a Raft log entry. The Raft layer ensures exactly-once application to the etcd state machine. The MVCC layer (tracking all historical revisions) is built on top of a state machine that never applies duplicates.
At-Least-Once vs Exactly-Once: Infrastructure Boundary #
Infrastructure Suppression shifts the burden of exactly-once semantics from the application to the platform:
| Layer | Delivery guarantee | Suppression mechanism |
|---|---|---|
| TCP | At-least-once (retransmission) | Sequence numbers, ACK tracking |
| Kafka (no idempotence) | At-least-once | None — application must deduplicate |
| Kafka (idempotent producer) | Exactly-once per partition | PID + sequence number at broker |
| Kafka (transactions) | Exactly-once across partitions | Transactional protocol + read_committed isolation |
| etcd watch | Exactly-once per revision | Revision-based ordering, resume-from-revision |
| Raft log application | Exactly-once | lastApplied index, log matching invariant |
The cost of exactly-once at the infrastructure level: latency and throughput. Idempotent producers require broker-side sequence tracking. Transactions require two-phase coordination with the transaction coordinator. Kafka’s exactly-once throughput is lower than at-least-once because each transaction adds coordination overhead (begin, prepare, commit markers in the log).
The tradeoff is the same as at every layer: exactly-once is more expensive than at-least-once. The infrastructure provides the option; the application chooses whether to pay the cost.
Practical Checklist #
| Requirement | Kafka configuration |
|---|---|
| No duplicate messages in log | enable.idempotence=true |
| Atomic write across multiple partitions | transactional.id + begin/commit |
| Consumer sees committed messages only | isolation.level=read_committed |
| Exactly-once stream processing | Kafka Streams with processing.guarantee=exactly_once_v2 |
| Consume → process → produce atomic | sendOffsetsToTransaction in transaction scope |
| etcd watch no missed events | WithRev(lastSeenRevision + 1) on reconnect |
| etcd watch fell behind compaction | Re-list + watch from current revision |