Skip to main content
  1. AMQP 0-9-1: The Complete Protocol/

Messaging Patterns

Messaging Patterns #

AMQP’s exchange/queue/binding model supports a small set of compositional patterns that cover the needs of nearly every messaging use case. Each pattern is a specific configuration of exchanges, queues, and bindings — the routing topology.

Pattern 1: Work Queue (Task Distribution) #

Problem: distribute tasks among multiple workers, ensuring each task is processed by exactly one worker.

Topology:

Producer → [default exchange] → [work-queue] → Worker 1
                                              → Worker 2
                                              → Worker 3

All workers consume from the same queue. The broker distributes messages round-robin by default.

# Producer
channel.queue_declare('tasks', durable=True)
channel.basic_publish(
    exchange='',
    routing_key='tasks',
    body=json.dumps(task).encode(),
    properties=pika.BasicProperties(delivery_mode=2)
)

# Worker (multiple instances)
channel.basic_qos(prefetch_count=1)
channel.queue_declare('tasks', durable=True)  # idempotent

def process_task(channel, method, properties, body):
    task = json.loads(body)
    execute(task)
    channel.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume('tasks', process_task)
channel.start_consuming()

Key settings:

  • durable=True + delivery_mode=2: tasks survive broker restart.
  • prefetch_count=1: fair dispatch — a slow worker gets only one task at a time; faster workers get more.
  • Manual ack: if the worker crashes mid-task, the task is re-delivered.

Multiple workers as consumers on the same queue: each message is delivered to exactly one consumer. This is AMQP’s competing consumer model — the queue implements the distribution automatically.

Pattern 2: Publish/Subscribe (Fan-Out) #

Problem: broadcast a message to multiple independent consumers, each receiving a full copy.

Topology:

Producer → [fanout exchange: 'notifications']
              → queue 'email-consumer'     → Email Service
              → queue 'sms-consumer'       → SMS Service
              → queue 'dashboard-consumer' → Dashboard

Each service has its own queue. A message published to the fanout exchange is independently delivered to each bound queue.

# Setup (done once, or by each service on startup)
channel.exchange_declare('notifications', 'fanout', durable=True)

# Each service declares its own queue and binds it
channel.queue_declare('email-queue', durable=True)
channel.queue_bind('email-queue', 'notifications', routing_key='')  # key ignored

# Producer
channel.basic_publish(
    exchange='notifications',
    routing_key='',  # ignored by fanout exchange
    body=json.dumps(event).encode(),
    properties=pika.BasicProperties(delivery_mode=2)
)

# Consumer (email service)
def on_notification(channel, method, properties, body):
    send_email(json.loads(body))
    channel.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume('email-queue', on_notification)

Key property: each service is decoupled from others. Adding a new service means declaring a new queue and binding it — the producer and existing consumers are unchanged.

Transient consumers: for consumers that only need messages while they are running (e.g., a live dashboard), use an exclusive, auto-delete queue:

result = channel.queue_declare(queue='', exclusive=True)
channel.queue_bind(result.method.queue, 'notifications', routing_key='')

When the consumer disconnects, the queue auto-deletes. It only receives messages published while it is running.

Pattern 3: Selective Routing (Direct Exchange) #

Problem: route messages to specific queues based on event type.

Topology:

Producer → [direct exchange: 'events']
  routing_key='order.created'  → queue 'order-service'
  routing_key='payment.charged' → queue 'payment-service'
  routing_key='order.created'  → queue 'audit-log'    ← same key, two queues
channel.exchange_declare('events', 'direct', durable=True)

# Order service binds to its events
channel.queue_declare('order-service', durable=True)
channel.queue_bind('order-service', 'events', 'order.created')
channel.queue_bind('order-service', 'events', 'order.cancelled')

# Audit log binds to everything
channel.queue_bind('audit-log', 'events', 'order.created')
channel.queue_bind('audit-log', 'events', 'payment.charged')
channel.queue_bind('audit-log', 'events', 'order.cancelled')

# Producer
channel.basic_publish(
    exchange='events',
    routing_key='order.created',
    body=json.dumps(order_event).encode(),
    properties=pika.BasicProperties(delivery_mode=2)
)

