Skip to main content
  1. RabbitMQ Internals: A Substrate Decomposition/

Transport Substrate — rabbit_reader and rabbit_writer

Transport Substrate — rabbit_reader and rabbit_writer #

The transport substrate is implemented across three modules: rabbit_reader (connection owner and frame parser), rabbit_writer (frame serializer and sender), and rabbit_heartbeat (keepalive timer). Together they form the TCP session layer — the boundary between raw bytes on the wire and typed AMQP methods dispatched to channel processes.

rabbit_reader is a Special Process #

rabbit_reader is not a gen_server or gen_statem. The module header says:

%% Reader processes are special processes (in the OTP sense).

A “special process” in OTP implements the system message protocol manually (system_continue/3, system_terminate/4, system_code_change/4) without using a behaviour. This gives rabbit_reader complete control over its receive loop — essential for efficient frame parsing, because a gen_server receive loop would add overhead and complicate buffer management.

Ranch: TCP Acceptor Pool #

RabbitMQ uses Ranch for TCP connection acceptance. Ranch maintains a pool of acceptor processes that call accept() on the listening socket. When a connection arrives, Ranch spawns a new process and passes the socket to it — this becomes the rabbit_reader process.

%% From rabbit_reader.erl
-record(v1, {
    parent,
    ranch_ref,    %% Ranch reference, used for socket handoff
    sock,         %% the TCP socket
    connection,   %% connection-level AMQP state
    callback,     %% current parser state (framing callback)
    ...
    throttle,     %% backpressure state
    heartbeater,  %% pid of rabbit_heartbeat process
    ...
}).

Ranch handles SO_REUSEPORT, acceptor pool sizing, and SSL handshake (via ranch_ssl). rabbit_reader receives control after the TCP (and optionally TLS) layer is established.

The recvloop/mainloop Event Loop #

rabbit_reader runs two mutually recursive functions: recvloop and mainloop.

recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) ->
    mainloop(Deb, Buf, BufLen, State);
recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) ->
    mainloop(Deb, Buf, BufLen, State);  %% ← backpressure: stop reading
recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen}) ->
    ok = rabbit_net:async_recv(Sock, RecvLen, infinity),
    mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true}).

recvloop issues an async receive request (rabbit_net:async_recv/3) on the socket and transitions to mainloop to wait. The recv_len field tells it exactly how many bytes to read — this is the progressive frame parsing state machine: first read 7 bytes (frame header), then read size bytes (payload), then read 1 byte (frame-end sentinel).

mainloop waits for either socket data or Erlang messages:

mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock, ...}) ->
    receive
        {inet_async, Sock, _, {ok, Data}} ->
            %% New bytes from socket
            recvloop(Deb, [Data | Buf], BufLen + size(Data), State1);
        {inet_async, Sock, _, {error, closed}} ->
            ...;
        {conserve_resources, Source, Conserve} ->
            %% Backpressure signal from alarm system
            control_throttle(State1);
        {channel_exit, ChannelNum, Reason} ->
            ...;
        Other ->
            handle_other(Other, State)
    end.

The critical backpressure path: when connection_state = blocked, recvloop does not issue a new async_recv. The socket’s TCP receive buffer fills up. When the kernel buffer is full, TCP flow control activates — the sender’s TCP send buffer fills. When that is full, the producer’s write() call blocks. This is the chain from application-level block to TCP-level backpressure.

Frame Parsing: handle_input and callback #

Frame parsing is a state machine driven by the callback field in the v1 record. The callback is a function that processes the current chunk of bytes and returns the next callback (and possibly a parsed result).

The initial state after the protocol header is received: callback = {frame_header, <<>>}. The flow:

handle_input({frame_header, <<>>}, Data, State)
  → reads 7 bytes (type, channel, size)
  → sets recv_len = Size + 1 (payload + frame-end)
  → callback = {frame_payload, Type, Channel, Size, <<>>}

handle_input({frame_payload, Type, Channel, Size, Payload}, Data, State)
  → assembles payload bytes
  → validates frame-end = 0xCE
  → calls process_frame(Type, Channel, Payload, State)
  → callback = {frame_header, <<>>}  (back to start)

process_frame dispatches on the frame type:

  • Type 1 (METHOD): calls rabbit_command_assembler:process/4 to decode the method class + method ID + arguments using rabbit_framing_amqp_0_9_1 generated code.
  • Type 2 (HEADER) + Type 3 (BODY): accumulated into a content record, then forwarded to the channel process once fully assembled.
  • Type 8 (HEARTBEAT): forwards to rabbit_heartbeat process.

