Skip to main content
  1. Quickwit Internals: A Substrate Decomposition/

Ingest Substrate — Ingester and mrecordlog WAL

Ingest Substrate — Ingester and mrecordlog #

The ingest substrate is the write path: it receives documents from producers (HTTP API, Kafka source, file source), durably buffers them in a WAL, and makes them available to the indexing pipeline. In Quickwit’s ingest v2 architecture, this substrate is embodied by the Ingester service in quickwit-ingest.

Architecture Overview #

The ingest substrate decouples producers from indexers via a per-shard WAL. A shard is a logical queue for a (index_uid, source_id, shard_id) triple. Producers write to shards; indexer pipelines read from shards and consume (advance checkpoints) as they index. The WAL provides durability: documents survive process crashes between the time they are written and the time the indexer commits a split to the metastore.

Ingester Struct #

#[derive(Clone)]
pub struct Ingester {
    self_node_id: NodeId,
    control_plane: ControlPlaneServiceClient,
    ingester_pool: IngesterPool,
    state: IngesterState,
    disk_capacity: ByteSize,
    memory_capacity: ByteSize,
    rate_limiter_settings: RateLimiterSettings,
    replication_factor: usize,
    reset_shards_permits: Arc<Semaphore>,
}

Key fields:

  • state: IngesterState — the mutable shard registry and WAL handle, wrapped in Arc<RwLock<...>> for concurrent access. All shard open/close/persist operations go through state.
  • ingester_pool: IngesterPool — a pool of gRPC clients to peer ingesters. Used for replication: when replication_factor > 1, the leader ingester forwards writes to follower ingesters via this pool.
  • disk_capacity / memory_capacity — capacity limits. The ingester tracks usage against these and signals to the control plane when it is near capacity, triggering shard reassignment.
  • reset_shards_permits: Arc<Semaphore> — a semaphore (capacity 1) ensuring only one reset_shards operation runs at a time. reset_shards is called to evict shards that are no longer assigned to this node; the semaphore prevents concurrent evictions from racing.

IngesterState and Shard Lifecycle #

IngesterState holds the WAL and the shard registry:

// Inferred from usage across ingester.rs and related modules
struct IngesterState {
    // mrecordlog multi-queue WAL, one queue per shard
    mrecordlog: MultiRecordLogAsync,
    // shard_id → ShardState
    shards: HashMap<QueueId, ShardState>,
    // current WAL disk usage
    disk_usage: ByteSize,
    // ...
}

Each ShardState tracks:

  • Leader/follower role: the leader ingester accepts writes and replicates to followers.
  • Shard status: Open (accepting writes), Closed (no new writes, indexer draining), Unavailable.
  • Rate limiter: per-shard rate limiting based on rate_limiter_settings.

On startup, IngesterState::load opens the WAL directory and reconstructs shard state from the WAL records already on disk. Background tasks are spawned:

BroadcastLocalShardsTask::spawn(cluster.clone(), weak_state.clone());
BroadcastIngesterCapacityScoreTask::spawn(cluster, weak_state.clone());
CloseIdleShardsTask::spawn(weak_state, idle_shard_timeout);
  • BroadcastLocalShardsTask: periodically advertises this node’s shard set to the cluster via Chitchat, so the control plane knows which ingester owns which shard.
  • BroadcastIngesterCapacityScoreTask: computes a capacity score (remaining disk + memory) and broadcasts it. The control plane uses scores to route new shard assignments to nodes with headroom.
  • CloseIdleShardsTask: closes shards that have received no writes for idle_shard_timeout. Closed shards stop accepting writes but remain in the WAL for the indexer to drain.

mrecordlog: The Multi-Queue WAL #

The WAL is provided by the mrecordlog crate (a dependency, not part of Quickwit’s main source). It implements a multi-queue, append-only log with sequential record IDs:

use mrecordlog::{MultiRecordLog, PersistAction, PersistPolicy, Record, ResourceUsage};

pub struct MultiRecordLogAsync {
    mrecordlog_opt: Option<MultiRecordLog>,
}

impl MultiRecordLogAsync {
    pub async fn open(directory_path: &Path) -> Result<Self, ReadRecordError> {
        Self::open_with_prefs(
            directory_path,
            PersistPolicy::Always(PersistAction::Flush),
        ).await
    }

