Skip to main content
  1. RabbitMQ Internals: A Substrate Decomposition/

Delivery Substrate — rabbit_queue_consumers

Delivery Substrate — rabbit_queue_consumers #

The delivery substrate sits between queue storage and channel processes. It answers the question: given a message ready to be delivered, which consumer should receive it, and is that consumer ready? rabbit_queue_consumers is the data structure (not a process) that owns this logic, embedded in the queue process state.

The Consumer Registry #

rabbit_queue_consumers maintains two ETS tables per queue process:

  • consumers: the set of registered consumers. Each entry is a #consumer{}:
-record(consumer, {
    tag,           %% consumer tag (binary)
    ack_required,  %% boolean: does this consumer require manual ack?
    prefetch,      %% per-consumer prefetch count (0 = unlimited)
    args,          %% consumer arguments (x-priority, x-cancel-on-ha-failover, etc.)
    user           %% authenticated user (for authz on delivery)
}).
  • cr (channel-register): per-channel delivery state, keyed by channel pid:
-record(cr, {
    consumer_count,
    ch_pid,
    limiter,            %% rabbit_limiter pid for this channel
    unsent_count = 0,   %% messages delivered but not yet acked
    blocked = false,    %% is this channel suspended due to prefetch?
    ...
}).

The deliver/5 Function #

rabbit_queue_consumers:deliver/5 is called by rabbit_amqqueue_process whenever a message is ready for delivery:

deliver(FetchFun, QName, State, SingleActiveConsumerIsOn, ActiveConsumer) ->
    deliver(FetchFun, QName, false, State, SingleActiveConsumerIsOn, ActiveConsumer).

deliver(FetchFun, QName, ConsumersChanged,
        State = #state{consumers = Consumers}, SingleActiveConsumerOn, _) ->
    case priority_queue:out(Consumers) of
        {empty, _} ->
            {undelivered, ConsumersChanged, State};
        {{value, QEntry}, Consumers1} ->
            case deliver_to_consumer(FetchFun, QEntry, QName) of
                {delivered, R} ->
                    {delivered, ConsumersChanged, R, State};
                undelivered ->
                    %% Consumer blocked (prefetch full); try next
                    deliver(FetchFun, QName, true,
                            State#state{consumers = Consumers1},
                            SingleActiveConsumerOn, none)
            end
    end.

The Consumers is a priority queue (round-robin at same priority, higher x-priority argument wins). deliver_to_consumer checks if the selected consumer can accept a message (limiter check) and if so, calls FetchFun to pull the next message from the backing queue, then sends it to the channel.

Prefetch Enforcement via rabbit_limiter #

The per-channel rabbit_limiter process is the gatekeeper for delivery. deliver_to_consumer calls:

case rabbit_limiter:can_send(Limiter, AckRequired, CTag) of
    {ok, Limiter1}     -> send_message(...);
    {suspend, Limiter1} -> undelivered  %% consumer blocked: prefetch full
end.

rabbit_limiter:can_send/3 checks:

  • Per-consumer prefetch: unsent_count < prefetch_count for this consumer tag.
  • Per-channel global prefetch: total unacked across all consumers on this channel.
  • Credit flow: whether the channel process has granted credit.

When a consumer is suspended (prefetch full), it is removed from the consumers priority queue in the delivery state — blocked consumers are not considered for delivery until credit is replenished. When basic.ack arrives and rabbit_limiter grants new credit, the consumer is re-inserted into the priority queue.

Single-Active-Consumer #

When a queue is declared with x-single-active-consumer = true, only one consumer is active at a time. Extras are registered but not delivered to:

deliver(_FetchFun, _QName, false, State, true, none) ->
    %% single-active-consumer mode, no active consumer yet
    {undelivered, false, State};
deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true,
        SingleActiveConsumer) ->
    %% single-active-consumer mode: only deliver to the active one
    case maps:find(consumer_key(SingleActiveConsumer), Consumers) of
        error -> {undelivered, false, State};
        {ok, QEntry} ->
            case deliver_to_consumer(FetchFun, QEntry, QName) of
                {delivered, R} -> {delivered, false, R, State};
                undelivered    -> {undelivered, true, State}
            end
    end.

