Skip to main content
  1. Quickwit Internals: A Substrate Decomposition/

Indexing Pipeline Substrate — The Eight-Actor Chain

Indexing Pipeline Substrate — The Eight-Actor Chain #

The indexing pipeline transforms raw documents from a source into committed, searchable splits on object storage. It is implemented as a chain of eight supervised actors in quickwit-indexing/src/actors/. Each actor has a bounded mailbox; backpressure propagates from the slowest stage upstream to the source.

The Pipeline Chain #

SourceActor
    → DocProcessor
    → Indexer
    → IndexSerializer
    → Packager
    → Uploader
    → Sequencer<Publisher>
    → Publisher

The IndexingPipelineHandles struct holds ActorHandle for every actor:

struct IndexingPipelineHandles {
    source_mailbox: Mailbox<SourceActor>,
    source_handle: ActorHandle<SourceActor>,
    doc_processor: ActorHandle<DocProcessor>,
    indexer: ActorHandle<Indexer>,
    index_serializer: ActorHandle<IndexSerializer>,
    packager: ActorHandle<Packager>,
    uploader: ActorHandle<Uploader>,
    sequencer: ActorHandle<Sequencer<Publisher>>,
    publisher: ActorHandle<Publisher>,
    next_check_for_progress: Instant,
}

Stage 1: SourceActor #

SourceActor reads documents from the configured source (Kafka, Kinesis, file, ingest API). It produces RawDocBatch — a batch of raw bytes with a SourceCheckpoint tracking which positions in the source have been consumed. Sources are pluggable via the Source trait.

The source actor is unique: it is the only actor whose mailbox is held by IndexingPipeline directly (via source_mailbox). When IndexingPipeline needs to close a shard or update the source configuration (e.g., shard assignment change), it sends a message directly to the source.

Stage 2: DocProcessor #

DocProcessor receives RawDocBatch and parses documents:

pub struct DocProcessor {
    // DocMapper: converts JSON objects to Tantivy documents
    doc_mapper: Arc<DocMapper>,
    // Sink: the Indexer's mailbox
    indexer_mailbox: Mailbox<Indexer>,
    // Counters for observation
    counters: DocProcessorCounters,
    // ...
}

For each document in the batch, DocProcessor:

  1. Parses the raw bytes as JSON (or OTLP protobuf/JSON for traces and logs).
  2. Applies optional VRL (Vector Remap Language) transforms if configured.
  3. Calls doc_mapper.doc_from_json_obj to produce a TantivyDocument.
  4. Forwards ProcessedDocBatch to the Indexer.

Documents that fail parsing are counted as invalid and dropped; the source checkpoint still advances. This is a deliberate trade-off: one bad document should not stall the pipeline.

Stage 3: Indexer #

Indexer is the core indexing stage. It maintains a workbench of in-progress splits, partitioned by partition_id:

struct IndexerState {
    pipeline_id: IndexingPipelineId,
    metastore: MetastoreServiceClient,
    indexing_directory: TempDirectory,
    indexing_settings: IndexingSettings,
    publish_lock: PublishLock,
    publish_token_opt: Option<PublishToken>,
    schema: Schema,
    doc_mapping_uid: DocMappingUid,
    tokenizer_manager: TokenizerManager,
    max_num_partitions: NonZeroU32,
    index_settings: IndexSettings,
    cooperative_indexing_opt: Option<CooperativeIndexingCycle>,
}

For each ProcessedDocBatch, the indexer:

  1. Looks up or creates an IndexedSplitBuilder for the document’s partition: splits: FnvHashMap<u64, IndexedSplitBuilder>.
  2. Adds the Tantivy document to the IndexBuilder inside that split builder.
  3. Checks CommitTrigger conditions after each batch.

When a commit is triggered, the workbench is flushed: all active splits are finalized and sent downstream as IndexedSplitBatch.

Partition overflow: if the number of active partitions exceeds max_num_partitions, new documents with unrecognized partitions go to the OTHER_PARTITION_ID split. This bounds memory usage when there are many hot partitions.

CommitTrigger: When to Flush #

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum CommitTrigger {
    Drained,        // source mailbox is empty
    ForceCommit,    // explicit commit request
    MemoryLimit,    // RAM usage exceeded threshold
    NoMoreDocs,     // source is exhausted
    NumDocsLimit,   // split_num_docs_limit reached
    Timeout,        // commit_timeout_secs elapsed
}

The commit timeout and doc limit come from IndexingSettings. Typical defaults: 60 seconds or 10 million documents. A MemoryLimit commit fires when the Tantivy index writer exceeds its heap budget. Drained fires when the source has no more documents immediately available — committing on drain bounds latency for low-volume indices.

PublishLock: Fencing Stale Splits #

PublishLock is a fencing mechanism to prevent stale pipeline generations from publishing splits after a restart:

// From models/publish_lock.rs (inferred)
pub struct PublishLock(Arc<AtomicBool>);

impl PublishLock {
    pub fn is_dead(&self) -> bool { ... }
    pub async fn acquire(&self) -> Option<PublishLockGuard> { ... }
}

When IndexingPipeline restarts (after a failure), it creates a new PublishLock and a new PublishToken. The old pipeline generation’s splits carry the old lock. When the old lock is marked dead, those splits’ Packager and Publisher check publish_lock.is_dead() and drop the split:

