Skip to main content
  1. Distributed Coordination: The Hidden Component/

Assignment — Consistent Hashing and Work Distribution

Assignment — Consistent Hashing and Work Distribution #

Assignment is the coordination mechanism that distributes work, data, or responsibility across a set of participants. Unlike Selection (one winner from many) or Agreement (all commit or all abort), Assignment is about routing: given an item, which node handles it?

The correctness requirements for Assignment:

Stability: the same key maps to the same node across time, unless membership changes. Minimal disruption: when membership changes, only the minimum necessary assignments move. In a 10-node cluster, removing one node should redistribute only ~1/10 of the keys, not rehash everything. Load balance: assignments distribute approximately equally, so no node becomes a hotspot.

These requirements are in tension. A simple modular hash (hash(key) % N) is stable and balanced but fails at minimal disruption — removing one node reshuffles all keys.

Consistent Hashing #

Consistent hashing ring with virtual nodes

Consistent hashing (Karger et al., 1997) solves the minimal disruption problem by placing both keys and nodes on a circular hash ring.

The Hash Ring #

The ring is the set of all possible hash values — typically a 64-bit or 128-bit integer space arranged as a circle from 0 to 2^64 − 1. Each node is placed on the ring by hashing its identifier (typically its IP address or a configured token). Each key is placed on the ring by hashing the key.

Routing rule: a key is assigned to the first node clockwise from its position on the ring.

Ring:  0 ─────────────────────────────── 2^64

Nodes: A at position 10, B at position 40, C at position 70

Key positions:
  key1 → hash(key1) = 15 → first node clockwise from 15 = B (at 40)
  key2 → hash(key2) = 55 → first node clockwise from 55 = C (at 70)
  key3 → hash(key3) = 80 → wraps around → A (at 10)

Membership Change with O(1/N) Redistribution #

When a node D is added at position 25:

Before: key1 (hash=15) → B
After:  key1 (hash=15) → D (new node at 25, clockwise from 15)

Only keys in the range (10, 25] move to D — those that were previously routed to B (the next clockwise node). All other keys are unaffected. On average, 1/N of keys move when a node is added or removed.

This is consistent hashing’s core property: O(1/N) key movement on membership change, compared to O(1) for modular hashing.

Virtual Nodes (Vnodes) #

A single node placement per physical node leads to uneven load:

  • Node positions on the ring are determined by hash(node_id). With only N positions, statistical variance causes some nodes to own much larger ring segments than others.
  • When a node fails, all its load moves to the single next clockwise node — a hot spot.
  • New hardware with different capacity cannot be assigned proportionally more ring segments.

Virtual nodes solve this by assigning each physical node multiple positions on the ring (typically 128 to 256 virtual nodes per physical node in Cassandra).

Physical node A → vnodes at positions: 5, 23, 67, 89, 102, 145, ...
Physical node B → vnodes at positions: 12, 34, 71, 93, 115, 167, ...
Physical node C → vnodes at positions: 8, 41, 78, 100, 129, 178, ...

With 256 vnodes per node and N physical nodes, each physical node owns 256 ring segments. Load is distributed across 256 intervals rather than 1. When node A fails, its 256 vnodes’ key ranges are distributed across all remaining nodes (each takes some vnodes), spreading the load instead of concentrating it on a single neighbor.

Cassandra uses virtual nodes by default since version 1.2. The num_tokens configuration controls the number of vnodes per node (default: 256 in Cassandra 4.0+).

Cassandra’s Assignment Model #

Cassandra uses consistent hashing for data assignment, with an additional layer for replication and a coordinator model for request routing.

Partitioner and Token Assignment #

Each key is hashed by the partitioner to produce a token in the hash ring space:

  • Murmur3Partitioner (default since Cassandra 1.2): uses MurmurHash3 on the partition key. Token range: -2^63 to 2^63 − 1.
  • RandomPartitioner (legacy): uses MD5. Not recommended for new clusters.
  • ByteOrderedPartitioner: orders keys by raw bytes. Allows efficient range scans but creates sequential hotspots (all writes to recent timestamps hit the same token range).
cqlsh> SELECT token(id), id FROM orders LIMIT 5;
 system.token(id) | id
------------------+--------------------------------------
  -9188...        | 3c7a...
  -8923...        | 1d2b...