When the active consumer cancels or its channel closes, rabbit_queue_consumers promotes the next consumer in registration order to be the active one and notifies it via a basic.cancel + basic.consume-ok sequence.

Credit-Based Flow Between Queue and Channel #

Erlang’s credit_flow module implements a credit system between queue processes and channel processes. A channel process grants credits to a queue process; the queue process consumes credits when sending messages. When credits run out, the queue process stops sending.

%% In rabbit_amqqueue_process, before delivering a message to rabbit_channel:
case rabbit_channel_common:flow_control_enabled(ChPid) of
    true  -> credit_flow:send(ChPid);  %% consume one credit
    false -> ok
end

When the channel’s credit runs out:

  1. credit_flow:send/1 blocks (or signals suspension).
  2. rabbit_amqqueue_process sets its internal flow state to paused.
  3. No more deliveries are attempted until credits are replenished.
  4. rabbit_channel periodically sends credit grants back to queue processes.

This is the application-level backpressure chain: consumer slow to ack → channel accumulates unacked messages → channel stops granting credits to queue → queue pauses delivery → message backlog grows in queue storage (not in channel memory).

Without this credit flow, a slow consumer would cause the queue process to flood the channel process mailbox — a classic Erlang memory leak under overload.

Delivering to the Channel: The Frame Path #

deliver_to_consumer ultimately calls:

rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, Msg)

This is a gen_server2 cast to the channel process. The channel process handles it:

handle_cast({deliver, ConsumerTag, AckRequired, Msg}, State) ->
    DeliveryTag = State#ch.delivery_tag + 1,
    %% Add to unacked_message_q
    State1 = record_sent(ConsumerTag, AckRequired, Msg, DeliveryTag, State),
    %% Build basic.deliver method
    Method = #'basic.deliver'{
        consumer_tag = ConsumerTag,
        delivery_tag = DeliveryTag,
        redelivered  = mc:is_redelivered(Msg),
        exchange     = ExchangeName,
        routing_key  = RoutingKey
    },
    %% Send to writer
    rabbit_writer:send_command_and_notify(WriterPid, QPid, Method, Content),
    {noreply, State1}.

rabbit_writer:send_command_and_notify sends the frame to the client and notifies the queue process that delivery succeeded (for credit flow).

Consumer Cancellation and Cleanup #

When a consumer cancels (basic.cancel), the channel notifies the queue process. rabbit_queue_consumers:cancel/3 removes the consumer from the registry and the priority queue. Any messages delivered to this consumer but not yet acked remain in the channel’s unacked_message_q.

When the channel process terminates (consumer disconnect), rabbit_channel:terminate/2 requeues all pending acks (as described in Chapter 3). This call reaches rabbit_amqqueue_process:requeue/3, which calls rabbit_backing_queue:requeue/2 (for classic queues) — the VQ puts the messages back into the ready set with redeliver_seq_id updated so they will be delivered with redelivered=true.

For quorum queues, the requeue is a return command sent to Ra: rabbit_fifo_client:return/3 → Ra pipeline → rabbit_fifo:apply marks messages as returned → they re-enter the checkout cycle.

Consumer Priority #

rabbit_queue_consumers uses a priority queue (priority_queue.erl in rabbit_common) to order consumers. Priority is set via the x-priority consumer argument:

%% When registering a consumer:
Priority = rabbit_misc:table_lookup(Args, <<"x-priority">>),
priority_queue:in(ConsumerKey, Priority, Consumers)

Higher priority consumers are served first when multiple consumers are active. Within the same priority, consumers are served in round-robin order (FIFO priority queue).

Summary #

rabbit_queue_consumers is a data structure (not a process) embedded in the queue process that manages consumer registration, priority ordering, prefetch enforcement, and single-active-consumer logic. Delivery is a cooperative protocol between the queue process (calling deliver/5), rabbit_limiter (enforcing prefetch), credit_flow (rate-matching between queue and channel processes), and rabbit_channel (tracking unacked messages and serializing frames). Consumer disconnect requeues all pending acks via VQ’s backing queue interface (classic) or Ra return commands (quorum).