    async fn run_operation<F, T>(&mut self, operation: F) -> T
    where
        F: FnOnce(&mut MultiRecordLog) -> T + Send + 'static,
        T: Send + 'static,
    {
        let mut mrecordlog = self.take();
        let join_res: Result<(T, MultiRecordLog), JoinError> =
            tokio::task::spawn_blocking(move || {
                let res = operation(&mut mrecordlog);
                (res, mrecordlog)
            })
            .await;
        // ...
    }
}

Key properties:

  • Multi-queue: each shard has its own logical queue identified by a QueueId (a string). All queues share a single WAL file directory, but records are tagged with their queue ID.
  • Sequential record IDs: each record appended to a queue gets a monotonically increasing u64 position. This position is used as the checkpoint: when the indexer has indexed and committed splits up to position N, it advances the checkpoint to N. The ingester can then truncate (delete) WAL records below N.
  • spawn_blocking wrapper: mrecordlog is a synchronous library (file I/O, fsync). MultiRecordLogAsync wraps every operation in tokio::task::spawn_blocking to prevent blocking the async runtime. If the WAL is poisoned (blocked I/O), the wrapper calls std::process::abort() rather than hanging the node.
  • PersistPolicy::Always(PersistAction::Flush): every append is flushed (but not necessarily fsynced) immediately. This trades some throughput for low latency durability.

Persist Request Flow #

When a producer writes documents:

  1. Producer sends a PersistRequest to the ingester’s gRPC endpoint.
  2. Ingester::persist validates the shard is Open and the rate limit is not exceeded.
  3. Records are appended to mrecordlog for the shard’s queue.
  4. If replication_factor > 1: the leader ingester sends a ReplicateRequest to follower ingesters via ingester_pool. The follower appends the same records to its local WAL.
  5. Once a majority (leader + enough followers) have acknowledged, the leader responds to the producer.

The timeout for persist requests is tight:

pub(super) const PERSIST_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature = "testsuite")) {
    Duration::from_millis(500)
} else {
    Duration::from_secs(6)
};

Six seconds. If the WAL write or replication stalls beyond this, the request times out with IngestV2Error::Timeout. Producers are expected to retry.

Batch Size #

const DEFAULT_BATCH_NUM_BYTES: usize = 1024 * 1024; // 1 MiB

fn get_batch_num_bytes() -> usize {
    static BATCH_NUM_BYTES_CELL: OnceCell<usize> = OnceCell::new();
    *BATCH_NUM_BYTES_CELL.get_or_init(|| {
        quickwit_common::get_from_env("QW_INGEST_BATCH_NUM_BYTES", DEFAULT_BATCH_NUM_BYTES, false)
    })
}

Documents are buffered into 1 MiB batches before being written to the WAL and sent to the indexing pipeline. This batching amortizes WAL write overhead and reduces the number of messages flowing through the actor pipeline.

Shard Truncation and Checkpoint Advancement #

The indexer pipeline reads from the WAL via a source actor. As the Publisher commits splits to the metastore, it sends checkpoint updates back to the source. The source advances its position in the WAL and calls mrecordlog.truncate(queue_id, up_to_position), freeing disk space.

This is the ingest substrate’s durability guarantee: documents are retained in the WAL until the metastore confirms the corresponding split is published. Only then is it safe to truncate.

Reset Shards and Capacity Management #

The control plane periodically reconciles which shards each ingester should own. If a shard is no longer assigned to this node (e.g., due to rebalancing), the ingester runs reset_shards:

// Semaphore ensures at most one concurrent reset
let _permit = self.reset_shards_permits.acquire().await;
// MIN_RESET_SHARDS_INTERVAL prevents thrashing

The minimum interval between resets is 60 seconds in production to prevent thrashing from rapid shard reassignments.

Summary #

The ingest substrate buffers documents durably in a per-shard WAL (mrecordlog) before handing them to the indexing pipeline. The Ingester manages shard lifecycle (open/close/idle timeout), enforces rate limits, replicates to followers for fault tolerance, and broadcasts capacity scores to the cluster. The WAL is truncated only after the indexer confirms a split is published to the metastore, providing an end-to-end durability guarantee. All WAL I/O is offloaded to spawn_blocking to keep the async runtime unblocked.