Replication #

Cassandra does not store each key on a single node. The replication factor (RF) determines how many nodes store each key’s replica. For RF=3, each key is stored on the primary node (the first clockwise from the token) and on the next 2 nodes clockwise.

The replication strategy controls how replicas are placed across datacenters:

  • SimpleStrategy: places replicas on the next N clockwise nodes, ignoring rack and datacenter topology.
  • NetworkTopologyStrategy: distributes replicas across availability zones (racks) and datacenters, ensuring no datacenter has more than one replica per rack.
NetworkTopologyStrategy: {'dc1': 3, 'dc2': 2}

This places 3 replicas in dc1 (spread across 3 racks) and 2 in dc2. For a write at QUORUM in dc1, 2 of the 3 dc1 replicas must acknowledge.

The Coordinator Pattern #

A Cassandra client connects to any node in the cluster. That node becomes the coordinator for the request — it has no special role in the cluster (not a leader, not elected) but it takes responsibility for:

  1. Computing the partition key token to determine which nodes own replicas.
  2. Sending the write or read to all relevant replica nodes.
  3. Collecting responses and returning the result when the required quorum is met.
  4. Handling hinted handoff: if a replica is unavailable, the coordinator stores the write locally as a hint and replays it to the replica when it recovers.
Client → Coordinator (any node)
         Coordinator → Replica 1 (primary)
         Coordinator → Replica 2
         Coordinator → Replica 3
         Wait for QUORUM (2 of 3) ACKs
         Return to client

The coordinator pattern distributes request handling across all nodes — there is no central bottleneck for reads or writes. Client libraries (DataStax Java driver, gocql) implement token-aware routing: they compute the token themselves and send requests directly to a replica node, which then acts as coordinator. This avoids one extra network hop.

Rebalancing Under Membership Change #

When a node joins:

  1. It is assigned tokens (vnodes) via configuration or auto-assignment.
  2. For each token range it takes ownership of, it streams the relevant data from the current owner.
  3. Once streaming completes, the node is marked NORMAL and begins accepting traffic.
  4. The previous owner can optionally run nodetool cleanup to remove data it no longer owns.

When a node leaves (decommissions):

  1. The node streams its data to the nodes that will take ownership of its token ranges.
  2. Once streaming completes, it removes itself from the ring.

When a node fails (crashes without decommissioning):

  1. Cassandra continues serving requests using remaining replicas (if quorum is still achievable).
  2. Hinted handoff temporarily stores writes for the failed node.
  3. If the failure is long-lived, an operator runs nodetool removenode to remove the failed node and trigger data redistribution.
  4. Repair (nodetool repair) uses Merkle tree anti-entropy to detect and fix diverged replicas after recovery.

The critical operational requirement: never remove a node from the ring before its data has been streamed to remaining nodes. If a node is removed before streaming completes, some keys may have fewer than RF replicas, and reads at QUORUM may fail.

Kafka Partition Assignment #

Kafka’s Assignment mechanism is different from Cassandra’s consistent hashing. Kafka uses explicit partition assignment managed by the group coordinator.

Partition Assignment in Consumer Groups #

Each Kafka topic is divided into a fixed number of partitions at creation time. A consumer group consumes the topic by distributing partitions across group members.

The assignment protocol:

  1. When a consumer joins or leaves, the group coordinator (a Kafka broker) triggers a rebalance.
  2. The coordinator elects a group leader — typically the first consumer to join.
  3. The group leader runs the assignor (a pluggable assignment strategy) on the full member list and partition list, and sends the result back to the coordinator.
  4. The coordinator distributes assignments to all group members.

Assignment strategies:

  • RangeAssignor (default): assigns partitions in range order to consumers in alphabetical order. Consumer 0 gets partitions 0, 1; consumer 1 gets 2, 3. Can create uneven load if topics have different numbers of partitions.
  • RoundRobinAssignor: distributes partitions round-robin across consumers. More even distribution when topics have different partition counts.
  • StickyAssignor: minimizes partition movement on rebalance. Preserves existing assignments as much as possible, moving only the partitions that must move to achieve balance.
  • CooperativeStickyAssignor: uses incremental rebalance (COOPERATIVE protocol). Instead of stopping all consumption during rebalance, only revokes the specific partitions that are moving. Other partitions continue being consumed.

