Skip to main content
  1. Search Indices: From Protocol to Production/

The Read Path: Scatter-Gather and Distributed Relevance

The Read Path: Scatter-Gather and Distributed Relevance #

A search query in OpenSearch is not sent to a single node. It fans out across every shard in the index, each shard independently ranks its local documents, and the results are merged by a coordinating node before being returned to the client. This scatter-gather model is what makes search horizontally scalable — but it introduces a fundamental distributed systems problem: relevance scores computed locally on a shard are not directly comparable to scores computed on another shard.

Two-Phase Query Execution: Query Then Fetch #

Scatter-gather: two-phase query execution

Every search request executes in two phases: the query phase and the fetch phase.

Query phase — lightweight ranking pass.

  1. Coordinating node receives the search request.
  2. Coordinating node broadcasts the query to all primary or replica shards of the index.
  3. Each shard executes the query locally against its Lucene index, computing relevance scores for every matching document.
  4. Each shard returns a lightweight result: the top-N document IDs and their scores. No field values, no source documents — just IDs and scores.
  5. Coordinating node merges all results into a global ranked list, keeping the top-N overall.

Fetch phase — full document retrieval.

  1. Coordinating node identifies which shards hold the winning documents from the global top-N.
  2. Coordinating node issues targeted fetch requests — only to the shards that hold the documents that made the final cut.
  3. Those shards retrieve the full _source documents and return them.
  4. Coordinating node assembles the final response.

The separation matters. In a 10-shard index with size: 10, the query phase fetches 10 document IDs from each of 10 shards — 100 IDs total — but the fetch phase retrieves only 10 full documents. Without this split, every shard would transmit full document bodies for every matching result regardless of final ranking.

The Distributed Scoring Problem: Local vs Global IDF #

Distributed IDF problem: local vs global term frequency

BM25 — the relevance algorithm used by OpenSearch — depends on Inverse Document Frequency (IDF): terms that appear in fewer documents are scored higher. In a distributed index, each shard has only a local view of document frequency. Term X may appear in 100 documents on shard 0 and 2 documents on shard 1 — shard 1 will score term X dramatically higher than shard 0 for otherwise identical documents.

Shard 0: 1000 docs total, "payment" in 200 docs → IDF = log(1000/200) = 1.61
Shard 1:  100 docs total, "payment" in   2 docs → IDF = log(100/2)   = 3.91

The same document, if it existed on both shards, would receive a score of 3.91 on shard 1 and 1.61 on shard 0. The merge step on the coordinating node compares these scores directly — a document on shard 1 gets an unfair advantage.

OpenSearch provides two solutions:

dfs_query_then_fetch — adds a pre-phase where the coordinating node collects term statistics from all shards before executing the query, computes a single global IDF, and distributes it to all shards for scoring.

GET /orders/_search?search_type=dfs_query_then_fetch
{
  "query": { "match": { "description": "payment" } }
}

This produces globally correct scores at the cost of an extra round trip. Use it when ranking accuracy matters more than latency — recommendation engines, relevance-sensitive search, A/B testing relevance.

Large shard count with uniform distribution — with enough documents evenly distributed across shards, local IDF approximates global IDF. At scale the problem self-corrects. For small indexes or heavily skewed data, dfs_query_then_fetch is the correct choice.

The Deep Pagination Problem: Why from/size Breaks #

The naive pagination model is from + size:

GET /orders/_search
{
  "from": 990,
  "size": 10
}

This appears to fetch documents 990–1000. In reality, each shard must return its top from + size = 1000 results to the coordinating node, which then discards all but the last 10. For a 5-shard index requesting page 100 with size: 10, the coordinating node receives and sorts 5,000 document IDs to return 10.

The cost scales linearly with from. OpenSearch enforces a default index.max_result_window of 10,000 — requests beyond this are rejected.

Two alternatives that scale:

search_after — stateless keyset pagination. Instead of an offset, you provide the sort values of the last document seen:

GET /orders/_search
{
  "size": 10,
  "sort": [{ "date": "asc" }, { "_id": "asc" }],
  "search_after": ["2024-01-15", "order_abc_123"]
}

Each shard only needs to find documents after the provided sort key — no accumulation of prior pages. Consistent and efficient at any depth. The constraint: you must paginate forward sequentially — jumping to an arbitrary page is not possible.

Point-in-Time (PIT) — for consistent pagination across a changing index. Without PIT, new documents indexed between page 1 and page 2 can shift result positions, causing documents to appear twice or be skipped entirely. A PIT creates a frozen view of the index:

POST /orders/_pit?keep_alive=5m
GET /_search
{
  "size": 10,
  "pit": { "id": "<pit_id>", "keep_alive": "5m" },
  "sort": [{ "date": "asc" }],
  "search_after": ["2024-01-15"]
}

The PIT holds open the index readers from the moment it was created. Subsequent writes do not affect the view. The cost: PIT contexts consume heap on the shard nodes for as long as they are kept alive. Always close PIT contexts explicitly when done.

Pagination methodMax depthConsistencyJump to pageCost
from/size10,000 (default)No — live index shiftsYesO(from × shards)
search_afterUnlimitedNo — live index shiftsNoO(size × shards)
PIT + search_afterUnlimitedYes — frozen viewNoO(size × shards) + heap

Replica Selection: Adaptive Routing #

For each shard in a query, the coordinating node must choose between the primary and any available replicas. OpenSearch uses adaptive replica selection (ARS) — it tracks response times and queue depths across all copies of each shard and routes to the copy with the best recent performance. A replica on a lightly loaded node receives more queries than one on a saturated node, even if both are in-sync.

This is why adding replicas to a search-heavy index improves throughput non-linearly — the coordinating node distributes load across all copies dynamically rather than round-robining equally.

Custom routing can override ARS:

GET /orders/_search?preference=_local
  • _local — prefer shards on the local node. Reduces network hop, may increase imbalance.
  • _only_local — only query local shards. Useful for analytics where staleness is acceptable.
  • A custom string — consistent hashing routes the same string to the same shard copy. Useful for query-level caching at the shard.

Concurrent Segment Search: Parallelism Within a Shard #

A shard is a Lucene index composed of multiple immutable segments. By default, a query on a shard executes serially across all its segments — one thread, sequential scan. For shards with many segments (common after high-throughput indexing before merging catches up), this serializes what could be parallel work.

Concurrent segment search parallelizes query execution across segments within a shard:

PUT /orders/_settings
{
  "index.search.concurrent_segment_search.enabled": true
}

Each segment is assigned to a slice of the thread pool. The shard-level result is assembled from per-segment results using the same merge step as the cross-shard merge at the coordinating node. The tradeoff: more CPU per query, lower latency for queries on shards with many segments. Not beneficial for shards with few large segments — the overhead of the merge outweighs the parallelism gain.