Skip to main content
  1. RabbitMQ Internals: A Substrate Decomposition/

Channel Multiplexer Substrate — rabbit_channel

Channel Multiplexer Substrate — rabbit_channel #

rabbit_channel is the gen_server that owns one AMQP channel. It is the busiest process in the broker per message: every publish, every ack, every consume call passes through it. It coordinates between the transport layer (receiving parsed methods from rabbit_reader) and the storage/delivery layer (routing to queue processes and receiving delivered messages from them).

gen_server2: The Priority Mailbox #

rabbit_channel uses gen_server2 — RabbitMQ’s enhanced gen_server — rather than OTP’s standard gen_server. The key addition: gen_server2 supports a priority mailbox. The channel defines priority rules:

prioritise_cast(Msg, _Len, _State) ->
    case Msg of
        {confirm,            _, _} -> 5;
        {mandatory_received, _}    -> 6;
        {rejected,           _, _} -> 5;
        _                          -> 0
    end.

Confirms and rejections from queue processes are delivered before ordinary casts. This ensures that publisher confirm state (rabbit_confirms) is updated promptly, preventing confirm accumulation under load.

Channel State Machine #

The conf record’s state field tracks the channel lifecycle:

-record(conf, {
    state,          %% starting | running | flow | closing
    ...
}).
  • starting: channel.open received, waiting for internal initialization.
  • running: normal operation.
  • flow: credit flow engaged — incoming publishes are being throttled.
  • closing: channel.close sent or received, draining in-flight operations.

The flow state is entered when rabbit_limiter signals that the channel is over credit. In flow state, rabbit_reader calls rabbit_channel:do_flow/3 which applies credit — if no credit is available, the call blocks the reader process, which stops reading from the socket.

Key State in the Channel Record #

-record(ch, {
    ...
    delivery_tag,           %% monotonically increasing, per-channel
    unacked_message_q,      %% pending_ack queue: delivered but not acked
    consumer_mapping,       %% CTag → {Queue, AckMode, Prefetch, Args}
    confirm_enabled,        %% boolean: is confirm mode on?
    publish_seqno,          %% monotonically increasing publish sequence number
    unconfirmed,            %% rabbit_confirms state: seqno → {exchange, queues}
    confirmed,              %% list of seqnos confirmed by all queues
    rejected,               %% list of seqnos rejected by any queue
    limiter,                %% rabbit_limiter pid
    ...
}).

delivery_tag: incremented on each basic.deliver. Consumers use this in basic.ack. Channel-local — resets to 0 when the channel closes.

unacked_message_q: a queue of #pending_ack{} records for messages delivered to consumers but not yet acked. When a consumer’s channel or connection closes, all entries here are requeued.

publish_seqno: incremented on each basic.publish when confirm mode is enabled. This is the sequence number sent to the producer in basic.ack.

basic.publish: The Routing Path #

When rabbit_channel receives basic.publish:

handle_method(#'basic.publish'{exchange    = ExchangeNameBin,
                               routing_key = RoutingKey,
                               mandatory   = Mandatory},
              Content, State) ->
    %% 1. Resolve exchange name to exchange record
    ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
    Exchange = rabbit_exchange:lookup_or_die(ExchangeName),

    %% 2. Build message (mc = message container)
    Message = rabbit_basic:message(ExchangeName, RoutingKey, Content),

    %% 3. Route: exchange → list of queue names
    Queues = rabbit_exchange:route(Exchange, Message),

    %% 4. Deliver to each queue
    {noreply, deliver_to_queues({Message, Mandatory, Queues}, State)}.

rabbit_exchange:route/2 is a pure function call — no process hop. It invokes the exchange type’s route/2 callback (e.g., rabbit_exchange_type_topic:route/2) which does an ETS lookup. The result is a list of #resource{kind = queue, ...} names.

deliver_to_queues then sends the message to each queue process:

deliver_to_queues({Message, Mandatory, Queues}, State) ->
    %% If confirm mode: register the publish seqno against all destination queues
    State1 = maybe_record_confirms(Queues, State),
    %% Deliver to each queue process
    lists:foldl(fun deliver_to_queue/2, State1, Queues).

For classic queues: gen_server2:cast(QPid, {deliver, Message, ...}). For quorum queues: rabbit_fifo_client:enqueue(QName, Message, FifoClientState) → Ra pipeline command.

Publisher Confirms: rabbit_confirms #

rabbit_confirms is a module (not a process) managing the confirm state for a channel. Its state is embedded in the ch record’s unconfirmed field.

