- My Development Notes/
- Quickwit Internals: A Substrate Decomposition/
- Cluster/Membership Substrate — Chitchat and Phi Accrual/
Cluster/Membership Substrate — Chitchat and Phi Accrual
Table of Contents
Cluster/Membership Substrate — Chitchat and Phi Accrual #
Quickwit clusters do not use ZooKeeper, etcd, or Consul for membership. Instead, they use Chitchat, a custom gossip protocol library that Quickwit’s team developed. Chitchat is inspired by Scuttlebutt reconciliation and uses Phi accrual failure detection (the same algorithm as Cassandra and Akka). It is the substrate through which nodes discover each other, share service availability, and propagate indexing task assignments.
Why Gossip? #
Gossip-based membership has O(log N) convergence time and no single point of failure — every node gossips with every other node eventually. The trade-off is eventual consistency: there is a window (typically one gossip interval) where different nodes have slightly different views of cluster membership. Quickwit accepts this trade-off because:
- The control plane (not individual nodes) makes authoritative routing decisions.
- Stale membership causes missed cache affinity (a performance issue), not data loss.
- The metastore provides the authoritative source for split and checkpoint state.
Cluster Struct #
#[derive(Clone)]
pub struct Cluster {
cluster_id: String,
self_chitchat_id: ChitchatId,
pub gossip_listen_addr: SocketAddr,
client_grpc_config: ClientGrpcConfig,
gossip_interval: Duration,
inner: Arc<RwLock<InnerCluster>>,
}
Cluster wraps a ChitchatHandle (the running gossip instance) and an InnerCluster that maintains the current view of all cluster nodes. The inner is RwLock-protected: reads (getting cluster state, listing nodes) are concurrent; writes (processing gossip updates) require exclusive access.
Chitchat Configuration #
use chitchat::{
Chitchat, ChitchatConfig, ChitchatHandle, ChitchatId,
ClusterStateSnapshot, FailureDetectorConfig,
KeyChangeEvent, ListenerHandle, NodeState, spawn_chitchat,
};
ChitchatConfig sets up the gossip instance:
let chitchat_config = ChitchatConfig {
cluster_id: cluster_id.clone(),
chitchat_id: self_node.chitchat_id(),
listen_addr: gossip_listen_addr,
seed_nodes: peer_seed_addrs,
failure_detector_config,
gossip_interval,
marked_for_deletion_grace_period: MARKED_FOR_DELETION_GRACE_PERIOD,
catchup_callback: Some(Box::new(catchup_callback)),
// ...
};
Key parameters:
cluster_id: nodes with different cluster IDs ignore each other’s gossip. Prevents accidental cross-cluster contamination in shared networks.chitchat_id: aChitchatIdcombiningnode_id(a stable string like a hostname) andgossip_advertise_addr(the UDP socket others gossip to). Quickwit’s node ID also includes a generation ID (start timestamp) to distinguish restarts of the same hostname.seed_nodes: the initial set of peer addresses for bootstrap. Once gossip starts, nodes learn about the full cluster from seeds.gossip_interval: how often a node initiates a gossip exchange. Typical value: 500ms to 1s.marked_for_deletion_grace_period: how long a dead node’s state is retained before being purged:
const MARKED_FOR_DELETION_GRACE_PERIOD: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(2_500)
} else {
Duration::from_secs(3_600 * 2) // 2 hours
};
Two hours ensures that a node which restarts after a long GC pause is not purged during the downtime.
Phi Accrual Failure Detection #
FailureDetectorConfig configures the Phi accrual detector (borrowed from chitchat). Phi accrual computes a continuous “suspicion level” φ(t) for each node based on the observed inter-arrival distribution of heartbeats:
φ(t) = -log₁₀(P_later(t - t_last))
where P_later(t) is the probability that the next heartbeat will arrive in more than t milliseconds, estimated from a sliding window of recent inter-arrival times. A node is considered dead when φ exceeds a threshold (typically 8–12). This is adaptive: a node with irregular but alive heartbeats (GC pauses, network jitter) gets a higher threshold before being declared dead.
Chitchat uses this detector to mark nodes as dead in the cluster state, which the control plane reads to stop routing new work to them.
ClusterMember: The Node State Model #
Each node publishes its state as key-value pairs in the Chitchat node state. Other nodes read this state to understand what services are available and what work is assigned:
pub struct ClusterMember {
pub node_id: NodeId,
pub generation_id: GenerationId,
pub enabled_services: HashSet<QuickwitService>,
pub gossip_advertise_addr: SocketAddr,
pub grpc_advertise_addr: SocketAddr,
pub indexing_tasks: Vec<IndexingTask>,
pub indexing_cpu_capacity: CpuCapacity,
}
Keys in the Chitchat node state (from member.rs):
pub const ENABLED_SERVICES_KEY: &str = "enabled_services";
pub const GRPC_ADVERTISE_ADDR_KEY: &str = "grpc_advertise_addr";
pub const READINESS_KEY: &str = "readiness";
pub const AVAILABILITY_ZONE_KEY: &str = "availability_zone";
pub const PIPELINE_METRICS_PREFIX: &str = "pipeline_metrics:";
pub const INDEXING_TASK_PREFIX: &str = "indexer.task:";
Each indexing pipeline running on a node publishes a indexer.task:{pipeline_uid} key with its current metrics (num_docs_per_sec, CPU usage). The control plane reads these to track pipeline health across the cluster.
The readiness key (readiness) transitions from READINESS_VALUE_NOT_READY to READINESS_VALUE_READY when a node has finished initialization. Before readiness, the node is not assigned new work.
Cluster::join: Bootstrap Sequence #
pub async fn join(
cluster_id: String,
self_node: ClusterMember,
gossip_listen_addr: SocketAddr,
peer_seed_addrs: Vec<String>,
gossip_interval: Duration,
failure_detector_config: FailureDetectorConfig,
transport: &dyn Transport,
client_grpc_config: ClientGrpcConfig,
) -> anyhow::Result<Self> {
info!(
cluster_id=%cluster_id,
node_id=%self_node.node_id,
generation_id=self_node.generation_id.as_u64(),
enabled_services=?self_node.enabled_services,
// ...
"joining cluster"
);
// 1. Start Chitchat (UDP gossip socket, failure detector, state machine)
// 2. Set initial node state keys (services, grpc addr, readiness=NOT_READY)
// 3. Start grpc_gossip catchup task
// 4. Start metrics task
// 5. Set readiness=READY after initialization
}
The join sequence ensures that other nodes see the new node as not-ready until it has fully initialized. This prevents the control plane from assigning pipelines to a node that hasn’t loaded its configuration yet.
grpc_gossip Catchup #
Gossip over UDP has a fundamental limitation: UDP packets can be lost or reordered, and large state payloads must be fragmented. For large cluster state deltas (e.g., a node joining with many indexing task assignments), Chitchat uses a gRPC catchup callback:
let (catchup_callback_tx, catchup_callback_rx) = watch::channel(());
let catchup_callback = move || {
let _ = catchup_callback_tx.send(());
};
// ...
chitchat_config.catchup_callback = Some(Box::new(catchup_callback));
When Chitchat detects that its gossip state is significantly behind another node’s (e.g., when a new node joins), it triggers the catchup callback. Quickwit’s spawn_catchup_callback_task handles this:
// From grpc_gossip.rs
pub fn spawn_catchup_callback_task(
cluster: Cluster,
catchup_callback_rx: watch::Receiver<()>,
) { ... }
The catchup task opens a gRPC stream to the peer and downloads the full cluster state delta in a reliable, ordered fashion — bypassing UDP’s unreliability for large payloads. This is Quickwit’s equivalent of Scuttlebutt’s “anti-entropy” phase: gossip provides fast convergence for small updates; gRPC provides reliable delivery for large state transfers.
Cluster Change Events #
The Cluster wraps Chitchat’s key-change events into higher-level ClusterChange events:
// From change.rs (inferred)
pub enum ClusterChange {
Add(ClusterNode),
Update(ClusterNode),
Remove(ClusterNode),
}
compute_cluster_change_events translates raw Chitchat KeyChangeEvents into these semantic events. The control plane subscribes to ClusterChangeStream to react to membership changes: when a node is added, schedule new indexing pipelines; when a node is removed, reassign its shards and pipelines.
Extra Liveness Predicate #
let extra_liveness_predicate = |node_state: &NodeState| {
[ENABLED_SERVICES_KEY, GRPC_ADVERTISE_ADDR_KEY]
.iter()
.all(|key| node_state.contains_key(key))
};
Chitchat’s standard liveness check uses heartbeat timing (Phi accrual). The extra_liveness_predicate adds an application-level check: a node is only considered alive if it has published both its enabled_services and grpc_advertise_addr. This prevents routing to a node that is gossip-reachable but hasn’t finished publishing its service advertisement.
Summary #
The cluster substrate uses Chitchat, a custom gossip protocol with Phi accrual failure detection. Node state is published as key-value pairs in the Chitchat node state: services, gRPC address, readiness, and per-pipeline metrics. The control plane subscribes to ClusterChangeStream to react to membership changes. Large state catchup is handled via gRPC streams, not UDP gossip, for reliability. Phi accrual provides adaptive failure detection that tolerates GC pauses and network jitter without false positives. The 2-hour marked_for_deletion_grace_period ensures node state is retained long enough for nodes to rejoin after extended downtime.