Messaging Patterns
Table of Contents
Messaging Patterns #
AMQP’s exchange/queue/binding model supports a small set of compositional patterns that cover the needs of nearly every messaging use case. Each pattern is a specific configuration of exchanges, queues, and bindings — the routing topology.
Pattern 1: Work Queue (Task Distribution) #
Problem: distribute tasks among multiple workers, ensuring each task is processed by exactly one worker.
Topology:
Producer → [default exchange] → [work-queue] → Worker 1
→ Worker 2
→ Worker 3
All workers consume from the same queue. The broker distributes messages round-robin by default.
# Producer
channel.queue_declare('tasks', durable=True)
channel.basic_publish(
exchange='',
routing_key='tasks',
body=json.dumps(task).encode(),
properties=pika.BasicProperties(delivery_mode=2)
)
# Worker (multiple instances)
channel.basic_qos(prefetch_count=1)
channel.queue_declare('tasks', durable=True) # idempotent
def process_task(channel, method, properties, body):
task = json.loads(body)
execute(task)
channel.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume('tasks', process_task)
channel.start_consuming()
Key settings:
durable=True+delivery_mode=2: tasks survive broker restart.prefetch_count=1: fair dispatch — a slow worker gets only one task at a time; faster workers get more.- Manual ack: if the worker crashes mid-task, the task is re-delivered.
Multiple workers as consumers on the same queue: each message is delivered to exactly one consumer. This is AMQP’s competing consumer model — the queue implements the distribution automatically.
Pattern 2: Publish/Subscribe (Fan-Out) #
Problem: broadcast a message to multiple independent consumers, each receiving a full copy.
Topology:
Producer → [fanout exchange: 'notifications']
→ queue 'email-consumer' → Email Service
→ queue 'sms-consumer' → SMS Service
→ queue 'dashboard-consumer' → Dashboard
Each service has its own queue. A message published to the fanout exchange is independently delivered to each bound queue.
# Setup (done once, or by each service on startup)
channel.exchange_declare('notifications', 'fanout', durable=True)
# Each service declares its own queue and binds it
channel.queue_declare('email-queue', durable=True)
channel.queue_bind('email-queue', 'notifications', routing_key='') # key ignored
# Producer
channel.basic_publish(
exchange='notifications',
routing_key='', # ignored by fanout exchange
body=json.dumps(event).encode(),
properties=pika.BasicProperties(delivery_mode=2)
)
# Consumer (email service)
def on_notification(channel, method, properties, body):
send_email(json.loads(body))
channel.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume('email-queue', on_notification)
Key property: each service is decoupled from others. Adding a new service means declaring a new queue and binding it — the producer and existing consumers are unchanged.
Transient consumers: for consumers that only need messages while they are running (e.g., a live dashboard), use an exclusive, auto-delete queue:
result = channel.queue_declare(queue='', exclusive=True)
channel.queue_bind(result.method.queue, 'notifications', routing_key='')
When the consumer disconnects, the queue auto-deletes. It only receives messages published while it is running.
Pattern 3: Selective Routing (Direct Exchange) #
Problem: route messages to specific queues based on event type.
Topology:
Producer → [direct exchange: 'events']
routing_key='order.created' → queue 'order-service'
routing_key='payment.charged' → queue 'payment-service'
routing_key='order.created' → queue 'audit-log' ← same key, two queues
channel.exchange_declare('events', 'direct', durable=True)
# Order service binds to its events
channel.queue_declare('order-service', durable=True)
channel.queue_bind('order-service', 'events', 'order.created')
channel.queue_bind('order-service', 'events', 'order.cancelled')
# Audit log binds to everything
channel.queue_bind('audit-log', 'events', 'order.created')
channel.queue_bind('audit-log', 'events', 'payment.charged')
channel.queue_bind('audit-log', 'events', 'order.cancelled')
# Producer
channel.basic_publish(
exchange='events',
routing_key='order.created',
body=json.dumps(order_event).encode(),
properties=pika.BasicProperties(delivery_mode=2)
)
When to use direct vs fanout: fanout for true broadcast (every consumer always gets every message); direct for selective delivery where each message type goes to a specific subset of consumers.
Pattern 4: Topic Routing (Multi-Criteria) #
Problem: route messages based on multiple-word routing keys with pattern matching.
Topology:
Producer → [topic exchange: 'logs']
routing_key format: '<severity>.<service>.<region>'
binding '*.error.*' → queue 'error-monitor'
binding 'payment.#' → queue 'payment-team'
binding '#.us' → queue 'us-region-dashboard'
binding 'audit.*.*' → queue 'audit-trail'
channel.exchange_declare('logs', 'topic', durable=True)
channel.queue_declare('error-monitor', durable=True)
channel.queue_bind('error-monitor', 'logs', '*.error.*')
channel.queue_declare('payment-team', durable=True)
channel.queue_bind('payment-team', 'logs', 'payment.#')
# Producer
channel.basic_publish(
exchange='logs',
routing_key='payment.error.us', # matches '*.error.*', 'payment.#', '#.us'
body=log_entry.encode()
)
Routing key design for topics: use a consistent schema. <service>.<severity>.<region> or <domain>.<event>.<context> are common. The schema must be decided at system design time — it cannot be changed after deployment without updating all bindings.
Pattern 5: RPC over AMQP #
Problem: implement synchronous request-response over an async messaging layer.
Topology:
RPC Client → [default exchange] → [rpc-server-queue] → RPC Server
↓
RPC Client ← [default exchange] ← [reply-queue (exclusive)] ←
import uuid
class RpcClient:
def __init__(self, channel):
# Exclusive reply queue — one per client connection
result = channel.queue_declare(queue='', exclusive=True)
self.reply_queue = result.method.queue
self.channel = channel
self.responses = {}
channel.basic_consume(
queue=self.reply_queue,
on_message_callback=self._on_response,
auto_ack=True
)
def _on_response(self, channel, method, properties, body):
self.responses[properties.correlation_id] = body
def call(self, request_body: bytes, timeout=5.0) -> bytes:
corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc-server',
body=request_body,
properties=pika.BasicProperties(
reply_to=self.reply_queue,
correlation_id=corr_id,
)
)
deadline = time.time() + timeout
while corr_id not in self.responses:
self.channel.connection.process_data_events(time_limit=0.1)
if time.time() > deadline:
raise TimeoutError("RPC timeout")
return self.responses.pop(corr_id)
# RPC Server
def on_rpc_request(channel, method, properties, body):
result = compute(body)
channel.basic_publish(
exchange='',
routing_key=properties.reply_to,
body=result,
properties=pika.BasicProperties(
correlation_id=properties.correlation_id
)
)
channel.basic_ack(delivery_tag=method.delivery_tag)
channel.queue_declare('rpc-server', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('rpc-server', on_rpc_request)
Key elements:
- Exclusive reply queue: one per client (or per request for request-scoped queues).
- correlation_id: the client matches responses to requests.
- reply_to: the server publishes back to the client’s reply queue.
- Server acks after responding: ensures the request is not re-queued if the server crashes mid-processing.
Timeout handling: if the server does not respond within the timeout, the client should cancel the outstanding request. The RPC pattern does not have a built-in cancellation mechanism — this must be handled at the application level (e.g., the server checks if the reply-to queue still exists before responding, or uses expiration on the request message).
Per-request reply queues: for high-security or high-isolation requirements, create a new exclusive queue per RPC call. This adds latency (queue declare round-trip) but guarantees no response mixing.
Pattern 6: Dead Letter Exchange (DLX) #
Problem: handle messages that cannot be processed — expired, rejected, or overflow.
Topology:
Producer → [exchange] → [work-queue] → Consumer
↓ (rejected / expired / overflow)
[DLX exchange] → [dead-letter-queue] → Error Handler
# DLX setup
channel.exchange_declare('dlx', 'topic', durable=True)
channel.queue_declare('dead-letters', durable=True)
channel.queue_bind('dead-letters', 'dlx', routing_key='#')
# Work queue with DLX configured
channel.queue_declare(
'tasks',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'tasks.failed',
'x-message-ttl': 3600000, # expire after 1 hour
}
)
# Consumer that handles failures
def process(channel, method, properties, body):
if is_unprocessable(body):
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
# → message routed to 'dlx' exchange → 'dead-letters' queue
else:
execute(body)
channel.basic_ack(delivery_tag=method.delivery_tag)
DLX headers: the dead-lettered message arrives with x-death headers containing the original queue, rejection reason, and timestamp — useful for error analysis.
Delayed retry with DLX: create a retry queue with a TTL and DLX pointing back to the original exchange:
# Retry queue: messages expire after 30 seconds and route back to main exchange
channel.queue_declare(
'tasks-retry',
durable=True,
arguments={
'x-message-ttl': 30000, # 30 second delay
'x-dead-letter-exchange': 'work', # after TTL, route back
'x-dead-letter-routing-key': 'tasks',
}
)
# When a task fails (transient error):
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
channel.basic_publish(exchange='', routing_key='tasks-retry', body=body,
properties=properties)
# Message waits 30 seconds in tasks-retry, then auto-routes back to tasks queue
This implements exponential backoff retry without a separate scheduling service.
Pattern 7: Priority Queue #
Problem: process high-priority messages before low-priority ones.
channel.queue_declare(
'priority-tasks',
durable=True,
arguments={'x-max-priority': 10}
)
# High-priority publish
channel.basic_publish(
exchange='',
routing_key='priority-tasks',
body=urgent_task,
properties=pika.BasicProperties(priority=9)
)
# Normal publish
channel.basic_publish(
exchange='',
routing_key='priority-tasks',
body=normal_task,
properties=pika.BasicProperties(priority=1)
)
The broker delivers priority=9 messages before priority=1 messages, regardless of arrival order. If both arrive simultaneously, high priority wins.
Practical limitation: priority only helps if the queue has a backlog. If the consumer processes messages as fast as they arrive (empty queue most of the time), priority has no effect — there is nothing to reorder.
Combining Patterns #
Real systems combine patterns. A common production topology:
External → [topic exchange: 'raw-events']
→ [queue: 'order-processor'] (topic: 'order.*')
→ [queue: 'payment-processor'] (topic: 'payment.*')
→ [queue: 'audit-log'] (topic: '#')
order-processor queue:
- durable=True
- x-dead-letter-exchange=dlx
- x-message-ttl=3600000
- consumed by 5 order-worker instances
- prefetch_count=10
- persistent messages (delivery-mode=2)
- publisher confirms on the producer side
This is: selective routing (topic exchange) + work queue (competing consumers on order-processor) + dead letter handling + backpressure (prefetch) + crash safety (durable + persistent + confirms).
Pattern Anti-Patterns #
One queue per message type, one producer per queue: breaks the exchange model. Use exchanges and bindings instead of routing keys embedded in queue names.
Polling with basic.get in a loop: CPU-wasteful and high-latency. Use basic.consume.
Sharing a channel across threads: causes interleaved frames, broker errors, and corrupt channel state.
RPC without timeout: if the server dies, the client waits forever. Always set a timeout on RPC calls.
Infinite requeue loop: nacking with requeue=True on always-failing messages burns CPU. Use x-delivery-limit (quorum queues) or DLX-based retry with limiting logic.
Summary #
AMQP’s seven standard patterns are composable configurations of exchanges, queues, and bindings. Work queues distribute tasks with competing consumers; fan-out broadcasts to independent queues; direct/topic exchanges route by type/pattern; RPC uses reply-to and correlation-id for bidirectional messaging; DLX handles failures; priority queues reorder delivery. Production systems combine these patterns: topic exchange for routing, competing consumers for scaling, DLX for error handling, prefetch for backpressure, persistence + confirms for durability.