- My Development Notes/
- RabbitMQ Internals: A Substrate Decomposition/
- Channel Multiplexer Substrate — rabbit_channel/
Channel Multiplexer Substrate — rabbit_channel
Table of Contents
Channel Multiplexer Substrate — rabbit_channel #
rabbit_channel is the gen_server that owns one AMQP channel. It is the busiest process in the broker per message: every publish, every ack, every consume call passes through it. It coordinates between the transport layer (receiving parsed methods from rabbit_reader) and the storage/delivery layer (routing to queue processes and receiving delivered messages from them).
gen_server2: The Priority Mailbox #
rabbit_channel uses gen_server2 — RabbitMQ’s enhanced gen_server — rather than OTP’s standard gen_server. The key addition: gen_server2 supports a priority mailbox. The channel defines priority rules:
prioritise_cast(Msg, _Len, _State) ->
case Msg of
{confirm, _, _} -> 5;
{mandatory_received, _} -> 6;
{rejected, _, _} -> 5;
_ -> 0
end.
Confirms and rejections from queue processes are delivered before ordinary casts. This ensures that publisher confirm state (rabbit_confirms) is updated promptly, preventing confirm accumulation under load.
Channel State Machine #
The conf record’s state field tracks the channel lifecycle:
-record(conf, {
state, %% starting | running | flow | closing
...
}).
starting: channel.open received, waiting for internal initialization.running: normal operation.flow: credit flow engaged — incoming publishes are being throttled.closing: channel.close sent or received, draining in-flight operations.
The flow state is entered when rabbit_limiter signals that the channel is over credit. In flow state, rabbit_reader calls rabbit_channel:do_flow/3 which applies credit — if no credit is available, the call blocks the reader process, which stops reading from the socket.
Key State in the Channel Record #
-record(ch, {
...
delivery_tag, %% monotonically increasing, per-channel
unacked_message_q, %% pending_ack queue: delivered but not acked
consumer_mapping, %% CTag → {Queue, AckMode, Prefetch, Args}
confirm_enabled, %% boolean: is confirm mode on?
publish_seqno, %% monotonically increasing publish sequence number
unconfirmed, %% rabbit_confirms state: seqno → {exchange, queues}
confirmed, %% list of seqnos confirmed by all queues
rejected, %% list of seqnos rejected by any queue
limiter, %% rabbit_limiter pid
...
}).
delivery_tag: incremented on each basic.deliver. Consumers use this in basic.ack. Channel-local — resets to 0 when the channel closes.
unacked_message_q: a queue of #pending_ack{} records for messages delivered to consumers but not yet acked. When a consumer’s channel or connection closes, all entries here are requeued.
publish_seqno: incremented on each basic.publish when confirm mode is enabled. This is the sequence number sent to the producer in basic.ack.
basic.publish: The Routing Path #
When rabbit_channel receives basic.publish:
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
Content, State) ->
%% 1. Resolve exchange name to exchange record
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
%% 2. Build message (mc = message container)
Message = rabbit_basic:message(ExchangeName, RoutingKey, Content),
%% 3. Route: exchange → list of queue names
Queues = rabbit_exchange:route(Exchange, Message),
%% 4. Deliver to each queue
{noreply, deliver_to_queues({Message, Mandatory, Queues}, State)}.
rabbit_exchange:route/2 is a pure function call — no process hop. It invokes the exchange type’s route/2 callback (e.g., rabbit_exchange_type_topic:route/2) which does an ETS lookup. The result is a list of #resource{kind = queue, ...} names.
deliver_to_queues then sends the message to each queue process:
deliver_to_queues({Message, Mandatory, Queues}, State) ->
%% If confirm mode: register the publish seqno against all destination queues
State1 = maybe_record_confirms(Queues, State),
%% Deliver to each queue process
lists:foldl(fun deliver_to_queue/2, State1, Queues).
For classic queues: gen_server2:cast(QPid, {deliver, Message, ...}).
For quorum queues: rabbit_fifo_client:enqueue(QName, Message, FifoClientState) → Ra pipeline command.
Publisher Confirms: rabbit_confirms #
rabbit_confirms is a module (not a process) managing the confirm state for a channel. Its state is embedded in the ch record’s unconfirmed field.
%% rabbit_confirms.erl data structure
-record(?MODULE, {
smallest :: undefined | seq_no(),
unconfirmed = #{} :: #{seq_no() => {exchange_name(), #{queue_name() => ok}}}
}).
Each published message with confirm_enabled is registered:
seq_no→{ExchangeName, #{QueueName => ok}}for each destination queue.
When a queue process confirms delivery ({confirm, SeqNos, QueueName} cast to channel):
handle_cast({confirm, SeqNos, QueueName}, State = #ch{unconfirmed = UC}) ->
{Confirmed, UC1} = rabbit_confirms:confirm(SeqNos, QueueName, UC),
%% Confirmed = list of {SeqNo, ExchangeName} where all queues have confirmed
rabbit_writer:send_command(WriterPid, #'basic.ack'{
delivery_tag = smallest_confirmed_seqno,
multiple = true
}),
...
A seq_no is fully confirmed only when all destination queues have confirmed it. If a message routes to 3 queues (fanout), the channel waits for 3 queue confirmations before sending basic.ack to the producer. rabbit_confirms:confirm/3 removes the confirmed queue from the per-seqno queue set; when the set is empty, the seqno is ready to confirm.
The smallest field tracks the lowest unconfirmed seqno — basic.ack(multiple=true) is sent from smallest to the current confirmed batch, allowing bulk acks in one frame.
Prefetch and Flow Control: rabbit_limiter #
rabbit_limiter is a gen_server spawned as a sibling of rabbit_channel under rabbit_channel_sup. It owns the channel’s prefetch state and credit-flow tokens.
%% Setting prefetch in rabbit_channel:
rabbit_limiter:limit_prefetch(Limiter, PrefetchCount, 0)
%% Before delivering a message to a consumer:
case rabbit_limiter:can_send(Limiter, ConsumerAckRequired, CTag) of
{ok, Limiter1} -> deliver_message_to_consumer(...);
{suspended, _} -> stop_delivering(...)
end.
rabbit_limiter also implements credit-based flow (credit_flow) between channel and queue processes. When rabbit_channel sends too many messages to a queue process without the queue processing them (backlog building up), the queue’s credit runs out. rabbit_channel transitions to the flow state, which causes rabbit_reader to stop reading new publish frames from the socket.
This implements application-level backpressure: slow queues → channel flow state → reader stops reading → TCP flow control → producer blocks. The same chain as resource-level backpressure but triggered from queue-level overload rather than broker-level memory/disk alarm.
basic.ack / basic.nack / basic.reject: Consumer Side #
When a consumer acks:
handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State) ->
%% Remove from unacked_message_q
{Acked, State1} = collect_acks(DeliveryTag, Multiple, State),
%% Notify each queue process that message is settled
lists:foreach(fun({QPid, MsgIds}) ->
rabbit_amqqueue:ack(QPid, MsgIds, self())
end, Acked),
{noreply, State1}.
collect_acks scans unacked_message_q and returns the matching #pending_ack{} records. Each record contains the queue pid and the queue-internal message ID. The channel notifies each queue — the queue removes the message from its persistent store.
For basic.nack/basic.reject with requeue=true: rabbit_amqqueue:requeue/3 is called — the queue process inserts the message back into its ready set. For requeue=false: rabbit_amqqueue:reject/3 — the queue dead-letters or discards.
Channel Close and Unacked Cleanup #
When a channel closes (gracefully or via crash), rabbit_channel:terminate/2 requeues all unacked messages:
terminate(_, State = #ch{unacked_message_q = UAMQ}) ->
%% Requeue all pending acks
ok = fold_per_queue(
fun(QPid, MsgIds, ok) ->
rabbit_amqqueue:requeue(QPid, MsgIds, self())
end, ok, UAMQ),
...
This is the mechanism behind “messages redelivered when consumer disconnects”: the channel process, on termination, explicitly requeues every message it had delivered but not yet acked.
Summary #
rabbit_channel is the channel state machine: a gen_server2 with a priority mailbox that receives AMQP methods from rabbit_reader, coordinates routing (calling exchange type modules), tracks publisher confirms via rabbit_confirms, enforces prefetch via rabbit_limiter, maintains the unacked message queue, and requeues all outstanding messages on close. The priority mailbox ensures confirms are processed promptly. Credit-based flow control between channel and queue processes provides application-level backpressure that chains through to TCP-level flow control.