Batch Processing Framework (MapReduce / Spark-class)
Batch Processing Framework (MapReduce / Spark-class) #
This note models a MapReduce/Spark-class batch processing framework where clients submit jobs as DAGs or stages, executors run partitioned tasks, shuffle outputs are materialized for downstream stages, and the scheduler coordinates retries and stage completion.
Step 1 - Normalize #
Assume the baseline prompt is:
- design a batch processing framework
- clients submit jobs composed of stages or DAG nodes
- worker executors run partitioned tasks
- shuffle/intermediate outputs feed downstream stages
- failed tasks can retry
- system scales across many workers
Normalize into state-affecting paths.
| Requirement | Actor | Operation | State touched | Priority |
|---|---|---|---|---|
| Client submits batch job / DAG | Client | append event | S1create targetJobRequest | C1 |
| Scheduler materializes runnable stage/task | System | append event | S1create targetTaskRequest | C1 |
| Executor claims task partition for execution | Client | state transition | S1update targetTaskExecutionState | C1 |
| Executor publishes shuffle or output partition | Client | append event | S1create targetPartitionArtifact | C1 |
| System commits task result metadata | System | state transition | S1update targetTaskResult | C1 |
| System marks stage complete and unblocks downstream stage | System | state transition | S1update targetStageState | C1 |
| System retries timed-out or failed task | System | async process | S1hidden write targetTaskExecutionState | C1 |
| Client or scheduler reads stage/task status | Client | read source | S1read source targetStageState | R1 |
| System routes job/stage shard to current owner | System | read source | S1read source targetPartitionMap | C1 |
| System reassigns shard ownership after node failure | System | state transition | S1update targetPartitionOwnership | C1 |
| Client reads cluster metrics / progress dashboard | Client | read projection | S1read projection targetJobStatusView | R2 |
Notes on normalization #
Important choices:
- job submission is
append event- job intent is immutable fact recording
- runnable task creation is
append event- task requests are materialized work items
- executor claim is
state transition- task moves from ready to claimed/running
- publishing stage output is
append event- partition artifacts are immutable outputs
- result commit and stage completion are
state transition- current execution and stage lifecycle change
- retries and ownership are explicit because this is distributed infra
This system is a hybrid of:
claimable execution queueDAG/stage orchestrationimmutable partition artifact publication
Step 2 - Critical Path Selection #
| Requirement | Priority class | Why |
|---|---|---|
| Submit job / materialize tasks | C1 | job intent and runnable work must not be lost |
| Claim task partition | C1 | duplicate uncontrolled execution wastes resources and breaks scheduler correctness |
| Publish partition output | C1 | downstream correctness depends on stable intermediate outputs |
| Commit task result | C1 | stage completion and downstream scheduling depend on correct result state |
| Mark stage complete / unblock next stage | C1 | DAG correctness depends on dependency satisfaction |
| Retry timed-out or failed task | C1 | worker crash recovery and eventual completion depend on safe requeue |
| Read stage/task status | R1 | core serving path for scheduler and client progress |
| Route to shard owner | C1 | wrong routing can split claims and stage/task state |
| Reassign shard ownership | C1 | failover must preserve task/stage lifecycle correctness |
| Metrics/dashboard | R2 | operational only |
Baseline critical paths #
Main C1 paths:
P1submit job and create runnable tasksP2claim taskP3publish partition outputP4commit task resultP5complete stage and unblock downstream stageP6retry timed-out or failed taskP7route to shard ownerP8reassign shard ownership
Main R1 path:
P9read stage/task state
This design is driven by:
- durable job and task intent
- one active executor claim per task attempt
- immutable intermediate and final outputs
- guarded stage-completion transitions
Step 3 - Primary State Extraction #
For a batch framework, the minimal primary state is the job request, stage/task definitions, task lifecycle, immutable partition outputs, stage lifecycle, and routing/ownership state.
| Candidate object label | Candidate source | Candidate needed for C1/R1? | Candidate decomposition action | Class | Primary? | Owner | Evolution | Scope kind | Scope value |
|---|---|---|---|---|---|---|---|---|---|
| JobRequest | direct noun | Yes | keep as candidate | event | Yes | service | append-only | instance | job_id |
| StageState | lifecycle object | Yes | keep as candidate | process | Yes | service | state machine | instance | stage_id |
| TaskRequest | direct noun | Yes | keep as candidate | event | Yes | service | append-only | instance | task_id |
| TaskExecutionState | lifecycle object | Yes | keep as candidate | process | Yes | service | state machine | instance | task_id |
| PartitionArtifact | direct noun | Yes | keep as candidate | event | Yes | service | append-only | instance | artifact_id or partition_key |
| TaskResult | lifecycle object | Yes | keep as candidate | entity | Yes | service | overwrite | instance | task_id |
| PartitionOwnership | hidden write target | Yes | keep as candidate | process | Yes | service | state machine | instance | shard_id |
| PartitionMap | hidden write target | Yes | keep as candidate | entity | Yes | service | overwrite | collection | job/stage shards |
| JobStatusView | derived read model | No | reject as UI artifact | projection | No | derived | overwrite | collection | job_id |
Important modeling choices #
JobRequest #
Primary because:
- job submission is immutable request recording
- captures DAG/stage graph, code reference, inputs, and runtime config
StageState #
Primary because:
- stage lifecycle drives dependency ordering
- captures states like
BLOCKED,READY,RUNNING,COMPLETED,FAILED
TaskRequest #
Primary because:
- scheduler materializes partition-level work as distinct tasks
TaskExecutionState #
Primary because:
- task lifecycle captures
READY,CLAIMED,RUNNING,SUCCEEDED,FAILED,RETRY_PENDING
PartitionArtifact #
Primary because:
- shuffle partitions and final outputs are immutable materialized data products for downstream consumption
TaskResult #
Primary because:
- links task execution to produced artifacts and completion metadata
Minimal strict primary set #
The strongest minimal set is:
JobRequestStageStateTaskRequestTaskExecutionStatePartitionArtifactTaskResultPartitionOwnershipPartitionMap
Step 4 - Hard Invariants #
For a MapReduce/Spark-class framework, the hard invariants are about durable job/task existence, one active executor claim per task attempt, immutable partition outputs, and correct stage completion.
| Path | Tier | Type | Invariant statement |
|---|---|---|---|
P1 submit job / create runnable tasks | HARD | uniqueness | Key job request_id maps to at most one logical outcome recorded job request within job scope. |
P2 claim task | HARD | eligibility | Action claim_task is valid only if TaskExecutionState(task_id) is READY at decision time. |
P2 claim task | HARD | uniqueness | Key task_id maps to at most one logical outcome active executor claim holder within claim-timeout scope. |
P3 publish partition output | HARD | uniqueness | Key artifact_id or (stage_id, partition_id, attempt_id) maps to at most one logical outcome immutable published partition output within artifact scope. |
P4 commit task result | HARD | eligibility | Action commit_task_result is valid only if TaskExecutionState(task_id) is in a current successful terminalizing state and claim token matches current executor at decision time. |
P4 commit task result | HARD | accounting | TaskResult(task_id) references only existing partition artifacts and metadata committed for that task execution. |
P5 complete stage / unblock downstream stage | HARD | eligibility | Action complete_stage is valid only if all required task results for stage_id are committed and current stage state allows completion at decision time. |
P6 retry timed-out or failed task | HARD | eligibility | Action requeue_task is valid only if TaskExecutionState(task_id) is in a retryable or expired claimed state and claim token/epoch still matches at decision time. |
P7 route to shard owner | HARD | uniqueness | Key shard_id maps to at most one logical outcome current authoritative owner within shard_id. |
P8 reassign shard ownership | HARD | eligibility | Action reassign_shard is valid only if current owner is failed or relinquished and candidate owner is eligible and sufficiently current on shard_id at decision time. |
P9 read stage/task state | HARD | freshness | Read path read_stage_or_task_state reflects authoritative committed state within configured consistency bound. |
What matters most #
1. One active executor claim per task #
This is the execution-control invariant.
2. Partition outputs are immutable #
Downstream consumers need stable intermediate outputs once published.
3. Stage completion is guarded by all required task results #
A stage cannot complete early or unblock downstream work before dependencies are satisfied.
4. Duplicate task execution can still happen #
Under retries or speculative execution, multiple attempts may run, but only one committed result should become authoritative for the stage plan.
Step 5 - Execution Context #
For the strict baseline batch framework:
| Field | Value | Why |
|---|---|---|
| Topology | single service distributed | one logical batch-processing system spread across schedulers and executors |
| Write coordination scope | per object scope | correctness is per task_id, stage_id, artifact identity, and shard ownership scope |
| Read consistency target | strong only | claim/commit/stage-completion paths must use authoritative state |
| Holder model | client | executors temporarily hold claimed tasks |
| Compensation acceptable? | No | wrong stage completion or result linkage cannot be repaired safely afterward |
Derived implications #
holder_may_crash = true- executors can crash while holding tasks
cross_service_write = false- baseline keeps job/task state, artifact metadata, and ownership in one logical service
bounded_staleness_allowed = false- claim/result/stage-completion paths need authoritative current state
cross_service_atomicity_required = false- no multi-service transaction across unrelated services in baseline
exclusive_claim_required = true- one executor should hold a task attempt at a time
guarded_by_current_state = true- claim, commit, retry, and stage completion depend on current state
What this implies #
This pushes us toward:
- one authoritative owner per scheduling shard
- explicit task execution and stage state machines
- claim timeout modeled as lease-like executor ownership
- immutable shuffle/output artifacts with guarded result publication
Step 6 - Deterministic Mechanism Selection #
| Path | Write shape | Base mechanism | Required companions |
|---|---|---|---|
P1 submit job / create runnable tasks | append-only event | append log | request id dedup |
P2 claim task | exclusive claim | lease | claim token, claim timeout |
P3 publish partition output | append-only event | immutable artifact publication | artifact identity verification |
P4 commit task result | guarded state transition | CAS on (state, version) or leader-applied guarded transition | claim token, artifact references |
P5 complete stage / unblock downstream stage | guarded state transition | CAS on (state, version) | task completion accounting |
P6 retry timed-out or failed task | guarded state transition | leader-applied guarded transition | claim token, timeout scan, retry policy |
P7 route to shard owner | exclusive claim | lease | fencing token, heartbeat |
P8 reassign shard ownership | guarded state transition | CAS on (state, version) | fencing token, shard catch-up check |
Why these fit #
Job submission and task materialization #
These are immutable workload facts, so append-only fits.
Task claim #
Executor ownership is temporary and exclusive, so lease semantics fit.
Partition output publication #
Shuffle partitions and final outputs are immutable publications, so append-only artifact publication fits.
Task result commit #
Committing a task result is not a blind write. It must verify:
- current task state
- current claim token
- referenced artifacts exist
So it is a guarded transition.
Stage completion #
Stage completion depends on the current completion count and dependency status, so it is a guarded transition.
Canonical substrate implied #
The baseline now points to:
- sharded scheduler / task-queue service
- one owner per shard
- immutable partition artifact store
- per-task execution lifecycle state
- guarded task-result commit and guarded stage-completion transitions
Step 7 - Read Model / Source of Truth #
For a MapReduce/Spark-class framework, truth is mostly direct source state. Dashboards are derived.
| Concept | Truth | Read path | Rebuild path |
|---|---|---|---|
C1 job submission | JobRequest | read source directly | authoritative job store |
C2 stage lifecycle | StageState | read source directly | authoritative stage-state store |
C3 runnable task record | TaskRequest | read source directly | authoritative task store |
C4 current task lifecycle | TaskExecutionState | read source directly | authoritative execution-state store |
C5 immutable partition output | PartitionArtifact | read source directly | authoritative artifact store |
C6 task result metadata | TaskResult | read source directly | authoritative result store |
C7 shard ownership | PartitionOwnership | read source directly | authoritative ownership store |
C8 shard routing map | PartitionMap | read source directly | authoritative routing metadata |
C9 job/stage metrics | derived from task and result state | materialized view | recompute from authoritative state |
Important point #
For the core semantics:
- scheduler decisions read authoritative
StageState,TaskExecutionState, andTaskResult - downstream tasks read authoritative
PartitionArtifactreferences - dashboards are projections
Step 8 - Failure Handling #
| Path | Retry | Competing writers | Crash after commit | Publish failure | Stale holder |
|---|---|---|---|---|---|
P1 submit job / tasks | retry with request_id to avoid duplicate submission | competing jobs coexist; dedup matters only for same logical request | committed submission survives owner crash if replicated past commit point | client may retry safely with idempotency key | n/a |
P2 claim task | retry claim safely; may return another task or later same task | only one active claim should win for a task at a time | if claim committed and executor crashes, task stays claimed until timeout or failover | n/a | stale executor fenced by claim token/epoch |
P3 publish partition output | retry artifact publish with same identity | duplicate publication of same attempt collapses or is rejected by artifact identity | committed artifact survives storage node crash if durable | partial publish rejected until artifact is complete | n/a |
P4 commit task result | retry with same claim token and result metadata | stale/wrong claim token loses guarded transition | committed result survives crash if replicated past commit point | if result publish fails after artifact upload, commit can be retried | stale holder cannot publish result after timeout and reassignment |
P5 complete stage | retry safe with current stage version | only one completion transition should win current stage state | committed stage completion survives crash if replicated | downstream task materialization can be retried from stage state | n/a |
P6 retry timed-out or failed task | timeout/retry scan retry safe | only one requeue transition should win for current expired/retryable state | scanner crash delays recovery; next scan retries | n/a | old claim token becomes invalid after requeue |
P7 route to shard owner | retry after refreshing shard map | only one valid owner should exist | if owner changed, refreshed map points to new owner | n/a | stale owner rejected by fencing token |
P8 reassign shard ownership | retry failover transition safely | only one reassignment wins current ownership state | promoted owner crash triggers later reassignment | n/a | old owner fenced and must not continue serving |
What matters most #
1. Claim-token fencing #
This is the scheduler/executor safety mechanism.
Bad case:
- executor A claims task
- task times out
- executor B reclaims task
- A later finishes and tries to publish result
Without fencing:
- A could incorrectly publish a stale result
So task-result commit must be tied to the current claim token.
2. Artifact first, result commit second #
Intermediate/final output should be durably stored before committing TaskResult.
3. Stage completion must be monotonic #
Once a stage is marked complete for the authoritative attempt set, downstream unblocking should move forward only.
4. Duplicate task execution is tolerated #
Retries or speculative execution can run multiple attempts, but only one authoritative result should be committed for scheduling purposes.
Step 9 - Scale Adjustments #
| Hotspot | Type | First response |
|---|---|---|
| hot scheduler shard with many task claims | contention hotspot | increase shard count and isolate large jobs or hot stages |
| shuffle / partition output bandwidth | write throughput hotspot | separate artifact service and add locality-aware storage/caching |
| stage-completion fan-in | contention hotspot | hierarchical aggregation of task completions before final stage transition |
| retry storms after executor fleet issue | contention hotspot | exponential backoff, jitter, and executor health draining |
| timeout scans | write throughput hotspot | bucket expiries and scan incrementally |
| progress dashboard queries | read hotspot | keep them as materialized projections only |
What scales well #
This system scales by:
- sharding scheduling and task coordination
- giving each shard one authoritative owner
- storing immutable shuffle/output artifacts in a separate scalable layer
- keeping stage-completion logic incremental
What fails first #
Usually:
- scheduler hot spots
- shuffle bandwidth bottlenecks
- stale-result fencing mistakes
- large stage fan-in causing completion bottlenecks
Canonical design conclusion #
The mechanical outcome is:
- primary state:
JobRequestStageStateTaskRequestTaskExecutionStatePartitionArtifactTaskResultPartitionOwnershipPartitionMap
- critical invariants:
- durable job/task submission
- one active executor claim per task attempt
- immutable partition outputs
- task result valid only for current claim token and existing artifacts
- stage completion valid only when all required task results are committed
- timed-out or retryable tasks become runnable again safely
- mechanisms:
append loglease- immutable artifact publication
- guarded result commit / stage-completion / retry transitions
- fenced shard ownership
- reads:
- direct authoritative reads for stage/task/artifact state
- projections only for dashboards and progress summaries
Polished interview answer #
I’d design the batch framework as a sharded scheduler plus executor fleet. Job submission records immutable job and task requests. Executors reserve partitioned tasks through a lease-like claim with a claim token and expiry, publish shuffle or final output partitions into immutable artifact storage, and then commit task results through a guarded transition that succeeds only for the current claim token and only if referenced outputs already exist. Stage completion is another guarded transition driven by committed task results, and once a stage completes the scheduler materializes downstream tasks. If an executor crashes or times out, the task is safely requeued for at-least-once execution, while status views are derived from authoritative stage and task state.
Concrete Substrate #
I’ll choose a sharded scheduler with executor fleet plus separate immutable artifact storage as the concrete baseline, because it matches the mechanics we derived:
- append-only job/task submission
- lease-like task claim
- immutable partition artifact publication
- guarded task-result commit
- guarded stage-completion
- one owner per shard
Concrete tech family:
- scheduler service in
Go,Java, orScala - local durable metadata storage:
RocksDBor replicated metadata DB
- shard replication:
Raftor leader-follower replication with commit index
- artifact storage:
- object store, HDFS-class store, or distributed blob store
- metadata/control:
etcdor internal metadata quorum
Each shard leader stores:
- task records
- current
TaskExecutionState - current
StageState - ready-task queue
- claim-expiry index
- stage-completion accounting metadata
Artifact storage stores:
- immutable shuffle partitions
- immutable final outputs
- manifests and metadata by partition identity
Operation Layer #
1. Submit job #
API
SubmitJob(job_request, request_id?)
Initiator
- client
Entry point
- gateway or scheduler frontend
Authoritative decider
- scheduler shard owner(s)
Precondition
- request id optional for dedup
Transition
- append
JobRequest - create initial
StageState - materialize runnable
TaskRequestrecords for source-ready stage tasks
Response
{job_id}
2. Claim task #
API
ReserveTask(worker_capabilities, max_tasks, lease_timeout)
Initiator
- executor/client
Entry point
- scheduler frontend or shard leader
Authoritative decider
- shard leader owning the task
Precondition
- task chosen must currently be
READY - executor capabilities satisfy task requirements
Transition
- selected task:
READY -> CLAIMED(executor_id, claim_token, expiry)
- remove from ready queue
- insert into claim-expiry index
Response
{tasks: [{task_id, stage_id, partition_id, input_refs, claim_token, lease_expiry}]}
3. Publish partition output #
API
PutPartitionArtifact(artifact_id, bytes, metadata)
Initiator
- executor/client
Entry point
- artifact store
Authoritative decider
- artifact storage service
Precondition
- artifact identity and completeness validation succeed
Transition
- store immutable
PartitionArtifact
Response
- success / already exists
4. Commit task result #
API
CommitTaskResult(task_id, claim_token, result_metadata)
Initiator
- executor/client
Entry point
- scheduler frontend or shard leader
Authoritative decider
- shard leader owning the task
Precondition
- current
TaskExecutionStateisCLAIMED claim_tokenmatches current holder- referenced artifacts exist in artifact store
Transition
CLAIMED -> SUCCEEDED- overwrite
TaskResult(task_id) - increment stage-completion accounting
Response
- success
5. Complete stage / unblock downstream stage #
API
- internal scheduler transition
Initiator
- system
Entry point
- shard leader
Authoritative decider
- shard leader
Precondition
- all required task results for current stage are committed
- current
StageStateallows completion
Transition
StageState: RUNNING -> COMPLETED- materialize downstream
TaskRequests for newly unblocked stages
6. Retry timed-out or failed task #
API
- internal background process
Initiator
- system
Entry point
- shard leader
Authoritative decider
- shard leader
Precondition
- task is still
CLAIMEDand expired - or task is failed and retryable by policy
Transition
- move task back to
READY - clear current claim token
- reinsert into ready queue
Entry Point vs Decider vs Responder #
| Path | Entry point | Authoritative decider | Physical responder | Logical responder |
|---|---|---|---|---|
SubmitJob | gateway / frontend | scheduler shard leader | leader or front node | batch framework |
ReserveTask | frontend / shard leader | shard leader | leader or front node | batch framework |
PutPartitionArtifact | artifact endpoint | artifact storage service | artifact node | batch framework |
CommitTaskResult | frontend / shard leader | shard leader | leader or front node | batch framework |
| stage completion | shard leader | shard leader | internal | batch framework |
| retry / requeue | shard leader | shard leader | internal | batch framework |
| shard failover | follower / coordination layer | shard quorum / lease store | new leader / control plane | batch framework |
Concrete HLD #
Main components:
- job submission frontend
- receives job requests
- scheduler shard leaders
- authoritative owners for stage/task lifecycle
- manage ready, claimed, and retry indexes
- executor fleet
- pulls runnable tasks and executes partitions
- artifact store
- stores immutable shuffle partitions and final outputs
- metadata/control service
- tracks shard ownership and routing
- background timeout / retry workers
- usually run on shard leaders
Short Interview Version #
I’d build the batch framework as a sharded scheduler plus executor fleet with a separate immutable artifact store. Job submission records immutable job and task requests. Executors reserve partitioned tasks through a lease-like claim with a claim token and expiry, run the computation, publish shuffle or final output partitions to immutable artifact storage, and then commit task results through a guarded transition that succeeds only for the current claim token and only if referenced outputs already exist. Stage completion is another guarded transition driven by committed task results, and once a stage completes the scheduler materializes downstream tasks. If an executor crashes or times out, the task is safely requeued for at-least-once execution, while progress views are derived from authoritative stage and task state.