%% rabbit_confirms.erl data structure
-record(?MODULE, {
    smallest :: undefined | seq_no(),
    unconfirmed = #{} :: #{seq_no() => {exchange_name(), #{queue_name() => ok}}}
}).

Each published message with confirm_enabled is registered:

  • seq_no{ExchangeName, #{QueueName => ok}} for each destination queue.

When a queue process confirms delivery ({confirm, SeqNos, QueueName} cast to channel):

handle_cast({confirm, SeqNos, QueueName}, State = #ch{unconfirmed = UC}) ->
    {Confirmed, UC1} = rabbit_confirms:confirm(SeqNos, QueueName, UC),
    %% Confirmed = list of {SeqNo, ExchangeName} where all queues have confirmed
    rabbit_writer:send_command(WriterPid, #'basic.ack'{
        delivery_tag = smallest_confirmed_seqno,
        multiple = true
    }),
    ...

A seq_no is fully confirmed only when all destination queues have confirmed it. If a message routes to 3 queues (fanout), the channel waits for 3 queue confirmations before sending basic.ack to the producer. rabbit_confirms:confirm/3 removes the confirmed queue from the per-seqno queue set; when the set is empty, the seqno is ready to confirm.

The smallest field tracks the lowest unconfirmed seqno — basic.ack(multiple=true) is sent from smallest to the current confirmed batch, allowing bulk acks in one frame.

Prefetch and Flow Control: rabbit_limiter #

rabbit_limiter is a gen_server spawned as a sibling of rabbit_channel under rabbit_channel_sup. It owns the channel’s prefetch state and credit-flow tokens.

%% Setting prefetch in rabbit_channel:
rabbit_limiter:limit_prefetch(Limiter, PrefetchCount, 0)

%% Before delivering a message to a consumer:
case rabbit_limiter:can_send(Limiter, ConsumerAckRequired, CTag) of
    {ok, Limiter1}  -> deliver_message_to_consumer(...);
    {suspended, _}  -> stop_delivering(...)
end.

rabbit_limiter also implements credit-based flow (credit_flow) between channel and queue processes. When rabbit_channel sends too many messages to a queue process without the queue processing them (backlog building up), the queue’s credit runs out. rabbit_channel transitions to the flow state, which causes rabbit_reader to stop reading new publish frames from the socket.

This implements application-level backpressure: slow queues → channel flow state → reader stops reading → TCP flow control → producer blocks. The same chain as resource-level backpressure but triggered from queue-level overload rather than broker-level memory/disk alarm.

basic.ack / basic.nack / basic.reject: Consumer Side #

When a consumer acks:

handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State) ->
    %% Remove from unacked_message_q
    {Acked, State1} = collect_acks(DeliveryTag, Multiple, State),
    %% Notify each queue process that message is settled
    lists:foreach(fun({QPid, MsgIds}) ->
        rabbit_amqqueue:ack(QPid, MsgIds, self())
    end, Acked),
    {noreply, State1}.

collect_acks scans unacked_message_q and returns the matching #pending_ack{} records. Each record contains the queue pid and the queue-internal message ID. The channel notifies each queue — the queue removes the message from its persistent store.

For basic.nack/basic.reject with requeue=true: rabbit_amqqueue:requeue/3 is called — the queue process inserts the message back into its ready set. For requeue=false: rabbit_amqqueue:reject/3 — the queue dead-letters or discards.

Channel Close and Unacked Cleanup #

When a channel closes (gracefully or via crash), rabbit_channel:terminate/2 requeues all unacked messages:

terminate(_, State = #ch{unacked_message_q = UAMQ}) ->
    %% Requeue all pending acks
    ok = fold_per_queue(
        fun(QPid, MsgIds, ok) ->
            rabbit_amqqueue:requeue(QPid, MsgIds, self())
        end, ok, UAMQ),
    ...

This is the mechanism behind “messages redelivered when consumer disconnects”: the channel process, on termination, explicitly requeues every message it had delivered but not yet acked.

Summary #

rabbit_channel is the channel state machine: a gen_server2 with a priority mailbox that receives AMQP methods from rabbit_reader, coordinates routing (calling exchange type modules), tracks publisher confirms via rabbit_confirms, enforces prefetch via rabbit_limiter, maintains the unacked message queue, and requeues all outstanding messages on close. The priority mailbox ensures confirms are processed promptly. Credit-based flow control between channel and queue processes provides application-level backpressure that chains through to TCP-level flow control.