Skip to main content
  1. Distributed Coordination: The Hidden Component/

Assignment at Application Level — Work Distribution and Routing

Assignment at Application Level — Work Distribution and Routing #

Chapter 5 covered Assignment at the infrastructure level: consistent hashing assigns data partitions to storage nodes, Kafka assigns stream partitions to consumer group members. At the application level, Assignment is the same mechanism applied to work: which worker processes this job, which crawler instance fetches this URL domain, which database shard holds this user’s data.

The correctness requirements carry over:

Stability: the same work item maps to the same worker, unless membership changes. Unstable assignment causes cache invalidation (a stateful worker loses its cached state), wasted duplication, or contention.

Exactly-one execution: for jobs with side effects (sending emails, charging payments), exactly one worker must execute the job — not zero, not two.

Load balance: work distributes approximately evenly so no worker is a bottleneck.

The tension between stability and exactly-one execution is the defining challenge of application-level Assignment. A stable assignment can fail to execute (the assigned worker is down). Reassigning to another worker risks double execution if the original worker is slow, not dead.

Task Queues #

A task queue is the most common application-level Assignment mechanism. Work items are enqueued; worker processes dequeue and execute them. The queue mediates assignment: it ensures each item is assigned to at most one worker, handles failures by re-enqueuing, and provides visibility into pending work.

Visibility Timeout Pattern #

The core mechanism for exactly-one semantics in a task queue is the visibility timeout: when a worker dequeues a task, the task becomes invisible to other workers for a configurable duration. If the worker completes and acknowledges the task within the timeout, the task is deleted. If the worker crashes or exceeds the timeout, the task becomes visible again and another worker can claim it.

# AWS SQS / Redis-based task queue model
def process_tasks(queue):
    while True:
        # Dequeue with 30-second visibility timeout
        task = queue.dequeue(visibility_timeout=30)
        if not task:
            time.sleep(1)
            continue

        try:
            execute(task)
            queue.acknowledge(task.receipt_handle)  # Delete from queue
        except Exception:
            # Do not acknowledge → task becomes visible again after timeout
            pass

The visibility timeout tradeoff:

  • Too short: slow tasks re-enqueue before completion → double execution.
  • Too long: failed worker delays re-processing → latency spike.

Heartbeat extension: for long-running tasks, the worker extends the visibility timeout while processing:

async def process_with_heartbeat(queue, task):
    heartbeat_task = asyncio.create_task(
        extend_visibility_loop(queue, task, interval=10, extension=30)
    )
    try:
        await execute(task)
        queue.acknowledge(task.receipt_handle)
    finally:
        heartbeat_task.cancel()

The worker sends a heartbeat every 10 seconds to extend the timeout by 30 seconds. If the worker crashes, heartbeats stop and the timeout expires naturally.

At-Least-Once Execution + Idempotent Tasks #

The visibility timeout model provides at-least-once execution (a task may be executed more than once if the timeout expires before acknowledgment). For tasks with side effects, the task must be idempotent:

def send_order_confirmation_email(task):
    # Idempotent: check if already sent before sending
    if db.exists("SELECT 1 FROM sent_emails WHERE task_id = $1", task.id):
        return  # Already sent — skip
    send_email(task.order_id)
    db.insert("INSERT INTO sent_emails (task_id, sent_at) VALUES ($1, NOW())", task.id)

The sent_emails table is a deduplication log — Suppression in service of Assignment correctness.

Exactly-Once Execution via Database Claim #

For tasks that cannot tolerate double execution even briefly, use a database claim instead of visibility timeout:

-- Atomically claim the next available task
WITH claimed AS (
    UPDATE tasks
    SET status = 'processing',
        worker_id = $worker_id,
        claimed_at = NOW()
    WHERE id = (
        SELECT id FROM tasks
        WHERE status = 'pending'
          AND (next_retry_at IS NULL OR next_retry_at <= NOW())
        ORDER BY priority DESC, created_at ASC
        FOR UPDATE SKIP LOCKED
        LIMIT 1
    )
    RETURNING *
)
SELECT * FROM claimed;

FOR UPDATE SKIP LOCKED is PostgreSQL’s advisory for task queues: acquire a row lock on the selected row, skipping any rows already locked by other workers. This enables multiple concurrent workers to safely claim distinct tasks without contention.

