Skip to main content
  1. Quickwit Internals: A Substrate Decomposition/

Search Root Substrate — Scatter-Gather and Job Placement

Search Root Substrate — Scatter-Gather and Job Placement #

The search root is the coordinator for distributed search. It receives a SearchRequest, determines which splits are relevant, assigns those splits to searcher nodes, fans out LeafSearchRequests in parallel, collects and merges partial results, and returns a SearchResponse. The implementation lives in quickwit-search/src/root.rs.

SearchJob: The Unit of Work #

#[derive(Debug, Clone, PartialEq)]
pub struct SearchJob {
    pub index_uid: IndexUid,
    cost: usize,
    pub offsets: SplitIdAndFooterOffsets,
}

impl Job for SearchJob {
    fn split_id(&self) -> &str {
        &self.offsets.split_id
    }

    fn cost(&self) -> usize {
        self.cost
    }
}

A SearchJob represents searching one split. The cost is an estimate of the work required, derived from split metadata:

impl<'a> From<&'a SplitMetadata> for SearchJob {
    fn from(split_metadata: &'a SplitMetadata) -> Self {
        SearchJob {
            index_uid: split_metadata.index_uid.clone(),
            cost: compute_split_cost(split_metadata),
            offsets: extract_split_and_footer_offsets(split_metadata),
        }
    }
}

compute_split_cost estimates cost from the split’s document count and size. SplitIdAndFooterOffsets carries the split ID and the byte range of the footer in the .split file — this is passed to leaf nodes so they can fetch the footer without querying the metastore.

Split Pruning Before Job Creation #

Before creating jobs, the root prunes the split list. list_relevant_splits queries the metastore for published splits and applies:

  • Time range pruning: if the query has a time filter, splits whose time range does not overlap are excluded.
  • Tag pruning: if the query’s QueryAst contains term constraints, splits whose tag set doesn’t intersect are excluded.
  • Tombstone filtering: splits in ScheduledForDelete state are excluded.

This can eliminate the majority of splits for time-bounded queries, reducing both network fan-out and leaf-node work.

SearchJobPlacer: Rendezvous Hashing #

#[derive(Clone, Default)]
pub struct SearchJobPlacer {
    searcher_pool: SearcherPool,
}

impl SearchJobPlacer {
    pub async fn best_nodes_per_affinity(
        &self,
        affinity_key: &[u8],
    ) -> impl Iterator<Item = SearchServiceClient> {
        let mut nodes: Vec<SocketAddrAndClient> = self
            .searcher_pool
            .pairs()
            .into_iter()
            .map(|...| SocketAddrAndClient { socket_addr, client })
            .collect();
        sort_by_rendez_vous_hash(&mut nodes[..], affinity_key);
        nodes.into_iter().map(|n| n.client)
    }
}

sort_by_rendez_vous_hash is Rendezvous hashing (also called highest-random-weight hashing). For each node, it computes H(node_id, affinity_key) and sorts nodes by descending hash value. The node with the highest hash for a given affinity_key (the split ID) is the primary assignee.

Rendezvous hashing has a key property: when a node is added or removed, only 1/N of the splits are reassigned (compared to consistent hashing’s average). This means the node-local SplitCache (chapter 4) maintains high hit rates as the cluster scales.

The SearchJobPlacer also handles split caching reports:

#[async_trait]
impl EventSubscriber<ReportSplitsRequest> for SearchJobPlacer {
    async fn handle_event(&mut self, evt: ReportSplitsRequest) {
        // For each split that a node reports having cached locally:
        // use node_affinity() to find the preferred node for that split
        // and send the ReportSplit to that node
        for report_split in evt.report_splits {
            let node_addr = nodes.keys()
                .max_by_key(|addr| {
                    node_affinity(SocketAddrLegacyHash(addr), &report_split.split_id)
                });
            // route report to that node
        }
    }
}

When a node caches a split locally, it reports this via ReportSplitsRequest. The SearchJobPlacer routes this report to the node that has the highest affinity for that split, so that node knows to prefer itself for queries on that split. This implements a soft cache-affinity routing: the Rendezvous primary is preferred, but fallback nodes exist if the primary is overloaded.

assign_jobs: The LPT Algorithm #

pub async fn assign_jobs<J: Job>(
    &self,
    mut jobs: Vec<J>,
    excluded_addrs: &HashSet<SocketAddr>,
) -> anyhow::Result<impl Iterator<Item = (SearchServiceClient, Vec<J>)> + use<J>> {
    // Sort jobs by descending cost (Longest Processing Time first)
    jobs.sort_unstable_by(Job::compare_cost);

    let mut candidate_nodes: Vec<CandidateNode> = all_nodes
        .into_iter()
        .map(|(grpc_addr, client)| CandidateNode { grpc_addr, client, load: 0 })
        .collect();

    const ALLOWED_DIFFERENCE: usize = 105; // 5% disparity allowed
    // ...
}

