- My Development Notes/
- RabbitMQ Internals: A Substrate Decomposition/
- Quorum Queue Storage Substrate — Ra and rabbit_fifo/
Quorum Queue Storage Substrate — Ra and rabbit_fifo
Table of Contents
Quorum Queue Storage Substrate — Ra and rabbit_fifo #
Quorum queues replace rabbit_variable_queue and rabbit_amqqueue_process with a fundamentally different storage substrate: Ra, RabbitMQ’s Raft implementation, running the rabbit_fifo state machine. Every enqueue, delivery, ack, and nack is a Raft command replicated to a majority of queue members before being acknowledged.
Ra: Erlang Raft #
Ra is a standalone Raft library written in Erlang. RabbitMQ embeds it as the consensus engine for two uses:
- Quorum queues: each queue is a Ra cluster (one Ra server per member node).
- Khepri: the metadata store is a single Ra cluster across all broker nodes.
A Ra server is an Erlang process running a Raft state machine. The state machine is provided by the application — for quorum queues, that is rabbit_fifo. For Khepri, it is Khepri’s own tree state machine.
Ra system: quorum queues run in the quorum_queues Ra system. The WAL (ra_log_wal) for this system is named ?RA_WAL_NAME = ra_log_wal. All quorum queues on a node share one WAL process — commands from all queues are written to the same append-only WAL file before being applied.
rabbit_fifo: The Queue State Machine #
rabbit_fifo is the Ra state machine that implements queue semantics. Its state is a functional data structure that Ra replicates across members and snapshots periodically.
Key state in rabbit_fifo:
-record(rabbit_fifo, {
name, %% queue resource name
cfg, %% queue config (max-length, dead-letter, delivery-limit, etc.)
messages, %% lqueue: ordered sequence of messages
consumers, %% #{ConsumerKey => consumer_state}
enqueuers, %% #{Pid => enqueuer_state}: tracks producer seq_nos
service_queue, %% queue of consumer keys ready to receive
returns, %% messages awaiting requeue
...
}).
rabbit_fifo is a pure functional state machine: apply(Meta, Command, State) → {State', Effects, Reply}. Ra calls apply for each committed log entry. The Effects list contains side effects to execute after applying (send deliveries to consumers, send confirms to producers, emit metrics).
Enqueue: The Producer Path #
When rabbit_channel publishes to a quorum queue:
%% In rabbit_quorum_queue.erl
deliver(QSvcPid, Msg, QState) ->
{ok, QState1} = rabbit_fifo_client:enqueue(QName, Msg, QState),
{ok, QState1}.
rabbit_fifo_client:enqueue/3 is the client-side abstraction over Ra:
%% rabbit_fifo_client.erl
enqueue(QName, Msg, State) ->
Seq = maps:get(next_enqueue_seq, State),
Command = rabbit_fifo:make_enqueue(self(), Seq, Msg),
ra:pipeline_command(LeaderId, Command),
{ok, State#{next_enqueue_seq => Seq + 1, ...}}.
ra:pipeline_command/2 sends the command to the Ra leader (non-blocking). The leader:
- Appends the command to the Raft log.
- Replicates to followers (majority acknowledgment required).
- Applies the committed entry to the
rabbit_fifostate machine. - Returns an effect:
{send_msg, EnqueuerPid, {enqueued, Seq}}.
When the channel process receives {enqueued, Seq}, it resolves the publisher confirm for that sequence number.
Enqueuer seq_no: rabbit_fifo_client assigns a monotonically increasing sequence number to each enqueue. rabbit_fifo:apply deduplicates: if it sees the same {EnqueuerPid, Seq} twice (due to leader change + retry), it discards the duplicate. This is the quorum queue’s exactly-once enqueue guarantee.
Checkout: The Consumer Path #
When a consumer subscribes to a quorum queue:
%% rabbit_quorum_queue.erl
consume(Q, ConsumerSpec, QState) ->
rabbit_fifo_client:checkout(ConsumerTag, Mode, ConsumerMeta, QState).
rabbit_fifo_client:checkout/4 sends a checkout command to the Ra leader. The rabbit_fifo state machine records the consumer and begins generating delivery effects.
As messages are applied to the state machine, rabbit_fifo checks if any consumer has credit available:
%% In rabbit_fifo apply:
{State1, Effects} = maybe_checkout_next(State),
%% maybe_checkout_next: for each consumer with credit,
%% dequeue next message, decrement credit, add delivery effect
The delivery effect is: {send_msg, ConsumerChannelPid, {deliver, ConsumerTag, DeliveryTag, Msg}}. Ra’s effect machinery sends this message to the channel process after the log entry is applied.
Settle (Ack) and Return (Nack) #
When rabbit_channel receives basic.ack:
%% rabbit_quorum_queue.erl
settle(Q, CTag, MsgIds, QState) ->
rabbit_fifo_client:settle(quorum_ctag(CTag), MsgIds, QState).
rabbit_fifo_client:settle/3 pipelines a settle command to Ra. rabbit_fifo:apply marks the messages as settled, freeing the delivery slot and allowing the consumer’s credit to be replenished.
For basic.nack with requeue=true:
return(Q, CTag, MsgIds, QState) ->
rabbit_fifo_client:return(quorum_ctag(CTag), MsgIds, QState).
rabbit_fifo:apply for a return command puts the messages back into the returns lqueue. They are re-delivered on the next checkout cycle.
For basic.nack with requeue=false (discard or dead-letter):
discard(Q, CTag, MsgIds, QState) ->
rabbit_fifo_client:discard(quorum_ctag(CTag), MsgIds, QState).
rabbit_fifo:apply for discard increments the delivery count for the message. If delivery count exceeds x-delivery-limit, the message is dead-lettered via the queue’s DLX configuration. Otherwise it is dropped.
Delivery Limit: Quorum Queue Poison Message Protection #
The x-delivery-limit argument sets the maximum number of delivery attempts before a message is dead-lettered:
%% In rabbit_fifo:apply for checkout:
DeliveryCount = get_delivery_count(Msg),
case Cfg#cfg.delivery_limit of
undefined -> deliver_message(Msg, Consumer, State);
Limit when DeliveryCount >= Limit ->
%% Dead-letter instead of deliver
dead_letter_message(Msg, delivery_limit, State);
_ ->
deliver_message(Msg, Consumer, State)
end.
Every delivery increments the delivery count in the message metadata. The count is part of the Raft-replicated state — it is preserved across leader changes. A message that fails 5 times will be counted as having failed 5 times on any node, not just the leader at time of delivery.
This is more robust than classic queue poison message handling, which requires application-level tracking via x-death headers.
rabbit_fifo_client: Pipelining and Leader Tracking #
rabbit_fifo_client is the channel-side client that manages:
- Leader discovery: knows which Ra node is the current leader. On leader change (Raft election), automatically re-routes commands to the new leader.
- Pipelining:
ra:pipeline_command/2is non-blocking. Multiple commands can be in flight simultaneously. - Soft limit:
rabbit_fifo_clienttracks the number of outstanding (unconfirmed) enqueue commands. If the count exceeds a soft limit (quorum_commands_soft_limit), new publishes are throttled. - Event handling:
rabbit_fifo_client:handle_ra_event/4processes Ra events (confirmations, deliveries, leader changes) received by the channel process.
%% In rabbit_quorum_queue.erl
handle_event(QName, {ra_event, LeaderId, Evt}, QState) ->
rabbit_fifo_client:handle_ra_event(QName, LeaderId, Evt, QState).
Ra events arrive as Erlang messages to the channel process and are handled in rabbit_channel:handle_info.
Ra WAL and Fsync #
Every ra:pipeline_command/2 call eventually writes to the Ra WAL (ra_log_wal). The WAL is shared across all quorum queues on the node. It is an append-only file with periodic fsyncs.
The WAL batches writes: multiple commands from multiple queues are written in a single pwrite call, then a single fsync. This amortizes disk latency across all queues, similar to how PostgreSQL’s WAL writer batches commits.
Ack timing: the publisher confirm is sent to the producer after:
- The enqueue command is written to WAL on the leader.
- A majority of followers have written to their WALs.
- The entry is applied to the
rabbit_fifostate machine.
This is why quorum queues have higher latency than classic queues for durable messages: they require a majority WAL write + apply, not just a single-node disk write.
Process Structure vs Classic Queues #
| Aspect | Classic queue | Quorum queue |
|---|---|---|
| Process | rabbit_amqqueue_process | Ra server (ra_server process) |
| State machine | rabbit_variable_queue | rabbit_fifo |
| Replication | None (mirroring deprecated) | Raft (majority WAL) |
| Supervisor | rabbit_amqqueue_sup | Ra’s own supervision tree |
| Disk I/O | VQ index + msg_store | Ra WAL + Ra segment files |
| Snapshot | VQ terms file | Ra snapshot (periodic) |
| Delivery tracking | ram_pending_ack in VQ | Checked-out messages in rabbit_fifo state |
| Poison message | Manual (x-death headers) | x-delivery-limit (broker-enforced) |
Summary #
Quorum queues replace the classic queue process with a Ra Raft server running rabbit_fifo. Every enqueue, settle, return, and discard is a Raft command — it is replicated to a majority before acknowledgment. rabbit_fifo_client manages leader tracking, pipelining, and soft-limit backpressure on the channel side. The Ra WAL is shared across all queues on the node, amortizing fsync cost. x-delivery-limit provides broker-enforced poison message protection using delivery counts tracked in the replicated state machine. Publisher confirms are sent after majority WAL write + state machine application.