Skip to main content
  1. Search Indices: From Protocol to Production/

Aggregations as Distributed Computation

Aggregations as Distributed Computation #

Aggregations are the analytics layer of a search index. They transform a set of matching documents into computed summaries — counts, sums, histograms, percentiles, unique value counts. The challenge is that this computation runs across shards in a distributed system, and some aggregation semantics that are trivially exact on a single machine become fundamentally approximate when distributed.

Understanding which aggregations are exact and which are approximate — and why — determines whether an analytics feature returns trustworthy numbers.

Two-Phase Distributed Execution #

Like the read path for search, aggregations execute in two phases.

Distributed aggregation execution: partial results per shard merged at coordinator

Phase 1 — per-shard partial computation. The coordinating node broadcasts the aggregation request to all shards. Each shard executes the aggregation against its local documents and returns a partial result: a partial bucket tree with counts and accumulated values for that shard’s data only.

Phase 2 — merge at coordinating node. The coordinator receives all partial results and merges them into the final aggregation. For exact aggregations (sum, min, max, count), the merge is lossless: sum(partial sums) = total sum. For approximate aggregations (cardinality, percentiles, terms), the merge introduces bounded error.

GET /orders/_search
{
  "size": 0,
  "aggs": {
    "total_revenue": { "sum": { "field": "amount" } },
    "by_status": {
      "terms": { "field": "status" },
      "aggs": {
        "avg_amount": { "avg": { "field": "amount" } }
      }
    }
  }
}

size: 0 suppresses the search hits — only the aggregation results are returned. Aggregations can run alongside a query or over the full index.

Bucket Aggregations: Partitioning Documents #

Bucket aggregations partition documents into groups. Each bucket holds a set of documents that satisfy the bucket’s criteria.

terms — group by unique values of a field. Returns the top-N values by document count.

{
  "terms": {
    "field": "status",
    "size": 10,
    "order": { "_count": "desc" }
  }
}

date_histogram — group by time intervals. Produces time-series data.

{
  "date_histogram": {
    "field": "created",
    "calendar_interval": "1d",
    "time_zone": "America/Los_Angeles"
  }
}

range — explicit numeric ranges, each a named bucket.

{
  "range": {
    "field": "amount",
    "ranges": [
      { "to": 100 },
      { "from": 100, "to": 500 },
      { "from": 500 }
    ]
  }
}

filters — arbitrary filter clauses as named buckets.

{
  "filters": {
    "filters": {
      "failed":    { "term": { "status": "failed" } },
      "retried":   { "term": { "status": "retried" } }
    }
  }
}

Any bucket aggregation can contain sub-aggregations — metrics or further nested buckets. This is the mechanism that enables nested analytics: “for each status, compute average amount and histogram of amounts.”

Metric Aggregations: Computing Values #

Metric aggregations compute a single value or a set of values over a bucket’s documents.

Exact metrics — computed by accumulating values; merge is lossless:

AggregationDescription
sumSum of field values
min, maxMinimum/maximum value
value_countCount of documents with the field
avgArithmetic mean
statsAll of the above in one response
extended_statsPlus variance and standard deviation

Approximate metrics — see the next section:

AggregationAlgorithmError bound
cardinalityHyperLogLog++~3% by default
percentilesTDigestConfigurable compression
percentile_ranksTDigestConfigurable compression

The Cardinality Problem: HyperLogLog++ #

Exact unique-value counting (cardinality) requires storing every distinct value seen. For a user_id field with 50 million unique values across a multi-shard index, exact counting would require all shards to transmit 50M values to the coordinator for deduplication — prohibitive in memory and network cost.

OpenSearch uses HyperLogLog++ (HLL++), a probabilistic data structure that estimates cardinality with ~3% error using a fixed amount of memory (controlled by the precision_threshold parameter).

{
  "unique_users": {
    "cardinality": {
      "field": "user_id",
      "precision_threshold": 40000
    }
  }
}

precision_threshold sets the maximum number of distinct values below which the estimate is near-exact. Above this threshold, HLL++ applies its probabilistic approximation. The memory cost is proportional to precision_threshold, not to actual cardinality. Maximum value is 40,000.

The merge step: each shard returns an HLL++ sketch (a compact byte array). The coordinator merges sketches using the HLL++ union operation — the merged sketch represents the union of all distinct values seen across all shards.

What this means operationally: “count distinct users” in a dashboard will show a number that is accurate to within ~3%. If you count 1,000,000 unique users, the true count is somewhere between 970,000 and 1,030,000. For dashboards and analytics, this is acceptable. For billing or compliance counts, use a different approach (exact count from a transactional system, or pre-computed rollups).

