- My Development Notes/
- Distributed Coordination: The Hidden Component/
- Aggregation — Scatter-Gather and Distributed Query Execution/
Aggregation — Scatter-Gather and Distributed Query Execution
Table of Contents
Aggregation — Scatter-Gather and Distributed Query Execution #
Infrastructure-level Aggregation is the mechanism by which a distributed system combines partial results from multiple nodes into a single authoritative answer. It is the coordination behind every distributed query: the read path of a distributed database, the search execution across shards, the windowed computation in a stream processor.
The defining characteristic of infrastructure Aggregation: the partial results must be combinable by a coordinator that does not hold all the data. The coordinator never sees the full dataset — only the summaries or answers that each node produces locally. The merge function must be correct on those summaries.
This is the same mathematical requirement as application-level Aggregation (Chapter 11): commutativity, associativity, and for some structures, idempotency. At the infrastructure level, the stakes are higher — a wrong merge function produces silently incorrect query results, not just an approximate count.
OpenSearch Scatter-Gather #
Every OpenSearch (and Elasticsearch) query executes as a two-phase scatter-gather across shards.
Query Phase #
The coordinating node sends the query to all shards (or the relevant shards, if routing is applied). Each shard executes the query locally against its Lucene index and returns:
- The top-N document IDs and scores that match the query.
- Aggregation partial results (partial bucket counts, partial metric values).
The coordinating node does not yet have the documents — only the IDs and scores.
Coordinating node receives: GET /orders/_search
{ "query": { "match": { "status": "pending" } }, "size": 10 }
Scatter to shard 0: match locally, return top-10 (doc_id, score)
Scatter to shard 1: match locally, return top-10 (doc_id, score)
Scatter to shard 2: match locally, return top-10 (doc_id, score)
Coordinating node merges: take global top-10 from 30 candidates
The merge function for top-N results: take the globally highest-scored documents across all shard results. This is a merge sort on score — O(N × shards) comparisons to find the global top-N.
Fetch Phase #
Once the coordinating node knows the global top-N document IDs, it sends targeted fetch requests to the shards that own those documents to retrieve the full _source content.
After merge: global top-10 = {shard_0: [doc_12, doc_45], shard_1: [doc_3], shard_2: [doc_7, doc_22, ...]}
Fetch phase:
shard_0: GET doc_12, doc_45 → full source
shard_1: GET doc_3 → full source
shard_2: GET doc_7, doc_22, ... → full source
Coordinating node assembles final response.
The two-phase design avoids transmitting full document source in the query phase — only IDs and scores travel the network for the initial scatter-gather, reducing bandwidth for queries that match many documents but return few.
Aggregation Merge #
For aggregations (Chapter 6 of the search-index series), each shard computes partial aggregation results. The coordinating node merges them.
Terms aggregation merge problem: each shard returns the top-K terms by document count for its local data. The globally top-K terms may include terms that are not in any shard’s top-K locally. A term with 500 documents spread evenly across 10 shards (50 per shard) may not appear in any shard’s top-10, but is globally the 3rd most common term.
OpenSearch addresses this with shard_size: each shard returns more than K candidates (default: shard_size = size × 1.5 + 10). The coordinating node merges the larger candidate sets. Higher shard_size = more accurate results, more network and memory overhead.
{
"aggs": {
"top_categories": {
"terms": {
"field": "category",
"size": 10,
"shard_size": 50
}
}
}
}
Cardinality aggregation (HyperLogLog): each shard computes an HLL sketch of the distinct values. The coordinating node merges sketches via element-wise max. The merged sketch estimates the global cardinality. This is mergeable by design — the same property exploited in application-level cardinality estimation (Chapter 11).
Percentile aggregation (TDigest): each shard computes a TDigest of the value distribution. The coordinating node merges TDigests by concatenating their centroids (then re-clustering). The merged TDigest estimates global percentiles.
Cassandra Quorum Read: Coordinator-Driven Aggregation #
A Cassandra read at QUORUM involves the coordinator aggregating responses from multiple replicas.
Read Execution #
- The coordinator determines which nodes own replicas for the partition key.
- The coordinator sends the read to all R nodes required for the consistency level (for QUORUM on RF=3: 2 nodes).
- The coordinator waits for R responses.
- The coordinator merges responses: for each column, keep the value with the highest timestamp (LWW merge).
- If the responses differ (replicas are out of sync), the coordinator initiates read repair — sending the corrected version to the lagging replica in the background.
Coordinator receives: SELECT * FROM orders WHERE order_id = 'abc'
Send to replica 1: order_id=abc → {status: 'shipped', ts: 1000}
Send to replica 2: order_id=abc → {status: 'processing', ts: 900}
Merge: status = 'shipped' (higher timestamp wins)
Read repair: send {status: 'shipped', ts: 1000} to replica 2 (background)
The merge function is column-level LWW. Cassandra does not return a “conflict” to the client — it silently resolves it using the merge function and corrects the lagging replica.
Speculative Retry #
For latency-sensitive reads, Cassandra can issue a speculative retry: if the first replica does not respond within a threshold, the coordinator sends the read to a second replica without waiting for the first to fail. Whichever responds first is used; the slower response is discarded.
cassandra.yaml:
speculative_retry: 99percentile # Retry if response time > p99
This is Aggregation in service of latency: the coordinator aggregates two parallel requests, taking the first response. It does not need to wait for all replicas — the first valid response is sufficient.
CockroachDB DistSQL: Distributed Query Fragments #
CockroachDB executes SQL queries using DistSQL — a distributed query execution engine that pushes computation close to the data.
Query Planning and Distribution #
For a query like SELECT region, COUNT(*) FROM orders GROUP BY region:
- The gateway node (the node the client connected to) receives the query.
- The query planner identifies which ranges hold the relevant data and which nodes are the leaseholders for those ranges.
- The planner creates a distributed query plan: a set of query fragments, each running on the node that holds the data.
- Each fragment executes its local computation (filter, partial aggregate) and streams results to the gateway.
- The gateway performs the final aggregation (merging partial counts by region) and returns the result.
Gateway node:
Range [a–m] leaseholder: node1 → fragment: SELECT region, COUNT(*) WHERE pk IN [a–m]
Range [m–z] leaseholder: node2 → fragment: SELECT region, COUNT(*) WHERE pk IN [m–z]
node1 returns: {region: 'US', count: 4500}, {region: 'EU', count: 2100}
node2 returns: {region: 'US', count: 3200}, {region: 'EU', count: 1800}, {region: 'APAC', count: 900}
Gateway merges:
US: 4500 + 3200 = 7700
EU: 2100 + 1800 = 3900
APAC: 900
Aggregation correctness: the partial aggregate (COUNT per region per fragment) is mergeable by sum. Not all SQL aggregates are mergeable this way:
- SUM, COUNT, MAX, MIN: mergeable (distributed by default)
- AVG: not directly mergeable (need SUM and COUNT separately, merge, then divide)
- COUNT DISTINCT: not exactly mergeable (use HyperLogLog approximation or exact tracking)
- MEDIAN, PERCENTILE: not exactly mergeable (TDigest approximation or full sort)
CockroachDB and other distributed SQL engines handle these by either serializing the non-mergeable aggregation to a single node or using approximate algorithms.
Kafka Streams: Windowed Aggregation #
Kafka Streams performs stateful aggregation over event streams. The aggregation is distributed across stream tasks, with each task owning a set of partitions.
Stateful Operators #
A count() or aggregate() operation in Kafka Streams is backed by a state store — a local RocksDB instance per task:
KStream<String, Order> orders = builder.stream("orders");
KTable<Windowed<String>, Long> orderCountByRegion = orders
.groupBy((key, order) -> order.getRegion())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count(Materialized.as("order-count-by-region"));
Each task processes a subset of partitions. The count for region “US” in a 5-minute window is stored in the state store of the task that owns the partition for “US” (determined by the groupBy key’s partition assignment).
There is no cross-task aggregation within Kafka Streams for a single output record. The partitioning by key ensures all events for “US” go to the same task — that task’s local state IS the global state for “US.” No merge needed.
When aggregation requires a repartition: if the groupBy key differs from the stream’s existing partition key, Kafka Streams performs a repartition — writing intermediate results to a repartitioned internal topic, then re-reading with the new key. This is expensive (two extra topic writes) but necessary to ensure all records for a key land on the same task.
Changelog and Fault Tolerance #
State stores are backed by a changelog topic in Kafka. Every state update is written to the changelog. On task failure:
- A new task is assigned the failed task’s partitions.
- The new task replays the changelog to reconstruct the state store.
- Only the delta since the last checkpoint needs replaying (Kafka Streams checkpoints state store offsets).
This is Suppression at the infrastructure level: the changelog ensures state is not lost and is reconstructed exactly, without double-counting on recovery.
The Infrastructure Aggregation Contract #
Every infrastructure Aggregation mechanism requires the merge function to satisfy:
Commutativity: merge(shard_A, shard_B) = merge(shard_B, shard_A). The order in which shard results arrive does not matter.
Associativity: merge(merge(A, B), C) = merge(A, merge(B, C)). The coordinator can merge in any grouping — useful for hierarchical aggregation (intermediate aggregators).
Correctness on partial results: the partial result from each node must contain enough information for the merge to produce a globally correct answer. This is where aggregations like AVG fail — the per-shard average is not sufficient to compute the global average without knowing the per-shard count.
| Aggregate | Mergeable? | Merge function |
|---|---|---|
| COUNT | Yes | Sum of partial counts |
| SUM | Yes | Sum of partial sums |
| MAX / MIN | Yes | Max / min of partial max/min |
| AVG | No directly | Merge (sum, count) pairs, then divide |
| COUNT DISTINCT | Approximately | HyperLogLog element-wise max |
| PERCENTILE | Approximately | TDigest centroid merge |
| TOP-K | Approximately | Merge top-K candidates with shard_size inflation |