- My Development Notes/
- RabbitMQ Internals: A Substrate Decomposition/
- Transport Substrate — rabbit_reader and rabbit_writer/
Transport Substrate — rabbit_reader and rabbit_writer
Table of Contents
Transport Substrate — rabbit_reader and rabbit_writer #
The transport substrate is implemented across three modules: rabbit_reader (connection owner and frame parser), rabbit_writer (frame serializer and sender), and rabbit_heartbeat (keepalive timer). Together they form the TCP session layer — the boundary between raw bytes on the wire and typed AMQP methods dispatched to channel processes.
rabbit_reader is a Special Process #
rabbit_reader is not a gen_server or gen_statem. The module header says:
%% Reader processes are special processes (in the OTP sense).
A “special process” in OTP implements the system message protocol manually (system_continue/3, system_terminate/4, system_code_change/4) without using a behaviour. This gives rabbit_reader complete control over its receive loop — essential for efficient frame parsing, because a gen_server receive loop would add overhead and complicate buffer management.
Ranch: TCP Acceptor Pool #
RabbitMQ uses Ranch for TCP connection acceptance. Ranch maintains a pool of acceptor processes that call accept() on the listening socket. When a connection arrives, Ranch spawns a new process and passes the socket to it — this becomes the rabbit_reader process.
%% From rabbit_reader.erl
-record(v1, {
parent,
ranch_ref, %% Ranch reference, used for socket handoff
sock, %% the TCP socket
connection, %% connection-level AMQP state
callback, %% current parser state (framing callback)
...
throttle, %% backpressure state
heartbeater, %% pid of rabbit_heartbeat process
...
}).
Ranch handles SO_REUSEPORT, acceptor pool sizing, and SSL handshake (via ranch_ssl). rabbit_reader receives control after the TCP (and optionally TLS) layer is established.
The recvloop/mainloop Event Loop #
rabbit_reader runs two mutually recursive functions: recvloop and mainloop.
recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) ->
mainloop(Deb, Buf, BufLen, State);
recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) ->
mainloop(Deb, Buf, BufLen, State); %% ← backpressure: stop reading
recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen}) ->
ok = rabbit_net:async_recv(Sock, RecvLen, infinity),
mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true}).
recvloop issues an async receive request (rabbit_net:async_recv/3) on the socket and transitions to mainloop to wait. The recv_len field tells it exactly how many bytes to read — this is the progressive frame parsing state machine: first read 7 bytes (frame header), then read size bytes (payload), then read 1 byte (frame-end sentinel).
mainloop waits for either socket data or Erlang messages:
mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock, ...}) ->
receive
{inet_async, Sock, _, {ok, Data}} ->
%% New bytes from socket
recvloop(Deb, [Data | Buf], BufLen + size(Data), State1);
{inet_async, Sock, _, {error, closed}} ->
...;
{conserve_resources, Source, Conserve} ->
%% Backpressure signal from alarm system
control_throttle(State1);
{channel_exit, ChannelNum, Reason} ->
...;
Other ->
handle_other(Other, State)
end.
The critical backpressure path: when connection_state = blocked, recvloop does not issue a new async_recv. The socket’s TCP receive buffer fills up. When the kernel buffer is full, TCP flow control activates — the sender’s TCP send buffer fills. When that is full, the producer’s write() call blocks. This is the chain from application-level block to TCP-level backpressure.
Frame Parsing: handle_input and callback #
Frame parsing is a state machine driven by the callback field in the v1 record. The callback is a function that processes the current chunk of bytes and returns the next callback (and possibly a parsed result).
The initial state after the protocol header is received: callback = {frame_header, <<>>}. The flow:
handle_input({frame_header, <<>>}, Data, State)
→ reads 7 bytes (type, channel, size)
→ sets recv_len = Size + 1 (payload + frame-end)
→ callback = {frame_payload, Type, Channel, Size, <<>>}
handle_input({frame_payload, Type, Channel, Size, Payload}, Data, State)
→ assembles payload bytes
→ validates frame-end = 0xCE
→ calls process_frame(Type, Channel, Payload, State)
→ callback = {frame_header, <<>>} (back to start)
process_frame dispatches on the frame type:
- Type 1 (METHOD): calls
rabbit_command_assembler:process/4to decode the method class + method ID + arguments usingrabbit_framing_amqp_0_9_1generated code. - Type 2 (HEADER) + Type 3 (BODY): accumulated into a content record, then forwarded to the channel process once fully assembled.
- Type 8 (HEARTBEAT): forwards to
rabbit_heartbeatprocess.
rabbit_framing_amqp_0_9_1.erl is generated code (by rabbitmq_codegen) — it contains the complete AMQP 0-9-1 method encoding/decoding table. Every class ID, method ID, and argument type is hardcoded as pattern-matched Erlang functions.
Dispatching to Channels #
Once a complete method frame is parsed, rabbit_reader forwards it to the appropriate rabbit_channel process:
%% Connection-level methods (channel 0) handled inline
handle_method(#'connection.tune_ok'{...}, _, State) -> ...;
handle_method(#'connection.open'{...}, _, State) -> ...;
%% Channel methods forwarded via message-passing
deliver_to_channel(Channel, Method, Content, State) ->
case get_channel(Channel, State) of
{ok, ChPid} ->
rabbit_channel:do_flow(ChPid, Method, Content);
...
end.
rabbit_channel:do_flow/3 is used for most channel methods (it applies credit flow). rabbit_channel:do/2 is used for methods that should not be subject to flow control (like basic.ack).
The throttle Record and Backpressure #
The throttle record tracks what is blocking the connection:
-record(throttle, {
last_blocked_at,
should_block,
blocked_by, %% set of blocking reasons
connection_blocked_message_sent
}).
blocked_by is a set that can contain:
{resource, memory}— broker memory high-watermark exceeded{resource, disk}— broker disk free space lowflow— internal credit flow signals from channel processes
The conserve_resources/3 callback is called by rabbit_alarm when the broker crosses a resource threshold. It sends a {conserve_resources, Source, Conserve} message to all reader processes:
conserve_resources(Pid, Source, {_, Conserve, _}) ->
Pid ! {conserve_resources, Source, Conserve},
ok.
When blocked_by becomes non-empty, control_throttle sets connection_state = blocked and sends connection.blocked to the client. When blocked_by becomes empty, it sends connection.unblocked and resumes recvloop.
rabbit_writer: The Sending Side #
rabbit_writer is a separate process from rabbit_reader. Separation allows rabbit_channel processes to send frames (deliveries, confirms) concurrently with rabbit_reader parsing incoming frames.
rabbit_writer receives messages from channel processes:
%% In rabbit_channel.erl, sending a frame to the client:
rabbit_writer:send_command(WriterPid, Method)
rabbit_writer:send_command_and_notify(WriterPid, QueuePid, Method, Content)
rabbit_writer serializes the method using rabbit_binary_generator:build_simple_method_frame/3 and writes to the socket. For content (header + body), it calls rabbit_binary_generator:build_content_frames/4 which splits the body across frames respecting frame_max.
The writer uses gen_tcp:send/2 (synchronous write). If the TCP send buffer is full (due to a slow or blocked client), gen_tcp:send blocks the writer process — this is how slow clients create backpressure on channel processes that try to deliver messages.
rabbit_heartbeat: The Keepalive Timer #
rabbit_heartbeat is a lightweight gen_server that tracks two timestamps: last byte received and last byte sent. It fires a timer at the negotiated heartbeat interval:
- If no bytes sent for
heartbeat/2seconds: send a heartbeat frame via the writer. - If no bytes received for
heartbeat * 2seconds: signalheartbeat_timeoutto the reader, which logs it and closes the connection.
%% rabbit_reader handles heartbeat timeout:
handle_other({heartbeat_send_error, _} = ErrHeartbeat, State) ->
%% The only portable way to detect disconnect on blocked
%% connection is to wait for heartbeat send failure.
...close_connection...
Note the comment: a blocked connection (no recvloop running) cannot detect TCP disconnect via async_recv. Heartbeat send failure is the only reliable disconnect signal for blocked connections.
SSL / TLS Integration #
rabbit_net is the abstraction layer over gen_tcp and ssl. All socket operations (recv, send, async_recv, setopts) go through rabbit_net, which dispatches to the appropriate module based on the socket type. Ranch handles TLS handshake before passing the socket to rabbit_reader; once the reader has the socket, TLS is transparent.
Summary #
rabbit_reader is a custom special process running a recvloop/mainloop event loop. Frame parsing is a progressive state machine: read 7-byte header → read payload → validate sentinel. Parsed method frames are dispatched to rabbit_channel processes via message-passing. rabbit_writer is a separate process handling outgoing serialization. Backpressure flows from the rabbit_alarm system → throttle record → connection_state = blocked → TCP flow control → producer blocks. rabbit_heartbeat detects dead connections especially during flow-controlled idle periods.