Skip to main content
  1. Distributed Coordination: The Hidden Component/

Aggregation — Scatter-Gather and Distributed Query Execution

Aggregation — Scatter-Gather and Distributed Query Execution #

Infrastructure-level Aggregation is the mechanism by which a distributed system combines partial results from multiple nodes into a single authoritative answer. It is the coordination behind every distributed query: the read path of a distributed database, the search execution across shards, the windowed computation in a stream processor.

The defining characteristic of infrastructure Aggregation: the partial results must be combinable by a coordinator that does not hold all the data. The coordinator never sees the full dataset — only the summaries or answers that each node produces locally. The merge function must be correct on those summaries.

This is the same mathematical requirement as application-level Aggregation (Chapter 11): commutativity, associativity, and for some structures, idempotency. At the infrastructure level, the stakes are higher — a wrong merge function produces silently incorrect query results, not just an approximate count.

OpenSearch Scatter-Gather #

Every OpenSearch (and Elasticsearch) query executes as a two-phase scatter-gather across shards.

Query Phase #

The coordinating node sends the query to all shards (or the relevant shards, if routing is applied). Each shard executes the query locally against its Lucene index and returns:

  • The top-N document IDs and scores that match the query.
  • Aggregation partial results (partial bucket counts, partial metric values).

The coordinating node does not yet have the documents — only the IDs and scores.

Coordinating node receives: GET /orders/_search
  { "query": { "match": { "status": "pending" } }, "size": 10 }

Scatter to shard 0: match locally, return top-10 (doc_id, score)
Scatter to shard 1: match locally, return top-10 (doc_id, score)
Scatter to shard 2: match locally, return top-10 (doc_id, score)

Coordinating node merges: take global top-10 from 30 candidates

The merge function for top-N results: take the globally highest-scored documents across all shard results. This is a merge sort on score — O(N × shards) comparisons to find the global top-N.

Fetch Phase #

Once the coordinating node knows the global top-N document IDs, it sends targeted fetch requests to the shards that own those documents to retrieve the full _source content.

After merge: global top-10 = {shard_0: [doc_12, doc_45], shard_1: [doc_3], shard_2: [doc_7, doc_22, ...]}

Fetch phase:
  shard_0: GET doc_12, doc_45 → full source
  shard_1: GET doc_3 → full source
  shard_2: GET doc_7, doc_22, ... → full source

Coordinating node assembles final response.

The two-phase design avoids transmitting full document source in the query phase — only IDs and scores travel the network for the initial scatter-gather, reducing bandwidth for queries that match many documents but return few.

Aggregation Merge #

For aggregations (Chapter 6 of the search-index series), each shard computes partial aggregation results. The coordinating node merges them.

Terms aggregation merge problem: each shard returns the top-K terms by document count for its local data. The globally top-K terms may include terms that are not in any shard’s top-K locally. A term with 500 documents spread evenly across 10 shards (50 per shard) may not appear in any shard’s top-10, but is globally the 3rd most common term.

OpenSearch addresses this with shard_size: each shard returns more than K candidates (default: shard_size = size × 1.5 + 10). The coordinating node merges the larger candidate sets. Higher shard_size = more accurate results, more network and memory overhead.

{
  "aggs": {
    "top_categories": {
      "terms": {
        "field": "category",
        "size": 10,
        "shard_size": 50
      }
    }
  }
}

Cardinality aggregation (HyperLogLog): each shard computes an HLL sketch of the distinct values. The coordinating node merges sketches via element-wise max. The merged sketch estimates the global cardinality. This is mergeable by design — the same property exploited in application-level cardinality estimation (Chapter 11).

Percentile aggregation (TDigest): each shard computes a TDigest of the value distribution. The coordinating node merges TDigests by concatenating their centroids (then re-clustering). The merged TDigest estimates global percentiles.

Cassandra Quorum Read: Coordinator-Driven Aggregation #

A Cassandra read at QUORUM involves the coordinator aggregating responses from multiple replicas.

Read Execution #

  1. The coordinator determines which nodes own replicas for the partition key.
  2. The coordinator sends the read to all R nodes required for the consistency level (for QUORUM on RF=3: 2 nodes).
  3. The coordinator waits for R responses.
  4. The coordinator merges responses: for each column, keep the value with the highest timestamp (LWW merge).
  5. If the responses differ (replicas are out of sync), the coordinator initiates read repair — sending the corrected version to the lagging replica in the background.
Coordinator receives: SELECT * FROM orders WHERE order_id = 'abc'

Send to replica 1: order_id=abc → {status: 'shipped', ts: 1000}
Send to replica 2: order_id=abc → {status: 'processing', ts: 900}

Merge: status = 'shipped' (higher timestamp wins)

Read repair: send {status: 'shipped', ts: 1000} to replica 2 (background)

The merge function is column-level LWW. Cassandra does not return a “conflict” to the client — it silently resolves it using the merge function and corrects the lagging replica.

Speculative Retry #

