- My Development Notes/
- RabbitMQ Internals: A Substrate Decomposition/
- Delivery Substrate — rabbit_queue_consumers/
Delivery Substrate — rabbit_queue_consumers
Table of Contents
Delivery Substrate — rabbit_queue_consumers #
The delivery substrate sits between queue storage and channel processes. It answers the question: given a message ready to be delivered, which consumer should receive it, and is that consumer ready? rabbit_queue_consumers is the data structure (not a process) that owns this logic, embedded in the queue process state.
The Consumer Registry #
rabbit_queue_consumers maintains two ETS tables per queue process:
consumers: the set of registered consumers. Each entry is a#consumer{}:
-record(consumer, {
tag, %% consumer tag (binary)
ack_required, %% boolean: does this consumer require manual ack?
prefetch, %% per-consumer prefetch count (0 = unlimited)
args, %% consumer arguments (x-priority, x-cancel-on-ha-failover, etc.)
user %% authenticated user (for authz on delivery)
}).
cr(channel-register): per-channel delivery state, keyed by channel pid:
-record(cr, {
consumer_count,
ch_pid,
limiter, %% rabbit_limiter pid for this channel
unsent_count = 0, %% messages delivered but not yet acked
blocked = false, %% is this channel suspended due to prefetch?
...
}).
The deliver/5 Function #
rabbit_queue_consumers:deliver/5 is called by rabbit_amqqueue_process whenever a message is ready for delivery:
deliver(FetchFun, QName, State, SingleActiveConsumerIsOn, ActiveConsumer) ->
deliver(FetchFun, QName, false, State, SingleActiveConsumerIsOn, ActiveConsumer).
deliver(FetchFun, QName, ConsumersChanged,
State = #state{consumers = Consumers}, SingleActiveConsumerOn, _) ->
case priority_queue:out(Consumers) of
{empty, _} ->
{undelivered, ConsumersChanged, State};
{{value, QEntry}, Consumers1} ->
case deliver_to_consumer(FetchFun, QEntry, QName) of
{delivered, R} ->
{delivered, ConsumersChanged, R, State};
undelivered ->
%% Consumer blocked (prefetch full); try next
deliver(FetchFun, QName, true,
State#state{consumers = Consumers1},
SingleActiveConsumerOn, none)
end
end.
The Consumers is a priority queue (round-robin at same priority, higher x-priority argument wins). deliver_to_consumer checks if the selected consumer can accept a message (limiter check) and if so, calls FetchFun to pull the next message from the backing queue, then sends it to the channel.
Prefetch Enforcement via rabbit_limiter #
The per-channel rabbit_limiter process is the gatekeeper for delivery. deliver_to_consumer calls:
case rabbit_limiter:can_send(Limiter, AckRequired, CTag) of
{ok, Limiter1} -> send_message(...);
{suspend, Limiter1} -> undelivered %% consumer blocked: prefetch full
end.
rabbit_limiter:can_send/3 checks:
- Per-consumer prefetch:
unsent_count < prefetch_countfor this consumer tag. - Per-channel global prefetch: total unacked across all consumers on this channel.
- Credit flow: whether the channel process has granted credit.
When a consumer is suspended (prefetch full), it is removed from the consumers priority queue in the delivery state — blocked consumers are not considered for delivery until credit is replenished. When basic.ack arrives and rabbit_limiter grants new credit, the consumer is re-inserted into the priority queue.
Single-Active-Consumer #
When a queue is declared with x-single-active-consumer = true, only one consumer is active at a time. Extras are registered but not delivered to:
deliver(_FetchFun, _QName, false, State, true, none) ->
%% single-active-consumer mode, no active consumer yet
{undelivered, false, State};
deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true,
SingleActiveConsumer) ->
%% single-active-consumer mode: only deliver to the active one
case maps:find(consumer_key(SingleActiveConsumer), Consumers) of
error -> {undelivered, false, State};
{ok, QEntry} ->
case deliver_to_consumer(FetchFun, QEntry, QName) of
{delivered, R} -> {delivered, false, R, State};
undelivered -> {undelivered, true, State}
end
end.
When the active consumer cancels or its channel closes, rabbit_queue_consumers promotes the next consumer in registration order to be the active one and notifies it via a basic.cancel + basic.consume-ok sequence.
Credit-Based Flow Between Queue and Channel #
Erlang’s credit_flow module implements a credit system between queue processes and channel processes. A channel process grants credits to a queue process; the queue process consumes credits when sending messages. When credits run out, the queue process stops sending.
%% In rabbit_amqqueue_process, before delivering a message to rabbit_channel:
case rabbit_channel_common:flow_control_enabled(ChPid) of
true -> credit_flow:send(ChPid); %% consume one credit
false -> ok
end
When the channel’s credit runs out:
credit_flow:send/1blocks (or signals suspension).rabbit_amqqueue_processsets its internal flow state to paused.- No more deliveries are attempted until credits are replenished.
rabbit_channelperiodically sends credit grants back to queue processes.
This is the application-level backpressure chain: consumer slow to ack → channel accumulates unacked messages → channel stops granting credits to queue → queue pauses delivery → message backlog grows in queue storage (not in channel memory).
Without this credit flow, a slow consumer would cause the queue process to flood the channel process mailbox — a classic Erlang memory leak under overload.
Delivering to the Channel: The Frame Path #
deliver_to_consumer ultimately calls:
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, Msg)
This is a gen_server2 cast to the channel process. The channel process handles it:
handle_cast({deliver, ConsumerTag, AckRequired, Msg}, State) ->
DeliveryTag = State#ch.delivery_tag + 1,
%% Add to unacked_message_q
State1 = record_sent(ConsumerTag, AckRequired, Msg, DeliveryTag, State),
%% Build basic.deliver method
Method = #'basic.deliver'{
consumer_tag = ConsumerTag,
delivery_tag = DeliveryTag,
redelivered = mc:is_redelivered(Msg),
exchange = ExchangeName,
routing_key = RoutingKey
},
%% Send to writer
rabbit_writer:send_command_and_notify(WriterPid, QPid, Method, Content),
{noreply, State1}.
rabbit_writer:send_command_and_notify sends the frame to the client and notifies the queue process that delivery succeeded (for credit flow).
Consumer Cancellation and Cleanup #
When a consumer cancels (basic.cancel), the channel notifies the queue process. rabbit_queue_consumers:cancel/3 removes the consumer from the registry and the priority queue. Any messages delivered to this consumer but not yet acked remain in the channel’s unacked_message_q.
When the channel process terminates (consumer disconnect), rabbit_channel:terminate/2 requeues all pending acks (as described in Chapter 3). This call reaches rabbit_amqqueue_process:requeue/3, which calls rabbit_backing_queue:requeue/2 (for classic queues) — the VQ puts the messages back into the ready set with redeliver_seq_id updated so they will be delivered with redelivered=true.
For quorum queues, the requeue is a return command sent to Ra: rabbit_fifo_client:return/3 → Ra pipeline → rabbit_fifo:apply marks messages as returned → they re-enter the checkout cycle.
Consumer Priority #
rabbit_queue_consumers uses a priority queue (priority_queue.erl in rabbit_common) to order consumers. Priority is set via the x-priority consumer argument:
%% When registering a consumer:
Priority = rabbit_misc:table_lookup(Args, <<"x-priority">>),
priority_queue:in(ConsumerKey, Priority, Consumers)
Higher priority consumers are served first when multiple consumers are active. Within the same priority, consumers are served in round-robin order (FIFO priority queue).
Summary #
rabbit_queue_consumers is a data structure (not a process) embedded in the queue process that manages consumer registration, priority ordering, prefetch enforcement, and single-active-consumer logic. Delivery is a cooperative protocol between the queue process (calling deliver/5), rabbit_limiter (enforcing prefetch), credit_flow (rate-matching between queue and channel processes), and rabbit_channel (tracking unacked messages and serializing frames). Consumer disconnect requeues all pending acks via VQ’s backing queue interface (classic) or Ra return commands (quorum).