- My Development Notes/
- Quickwit Internals: A Substrate Decomposition/
- The Actor Framework Substrate — quickwit-actors/
The Actor Framework Substrate — quickwit-actors
Table of Contents
The Actor Framework Substrate — quickwit-actors #
Every long-running component in Quickwit — sources, indexers, packagers, uploaders, publishers, merge planners — is an actor: a self-contained unit of state that receives typed messages from a mailbox, processes them one at a time, and can exit with a typed status. The quickwit-actors crate is the substrate that makes this possible. It is not a general-purpose actor runtime (not Actix, not Akka-style); it is a narrow, purpose-built framework tuned for Quickwit’s supervision and backpressure requirements.
The Actor Trait #
Every actor implements two traits: Actor (defines its state and lifecycle hooks) and Handler<M> (defines how it handles a specific message type M).
#[async_trait]
pub trait Actor: Send + Sized + 'static {
type ObservableState: fmt::Debug + serde::Serialize + Send + Sync + Clone;
fn name(&self) -> String {
type_name::<Self>().to_string()
}
fn runtime_handle(&self) -> tokio::runtime::Handle {
tokio::runtime::Handle::current()
}
fn yield_after_each_message(&self) -> bool {
true
}
fn queue_capacity(&self) -> QueueCapacity {
QueueCapacity::Unbounded
}
fn observable_state(&self) -> Self::ObservableState;
async fn initialize(&mut self, _ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
Ok(())
}
async fn on_drained_messages(&mut self, _ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
Ok(())
}
async fn finalize(
&mut self,
_exit_status: &ActorExitStatus,
_ctx: &ActorContext<Self>,
) -> anyhow::Result<()> {
Ok(())
}
}
Key design decisions:
ObservableState: every actor can expose a snapshot of its state.ActorHandle::observe()sends a high-priorityObservemessage that causes the actor to emit its currentObservableStatevia awatch::Sender. This is how theIndexingPipelinesupervisor reads health counters from child actors without stopping them.queue_capacity: actors declare their inbox size.PackagerdeclaresQueueCapacity::Bounded(1)— it will only hold one pendingIndexedSplitBatchat a time. When the queue is full, the sender (send_message) awaits, providing natural backpressure upstream.runtime_handle: actors can specify which tokio runtime thread pool they run on.PackagerreturnsRuntimeType::Blocking.get_runtime_handle()because it does CPU-intensive segment merging that would stall the async runtime.on_drained_messages: called when the mailbox is empty and the actor has no more work. Used by source actors to signal that a checkpoint is safe to commit.
The Handler<M> Trait #
#[async_trait]
pub trait Handler<M>: Actor {
type Reply: Send + 'static;
async fn handle(
&mut self,
message: M,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus>;
}
An actor implements Handler<M> for each message type it accepts. If handle returns Err(ActorExitStatus::...), the actor stops processing messages and enters its finalization path. The exit status propagates to supervisors.
#[derive(Clone, Debug, Error)]
pub enum ActorExitStatus {
#[error("success")]
Success,
#[error("quit")]
Quit,
#[error("downstream actor exited")]
DownstreamClosed,
#[error("killed")]
Killed,
#[error("failure(cause={0:?})")]
Failure(Arc<anyhow::Error>),
#[error("panicked")]
Panicked,
}
DownstreamClosed is the key status for pipeline teardown: if the Uploader’s mailbox to the Sequencer is disconnected (because the Sequencer exited), the Uploader returns DownstreamClosed, which propagates backwards up the pipeline.
The Mailbox #
pub struct Mailbox<A: Actor> {
inner: Arc<Inner<A>>,
ref_count: Arc<AtomicUsize>,
}
struct Inner<A: Actor> {
pub(crate) tx: Sender<Envelope<A>>,
scheduler_client_opt: Option<SchedulerClient>,
instance_id: String,
}
A Mailbox<A> is the send-side handle for communicating with an actor. It wraps a channel sender and tracks reference count: when the last Mailbox clone is dropped, the actor’s inbox is closed, signalling the actor to drain and exit.
The mailbox has two priority levels:
#[derive(Copy, Clone)]
pub(crate) enum Priority {
High,
Low,
}
- Low priority:
send_message— normal messages (indexed batches, packaged splits). - High priority:
send_message_with_high_priority— control messages (Observe,Command::Pause,Command::Resume,Command::Quit,Command::Nudge).
High-priority messages jump ahead of the low-priority queue. This is how ActorHandle::observe() gets a timely state snapshot even when an actor has a full inbox of data messages.
Backpressure: send_message is async — it awaits when the inbox is full (bounded capacity). try_send_message is non-blocking and returns TrySendError if full. This creates the backpressure chain: Packager has capacity 1 → Indexer awaits when sending → DocProcessor awaits when sending → SourceActor awaits → ingest rate throttles.
ActorContext and KillSwitch #
pub struct ActorContext<A: Actor> {
inner: Arc<ActorContextInner<A>>,
}
ActorContext is passed to every handle and lifecycle method. It provides:
ctx.mailbox()— the actor’s own mailbox (for self-referential scheduling).ctx.spawn_actor()— spawn a child actor within the supervisor’s universe.ctx.kill_switch()— a sharedKillSwitchthat can be triggered to kill the entire pipeline.ctx.progress()— a progress tracker used by the supervision loop to detect stuck actors.ctx.record_progress()— called by actors to signal liveness to their supervisor.
KillSwitch: a shared Arc<AtomicBool>. When triggered (via kill_switch.kill()), all actors in the pipeline check it on their next message loop iteration and exit with ActorExitStatus::Killed. This is how IndexingPipeline tears down all actors atomically: it triggers the KillSwitch, then sends a Nudge to unblock any actor waiting on a full downstream mailbox.
ActorHandle and Health Monitoring #
pub struct ActorHandle<A: Actor> {
actor_context: ActorContext<A>,
last_state: watch::Receiver<A::ObservableState>,
join_handle: ActorJoinHandle,
}
pub trait Supervisable {
fn name(&self) -> &str;
fn check_health(&self, check_for_progress: bool) -> Health;
fn state(&self) -> ActorState;
}
pub enum Health {
Healthy,
FailureOrUnhealthy,
Success,
}
ActorHandle is the supervisor-side handle. The IndexingPipeline holds ActorHandle for all eight pipeline actors. Every second, its SuperviseLoop handler calls:
fn healthcheck(&self, check_for_progress: bool) -> Health {
let mut healthy_actors: Vec<&str> = Default::default();
let mut failure_or_unhealthy_actors: Vec<&str> = Default::default();
// ...
for supervisable in self.supervisables() {
match supervisable.check_health(check_for_progress) {
Health::Healthy => healthy_actors.push(supervisable.name()),
Health::FailureOrUnhealthy => failure_or_unhealthy_actors.push(supervisable.name()),
Health::Success => {} // actor completed normally
}
}
}
If any actor is FailureOrUnhealthy, IndexingPipeline kills the entire pipeline via KillSwitch and schedules a Spawn with exponential backoff:
pub(crate) fn wait_duration_before_retry(retry_count: usize) -> Duration {
let max_power = (retry_count as u32).min(31);
Duration::from_secs(2u64.pow(max_power)).min(MAX_RETRY_DELAY)
}
const MAX_RETRY_DELAY: Duration = Duration::from_secs(600); // 10 min.
The retry counter is preserved across restarts on IndexingPipeline, so repeated failures back off up to 10 minutes.
Spawn Semaphore and Isolation #
static SPAWN_PIPELINE_SEMAPHORE: Semaphore = Semaphore::const_new(10);
Both IndexingPipeline and MergePipeline use this global semaphore to limit concurrent pipeline spawns to 10. Spawning a pipeline hits the metastore heavily (loading split metadata, source configurations, schema) and the semaphore prevents thundering herd during node restart when many pipelines initialize simultaneously.
Summary #
quickwit-actors is a purpose-built supervised actor substrate. Actors declare their inbox capacity (providing backpressure), their tokio runtime (separating CPU and IO work), and their observable state (enabling non-intrusive health monitoring). The KillSwitch provides atomic pipeline teardown. ActorHandle exposes health via the Supervisable trait, driving the IndexingPipeline’s supervise-and-retry loop with exponential backoff. Every substrate in Quickwit — indexing, merging, ingest — is built as a chain of actors connected by typed mailboxes.