When to use direct vs fanout: fanout for true broadcast (every consumer always gets every message); direct for selective delivery where each message type goes to a specific subset of consumers.

Pattern 4: Topic Routing (Multi-Criteria) #

Problem: route messages based on multiple-word routing keys with pattern matching.

Topology:

Producer → [topic exchange: 'logs']
  routing_key format: '<severity>.<service>.<region>'

  binding '*.error.*'       → queue 'error-monitor'
  binding 'payment.#'       → queue 'payment-team'
  binding '#.us'            → queue 'us-region-dashboard'
  binding 'audit.*.*'       → queue 'audit-trail'
channel.exchange_declare('logs', 'topic', durable=True)

channel.queue_declare('error-monitor', durable=True)
channel.queue_bind('error-monitor', 'logs', '*.error.*')

channel.queue_declare('payment-team', durable=True)
channel.queue_bind('payment-team', 'logs', 'payment.#')

# Producer
channel.basic_publish(
    exchange='logs',
    routing_key='payment.error.us',  # matches '*.error.*', 'payment.#', '#.us'
    body=log_entry.encode()
)

Routing key design for topics: use a consistent schema. <service>.<severity>.<region> or <domain>.<event>.<context> are common. The schema must be decided at system design time — it cannot be changed after deployment without updating all bindings.

Pattern 5: RPC over AMQP #

Problem: implement synchronous request-response over an async messaging layer.

Topology:

RPC Client → [default exchange] → [rpc-server-queue] → RPC Server
                                                           ↓
RPC Client ← [default exchange] ← [reply-queue (exclusive)] ←
import uuid

class RpcClient:
    def __init__(self, channel):
        # Exclusive reply queue — one per client connection
        result = channel.queue_declare(queue='', exclusive=True)
        self.reply_queue = result.method.queue
        self.channel = channel
        self.responses = {}

        channel.basic_consume(
            queue=self.reply_queue,
            on_message_callback=self._on_response,
            auto_ack=True
        )

    def _on_response(self, channel, method, properties, body):
        self.responses[properties.correlation_id] = body

    def call(self, request_body: bytes, timeout=5.0) -> bytes:
        corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc-server',
            body=request_body,
            properties=pika.BasicProperties(
                reply_to=self.reply_queue,
                correlation_id=corr_id,
            )
        )
        deadline = time.time() + timeout
        while corr_id not in self.responses:
            self.channel.connection.process_data_events(time_limit=0.1)
            if time.time() > deadline:
                raise TimeoutError("RPC timeout")
        return self.responses.pop(corr_id)


# RPC Server
def on_rpc_request(channel, method, properties, body):
    result = compute(body)
    channel.basic_publish(
        exchange='',
        routing_key=properties.reply_to,
        body=result,
        properties=pika.BasicProperties(
            correlation_id=properties.correlation_id
        )
    )
    channel.basic_ack(delivery_tag=method.delivery_tag)

channel.queue_declare('rpc-server', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume('rpc-server', on_rpc_request)

Key elements:

  1. Exclusive reply queue: one per client (or per request for request-scoped queues).
  2. correlation_id: the client matches responses to requests.
  3. reply_to: the server publishes back to the client’s reply queue.
  4. Server acks after responding: ensures the request is not re-queued if the server crashes mid-processing.

Timeout handling: if the server does not respond within the timeout, the client should cancel the outstanding request. The RPC pattern does not have a built-in cancellation mechanism — this must be handled at the application level (e.g., the server checks if the reply-to queue still exists before responding, or uses expiration on the request message).

Per-request reply queues: for high-security or high-isolation requirements, create a new exclusive queue per RPC call. This adds latency (queue declare round-trip) but guarantees no response mixing.

Pattern 6: Dead Letter Exchange (DLX) #

Problem: handle messages that cannot be processed — expired, rejected, or overflow.

Topology:

Producer → [exchange] → [work-queue] → Consumer
                                ↓ (rejected / expired / overflow)
                        [DLX exchange] → [dead-letter-queue] → Error Handler
# DLX setup
channel.exchange_declare('dlx', 'topic', durable=True)
channel.queue_declare('dead-letters', durable=True)
channel.queue_bind('dead-letters', 'dlx', routing_key='#')

# Work queue with DLX configured
channel.queue_declare(
    'tasks',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'tasks.failed',
        'x-message-ttl': 3600000,  # expire after 1 hour
    }
)

