Skip to main content
  1. Quickwit Internals: A Substrate Decomposition/

The Actor Framework Substrate — quickwit-actors

The Actor Framework Substrate — quickwit-actors #

Every long-running component in Quickwit — sources, indexers, packagers, uploaders, publishers, merge planners — is an actor: a self-contained unit of state that receives typed messages from a mailbox, processes them one at a time, and can exit with a typed status. The quickwit-actors crate is the substrate that makes this possible. It is not a general-purpose actor runtime (not Actix, not Akka-style); it is a narrow, purpose-built framework tuned for Quickwit’s supervision and backpressure requirements.

The Actor Trait #

Every actor implements two traits: Actor (defines its state and lifecycle hooks) and Handler<M> (defines how it handles a specific message type M).

#[async_trait]
pub trait Actor: Send + Sized + 'static {
    type ObservableState: fmt::Debug + serde::Serialize + Send + Sync + Clone;

    fn name(&self) -> String {
        type_name::<Self>().to_string()
    }

    fn runtime_handle(&self) -> tokio::runtime::Handle {
        tokio::runtime::Handle::current()
    }

    fn yield_after_each_message(&self) -> bool {
        true
    }

    fn queue_capacity(&self) -> QueueCapacity {
        QueueCapacity::Unbounded
    }

    fn observable_state(&self) -> Self::ObservableState;

    async fn initialize(&mut self, _ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
        Ok(())
    }

    async fn on_drained_messages(&mut self, _ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
        Ok(())
    }

    async fn finalize(
        &mut self,
        _exit_status: &ActorExitStatus,
        _ctx: &ActorContext<Self>,
    ) -> anyhow::Result<()> {
        Ok(())
    }
}

Key design decisions:

  • ObservableState: every actor can expose a snapshot of its state. ActorHandle::observe() sends a high-priority Observe message that causes the actor to emit its current ObservableState via a watch::Sender. This is how the IndexingPipeline supervisor reads health counters from child actors without stopping them.
  • queue_capacity: actors declare their inbox size. Packager declares QueueCapacity::Bounded(1) — it will only hold one pending IndexedSplitBatch at a time. When the queue is full, the sender (send_message) awaits, providing natural backpressure upstream.
  • runtime_handle: actors can specify which tokio runtime thread pool they run on. Packager returns RuntimeType::Blocking.get_runtime_handle() because it does CPU-intensive segment merging that would stall the async runtime.
  • on_drained_messages: called when the mailbox is empty and the actor has no more work. Used by source actors to signal that a checkpoint is safe to commit.

The Handler<M> Trait #

#[async_trait]
pub trait Handler<M>: Actor {
    type Reply: Send + 'static;

    async fn handle(
        &mut self,
        message: M,
        ctx: &ActorContext<Self>,
    ) -> Result<Self::Reply, ActorExitStatus>;
}

An actor implements Handler<M> for each message type it accepts. If handle returns Err(ActorExitStatus::...), the actor stops processing messages and enters its finalization path. The exit status propagates to supervisors.

#[derive(Clone, Debug, Error)]
pub enum ActorExitStatus {
    #[error("success")]
    Success,
    #[error("quit")]
    Quit,
    #[error("downstream actor exited")]
    DownstreamClosed,
    #[error("killed")]
    Killed,
    #[error("failure(cause={0:?})")]
    Failure(Arc<anyhow::Error>),
    #[error("panicked")]
    Panicked,
}

DownstreamClosed is the key status for pipeline teardown: if the Uploader’s mailbox to the Sequencer is disconnected (because the Sequencer exited), the Uploader returns DownstreamClosed, which propagates backwards up the pipeline.

The Mailbox #

pub struct Mailbox<A: Actor> {
    inner: Arc<Inner<A>>,
    ref_count: Arc<AtomicUsize>,
}

struct Inner<A: Actor> {
    pub(crate) tx: Sender<Envelope<A>>,
    scheduler_client_opt: Option<SchedulerClient>,
    instance_id: String,
}

A Mailbox<A> is the send-side handle for communicating with an actor. It wraps a channel sender and tracks reference count: when the last Mailbox clone is dropped, the actor’s inbox is closed, signalling the actor to drain and exit.

The mailbox has two priority levels:

#[derive(Copy, Clone)]
pub(crate) enum Priority {
    High,
    Low,
}
  • Low priority: send_message — normal messages (indexed batches, packaged splits).
  • High priority: send_message_with_high_priority — control messages (Observe, Command::Pause, Command::Resume, Command::Quit, Command::Nudge).

High-priority messages jump ahead of the low-priority queue. This is how ActorHandle::observe() gets a timely state snapshot even when an actor has a full inbox of data messages.

Backpressure: send_message is async — it awaits when the inbox is full (bounded capacity). try_send_message is non-blocking and returns TrySendError if full. This creates the backpressure chain: Packager has capacity 1 → Indexer awaits when sending → DocProcessor awaits when sending → SourceActor awaits → ingest rate throttles.

ActorContext and KillSwitch #

pub struct ActorContext<A: Actor> {
    inner: Arc<ActorContextInner<A>>,
}

ActorContext is passed to every handle and lifecycle method. It provides:

  • ctx.mailbox() — the actor’s own mailbox (for self-referential scheduling).
  • ctx.spawn_actor() — spawn a child actor within the supervisor’s universe.
  • ctx.kill_switch() — a shared KillSwitch that can be triggered to kill the entire pipeline.
  • ctx.progress() — a progress tracker used by the supervision loop to detect stuck actors.
  • ctx.record_progress() — called by actors to signal liveness to their supervisor.

KillSwitch: a shared Arc<AtomicBool>. When triggered (via kill_switch.kill()), all actors in the pipeline check it on their next message loop iteration and exit with ActorExitStatus::Killed. This is how IndexingPipeline tears down all actors atomically: it triggers the KillSwitch, then sends a Nudge to unblock any actor waiting on a full downstream mailbox.

ActorHandle and Health Monitoring #

pub struct ActorHandle<A: Actor> {
    actor_context: ActorContext<A>,
    last_state: watch::Receiver<A::ObservableState>,
    join_handle: ActorJoinHandle,
}

pub trait Supervisable {
    fn name(&self) -> &str;
    fn check_health(&self, check_for_progress: bool) -> Health;
    fn state(&self) -> ActorState;
}

pub enum Health {
    Healthy,
    FailureOrUnhealthy,
    Success,
}

ActorHandle is the supervisor-side handle. The IndexingPipeline holds ActorHandle for all eight pipeline actors. Every second, its SuperviseLoop handler calls:

fn healthcheck(&self, check_for_progress: bool) -> Health {
    let mut healthy_actors: Vec<&str> = Default::default();
    let mut failure_or_unhealthy_actors: Vec<&str> = Default::default();
    // ...
    for supervisable in self.supervisables() {
        match supervisable.check_health(check_for_progress) {
            Health::Healthy => healthy_actors.push(supervisable.name()),
            Health::FailureOrUnhealthy => failure_or_unhealthy_actors.push(supervisable.name()),
            Health::Success => {} // actor completed normally
        }
    }
}

If any actor is FailureOrUnhealthy, IndexingPipeline kills the entire pipeline via KillSwitch and schedules a Spawn with exponential backoff:

pub(crate) fn wait_duration_before_retry(retry_count: usize) -> Duration {
    let max_power = (retry_count as u32).min(31);
    Duration::from_secs(2u64.pow(max_power)).min(MAX_RETRY_DELAY)
}

const MAX_RETRY_DELAY: Duration = Duration::from_secs(600); // 10 min.

The retry counter is preserved across restarts on IndexingPipeline, so repeated failures back off up to 10 minutes.

Spawn Semaphore and Isolation #

static SPAWN_PIPELINE_SEMAPHORE: Semaphore = Semaphore::const_new(10);

Both IndexingPipeline and MergePipeline use this global semaphore to limit concurrent pipeline spawns to 10. Spawning a pipeline hits the metastore heavily (loading split metadata, source configurations, schema) and the semaphore prevents thundering herd during node restart when many pipelines initialize simultaneously.

Summary #

quickwit-actors is a purpose-built supervised actor substrate. Actors declare their inbox capacity (providing backpressure), their tokio runtime (separating CPU and IO work), and their observable state (enabling non-intrusive health monitoring). The KillSwitch provides atomic pipeline teardown. ActorHandle exposes health via the Supervisable trait, driving the IndexingPipeline’s supervise-and-retry loop with exponential backoff. Every substrate in Quickwit — indexing, merging, ingest — is built as a chain of actors connected by typed mailboxes.