The Percentiles Problem: TDigest #

Exact percentile computation requires sorting all values to find the Nth. Sorting 100M values distributed across 20 shards and transmitting them all to a coordinator is infeasible.

OpenSearch uses TDigest, a data structure that represents a distribution as a set of compressed centroids (each centroid holds a mean value and the weight of observations it represents). TDigest supports merging: each shard returns its centroid list; the coordinator merges them into a combined distribution and extracts percentiles.

{
  "response_time_percentiles": {
    "percentiles": {
      "field": "response_ms",
      "percents": [50, 90, 95, 99],
      "tdigest": { "compression": 200 }
    }
  }
}

compression controls the tradeoff between accuracy and memory. Higher compression → more centroids → better accuracy for extreme percentiles (p99, p99.9). Default is 100. For p50 accuracy, lower compression is acceptable. For SLO-grade p99 accuracy, use compression ≥ 200.

The error is highest at the extremes — p1 and p99 — and lowest at the median. This is the opposite of what most SLO use cases need, which is why percentile aggregations for latency monitoring require careful compression tuning.

The Terms Accuracy Problem: shard_size #

The terms aggregation has a subtle accuracy hazard. In the distributed execution:

  1. Each shard returns its top-N terms by local count.
  2. The coordinator merges per-shard counts for each term.

A term that is globally the #1 term may not be in the top-N on every shard. If size: 5 is requested and the global #1 term ranks #6 on one shard, it won’t appear in that shard’s response — the coordinator’s merged count for it will be understated.

OpenSearch reports this via doc_count_error_upper_bound in the response: the maximum number of documents that could have been missed for any returned term.

shard_size — control how many terms each shard returns, independently of the final size:

{
  "by_status": {
    "terms": {
      "field": "status",
      "size": 10,
      "shard_size": 100
    }
  }
}

Each shard returns 100 terms; the coordinator reduces to the top 10. More terms from each shard → lower doc_count_error_upper_bound → more accurate final counts. The default shard_size is size × 1.5 + 10.

For high-cardinality fields (product IDs, user IDs) where you want an accurate top-N across a distributed index, set shard_size to several times size. For low-cardinality fields (status codes, boolean flags) the default is fine — all shards will have all terms.

Pipeline Aggregations: Computing on Aggregation Output #

Pipeline aggregations operate on the output of other aggregations rather than on raw documents. They post-process bucket or metric results.

{
  "aggs": {
    "by_day": {
      "date_histogram": {
        "field": "created",
        "calendar_interval": "1d"
      },
      "aggs": {
        "daily_revenue": { "sum": { "field": "amount" } },
        "7d_moving_avg": {
          "moving_avg": {
            "buckets_path": "daily_revenue",
            "window": 7
          }
        }
      }
    },
    "max_daily_revenue": {
      "max_bucket": {
        "buckets_path": "by_day>daily_revenue"
      }
    }
  }
}

moving_avg smooths daily revenue across a rolling 7-day window. max_bucket finds the day with the highest revenue — a sibling pipeline aggregation that operates at the level of the parent date_histogram.

Common pipeline aggregations:

AggregationDescription
derivativeRate of change between buckets
cumulative_sumRunning total
moving_avgRolling average
bucket_sortSort and paginate buckets
bucket_selectorFilter buckets by metric value
max_bucket, min_bucket, avg_bucketSibling aggregations across buckets

Pipeline aggregations execute entirely on the coordinating node after all shard results are merged — they do not run per-shard. This means they cannot be used to reduce the data transmitted from shards, only to post-process what was already transmitted.

Aggregation Performance: What Matters #

doc_values — aggregations and sorting on keyword, numeric, and date fields use doc values — a columnar, on-disk structure written at index time that enables efficient access to all values for a given field across a set of documents. If doc_values is disabled for a field, aggregations on that field load the values into memory from the fielddata cache — expensive and heap-consuming. Never disable doc_values on fields you aggregate on.

execution_hint: map vs execution_hint: global_ordinals — the terms aggregation uses global ordinals (an integer-to-term mapping built eagerly for keyword fields) for performance. For low-cardinality fields this is ideal. For fields with many unique values that are rarely aggregated, map avoids the ordinal-building overhead.

eager_global_ordinals — for fields that are aggregated on nearly every query, pre-building global ordinals at index refresh time eliminates the first-query latency spike:

PUT /orders/_mapping
{
  "properties": {
    "status": {
      "type": "keyword",
      "eager_global_ordinals": true
    }
  }
}