The Stop-the-World Rebalance Problem #

The original Kafka rebalance protocol (EAGER) is stop-the-world:

  1. All consumers in the group revoke all their assigned partitions.
  2. All consumers rejoin.
  3. Assignment is recalculated.
  4. Partitions are redistributed.

During the rebalance window (which can be seconds to tens of seconds for large groups), no messages are consumed. For high-throughput topics, this creates a processing backlog.

Incremental cooperative rebalance (COOPERATIVE protocol, KIP-429) solves this:

  1. Only the partitions being reassigned are revoked.
  2. Consumers continue processing their remaining partitions during rebalance.
  3. New assignments are given out after revocations complete.

No global stop-the-world pause. Consumers processing unaffected partitions never pause.

Static Membership #

For batch-processing consumers that restart frequently (e.g., stream processing jobs after deployment), the EAGER rebalance on every restart causes unnecessary partition shuffles.

Static membership (KIP-345) allows each consumer to declare a group.instance.id. Kafka tracks this ID and does not trigger rebalance when a static member temporarily disconnects. If the consumer rejoins with the same group.instance.id before the session.timeout.ms expires, it gets back its previous assignments.

Range-Based Assignment: etcd and CockroachDB #

For systems that need range-based routing (where keys can be read with range scans), consistent hashing is the wrong mechanism. Consistent hashing distributes keys uniformly, but a range scan on a consistently-hashed keyspace requires touching all nodes.

Range-based assignment divides the keyspace into contiguous ranges and assigns each range to a node or a group of replicas:

Key range         → Assigned replica group
["", "g")         → {node1, node2, node3}
["g", "m")        → {node4, node5, node6}
["m", "s")        → {node7, node8, node9}
["s", ∞)          → {node1, node2, node3}

A range scan from “apple” to “cherry” touches only the first range group — no scatter-gather across all nodes.

CockroachDB ranges: each range is ~512 MB by default. When a range exceeds the size threshold, it splits. Each range is replicated via Raft. The leaseholder replica (the Raft leader for that range) serves all reads and writes for that range. A table with many ranges across many nodes achieves parallelism by having leaseholders distributed across the cluster.

etcd’s key range: etcd uses a flat key-value store with a single Raft log. For large-scale use cases (Kubernetes clusters with millions of keys), etcd can be sharded into multiple etcd clusters, with a proxy routing requests by key prefix. But this is rare — a single etcd cluster handles thousands of keys/second.

Hotspot Mitigation #

Consistent hashing provides uniform load in expectation, but skewed workloads (popular keys, time-series with sequential tokens) create hotspots.

Sequential key hotspots: time-series data with a timestamp as the partition key hashes to sequential tokens. All recent writes land on the same node. Mitigation: add a random prefix to the key (shard_id = random(0, N); key = f"{shard_id}:{timestamp}"), then scatter reads across all shards.

Popular key hotspots: a single key receives a disproportionate fraction of requests. Mitigation: cache at the application layer; replicate the key to multiple nodes with a client-side random routing policy.

Imbalanced vnodes: if a physical node has fewer vnodes than its neighbors (due to misconfiguration or migration), it owns a disproportionately small ring segment. Mitigation: configure num_tokens uniformly; use Cassandra’s nodetool rebalance to redistribute tokens.

Compaction-triggered hotspots: Cassandra’s compaction (merging SSTables) is CPU and I/O intensive. If compaction lags, read latency increases on affected nodes. Mitigation: monitor compaction queue depth (nodetool compactionstats); use STCS vs LCS vs TWCS compaction strategy based on workload pattern (TWCS for time-series that expire).

Assignment Properties Summary #

SystemAssignment mechanismDisruption on membership changeRange scans
CassandraConsistent hashing + vnodesO(1/N) keys moveNo — use Cassandra table design to avoid
Kafka consumer groupExplicit assignor (range/round-robin/sticky)Rebalance pause (EAGER) or minimal (COOPERATIVE)N/A — partitions, not keys
Redis Cluster16384 hash slots, CRC16 routingSlot migrationNo — hash slot breaks locality
CockroachDBRange-based, Raft-replicated per rangeOnly affected range movesYes — primary use case
DynamoDBConsistent hashing + internal shardingTransparent (managed service)Secondary index required