For latency-sensitive reads, Cassandra can issue a speculative retry: if the first replica does not respond within a threshold, the coordinator sends the read to a second replica without waiting for the first to fail. Whichever responds first is used; the slower response is discarded.

cassandra.yaml:
  speculative_retry: 99percentile  # Retry if response time > p99

This is Aggregation in service of latency: the coordinator aggregates two parallel requests, taking the first response. It does not need to wait for all replicas — the first valid response is sufficient.

CockroachDB DistSQL: Distributed Query Fragments #

CockroachDB executes SQL queries using DistSQL — a distributed query execution engine that pushes computation close to the data.

Query Planning and Distribution #

For a query like SELECT region, COUNT(*) FROM orders GROUP BY region:

  1. The gateway node (the node the client connected to) receives the query.
  2. The query planner identifies which ranges hold the relevant data and which nodes are the leaseholders for those ranges.
  3. The planner creates a distributed query plan: a set of query fragments, each running on the node that holds the data.
  4. Each fragment executes its local computation (filter, partial aggregate) and streams results to the gateway.
  5. The gateway performs the final aggregation (merging partial counts by region) and returns the result.
Gateway node:
  Range [a–m] leaseholder: node1 → fragment: SELECT region, COUNT(*) WHERE pk IN [a–m]
  Range [m–z] leaseholder: node2 → fragment: SELECT region, COUNT(*) WHERE pk IN [m–z]

  node1 returns: {region: 'US', count: 4500}, {region: 'EU', count: 2100}
  node2 returns: {region: 'US', count: 3200}, {region: 'EU', count: 1800}, {region: 'APAC', count: 900}

Gateway merges:
  US: 4500 + 3200 = 7700
  EU: 2100 + 1800 = 3900
  APAC: 900

Aggregation correctness: the partial aggregate (COUNT per region per fragment) is mergeable by sum. Not all SQL aggregates are mergeable this way:

  • SUM, COUNT, MAX, MIN: mergeable (distributed by default)
  • AVG: not directly mergeable (need SUM and COUNT separately, merge, then divide)
  • COUNT DISTINCT: not exactly mergeable (use HyperLogLog approximation or exact tracking)
  • MEDIAN, PERCENTILE: not exactly mergeable (TDigest approximation or full sort)

CockroachDB and other distributed SQL engines handle these by either serializing the non-mergeable aggregation to a single node or using approximate algorithms.

Kafka Streams: Windowed Aggregation #

Kafka Streams performs stateful aggregation over event streams. The aggregation is distributed across stream tasks, with each task owning a set of partitions.

Stateful Operators #

A count() or aggregate() operation in Kafka Streams is backed by a state store — a local RocksDB instance per task:

KStream<String, Order> orders = builder.stream("orders");

KTable<Windowed<String>, Long> orderCountByRegion = orders
    .groupBy((key, order) -> order.getRegion())
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count(Materialized.as("order-count-by-region"));

Each task processes a subset of partitions. The count for region “US” in a 5-minute window is stored in the state store of the task that owns the partition for “US” (determined by the groupBy key’s partition assignment).

There is no cross-task aggregation within Kafka Streams for a single output record. The partitioning by key ensures all events for “US” go to the same task — that task’s local state IS the global state for “US.” No merge needed.

When aggregation requires a repartition: if the groupBy key differs from the stream’s existing partition key, Kafka Streams performs a repartition — writing intermediate results to a repartitioned internal topic, then re-reading with the new key. This is expensive (two extra topic writes) but necessary to ensure all records for a key land on the same task.

Changelog and Fault Tolerance #

State stores are backed by a changelog topic in Kafka. Every state update is written to the changelog. On task failure:

  1. A new task is assigned the failed task’s partitions.
  2. The new task replays the changelog to reconstruct the state store.
  3. Only the delta since the last checkpoint needs replaying (Kafka Streams checkpoints state store offsets).

This is Suppression at the infrastructure level: the changelog ensures state is not lost and is reconstructed exactly, without double-counting on recovery.

The Infrastructure Aggregation Contract #

Every infrastructure Aggregation mechanism requires the merge function to satisfy:

Commutativity: merge(shard_A, shard_B) = merge(shard_B, shard_A). The order in which shard results arrive does not matter.

Associativity: merge(merge(A, B), C) = merge(A, merge(B, C)). The coordinator can merge in any grouping — useful for hierarchical aggregation (intermediate aggregators).

Correctness on partial results: the partial result from each node must contain enough information for the merge to produce a globally correct answer. This is where aggregations like AVG fail — the per-shard average is not sufficient to compute the global average without knowing the per-shard count.

AggregateMergeable?Merge function
COUNTYesSum of partial counts
SUMYesSum of partial sums
MAX / MINYesMax / min of partial max/min
AVGNo directlyMerge (sum, count) pairs, then divide
COUNT DISTINCTApproximatelyHyperLogLog element-wise max
PERCENTILEApproximatelyTDigest centroid merge
TOP-KApproximatelyMerge top-K candidates with shard_size inflation