assign_jobs uses the Longest Processing Time (LPT) greedy algorithm:

  1. Sort jobs by descending cost.
  2. For each job (in cost order), assign it to the node with the lowest current load, subject to the Rendezvous hash affinity constraint.

LPT minimizes the makespan (time until all nodes finish) for the case where all nodes start simultaneously — which is exactly what happens in scatter-gather search. The 5% disparity tolerance (ALLOWED_DIFFERENCE = 105) prevents over-optimization: a job that would go to a slightly less loaded but very cache-affine node can stay on the affine node if the load imbalance is within 5%.

Scatter-Gather Fan-Out #

let jobs: Vec<SearchJob> = split_metadatas.iter().map(SearchJob::from).collect();
let assigned_leaf_search_jobs = cluster_client
    .search_job_placer
    .assign_jobs(jobs, &HashSet::default())
    .await?;

let mut leaf_request_tasks = Vec::new();
for (client, client_jobs) in assigned_leaf_search_jobs {
    let leaf_request = jobs_to_leaf_request(
        search_request,
        indexes_metas_for_leaf_search,
        client_jobs,
    )?;
    leaf_request_tasks.push(cluster_client.leaf_search(leaf_request, client.clone()));
}

let leaf_search_responses: Vec<LeafSearchResponse> =
    try_join_all(leaf_request_tasks).await?;

All leaf requests are issued concurrently via try_join_all. Each LeafSearchRequest carries the list of SplitIdAndFooterOffsets assigned to that node, the query, and the sort/aggregation specification. The leaf processes all its assigned splits and returns a LeafSearchResponse with partial hits and partial aggregation results.

If any leaf fails, try_join_all returns the first error. The root can retry failed leaves on a different node (using excluded_addrs in the next assign_jobs call).

Merge Collector #

After receiving all LeafSearchResponses, the root merges them:

let merge_collector = make_merge_collector(search_request, &aggregations)?;
let merged_response = merge_collector.merge_fruits(leaf_results)?;

make_merge_collector creates a collector appropriate for the query:

  • Top-K hits: merge sorted partial hit lists, keep the global top K.
  • Aggregations: merge IntermediateAggregationResults from all leaves.
  • Count: sum up document counts.

The merge is CPU-bound but not I/O-bound — all data is already in memory in the LeafSearchResponses. Tantivy’s aggregation framework handles the merging of intermediate aggregation trees.

FetchDocs: The Second Phase #

Quickwit uses a two-phase search (like Elasticsearch’s query_then_fetch):

Phase 1 (leaf search): each leaf returns PartialHits — document IDs and sort values, but not the stored document fields. This minimizes network transfer: typically only a small top-K subset of documents per split are returned.

Phase 2 (fetch docs): the root collects the global top-K PartialHits, groups them by split (as FetchDocsJob), and sends FetchDocsRequests to the relevant nodes:

pub struct FetchDocsJob {
    index_uid: IndexUid,
    offsets: SplitIdAndFooterOffsets,
    pub partial_hits: Vec<PartialHit>,
}

impl Job for FetchDocsJob {
    fn split_id(&self) -> &str { &self.offsets.split_id }
    fn cost(&self) -> usize { self.partial_hits.len() }
}

FetchDocsJob cost is the number of partial hits, not the split size — fetching 3 documents from a large split is cheaper than fetching 300 from a small one. The same assign_jobs + Rendezvous hashing assigns fetch-doc jobs to nodes.

Metadata Count Optimization #

let leaf_search_responses = if is_metadata_count_request(search_request) {
    get_count_from_metadata(split_metadatas)
} else {
    // ... full scatter-gather
};

If the query is a pure count query with no filters beyond the split’s time range, the count can be read directly from split metadata without scanning any documents. get_count_from_metadata sums SplitMetadata::num_docs across the relevant splits — zero leaf I/O.

Summary #

The search root substrate implements scatter-gather search with Rendezvous hashing for split affinity, LPT for balanced load distribution, and try_join_all for concurrent leaf fan-out. Split pruning (time range, tags) reduces the job set before placement. The two-phase query (leaf search → fetch docs) minimizes network transfer. Pure count queries short-circuit the entire scatter-gather via metadata. The SearchJobPlacer doubles as a cache-affinity routing layer, directing split cache reports to the Rendezvous-preferred node to maximize SplitCache hit rates.