# Consumer that handles failures
def process(channel, method, properties, body):
    if is_unprocessable(body):
        channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
        # → message routed to 'dlx' exchange → 'dead-letters' queue
    else:
        execute(body)
        channel.basic_ack(delivery_tag=method.delivery_tag)

DLX headers: the dead-lettered message arrives with x-death headers containing the original queue, rejection reason, and timestamp — useful for error analysis.

Delayed retry with DLX: create a retry queue with a TTL and DLX pointing back to the original exchange:

# Retry queue: messages expire after 30 seconds and route back to main exchange
channel.queue_declare(
    'tasks-retry',
    durable=True,
    arguments={
        'x-message-ttl': 30000,              # 30 second delay
        'x-dead-letter-exchange': 'work',    # after TTL, route back
        'x-dead-letter-routing-key': 'tasks',
    }
)
# When a task fails (transient error):
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
channel.basic_publish(exchange='', routing_key='tasks-retry', body=body,
                      properties=properties)
# Message waits 30 seconds in tasks-retry, then auto-routes back to tasks queue

This implements exponential backoff retry without a separate scheduling service.

Pattern 7: Priority Queue #

Problem: process high-priority messages before low-priority ones.

channel.queue_declare(
    'priority-tasks',
    durable=True,
    arguments={'x-max-priority': 10}
)

# High-priority publish
channel.basic_publish(
    exchange='',
    routing_key='priority-tasks',
    body=urgent_task,
    properties=pika.BasicProperties(priority=9)
)

# Normal publish
channel.basic_publish(
    exchange='',
    routing_key='priority-tasks',
    body=normal_task,
    properties=pika.BasicProperties(priority=1)
)

The broker delivers priority=9 messages before priority=1 messages, regardless of arrival order. If both arrive simultaneously, high priority wins.

Practical limitation: priority only helps if the queue has a backlog. If the consumer processes messages as fast as they arrive (empty queue most of the time), priority has no effect — there is nothing to reorder.

Combining Patterns #

Real systems combine patterns. A common production topology:

External → [topic exchange: 'raw-events']
              → [queue: 'order-processor']  (topic: 'order.*')
              → [queue: 'payment-processor'] (topic: 'payment.*')
              → [queue: 'audit-log']         (topic: '#')

order-processor queue:
  - durable=True
  - x-dead-letter-exchange=dlx
  - x-message-ttl=3600000
  - consumed by 5 order-worker instances
  - prefetch_count=10
  - persistent messages (delivery-mode=2)
  - publisher confirms on the producer side

This is: selective routing (topic exchange) + work queue (competing consumers on order-processor) + dead letter handling + backpressure (prefetch) + crash safety (durable + persistent + confirms).

Pattern Anti-Patterns #

One queue per message type, one producer per queue: breaks the exchange model. Use exchanges and bindings instead of routing keys embedded in queue names.

Polling with basic.get in a loop: CPU-wasteful and high-latency. Use basic.consume.

Sharing a channel across threads: causes interleaved frames, broker errors, and corrupt channel state.

RPC without timeout: if the server dies, the client waits forever. Always set a timeout on RPC calls.

Infinite requeue loop: nacking with requeue=True on always-failing messages burns CPU. Use x-delivery-limit (quorum queues) or DLX-based retry with limiting logic.

Summary #

AMQP’s seven standard patterns are composable configurations of exchanges, queues, and bindings. Work queues distribute tasks with competing consumers; fan-out broadcasts to independent queues; direct/topic exchanges route by type/pattern; RPC uses reply-to and correlation-id for bidirectional messaging; DLX handles failures; priority queues reorder delivery. Production systems combine these patterns: topic exchange for routing, competing consumers for scaling, DLX for error handling, prefetch for backpressure, persistence + confirms for durability.