- My Development Notes/
- AMQP 0-9-1: The Complete Protocol/
- Prefetch and QoS — Flow Control at the Consumer/
Prefetch and QoS — Flow Control at the Consumer
Table of Contents
Prefetch and QoS — Flow Control at the Consumer #
Without flow control, a broker would send messages to a consumer as fast as the network allows. A consumer with a slow processing function would accumulate a backlog of messages in its local buffer — messages the broker considers “delivered” but the consumer has not yet processed. This is the prefetch problem: how many messages should the broker send before waiting for acknowledgments?
AMQP’s answer is basic.qos (Quality of Service), specifically the prefetch_count setting.
basic.qos: The Prefetch Limit #
basic.qos sets the maximum number of unacknowledged messages the broker will deliver to a consumer at once:
channel.basic_qos(
prefetch_count=10, # max 10 unacked messages in flight
prefetch_size=0, # max bytes (0 = no byte limit, rarely used)
global_=False # False = per-consumer; True = per-channel
)
After basic.qos(prefetch_count=10), the broker tracks how many messages are outstanding (delivered but not acked) for this consumer. When the outstanding count reaches 10, the broker stops sending new messages until the consumer acks some:
prefetch_count=3:
Broker delivers: msg1 (outstanding=1)
Broker delivers: msg2 (outstanding=2)
Broker delivers: msg3 (outstanding=3)
Broker STOPS — at prefetch limit
Consumer acks msg1 (outstanding=2)
Broker delivers: msg4 (outstanding=3)
Broker STOPS again
Consumer acks msg2, msg3 (outstanding=1)
Broker delivers: msg5, msg6 (outstanding=3)
...
The outstanding count is the “in-flight window” — the number of messages the broker has sent but not yet received acks for.
Choosing the Right prefetch_count #
prefetch_count=1 (the “fair dispatch” setting):
channel.basic_qos(prefetch_count=1)
The broker delivers one message, waits for the ack, delivers the next. This ensures that if two consumers have different processing speeds, the slower one does not receive more messages than it can handle in time. No consumer receives a second message until it acks the first.
Downside: extremely low throughput for fast consumers. Every message requires a round-trip before the next is delivered. For a 1ms processing time and 5ms round-trip latency, the consumer is idle 80% of the time.
prefetch_count=N (higher values):
Higher prefetch allows the broker to pipeline messages — while the consumer processes one message, the next N-1 are already in transit or buffered. This dramatically improves throughput for fast consumers.
The throughput curve:
prefetch=1: → limited by round-trip latency
prefetch=10: → much better: pipelining hides latency
prefetch=100: → better still, diminishing returns
prefetch=1000: → essentially no prefetch limit (may overload slow consumers)
Rule of thumb: set prefetch_count to 2–3× the number of messages the consumer can process per round-trip latency. For a consumer that processes 100 messages/second and a round-trip of 10ms, set prefetch_count = 100 * 0.01 * 2 = 2.
For most applications, prefetch_count=10 to prefetch_count=100 is the sweet spot. Monitor the unacked count in the broker management UI — if it consistently hovers at the prefetch limit, increase it. If it is always low, decrease it.
Per-Consumer vs Per-Channel Prefetch #
The global_ flag controls the scope of the prefetch limit:
global_ | Scope | Behavior |
|---|---|---|
False (default) | Per consumer | Each basic.consume subscription has its own prefetch window |
True | Per channel | All consumers on this channel share one prefetch window |
# Per-consumer: each consumer gets its own 10-message window
channel.basic_qos(prefetch_count=10, global_=False)
channel.basic_consume(queue='queue-a', on_message_callback=callback_a)
channel.basic_consume(queue='queue-b', on_message_callback=callback_b)
# callback_a can have 10 unacked, callback_b can have 10 unacked → 20 total
# Per-channel: both consumers share one 10-message window
channel.basic_qos(prefetch_count=10, global_=True)
channel.basic_consume(queue='queue-a', on_message_callback=callback_a)
channel.basic_consume(queue='queue-b', on_message_callback=callback_b)
# Total unacked across both consumers ≤ 10
When to use per-channel prefetch: when multiple queues are consumed on the same channel and you need a total memory budget across all of them. Rare in practice — most applications use one consumer per channel.
When to use per-consumer prefetch: the default. Each consumer is independently flow-controlled.
No prefetch_count (Unlimited) #
Not calling basic.qos, or calling basic.qos(prefetch_count=0), disables prefetch:
channel.basic_consume(queue='logs', on_message_callback=callback, auto_ack=True)
# No basic.qos call → unlimited prefetch
With unlimited prefetch, the broker pushes all available messages immediately. For consumers with auto_ack=True (at-most-once), this is fine — messages are acked on delivery. For consumers with manual ack and slow processing, unlimited prefetch creates an unbounded buffer on the consumer side.
Unlimited prefetch failure mode: if a queue has 1 million messages and one consumer connects without prefetch, the broker delivers all 1 million messages immediately. The consumer’s in-memory buffer fills up, possibly causing OOM. The messages are stuck unacknowledged — other consumers cannot receive them because the broker has “given” them to the first consumer. This is the prefetch starvation failure mode.
Prefetch and Fairness #
Without prefetch (or with high prefetch), message distribution across multiple consumers is unfair:
Queue: 1000 messages
Consumer A: fast (10ms/message)
Consumer B: slow (1000ms/message)
Without prefetch:
Round-robin delivers 500 to A and 500 to B
A finishes in 5 seconds, sits idle
B takes 500 seconds to finish
Total time: 500 seconds
With prefetch_count=1:
A finishes its message → gets next → finishes quickly
B slowly processes its messages
A handles ~90% of the messages
Total time: ~55 seconds (much faster)
prefetch_count=1 implements work stealing — faster consumers naturally get more work.
Prefetch and Acknowledgment Latency #
There is a connection between prefetch_count and acknowledgment latency:
Low prefetch_count (1-5): tight coupling between ack latency and throughput. A slow ack directly blocks message delivery.
High prefetch_count (100+): pipelining decouples ack latency from throughput. The broker continues delivering messages without waiting for each ack.
For latency-sensitive applications (e.g., RPC servers), high prefetch can introduce head-of-line blocking: a slow message blocks the channel’s processing even though other messages are queued behind it (if the channel is single-threaded).
Solution: one channel + one consumer per worker thread, not multiple consumers on one channel.
Prefetch in Async Frameworks #
In asyncio-based consumers (aio-pika), prefetch controls concurrency:
import asyncio
import aio_pika
async def main():
connection = await aio_pika.connect_robust("amqp://localhost/")
channel = await connection.channel()
await channel.set_qos(prefetch_count=10)
queue = await channel.declare_queue("tasks", durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process(): # auto-ack on context exit
await process(message.body)
asyncio.run(main())
With prefetch_count=10, the async consumer will have at most 10 concurrent processing coroutines. When all 10 are active, the event loop stops receiving new messages until one completes and acks.
This is natural concurrency control for async consumers — no explicit semaphore needed.
prefetch_size (Rarely Used) #
basic.qos also accepts prefetch_size: the maximum total byte size of unacknowledged messages. When the total body size of outstanding messages exceeds prefetch_size, the broker stops delivering.
channel.basic_qos(prefetch_size=1024*1024) # max 1 MB in flight
Why it is rarely used: prefetch_count is simpler to reason about (message count vs byte count). prefetch_size is also not well-supported in all brokers. For variable-size messages, prefetch_count combined with a reasonable estimate of average message size is easier to tune.
RabbitMQ’s implementation note: prefetch_size is accepted but the value is ignored in current RabbitMQ versions. Only prefetch_count is enforced.
Summary #
basic.qos(prefetch_count=N) is the consumer’s flow control mechanism. It limits the broker to N unacked messages per consumer (or per channel with global_=True). Low prefetch (1) ensures fairness across consumers with different processing speeds. High prefetch (50-200) maximizes throughput by pipelining deliveries. No prefetch risks consumer memory exhaustion and message starvation. For most applications, prefetch_count=10 to 100 balances throughput and fairness.