// In packager.rs
for split in batch.splits {
    if batch.publish_lock.is_dead() {
        info!(split_ids=?split_ids, "Splits' publish lock is dead.");
        return Ok(());
    }
    let packaged_split = self.process_indexed_split(split, ctx).await?;
    packaged_splits.push(packaged_split);
}

This prevents a split from a crashed pipeline generation from racing with a split from the new generation at the Publisher.

Stage 4: IndexSerializer #

IndexSerializer calls index_writer.commit() on the Tantivy IndexWriter, converting the in-memory index into segment files on a temp directory. This is a blocking operation (disk I/O + codec), so IndexSerializer runs on the blocking thread pool. The output is an IndexedSplit with a finalized Tantivy index on disk.

Stage 5: Packager #

pub struct Packager {
    actor_name: &'static str,
    uploader_mailbox: Mailbox<Uploader>,
    tag_fields: Vec<NamedField>,
}

impl Actor for Packager {
    fn queue_capacity(&self) -> QueueCapacity {
        QueueCapacity::Bounded(1)
    }

    fn runtime_handle(&self) -> Handle {
        RuntimeType::Blocking.get_runtime_handle()
    }
}

Packager takes the finalized Tantivy segment, extracts tags (used for split pruning during search), and calls create_packaged_split to produce a PackagedSplit. The packaged split contains the split’s segment files and metadata needed for the bundle format.

The queue capacity of 1 creates backpressure: the IndexSerializer will block when sending to a full Packager mailbox. This throttles the indexer from generating splits faster than they can be packaged and uploaded.

Stage 6: Uploader #

Uploader is responsible for:

  1. Staging the split in the metastore (StageSplitsRequest) — recording the split as Staged with its metadata before uploading its data.
  2. Building the split bundle via SplitPayloadBuilder::get_split_payload.
  3. Uploading the bundle to object storage via IndexingSplitStore.
  4. Sending the SplitsUpdate message downstream.

Upload concurrency is bounded by semaphores:

static CONCURRENT_UPLOAD_PERMITS_INDEX: OnceCell<Semaphore> = OnceCell::new();
static CONCURRENT_UPLOAD_PERMITS_MERGE: OnceCell<Semaphore> = OnceCell::new();

The indexing and merge pipelines share a configured max_concurrent_split_uploads budget split between them. This prevents a burst of large splits from exhausting object storage bandwidth.

Stage 7: Sequencer #

// SplitsUpdateMailbox variant for the indexing pipeline
SplitsUpdateMailbox::Sequencer(Mailbox<Sequencer<Publisher>>)

The Sequencer is a reordering buffer. The Uploader can upload multiple splits concurrently (up to the semaphore limit), so uploads may complete out of order. The Sequencer ensures that SplitsUpdate messages are forwarded to Publisher in the same order they were originally sent by the Uploader.

This ordering guarantee is critical: checkpoint advancement must be monotonic. If split B (checkpointing position 1000) is published before split A (checkpointing position 500), position 500–999 would be skipped and those documents lost.

The merge pipeline does not use a Sequencer (it sends directly to Publisher) because merge splits do not carry source checkpoints — they are derived from already-committed splits.

Stage 8: Publisher #

pub struct Publisher {
    publisher_type: PublisherType,
    metastore: MetastoreServiceClient,
    merge_planner_mailbox_opt: Option<Mailbox<MergePlanner>>,
    source_mailbox_opt: Option<Mailbox<SourceActor>>,
    counters: PublisherCounters,
}

Publisher calls metastore.publish_splits(PublishSplitsRequest) to atomically transition splits from Staged to Published and advance the source checkpoint. The metastore’s atomicity guarantee means either the split is published and the checkpoint advances, or neither happens — preventing partial commits.

After publishing, Publisher sends the new split list to MergePlanner (so it can schedule merges) and sends the checkpoint delta back to SourceActor (so the source can truncate its WAL).

IndexingPipeline: The Supervisor #

IndexingPipeline is itself an actor that supervises all eight pipeline actors:

pub struct IndexingPipeline {
    params: IndexingPipelineParams,
    previous_generations_statistics: IndexingStatistics,
    statistics: IndexingStatistics,
    handles_opt: Option<IndexingPipelineHandles>,
    kill_switch: KillSwitch,
    shard_ids: BTreeSet<ShardId>,
    _indexing_pipelines_gauge_guard: OwnedGaugeGuard,
}

Every second, SuperviseLoop runs healthcheck() across all supervisables(). If any actor is FailureOrUnhealthy, it:

  1. Triggers the KillSwitch to kill all actors.
  2. Marks the old PublishLock as dead.
  3. Schedules a Spawn message with exponential backoff (wait_duration_before_retry).
  4. On Spawn, spawns a fresh pipeline with a new PublishLock and incremented retry count.

The spawn semaphore (SPAWN_PIPELINE_SEMAPHORE, capacity 10) prevents more than 10 pipelines from initializing simultaneously during a node restart.

Summary #

The indexing pipeline is a chain of eight supervised actors with bounded mailboxes providing end-to-end backpressure. PublishLock fences stale pipeline generations from polluting the metastore. CommitTrigger drives split boundaries on timeout, doc count, memory, or drain events. Sequencer enforces monotonic checkpoint advancement despite concurrent uploads. IndexingPipeline supervises the chain with health checks and exponential-backoff restart.