Failure recovery: a background process periodically scans for tasks stuck in processing with a claimed_at older than the expected max execution time and resets them to pending:

UPDATE tasks
SET status = 'pending', worker_id = NULL, claimed_at = NULL,
    retry_count = retry_count + 1
WHERE status = 'processing'
  AND claimed_at < NOW() - INTERVAL '5 minutes';

This model provides at-most-once execution per claim window (exactly-once in practice for non-crashed workers) with a bounded recovery delay on failure.

Job Scheduler: Exactly-One Trigger #

A job scheduler (cron-style, triggering periodic jobs) has a stricter requirement than a task queue: each job must be triggered exactly once per schedule interval, by exactly one node. Multiple scheduler nodes running the same job causes duplicate side effects (duplicate reports, duplicate billing runs, duplicate notifications).

Leader Election for Scheduling #

The simplest approach: combine Selection (Chapter 3) with Assignment. Elect a single leader among scheduler nodes using etcd; only the leader triggers jobs.

election := concurrency.NewElection(session, "/scheduler/leader")
election.Campaign(ctx, nodeID)

// Only the leader schedules jobs
for {
    if isLeader(election) {
        runDueJobs()
    }
    time.Sleep(1 * time.Second)
}

Failure mode: if the leader crashes between triggering a job and recording that it was triggered, the next leader will trigger the same job again on takeover.

Mitigation: record the job trigger in the same transaction as claiming leadership for that interval:

-- Claim this job for this interval atomically
INSERT INTO job_runs (job_id, interval_start, triggered_by, status)
VALUES ($job_id, $interval_start, $node_id, 'running')
ON CONFLICT (job_id, interval_start) DO NOTHING;
-- If 0 rows inserted: another node already triggered this interval

The (job_id, interval_start) unique constraint ensures exactly-one trigger per interval, regardless of how many nodes attempt to claim it. This is CAS-based Assignment — the same pattern as seat reservation but for job scheduling.

Distributed Cron Without a Leader #

For systems where a central leader is undesirable (single point of failure concerns), each scheduler node can attempt to claim each job interval independently:

def try_run_job(job_id: str, interval_start: datetime) -> bool:
    # Atomic claim: only one node succeeds
    rows = db.execute("""
        INSERT INTO job_runs (job_id, interval_start, node_id, status)
        VALUES ($1, $2, $3, 'running')
        ON CONFLICT (job_id, interval_start) DO NOTHING
    """, job_id, interval_start, node_id)
    return rows.rowcount == 1  # True if this node won the claim

if try_run_job(job_id, current_interval):
    execute_job(job_id)
    mark_job_complete(job_id, current_interval)

All nodes run the same logic. The database constraint ensures exactly one claim succeeds. This trades leader election overhead for database write overhead per job per interval — acceptable for low-frequency jobs, potentially problematic for high-frequency jobs (hundreds per second) where database contention becomes a bottleneck.

Web Crawler: Domain-Based Assignment #

A web crawler must fetch billions of URLs efficiently while respecting per-domain rate limits (politeness). Assigning URLs to crawler instances by domain provides both stability (a domain’s queue is always processed by the same instance) and politeness (one instance handles one domain’s rate limit).

Domain Sharding #

URLs are assigned to crawler instances by hashing the domain:

def assign_url_to_crawler(url: str, num_crawlers: int) -> int:
    domain = extract_domain(url)  # "news.ycombinator.com"
    return hash(domain) % num_crawlers

# All URLs from ycombinator.com → crawler instance 3
# All URLs from nytimes.com → crawler instance 7

This is application-level consistent hashing — the same mechanism as Cassandra’s token ring, but applied to URL assignment rather than data storage. The domain is the partition key; the crawler instance is the node.

Why domain-based (not URL-based) assignment:

  • Rate limiting: one instance knows all in-progress requests to a domain, preventing over-crawling.
  • DNS caching: one instance caches DNS resolution for the domain, reducing DNS queries.
  • robots.txt: one instance fetches and respects the domain’s robots.txt.

