- My Development Notes/
- Quickwit Internals: A Substrate Decomposition/
- Indexing Pipeline Substrate — The Eight-Actor Chain/
Indexing Pipeline Substrate — The Eight-Actor Chain
Table of Contents
Indexing Pipeline Substrate — The Eight-Actor Chain #
The indexing pipeline transforms raw documents from a source into committed, searchable splits on object storage. It is implemented as a chain of eight supervised actors in quickwit-indexing/src/actors/. Each actor has a bounded mailbox; backpressure propagates from the slowest stage upstream to the source.
The Pipeline Chain #
SourceActor
→ DocProcessor
→ Indexer
→ IndexSerializer
→ Packager
→ Uploader
→ Sequencer<Publisher>
→ Publisher
The IndexingPipelineHandles struct holds ActorHandle for every actor:
struct IndexingPipelineHandles {
source_mailbox: Mailbox<SourceActor>,
source_handle: ActorHandle<SourceActor>,
doc_processor: ActorHandle<DocProcessor>,
indexer: ActorHandle<Indexer>,
index_serializer: ActorHandle<IndexSerializer>,
packager: ActorHandle<Packager>,
uploader: ActorHandle<Uploader>,
sequencer: ActorHandle<Sequencer<Publisher>>,
publisher: ActorHandle<Publisher>,
next_check_for_progress: Instant,
}
Stage 1: SourceActor #
SourceActor reads documents from the configured source (Kafka, Kinesis, file, ingest API). It produces RawDocBatch — a batch of raw bytes with a SourceCheckpoint tracking which positions in the source have been consumed. Sources are pluggable via the Source trait.
The source actor is unique: it is the only actor whose mailbox is held by IndexingPipeline directly (via source_mailbox). When IndexingPipeline needs to close a shard or update the source configuration (e.g., shard assignment change), it sends a message directly to the source.
Stage 2: DocProcessor #
DocProcessor receives RawDocBatch and parses documents:
pub struct DocProcessor {
// DocMapper: converts JSON objects to Tantivy documents
doc_mapper: Arc<DocMapper>,
// Sink: the Indexer's mailbox
indexer_mailbox: Mailbox<Indexer>,
// Counters for observation
counters: DocProcessorCounters,
// ...
}
For each document in the batch, DocProcessor:
- Parses the raw bytes as JSON (or OTLP protobuf/JSON for traces and logs).
- Applies optional VRL (Vector Remap Language) transforms if configured.
- Calls
doc_mapper.doc_from_json_objto produce aTantivyDocument. - Forwards
ProcessedDocBatchto theIndexer.
Documents that fail parsing are counted as invalid and dropped; the source checkpoint still advances. This is a deliberate trade-off: one bad document should not stall the pipeline.
Stage 3: Indexer #
Indexer is the core indexing stage. It maintains a workbench of in-progress splits, partitioned by partition_id:
struct IndexerState {
pipeline_id: IndexingPipelineId,
metastore: MetastoreServiceClient,
indexing_directory: TempDirectory,
indexing_settings: IndexingSettings,
publish_lock: PublishLock,
publish_token_opt: Option<PublishToken>,
schema: Schema,
doc_mapping_uid: DocMappingUid,
tokenizer_manager: TokenizerManager,
max_num_partitions: NonZeroU32,
index_settings: IndexSettings,
cooperative_indexing_opt: Option<CooperativeIndexingCycle>,
}
For each ProcessedDocBatch, the indexer:
- Looks up or creates an
IndexedSplitBuilderfor the document’s partition:splits: FnvHashMap<u64, IndexedSplitBuilder>. - Adds the Tantivy document to the
IndexBuilderinside that split builder. - Checks
CommitTriggerconditions after each batch.
When a commit is triggered, the workbench is flushed: all active splits are finalized and sent downstream as IndexedSplitBatch.
Partition overflow: if the number of active partitions exceeds max_num_partitions, new documents with unrecognized partitions go to the OTHER_PARTITION_ID split. This bounds memory usage when there are many hot partitions.
CommitTrigger: When to Flush #
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum CommitTrigger {
Drained, // source mailbox is empty
ForceCommit, // explicit commit request
MemoryLimit, // RAM usage exceeded threshold
NoMoreDocs, // source is exhausted
NumDocsLimit, // split_num_docs_limit reached
Timeout, // commit_timeout_secs elapsed
}
The commit timeout and doc limit come from IndexingSettings. Typical defaults: 60 seconds or 10 million documents. A MemoryLimit commit fires when the Tantivy index writer exceeds its heap budget. Drained fires when the source has no more documents immediately available — committing on drain bounds latency for low-volume indices.
PublishLock: Fencing Stale Splits #
PublishLock is a fencing mechanism to prevent stale pipeline generations from publishing splits after a restart:
// From models/publish_lock.rs (inferred)
pub struct PublishLock(Arc<AtomicBool>);
impl PublishLock {
pub fn is_dead(&self) -> bool { ... }
pub async fn acquire(&self) -> Option<PublishLockGuard> { ... }
}
When IndexingPipeline restarts (after a failure), it creates a new PublishLock and a new PublishToken. The old pipeline generation’s splits carry the old lock. When the old lock is marked dead, those splits’ Packager and Publisher check publish_lock.is_dead() and drop the split:
// In packager.rs
for split in batch.splits {
if batch.publish_lock.is_dead() {
info!(split_ids=?split_ids, "Splits' publish lock is dead.");
return Ok(());
}
let packaged_split = self.process_indexed_split(split, ctx).await?;
packaged_splits.push(packaged_split);
}
This prevents a split from a crashed pipeline generation from racing with a split from the new generation at the Publisher.
Stage 4: IndexSerializer #
IndexSerializer calls index_writer.commit() on the Tantivy IndexWriter, converting the in-memory index into segment files on a temp directory. This is a blocking operation (disk I/O + codec), so IndexSerializer runs on the blocking thread pool. The output is an IndexedSplit with a finalized Tantivy index on disk.
Stage 5: Packager #
pub struct Packager {
actor_name: &'static str,
uploader_mailbox: Mailbox<Uploader>,
tag_fields: Vec<NamedField>,
}
impl Actor for Packager {
fn queue_capacity(&self) -> QueueCapacity {
QueueCapacity::Bounded(1)
}
fn runtime_handle(&self) -> Handle {
RuntimeType::Blocking.get_runtime_handle()
}
}
Packager takes the finalized Tantivy segment, extracts tags (used for split pruning during search), and calls create_packaged_split to produce a PackagedSplit. The packaged split contains the split’s segment files and metadata needed for the bundle format.
The queue capacity of 1 creates backpressure: the IndexSerializer will block when sending to a full Packager mailbox. This throttles the indexer from generating splits faster than they can be packaged and uploaded.
Stage 6: Uploader #
Uploader is responsible for:
- Staging the split in the metastore (
StageSplitsRequest) — recording the split asStagedwith its metadata before uploading its data. - Building the split bundle via
SplitPayloadBuilder::get_split_payload. - Uploading the bundle to object storage via
IndexingSplitStore. - Sending the
SplitsUpdatemessage downstream.
Upload concurrency is bounded by semaphores:
static CONCURRENT_UPLOAD_PERMITS_INDEX: OnceCell<Semaphore> = OnceCell::new();
static CONCURRENT_UPLOAD_PERMITS_MERGE: OnceCell<Semaphore> = OnceCell::new();
The indexing and merge pipelines share a configured max_concurrent_split_uploads budget split between them. This prevents a burst of large splits from exhausting object storage bandwidth.
Stage 7: Sequencer #
// SplitsUpdateMailbox variant for the indexing pipeline
SplitsUpdateMailbox::Sequencer(Mailbox<Sequencer<Publisher>>)
The Sequencer is a reordering buffer. The Uploader can upload multiple splits concurrently (up to the semaphore limit), so uploads may complete out of order. The Sequencer ensures that SplitsUpdate messages are forwarded to Publisher in the same order they were originally sent by the Uploader.
This ordering guarantee is critical: checkpoint advancement must be monotonic. If split B (checkpointing position 1000) is published before split A (checkpointing position 500), position 500–999 would be skipped and those documents lost.
The merge pipeline does not use a Sequencer (it sends directly to Publisher) because merge splits do not carry source checkpoints — they are derived from already-committed splits.
Stage 8: Publisher #
pub struct Publisher {
publisher_type: PublisherType,
metastore: MetastoreServiceClient,
merge_planner_mailbox_opt: Option<Mailbox<MergePlanner>>,
source_mailbox_opt: Option<Mailbox<SourceActor>>,
counters: PublisherCounters,
}
Publisher calls metastore.publish_splits(PublishSplitsRequest) to atomically transition splits from Staged to Published and advance the source checkpoint. The metastore’s atomicity guarantee means either the split is published and the checkpoint advances, or neither happens — preventing partial commits.
After publishing, Publisher sends the new split list to MergePlanner (so it can schedule merges) and sends the checkpoint delta back to SourceActor (so the source can truncate its WAL).
IndexingPipeline: The Supervisor #
IndexingPipeline is itself an actor that supervises all eight pipeline actors:
pub struct IndexingPipeline {
params: IndexingPipelineParams,
previous_generations_statistics: IndexingStatistics,
statistics: IndexingStatistics,
handles_opt: Option<IndexingPipelineHandles>,
kill_switch: KillSwitch,
shard_ids: BTreeSet<ShardId>,
_indexing_pipelines_gauge_guard: OwnedGaugeGuard,
}
Every second, SuperviseLoop runs healthcheck() across all supervisables(). If any actor is FailureOrUnhealthy, it:
- Triggers the
KillSwitchto kill all actors. - Marks the old
PublishLockas dead. - Schedules a
Spawnmessage with exponential backoff (wait_duration_before_retry). - On
Spawn, spawns a fresh pipeline with a newPublishLockand incremented retry count.
The spawn semaphore (SPAWN_PIPELINE_SEMAPHORE, capacity 10) prevents more than 10 pipelines from initializing simultaneously during a node restart.
Summary #
The indexing pipeline is a chain of eight supervised actors with bounded mailboxes providing end-to-end backpressure. PublishLock fences stale pipeline generations from polluting the metastore. CommitTrigger drives split boundaries on timeout, doc count, memory, or drain events. Sequencer enforces monotonic checkpoint advancement despite concurrent uploads. IndexingPipeline supervises the chain with health checks and exponential-backoff restart.