- My Development Notes/
- RabbitMQ Internals: A Substrate Decomposition/
- Classic Queue Storage Substrate — rabbit_variable_queue/
Classic Queue Storage Substrate — rabbit_variable_queue
Table of Contents
Classic Queue Storage Substrate — rabbit_variable_queue #
rabbit_variable_queue (VQ) is the storage engine for classic queues. It manages the split between in-memory and on-disk message storage, coordinates two separate disk stores (index and body), tracks delivery state (pending acks, redelivery), and exposes a clean interface to rabbit_amqqueue_process via the rabbit_backing_queue behaviour.
The rabbit_backing_queue Behaviour #
VQ is pluggable via the rabbit_backing_queue behaviour. The interface:
-callback publish(Msg, MsgProps, IsDelivered, ChPid, State) -> State.
-callback publish_delivered(Msg, MsgProps, ChPid, State) -> {SeqId, State}.
-callback fetch(AckRequired, State) -> {FetchResult, State}.
-callback ack([SeqId], State) -> {[MsgId], State}.
-callback requeue([SeqId], State) -> {[MsgId], State}.
-callback drop(AckRequired, State) -> {FetchResult, State}.
-callback len(State) -> non_neg_integer().
-callback is_empty(State) -> boolean().
rabbit_amqqueue_process calls these callbacks without knowing whether VQ is storing messages in memory, on disk, or both. Other implementations of this behaviour exist: rabbit_priority_queue wraps multiple VQ instances (one per priority level), rabbit_volatile_queue is a pure in-memory backing queue.
The Ram/Disk Duality #
The vq record has two key positions:
-record(vq, {
q1, %% Q1: in-memory, not yet index-referenced
q2, %% Q2: pending acks in memory (index-referenced)
delta, %% Tail of the queue, fully on disk
q3, %% Q3: partially loaded from disk (index-referenced)
q4, %% Q4: in-memory, ready for immediate delivery
next_seq_id, %% SeqId counter
redeliver_seq_id, %% everything before this has been delivered at least once
delivery_count, %% for AMQP 1.0 delivery-count header
ram_pending_ack, %% #{SeqId => Msg}: delivered but not acked, in RAM
disk_pending_ack, %% #{SeqId => MsgId}: delivered but not acked, on disk
...
}).
The queue is conceptually split into four zones (Q1-Q4) based on in-memory vs on-disk status. Messages flow: Q1 → Q2 → delta (disk) → Q3 → Q4 → delivered.
The key rule from the source comments:
Messages are never written back to disk after they have been read into memory. The queue is designed to avoid keeping too much to begin with. At a minimum 1 message is kept and at a maximum the semi-arbitrary number 2048.
The memory limit (2048 messages in RAM at peak) is enforced by paging: when RAM message count exceeds the limit, messages are moved from Q1/Q2 to the delta (on-disk) region.
Two Disk Stores #
Messages are persisted across two separate storage systems:
Queue Index v2 (rabbit_classic_queue_index_v2): stores per-message metadata per queue — {SeqId, MsgId, IsPersistent, IsDelivered, ExpiryTime}. Stored in segment files on disk ({QueueDir}/idx/). The index is the sequence-ordered spine of the queue: it tracks what messages exist, their position, and their delivery state, but not the message body.
Message body stores — two options based on message size:
rabbit_classic_queue_store_v2: per-queue store for small messages (size <qi_msgs_embed_below, default configurable). Bodies stored in the queue’s own directory.rabbit_msg_store: per-vhost shared store for larger messages. Two instances per vhost:persistent_msg_store: durable messages that survive restarts.transient_msg_store: non-persistent messages (nuked on restart).
The size threshold splits messages between the per-queue store and the shared vhost store:
Message arrives:
size < qi_msgs_embed_below:
→ queue index records SeqId + metadata
→ body stored in rabbit_classic_queue_store_v2 (per-queue file)
size >= qi_msgs_embed_below:
→ queue index records SeqId + metadata + MsgId reference
→ body stored in rabbit_msg_store (shared vhost store, keyed by MsgId)
The shared rabbit_msg_store is reference-counted — if the same message body is published to multiple queues (via fanout), it is stored once and each queue holds a reference. On ack, the reference count decrements; the body is deleted when count reaches zero.
SeqId: The Queue’s Internal Ordering Key #
next_seq_id is a monotonically increasing counter per queue. Every published message gets a seq_id. The seq_id is the queue’s internal sequence number — distinct from:
- The AMQP
delivery_tag(channel-scoped, given to the consumer on delivery) - The
message_id(application-defined, in message properties) - The
msg_id(content hash used as the key inrabbit_msg_store)
The seq_id determines message order and is used for:
- Ack tracking:
ram_pending_ackmapsseq_id → messagefor in-memory acks - Redelivery detection:
redeliver_seq_id— any seq_id below this was delivered at least once
Pending Acks: ram_pending_ack and disk_pending_ack #
When a message is delivered to a consumer, it is moved from the queue’s ready set to the pending-ack set:
%% ram_pending_ack: #{SeqId => {Msg, MsgProps}} — full message in memory
%% disk_pending_ack: #{SeqId => MsgId} — only reference, body on disk
The split between ram_pending_ack and disk_pending_ack is a memory optimization: if the message body was already paged to disk before delivery, we only need to keep the MsgId reference in the pending-ack set, not the full body. The full body is only loaded when the message needs to be requeued (nack with requeue=true).
When basic.ack arrives (via rabbit_amqqueue_process):
ack(SeqIds, State) ->
{AllMsgIds, State1} = lists:foldl(
fun(SeqId, {Acc, S}) ->
case maps:take(SeqId, S#vq.ram_pending_ack) of
{Msg, RPA} ->
%% In RAM: ack directly
...remove from index...
{[mc:get_annotation(id, Msg) | Acc], S#vq{ram_pending_ack = RPA}};
error ->
%% On disk: look up in disk_pending_ack
...remove from disk store...
end
end, {[], State}, SeqIds),
{AllMsgIds, State1}.
Paging: Moving Between RAM and Disk #
Paging is triggered when the queue’s RAM usage exceeds limits. The needs_timeout/1 callback tells rabbit_amqqueue_process whether the VQ needs to be ticked (to perform background paging).
Paging moves messages from Q1 (in-memory, front of queue) to the delta structure (on-disk spine). The delta record is:
-record(delta, {
start_seq_id, %% SeqId of first on-disk message
count, %% number of messages on disk
transient, %% count of transient (non-persistent) messages on disk
end_seq_id %% SeqId after last on-disk message
}).
On prefetch/delivery, messages are loaded from delta into Q3 (in-memory, loaded from disk, index-referenced) and eventually into Q4 (in-memory, ready for delivery).
Clean vs Dirty Shutdown Recovery #
On clean shutdown (broker gracefully stopped), VQ saves its state to a terms file:
%% Saved: next_seq_id, redeliver_seq_id, persistent message count, msg_store refs
On startup with a terms file (clean shutdown detected), the queue can skip full index replay and start quickly. On dirty recovery (crash), the queue must replay the entire index from disk, re-building the in-memory VQ state from segment files.
The rabbit_msg_store also distinguishes clean vs dirty shutdown: a clean shutdown marker file exists → trust saved index state. No marker → replay from scratch.
The rabbit_amqqueue_process Owner #
rabbit_variable_queue is not a process — it is a data structure updated by rabbit_amqqueue_process. Every VQ operation is a synchronous function call inside the queue process. This means:
- All VQ state changes are single-threaded (one queue process, one VQ state).
- High publish rates create a bottleneck at the queue process — all publishers to one queue serialize through one Erlang process.
- This is the fundamental throughput limit of classic queues under high concurrency.
Quorum queues (next chapter) address this differently: Ra batches commands across multiple processes and applies them to the rabbit_fifo state machine.
Summary #
rabbit_variable_queue manages classic queue storage via a four-zone (Q1–Q4 + delta) ram/disk split. The queue index v2 stores per-queue message metadata in segment files. Small message bodies go to rabbit_classic_queue_store_v2 (per-queue); large bodies go to the shared rabbit_msg_store (per-vhost, reference-counted). Pending acks are tracked by seq_id in ram_pending_ack (full message in RAM) and disk_pending_ack (body on disk, reference only). Clean shutdown saves a terms file for fast restart; dirty recovery replays the full index. All VQ operations are single-threaded within rabbit_amqqueue_process.