Distributed Task Queue (Celery / Sidekiq-class)
Distributed Task Queue (Celery / Sidekiq-class) #
This note models a Celery/Sidekiq-class background job system where producers enqueue tasks, workers claim and execute them, and the system supports retries, scheduled retries, and worker crash recovery.
Step 1 - Normalize #
Assume the baseline prompt is:
- design a distributed task queue
- producers enqueue jobs
- workers pull and execute jobs
- jobs are acked on success
- failures may retry or move to dead-letter storage
- system scales across many nodes
Normalize into state-affecting paths.
| Requirement | Actor | Operation | State touched | Priority |
|---|---|---|---|---|
| Producer enqueues task | Client | append event | S1create targetTaskEnvelope | C1 |
| Worker reserves/claims next task | Client | state transition | S1update targetTaskExecutionState | C1 |
| Worker acknowledges successful task | Client | state transition | S1update targetTaskExecutionState | C1 |
| Worker marks task failed/retryable | Client | state transition | S1update targetTaskExecutionState | C1 |
| System requeues timed-out or retryable task | System | async process | S1hidden write targetTaskExecutionState | C1 |
| System promotes delayed/retry-scheduled task when due | System | async process | S1hidden write targetScheduledTaskState | C1 |
| System routes queue/shard to current owner | System | read source | S1read source targetPartitionMap | C1 |
| System reassigns shard ownership after node failure | System | state transition | S1update targetPartitionOwnership | C1 |
| Client reads queue depth / worker stats | Client | read projection | S1read projection targetQueueStatsView | R2 |
Notes on normalization #
Important choices:
- enqueue is
append event- task submission is an immutable fact
- reserve/claim is
state transition- task moves from ready to claimed/in-progress
- ack/fail are
state transition- task lifecycle changes
- retries and delayed tasks are explicit internal paths
- routing/ownership are explicit because this is distributed infra
This system is closer to:
claimable execution queue
than to:
- replayable event log
Step 2 - Critical Path Selection #
| Requirement | Priority class | Why |
|---|---|---|
| Enqueue task | C1 | task intent must not be lost or duplicated incorrectly |
| Reserve/claim next task | C1 | task execution correctness depends on valid exclusive claim |
| Ack successful task | C1 | completion semantics depend on current claim holder |
| Mark failed/retryable task | C1 | retry and terminal failure semantics depend on valid lifecycle transition |
| Requeue timed-out or retryable task | C1 | worker crash recovery and retries depend on this path |
| Promote delayed/retry-scheduled task | C1 | scheduled work must become runnable at the right time |
| Route queue/shard to owner | C1 | wrong routing can split claims and task order |
| Reassign shard ownership | C1 | failover must preserve task lifecycle correctness |
| Queue depth / worker stats | R2 | operational only |
Baseline critical paths #
Main C1 paths:
P1enqueue taskP2reserve/claim taskP3ack task successP4fail/retry taskP5requeue timed-out or retryable taskP6promote delayed taskP7route to shard ownerP8reassign shard ownership
This design is driven by:
- durable task intent
- one active worker claim per task attempt
- guarded lifecycle transitions
- retry and timeout recovery
Step 3 - Primary State Extraction #
For a distributed task queue, the minimal primary state is the task itself, its execution lifecycle, delayed scheduling state, and routing/ownership state.
| Candidate object label | Candidate source | Candidate needed for C1/R1? | Candidate decomposition action | Class | Primary? | Owner | Evolution | Scope kind | Scope value |
|---|---|---|---|---|---|---|---|---|---|
| TaskEnvelope | direct noun | Yes | keep as candidate | event | Yes | service | append-only | instance | task_id |
| TaskExecutionState | lifecycle object | Yes | keep as candidate | process | Yes | service | state machine | instance | task_id |
| ScheduledTaskState | hidden write target | Yes | keep as candidate | process | Yes | service | state machine | instance | task_id |
| PartitionOwnership | hidden write target | Yes | keep as candidate | process | Yes | service | state machine | instance | shard_id |
| PartitionMap | hidden write target | Yes | keep as candidate | entity | Yes | service | overwrite | collection | queue shards |
| QueueStatsView | derived read model | No | reject as UI artifact | projection | No | derived | overwrite | collection | queue_id |
| ClaimAttempt | hidden write target | No | reject as implementation choice | event | No | derived | append-only | collection | task_id |
Important modeling choices #
TaskEnvelope #
Primary because:
- enqueue is immutable fact recording
- payload, queue, headers, retries, and metadata live here
TaskExecutionState #
This is the central task-queue object. It captures lifecycle like:
READYCLAIMED(worker_id, claim_token, expiry)SUCCEEDEDFAILEDRETRY_PENDINGDEAD_LETTERED
ScheduledTaskState #
Kept explicit because delayed jobs and scheduled retries are common in Celery/Sidekiq-class systems. It captures:
run_at- delayed / promoted lifecycle
PartitionOwnership #
Needed because:
- one owner should control claim transitions for a shard at a time
PartitionMap #
Needed because:
- producers and workers must route consistently to the right shard authority
Minimal strict primary set #
The strongest minimal set is:
TaskEnvelopeTaskExecutionStateScheduledTaskStatePartitionOwnershipPartitionMap
Step 4 - Hard Invariants #
For a Celery/Sidekiq-class task queue, the hard invariants are about durable task existence, exclusive active claim per task attempt, guarded completion/failure transitions, and safe retry promotion.
| Path | Tier | Type | Invariant statement |
|---|---|---|---|
P1 enqueue task | HARD | uniqueness | Key enqueue request_id maps to at most one logical outcome enqueued task within queue scope. |
P2 reserve/claim task | HARD | eligibility | Action claim_task is valid only if TaskExecutionState(task_id) is READY at decision time. |
P2 reserve/claim task | HARD | uniqueness | Key task_id maps to at most one logical outcome active claim holder within claim-timeout scope. |
P3 ack task success | HARD | eligibility | Action ack_task is valid only if TaskExecutionState(task_id) is CLAIMED and claim_token matches current holder at decision time. |
P4 fail/retry task | HARD | eligibility | Action fail_task is valid only if TaskExecutionState(task_id) is CLAIMED and claim_token matches current holder at decision time. |
P5 requeue timed-out or retryable task | HARD | eligibility | Action requeue_task is valid only if TaskExecutionState(task_id) is in a retryable or expired claimed state and current claim token/epoch still matches at decision time. |
P6 promote delayed task | HARD | eligibility | Action promote_delayed_task is valid only if ScheduledTaskState(task_id) is pending and run_at <= now at decision time. |
P7 route to shard owner | HARD | uniqueness | Key shard_id maps to at most one logical outcome current authoritative owner within shard_id. |
P8 reassign shard ownership | HARD | eligibility | Action reassign_shard is valid only if current owner is failed or relinquished and candidate owner is eligible and sufficiently current on shard_id at decision time. |
What matters most #
1. One active claim per task at a time #
This is the central execution invariant.
2. Ack/fail must be fenced by current claim token #
A stale worker must not be able to ack or fail a task after timeout and reassignment.
3. Retry promotion must be guarded #
Only due delayed tasks or legitimately retryable tasks may become runnable again.
4. At-least-once execution is normal #
This system usually allows duplicate execution on worker crash or lost ack. Exactly-once is not the default invariant.
Step 5 - Execution Context #
For the strict baseline task queue:
| Field | Value | Why |
|---|---|---|
| Topology | single service distributed | one logical task-queue service spread across many nodes |
| Write coordination scope | per object scope | correctness is per task_id and per shard ownership scope |
| Read consistency target | strong only | claim/ack/fail paths must use authoritative task state |
| Holder model | client | workers temporarily hold claimed tasks |
| Compensation acceptable? | No | wrong claim/ack/fail decisions cannot be repaired safely afterward |
Derived implications #
holder_may_crash = true- workers can crash while holding claimed tasks
cross_service_write = false- baseline keeps task state, routing, and ownership within one logical service
bounded_staleness_allowed = false- claim/ack/fail paths must use authoritative execution state
cross_service_atomicity_required = false- no multi-service transaction in baseline
exclusive_claim_required = true- one worker should hold a task attempt at a time
guarded_by_current_state = true- claim, ack, fail, retry, and promotion all depend on current lifecycle state
What this implies #
This pushes us toward:
- one authoritative owner per shard
- explicit task execution state machine
- claim timeout modeled as lease-like worker ownership
- strong reads on the task-state hot path
Step 6 - Deterministic Mechanism Selection #
| Path | Write shape | Base mechanism | Required companions |
|---|---|---|---|
P1 enqueue task | append-only event | append log | request id dedup |
P2 reserve/claim task | exclusive claim | lease | claim token, claim timeout |
P3 ack task success | guarded state transition | CAS on (state, version) or leader-applied guarded transition | claim token |
P4 fail/retry task | guarded state transition | CAS on (state, version) or leader-applied guarded transition | claim token, retry policy |
P5 requeue timed-out or retryable task | guarded state transition | leader-applied guarded transition | claim token, timeout scan |
P6 promote delayed task | guarded state transition | leader-applied guarded transition | due-time index |
P7 route to shard owner | exclusive claim | lease | fencing token, heartbeat |
P8 reassign shard ownership | guarded state transition | CAS on (state, version) | fencing token, shard catch-up check |
Why these fit #
Enqueue #
Task submission is immutable fact recording, so append-only fits naturally.
Claim #
Task execution ownership is temporary and exclusive, which is effectively a lease.
Ack / fail #
These are not blind updates. They must verify the current claim token and state, so they are guarded transitions.
Retry / delayed promotion #
These depend on current lifecycle state plus time predicates, so guarded transitions fit.
Canonical substrate implied #
The baseline now points to:
- sharded task queue service
- one owner per shard
- append-only task storage
- per-task execution lifecycle state
- lease-like task claim with token/timeout
- delayed-task time index for scheduled retries
Step 7 - Read Model / Source of Truth #
For a Celery/Sidekiq-class task queue, truth is mostly direct source state. Operational queue metrics are derived.
| Concept | Truth | Read path | Rebuild path |
|---|---|---|---|
C1 task payload and metadata | TaskEnvelope | read source directly | authoritative task store |
C2 current execution lifecycle | TaskExecutionState | read source directly | authoritative execution-state store or replay from committed log |
C3 delayed scheduling lifecycle | ScheduledTaskState | read source directly | authoritative delayed-task store |
C4 shard ownership | PartitionOwnership | read source directly | authoritative ownership store |
C5 shard routing map | PartitionMap | read source directly | authoritative routing metadata |
C6 queue depth / worker stats | derived from task and state data | materialized view | recompute from authoritative state |
Important point #
For the core semantics:
- enqueue writes authoritative task state
- claim/ack/fail/retry operate on authoritative
TaskExecutionState - delayed promotions operate on authoritative due-time state
- queue depth and worker dashboards are projections
Step 8 - Failure Handling #
| Path | Retry | Competing writers | Crash after commit | Publish failure | Stale holder |
|---|---|---|---|---|---|
P1 enqueue task | retry with request_id to avoid duplicate enqueue | competing enqueues coexist; dedup matters only for same logical request | committed enqueue survives owner crash if replicated past commit point | producer may retry safely with idempotency key | n/a |
P2 reserve/claim task | retry claim safely; may return another task or later same task | only one active claim should win for a task at a time | if claim committed and worker crashes, task stays claimed until timeout or failover | n/a | stale worker fenced by claim token/epoch |
P3 ack task success | retry ack with same claim token | stale/wrong claim token loses guarded transition | committed success survives crash if replicated past commit point | n/a | stale holder cannot ack after timeout and re-claim |
P4 fail/retry task | retry fail with same claim token | stale token loses guarded transition | committed fail/retry survives crash if replicated past commit point | n/a | stale holder cannot fail after timeout and reassignment |
P5 requeue timed-out or retryable task | timeout/retry scan retry safe | only one requeue transition should win for current expired/retryable state | scanner crash delays recovery; next scan retries | n/a | old claim token becomes invalid after requeue |
P6 promote delayed task | due-scan retry safe | only one promotion transition should win for current delayed state | scanner crash delays runnable promotion; next scan retries | n/a | n/a |
P7 route to shard owner | retry after refreshing shard map | only one valid owner should exist | if owner changed, refreshed map points to new owner | n/a | stale owner rejected by fencing token |
P8 reassign shard ownership | retry failover transition safely | only one reassignment wins current ownership state | promoted owner crash triggers later reassignment | n/a | old owner fenced and must not continue serving |
What matters most #
1. Claim-token fencing #
This is the key task-queue safety mechanism.
Bad case:
- worker A claims task
- claim times out
- worker B reclaims task
- worker A later acks success
Without fencing:
- A could incorrectly finish B’s active attempt
So ack/fail must be tied to the current claim token.
2. At-least-once execution #
If a worker crashes after side effects but before ack:
- task may run again
So application handlers should be idempotent or dedup-aware.
3. Enqueue retry dedup #
If producer retries after response loss:
- dedup by
request_idif API promises producer idempotency
Step 9 - Scale Adjustments #
| Hotspot | Type | First response |
|---|---|---|
| hot queue / shard with many claims | contention hotspot | increase shard count and isolate hot queues |
| claim/ack pressure on same shard | write throughput hotspot | batch claim fetches and spread queue families across shards |
| delayed-task promotion scans | write throughput hotspot | bucket due times by time wheel / min-heap and scan incrementally |
| retry storms after dependency outage | contention hotspot | exponential backoff, jitter, retry caps, and dead-letter cutoffs |
| ownership churn during failures | contention hotspot | stabilize leases/elections and avoid aggressive reassignment |
| stats/dashboard reads | read hotspot | keep them as derived views only |
What scales well #
This queue scales by:
- sharding task queues
- giving each shard one authoritative owner
- keeping claim/ack/fail local to that owner
- using efficient due-time indexes for delayed jobs and retries
What fails first #
Usually:
- hot queue concentration on one shard
- inefficient delayed-job scans
- stale-claim fencing mistakes
- retry storms overwhelming ready queues
Canonical design conclusion #
The mechanical outcome is:
- primary state:
TaskEnvelopeTaskExecutionStateScheduledTaskStatePartitionOwnershipPartitionMap
- critical invariants:
- durable task enqueue
- one active worker claim per task attempt
- ack/fail valid only for current claim token
- timed-out or retryable tasks become runnable again safely
- delayed tasks promote when due
- mechanisms:
append loglease- guarded ack/fail/requeue transitions
- fenced shard ownership
- reads:
- direct authoritative reads for task and execution state
- projections only for queue depth/stats
Polished interview answer #
“I’d design the task queue as a sharded service with one leader per shard. Enqueue appends an immutable task record. Workers reserve tasks through an exclusive claim that issues a claim token and timeout, and ack/fail are guarded transitions that succeed only for the current claim token. If a worker crashes or times out, the task is safely requeued for at-least-once execution, and delayed tasks or scheduled retries are promoted through a due-time index. The main scaling levers are more shards, hot-queue isolation, efficient delayed-task scanning, and retry backoff control.”
Concrete Substrate #
I’ll choose a service-owned sharded task queue with shard leaders as the concrete baseline, because it matches the mechanics we derived:
- append-only enqueue
- lease-like task claim
- guarded ack/fail/requeue
- delayed-task due-time promotion
- one owner per shard
Concrete tech family:
- service in
GoorJava - local durable storage:
RocksDBor log-segment files
- shard replication:
Raftor leader-follower replication with commit index
- metadata/control:
etcdor internal metadata quorum
Each shard leader stores:
- task records
- current
TaskExecutionState - ready queue / runnable index
- in-flight expiry index
- delayed-task due-time index
Operation Layer #
1. Enqueue task #
API
EnqueueTask(queue_id, payload, request_id?, run_at?)
Initiator
- producer/client
Entry point
- gateway or any task-queue node
Authoritative decider
- current leader for target shard
Precondition
- valid shard routing
- optional dedup request id if producer idempotency is required
Transition
- append
TaskEnvelope - initialize
TaskExecutionState = READYif runnable now - or create
ScheduledTaskState(run_at)if delayed
Response
{task_id}
Failure cases
- stale routing -> retry with updated shard map
- response loss -> retry may duplicate unless
request_iddedup is used
2. Reserve/claim task #
API
ReserveTask(queue_id, max_tasks, lease_timeout)
Initiator
- worker/client
Entry point
- gateway or any task-queue node
Authoritative decider
- current shard leader(s) serving that queue
Precondition
- task chosen must currently be
READY
Transition
- selected task:
READY -> CLAIMED(worker_id, claim_token, expiry)
- remove from ready index
- insert into claim-expiry index
Response
{tasks: [{task_id, payload, claim_token, lease_expiry}]}
Failure cases
- no ready task -> empty response / long poll timeout
- stale owner -> retry after redirect
3. Ack task success #
API
AckTask(queue_id, task_id, claim_token)
Initiator
- worker/client
Entry point
- gateway or any task-queue node
Authoritative decider
- shard leader owning the task
Precondition
- current
TaskExecutionStateisCLAIMED claim_tokenmatches current holder
Transition
CLAIMED -> SUCCEEDED
Response
- success / no-op failure
Failure cases
- stale claim token -> reject
- task already timed out and reassigned -> reject stale ack
4. Fail or retry task #
API
FailTask(queue_id, task_id, claim_token, error, retry_at?)
Initiator
- worker/client
Entry point
- gateway or any task-queue node
Authoritative decider
- shard leader owning the task
Precondition
- current
TaskExecutionStateisCLAIMED claim_tokenmatches current holder
Transition
- if retryable:
CLAIMED -> RETRY_PENDING- update
ScheduledTaskState(retry_at)
- else:
CLAIMED -> FAILEDorDEAD_LETTERED
Response
- success
5. Requeue timed-out or retryable task #
API
- internal background process
Initiator
- system
Entry point
- shard leader
Authoritative decider
- shard leader
Precondition
- task is still
CLAIMEDand expired - or task is
RETRY_PENDINGand due for requeue
Transition
- move task back to
READY - remove from expired/retry tracking
- insert into ready queue
6. Promote delayed task #
API
- internal background process
Initiator
- system
Entry point
- shard leader
Authoritative decider
- shard leader
Precondition
- delayed task is pending and
run_at <= now
Transition
ScheduledTaskStatebecomes promotedTaskExecutionState -> READY- insert into ready queue
7. Reassign shard ownership #
API
- internal failover flow:
AcquireShardLease(shard_id)- or
Raftleader election
Initiator
- system
Entry point
- follower/candidate node or coordination layer
Authoritative decider
- shard quorum or metadata lease store
Precondition
- current owner failed or relinquished
- candidate replica sufficiently current
Transition
- new owner/leader epoch established
- old owner fenced
Entry Point vs Decider vs Responder #
| Path | Entry point | Authoritative decider | Physical responder | Logical responder |
|---|---|---|---|---|
EnqueueTask | gateway / any node | shard leader | leader or front node | task queue service |
ReserveTask | gateway / any node | shard leader | leader or front node | task queue service |
AckTask | gateway / any node | shard leader | leader or front node | task queue service |
FailTask | gateway / any node | shard leader | leader or front node | task queue service |
| delayed/retry promotion | shard leader | shard leader | internal | task queue service |
| shard failover | follower / coordination layer | shard quorum / lease store | new leader / control plane | task queue service |
Concrete HLD #
Main components:
- gateway/router
- resolves queue/shard
- forwards client and worker requests
- shard leader
- authoritative owner of task lifecycle for shard
- manages ready, claimed, and delayed indexes
- shard followers
- replicate shard mutations
- metadata/control service
- tracks shard ownership and routing
- background timeout / delayed promotion worker
- usually runs on shard leader
Short Interview Version #
I’d build the task queue as a sharded service with one leader per shard. Enqueue appends an immutable task record. Workers reserve tasks through a lease-like claim that issues a claim token and expiry, and ack or fail are guarded transitions that succeed only for the current claim token. If a worker crashes or times out, the task is safely requeued for at-least-once execution, and delayed jobs or scheduled retries are promoted from a due-time index when they become runnable. Replication happens per shard, and failover promotes only a sufficiently current replica.