rabbit_framing_amqp_0_9_1.erl is generated code (by rabbitmq_codegen) — it contains the complete AMQP 0-9-1 method encoding/decoding table. Every class ID, method ID, and argument type is hardcoded as pattern-matched Erlang functions.

Dispatching to Channels #

Once a complete method frame is parsed, rabbit_reader forwards it to the appropriate rabbit_channel process:

%% Connection-level methods (channel 0) handled inline
handle_method(#'connection.tune_ok'{...}, _, State) -> ...;
handle_method(#'connection.open'{...}, _, State) -> ...;

%% Channel methods forwarded via message-passing
deliver_to_channel(Channel, Method, Content, State) ->
    case get_channel(Channel, State) of
        {ok, ChPid} ->
            rabbit_channel:do_flow(ChPid, Method, Content);
        ...
    end.

rabbit_channel:do_flow/3 is used for most channel methods (it applies credit flow). rabbit_channel:do/2 is used for methods that should not be subject to flow control (like basic.ack).

The throttle Record and Backpressure #

The throttle record tracks what is blocking the connection:

-record(throttle, {
    last_blocked_at,
    should_block,
    blocked_by,                    %% set of blocking reasons
    connection_blocked_message_sent
}).

blocked_by is a set that can contain:

  • {resource, memory} — broker memory high-watermark exceeded
  • {resource, disk} — broker disk free space low
  • flow — internal credit flow signals from channel processes

The conserve_resources/3 callback is called by rabbit_alarm when the broker crosses a resource threshold. It sends a {conserve_resources, Source, Conserve} message to all reader processes:

conserve_resources(Pid, Source, {_, Conserve, _}) ->
    Pid ! {conserve_resources, Source, Conserve},
    ok.

When blocked_by becomes non-empty, control_throttle sets connection_state = blocked and sends connection.blocked to the client. When blocked_by becomes empty, it sends connection.unblocked and resumes recvloop.

rabbit_writer: The Sending Side #

rabbit_writer is a separate process from rabbit_reader. Separation allows rabbit_channel processes to send frames (deliveries, confirms) concurrently with rabbit_reader parsing incoming frames.

rabbit_writer receives messages from channel processes:

%% In rabbit_channel.erl, sending a frame to the client:
rabbit_writer:send_command(WriterPid, Method)
rabbit_writer:send_command_and_notify(WriterPid, QueuePid, Method, Content)

rabbit_writer serializes the method using rabbit_binary_generator:build_simple_method_frame/3 and writes to the socket. For content (header + body), it calls rabbit_binary_generator:build_content_frames/4 which splits the body across frames respecting frame_max.

The writer uses gen_tcp:send/2 (synchronous write). If the TCP send buffer is full (due to a slow or blocked client), gen_tcp:send blocks the writer process — this is how slow clients create backpressure on channel processes that try to deliver messages.

rabbit_heartbeat: The Keepalive Timer #

rabbit_heartbeat is a lightweight gen_server that tracks two timestamps: last byte received and last byte sent. It fires a timer at the negotiated heartbeat interval:

  • If no bytes sent for heartbeat/2 seconds: send a heartbeat frame via the writer.
  • If no bytes received for heartbeat * 2 seconds: signal heartbeat_timeout to the reader, which logs it and closes the connection.
%% rabbit_reader handles heartbeat timeout:
handle_other({heartbeat_send_error, _} = ErrHeartbeat, State) ->
    %% The only portable way to detect disconnect on blocked
    %% connection is to wait for heartbeat send failure.
    ...close_connection...

Note the comment: a blocked connection (no recvloop running) cannot detect TCP disconnect via async_recv. Heartbeat send failure is the only reliable disconnect signal for blocked connections.

SSL / TLS Integration #

rabbit_net is the abstraction layer over gen_tcp and ssl. All socket operations (recv, send, async_recv, setopts) go through rabbit_net, which dispatches to the appropriate module based on the socket type. Ranch handles TLS handshake before passing the socket to rabbit_reader; once the reader has the socket, TLS is transparent.

Summary #

rabbit_reader is a custom special process running a recvloop/mainloop event loop. Frame parsing is a progressive state machine: read 7-byte header → read payload → validate sentinel. Parsed method frames are dispatched to rabbit_channel processes via message-passing. rabbit_writer is a separate process handling outgoing serialization. Backpressure flows from the rabbit_alarm system → throttle record → connection_state = blocked → TCP flow control → producer blocks. rabbit_heartbeat detects dead connections especially during flow-controlled idle periods.