Rebalancing on instance failure: if crawler instance 3 fails, its domains must be redistributed. With modular hashing, adding or removing an instance rehashes all domains — the same minimal-disruption problem as data sharding. Use consistent hashing with virtual nodes to minimize domain reassignment:

class CrawlerRing:
    def __init__(self, num_vnodes=150):
        self.ring = ConsistentHashRing(vnodes=num_vnodes)

    def add_crawler(self, crawler_id: str):
        self.ring.add_node(crawler_id)

    def remove_crawler(self, crawler_id: str):
        self.ring.remove_node(crawler_id)
        # Only domains in the removed node's vnode ranges reassign

    def assign(self, url: str) -> str:
        domain = extract_domain(url)
        return self.ring.get_node(domain)

URL Frontier Assignment #

The URL frontier is the queue of URLs to be crawled. In a distributed crawler, the frontier is partitioned:

URL arrives → extract domain → hash(domain) % N → frontier partition N
Crawler instance N → dequeues from its frontier partition → fetches URL → extracts new URLs → repeats

The frontier partition IS the assignment. There is no separate assignment step — the hash routing is the assignment mechanism.

Priority-based frontier: not all URLs are equally valuable. News articles degrade in value within hours; product pages are evergreen. A priority queue per domain (inside each crawler’s frontier partition) schedules high-value URLs first:

# Redis sorted set as priority frontier per domain
def enqueue_url(domain: str, url: str, priority: float):
    redis.zadd(f"frontier:{domain}", {url: priority})

def dequeue_url(domain: str) -> Optional[str]:
    # Get highest priority URL (highest score first)
    urls = redis.zpopmax(f"frontier:{domain}", count=1)
    return urls[0][0] if urls else None

Shard-Based Database Routing #

Application-level Assignment also applies to database sharding: given a user ID (or tenant ID, or order ID), which database shard holds the data?

Range-Based vs Hash-Based Sharding #

Hash-based sharding: shard = hash(user_id) % N. Uniform distribution, but no range queries across shards.

def get_shard(user_id: str, num_shards: int) -> int:
    return int(hashlib.md5(user_id.encode()).hexdigest(), 16) % num_shards

Range-based sharding: shard = lookup_table[user_id_prefix]. Supports range queries within a shard, but requires a routing table and hotspot risk if ranges are not balanced.

ROUTING_TABLE = {
    range(0, 1_000_000):     "shard_1",
    range(1_000_000, 2_500_000): "shard_2",
    range(2_500_000, None):   "shard_3",
}

Directory-based sharding: a lookup service maps entity ID to shard. Maximum flexibility (any entity can be moved to any shard) at the cost of a lookup service round-trip on every request.

def get_shard(user_id: str) -> str:
    shard = cache.get(f"shard:{user_id}")
    if not shard:
        shard = shard_directory.lookup(user_id)
        cache.set(f"shard:{user_id}", shard, ttl=3600)
    return shard

Resharding #

When a shard becomes too large (storage, throughput), it must be split. This is the application-level equivalent of Cassandra’s token ring rebalancing.

Online resharding with double-writes:

  1. Create the new shard configuration (shard A splits into A1 and A2).
  2. Start double-writing: all writes go to both old shard A and new shards A1/A2.
  3. Backfill: copy historical data from A to A1 and A2.
  4. Verify: confirm A1 and A2 have all data.
  5. Switch reads: route reads to A1/A2.
  6. Stop double-writes, decommission A.

The double-write period ensures no writes are lost during migration. The backfill ensures no historical data is missing. This is zero-downtime resharding at the application level.

Assignment Failure Modes #

FailureMechanism affectedSymptomMitigation
Worker crash during taskTask queueTask stuck in processing; not retried until timeoutHeartbeat + visibility timeout; recovery sweep
Double execution after timeoutTask queueDuplicate side effects (double email)Idempotent task execution
Two schedulers trigger same jobJob schedulerDuplicate job runDB-level unique constraint on (job_id, interval)
Crawler instance failureDomain assignmentDomain queue orphanedConsistent hashing reassignment
Database shard hotspotRange shardingOne shard handles all trafficHash-based sharding or shard split
Shard routing table staleDirectory shardingRequests routed to wrong shardShort TTL on routing cache; invalidate on shard migration