Skip to main content
  1. System Design Components/

Batch Processing Framework (MapReduce / Spark-class)

Batch Processing Framework (MapReduce / Spark-class) #

This note models a MapReduce/Spark-class batch processing framework where clients submit jobs as DAGs or stages, executors run partitioned tasks, shuffle outputs are materialized for downstream stages, and the scheduler coordinates retries and stage completion.


Step 1 - Normalize #

Assume the baseline prompt is:

  • design a batch processing framework
  • clients submit jobs composed of stages or DAG nodes
  • worker executors run partitioned tasks
  • shuffle/intermediate outputs feed downstream stages
  • failed tasks can retry
  • system scales across many workers

Normalize into state-affecting paths.

RequirementActorOperationState touchedPriority
Client submits batch job / DAGClientappend eventS1
create target
JobRequest
C1
Scheduler materializes runnable stage/taskSystemappend eventS1
create target
TaskRequest
C1
Executor claims task partition for executionClientstate transitionS1
update target
TaskExecutionState
C1
Executor publishes shuffle or output partitionClientappend eventS1
create target
PartitionArtifact
C1
System commits task result metadataSystemstate transitionS1
update target
TaskResult
C1
System marks stage complete and unblocks downstream stageSystemstate transitionS1
update target
StageState
C1
System retries timed-out or failed taskSystemasync processS1
hidden write target
TaskExecutionState
C1
Client or scheduler reads stage/task statusClientread sourceS1
read source target
StageState
R1
System routes job/stage shard to current ownerSystemread sourceS1
read source target
PartitionMap
C1
System reassigns shard ownership after node failureSystemstate transitionS1
update target
PartitionOwnership
C1
Client reads cluster metrics / progress dashboardClientread projectionS1
read projection target
JobStatusView
R2

Notes on normalization #

Important choices:

  • job submission is append event
    • job intent is immutable fact recording
  • runnable task creation is append event
    • task requests are materialized work items
  • executor claim is state transition
    • task moves from ready to claimed/running
  • publishing stage output is append event
    • partition artifacts are immutable outputs
  • result commit and stage completion are state transition
    • current execution and stage lifecycle change
  • retries and ownership are explicit because this is distributed infra

This system is a hybrid of:

  • claimable execution queue
  • DAG/stage orchestration
  • immutable partition artifact publication

Step 2 - Critical Path Selection #

RequirementPriority classWhy
Submit job / materialize tasksC1job intent and runnable work must not be lost
Claim task partitionC1duplicate uncontrolled execution wastes resources and breaks scheduler correctness
Publish partition outputC1downstream correctness depends on stable intermediate outputs
Commit task resultC1stage completion and downstream scheduling depend on correct result state
Mark stage complete / unblock next stageC1DAG correctness depends on dependency satisfaction
Retry timed-out or failed taskC1worker crash recovery and eventual completion depend on safe requeue
Read stage/task statusR1core serving path for scheduler and client progress
Route to shard ownerC1wrong routing can split claims and stage/task state
Reassign shard ownershipC1failover must preserve task/stage lifecycle correctness
Metrics/dashboardR2operational only

Baseline critical paths #

Main C1 paths:

  • P1 submit job and create runnable tasks
  • P2 claim task
  • P3 publish partition output
  • P4 commit task result
  • P5 complete stage and unblock downstream stage
  • P6 retry timed-out or failed task
  • P7 route to shard owner
  • P8 reassign shard ownership

Main R1 path:

  • P9 read stage/task state

This design is driven by:

  • durable job and task intent
  • one active executor claim per task attempt
  • immutable intermediate and final outputs
  • guarded stage-completion transitions

Step 3 - Primary State Extraction #

For a batch framework, the minimal primary state is the job request, stage/task definitions, task lifecycle, immutable partition outputs, stage lifecycle, and routing/ownership state.

Candidate object labelCandidate sourceCandidate needed for C1/R1?Candidate decomposition actionClassPrimary?OwnerEvolutionScope kindScope value
JobRequestdirect nounYeskeep as candidateeventYesserviceappend-onlyinstancejob_id
StageStatelifecycle objectYeskeep as candidateprocessYesservicestate machineinstancestage_id
TaskRequestdirect nounYeskeep as candidateeventYesserviceappend-onlyinstancetask_id
TaskExecutionStatelifecycle objectYeskeep as candidateprocessYesservicestate machineinstancetask_id
PartitionArtifactdirect nounYeskeep as candidateeventYesserviceappend-onlyinstanceartifact_id or partition_key
TaskResultlifecycle objectYeskeep as candidateentityYesserviceoverwriteinstancetask_id
PartitionOwnershiphidden write targetYeskeep as candidateprocessYesservicestate machineinstanceshard_id
PartitionMaphidden write targetYeskeep as candidateentityYesserviceoverwritecollectionjob/stage shards
JobStatusViewderived read modelNoreject as UI artifactprojectionNoderivedoverwritecollectionjob_id

Important modeling choices #

JobRequest #

Primary because:

  • job submission is immutable request recording
  • captures DAG/stage graph, code reference, inputs, and runtime config

StageState #

Primary because:

  • stage lifecycle drives dependency ordering
  • captures states like BLOCKED, READY, RUNNING, COMPLETED, FAILED

TaskRequest #

Primary because:

  • scheduler materializes partition-level work as distinct tasks

TaskExecutionState #

Primary because:

  • task lifecycle captures READY, CLAIMED, RUNNING, SUCCEEDED, FAILED, RETRY_PENDING

PartitionArtifact #

Primary because:

  • shuffle partitions and final outputs are immutable materialized data products for downstream consumption

TaskResult #

Primary because:

  • links task execution to produced artifacts and completion metadata

Minimal strict primary set #

The strongest minimal set is:

  • JobRequest
  • StageState
  • TaskRequest
  • TaskExecutionState
  • PartitionArtifact
  • TaskResult
  • PartitionOwnership
  • PartitionMap

Step 4 - Hard Invariants #

For a MapReduce/Spark-class framework, the hard invariants are about durable job/task existence, one active executor claim per task attempt, immutable partition outputs, and correct stage completion.

PathTierTypeInvariant statement
P1 submit job / create runnable tasksHARDuniquenessKey job request_id maps to at most one logical outcome recorded job request within job scope.
P2 claim taskHARDeligibilityAction claim_task is valid only if TaskExecutionState(task_id) is READY at decision time.
P2 claim taskHARDuniquenessKey task_id maps to at most one logical outcome active executor claim holder within claim-timeout scope.
P3 publish partition outputHARDuniquenessKey artifact_id or (stage_id, partition_id, attempt_id) maps to at most one logical outcome immutable published partition output within artifact scope.
P4 commit task resultHARDeligibilityAction commit_task_result is valid only if TaskExecutionState(task_id) is in a current successful terminalizing state and claim token matches current executor at decision time.
P4 commit task resultHARDaccountingTaskResult(task_id) references only existing partition artifacts and metadata committed for that task execution.
P5 complete stage / unblock downstream stageHARDeligibilityAction complete_stage is valid only if all required task results for stage_id are committed and current stage state allows completion at decision time.
P6 retry timed-out or failed taskHARDeligibilityAction requeue_task is valid only if TaskExecutionState(task_id) is in a retryable or expired claimed state and claim token/epoch still matches at decision time.
P7 route to shard ownerHARDuniquenessKey shard_id maps to at most one logical outcome current authoritative owner within shard_id.
P8 reassign shard ownershipHARDeligibilityAction reassign_shard is valid only if current owner is failed or relinquished and candidate owner is eligible and sufficiently current on shard_id at decision time.
P9 read stage/task stateHARDfreshnessRead path read_stage_or_task_state reflects authoritative committed state within configured consistency bound.

What matters most #

1. One active executor claim per task #

This is the execution-control invariant.

2. Partition outputs are immutable #

Downstream consumers need stable intermediate outputs once published.

3. Stage completion is guarded by all required task results #

A stage cannot complete early or unblock downstream work before dependencies are satisfied.

4. Duplicate task execution can still happen #

Under retries or speculative execution, multiple attempts may run, but only one committed result should become authoritative for the stage plan.


Step 5 - Execution Context #

For the strict baseline batch framework:

FieldValueWhy
Topologysingle service distributedone logical batch-processing system spread across schedulers and executors
Write coordination scopeper object scopecorrectness is per task_id, stage_id, artifact identity, and shard ownership scope
Read consistency targetstrong onlyclaim/commit/stage-completion paths must use authoritative state
Holder modelclientexecutors temporarily hold claimed tasks
Compensation acceptable?Nowrong stage completion or result linkage cannot be repaired safely afterward

Derived implications #

  • holder_may_crash = true

    • executors can crash while holding tasks
  • cross_service_write = false

    • baseline keeps job/task state, artifact metadata, and ownership in one logical service
  • bounded_staleness_allowed = false

    • claim/result/stage-completion paths need authoritative current state
  • cross_service_atomicity_required = false

    • no multi-service transaction across unrelated services in baseline
  • exclusive_claim_required = true

    • one executor should hold a task attempt at a time
  • guarded_by_current_state = true

    • claim, commit, retry, and stage completion depend on current state

What this implies #

This pushes us toward:

  • one authoritative owner per scheduling shard
  • explicit task execution and stage state machines
  • claim timeout modeled as lease-like executor ownership
  • immutable shuffle/output artifacts with guarded result publication

Step 6 - Deterministic Mechanism Selection #

PathWrite shapeBase mechanismRequired companions
P1 submit job / create runnable tasksappend-only eventappend logrequest id dedup
P2 claim taskexclusive claimleaseclaim token, claim timeout
P3 publish partition outputappend-only eventimmutable artifact publicationartifact identity verification
P4 commit task resultguarded state transitionCAS on (state, version) or leader-applied guarded transitionclaim token, artifact references
P5 complete stage / unblock downstream stageguarded state transitionCAS on (state, version)task completion accounting
P6 retry timed-out or failed taskguarded state transitionleader-applied guarded transitionclaim token, timeout scan, retry policy
P7 route to shard ownerexclusive claimleasefencing token, heartbeat
P8 reassign shard ownershipguarded state transitionCAS on (state, version)fencing token, shard catch-up check

Why these fit #

Job submission and task materialization #

These are immutable workload facts, so append-only fits.

Task claim #

Executor ownership is temporary and exclusive, so lease semantics fit.

Partition output publication #

Shuffle partitions and final outputs are immutable publications, so append-only artifact publication fits.

Task result commit #

Committing a task result is not a blind write. It must verify:

  • current task state
  • current claim token
  • referenced artifacts exist

So it is a guarded transition.

Stage completion #

Stage completion depends on the current completion count and dependency status, so it is a guarded transition.

Canonical substrate implied #

The baseline now points to:

  • sharded scheduler / task-queue service
  • one owner per shard
  • immutable partition artifact store
  • per-task execution lifecycle state
  • guarded task-result commit and guarded stage-completion transitions

Step 7 - Read Model / Source of Truth #

For a MapReduce/Spark-class framework, truth is mostly direct source state. Dashboards are derived.

ConceptTruthRead pathRebuild path
C1 job submissionJobRequestread source directlyauthoritative job store
C2 stage lifecycleStageStateread source directlyauthoritative stage-state store
C3 runnable task recordTaskRequestread source directlyauthoritative task store
C4 current task lifecycleTaskExecutionStateread source directlyauthoritative execution-state store
C5 immutable partition outputPartitionArtifactread source directlyauthoritative artifact store
C6 task result metadataTaskResultread source directlyauthoritative result store
C7 shard ownershipPartitionOwnershipread source directlyauthoritative ownership store
C8 shard routing mapPartitionMapread source directlyauthoritative routing metadata
C9 job/stage metricsderived from task and result statematerialized viewrecompute from authoritative state

Important point #

For the core semantics:

  • scheduler decisions read authoritative StageState, TaskExecutionState, and TaskResult
  • downstream tasks read authoritative PartitionArtifact references
  • dashboards are projections

Step 8 - Failure Handling #

PathRetryCompeting writersCrash after commitPublish failureStale holder
P1 submit job / tasksretry with request_id to avoid duplicate submissioncompeting jobs coexist; dedup matters only for same logical requestcommitted submission survives owner crash if replicated past commit pointclient may retry safely with idempotency keyn/a
P2 claim taskretry claim safely; may return another task or later same taskonly one active claim should win for a task at a timeif claim committed and executor crashes, task stays claimed until timeout or failovern/astale executor fenced by claim token/epoch
P3 publish partition outputretry artifact publish with same identityduplicate publication of same attempt collapses or is rejected by artifact identitycommitted artifact survives storage node crash if durablepartial publish rejected until artifact is completen/a
P4 commit task resultretry with same claim token and result metadatastale/wrong claim token loses guarded transitioncommitted result survives crash if replicated past commit pointif result publish fails after artifact upload, commit can be retriedstale holder cannot publish result after timeout and reassignment
P5 complete stageretry safe with current stage versiononly one completion transition should win current stage statecommitted stage completion survives crash if replicateddownstream task materialization can be retried from stage staten/a
P6 retry timed-out or failed tasktimeout/retry scan retry safeonly one requeue transition should win for current expired/retryable statescanner crash delays recovery; next scan retriesn/aold claim token becomes invalid after requeue
P7 route to shard ownerretry after refreshing shard maponly one valid owner should existif owner changed, refreshed map points to new ownern/astale owner rejected by fencing token
P8 reassign shard ownershipretry failover transition safelyonly one reassignment wins current ownership statepromoted owner crash triggers later reassignmentn/aold owner fenced and must not continue serving

What matters most #

1. Claim-token fencing #

This is the scheduler/executor safety mechanism.

Bad case:

  • executor A claims task
  • task times out
  • executor B reclaims task
  • A later finishes and tries to publish result

Without fencing:

  • A could incorrectly publish a stale result

So task-result commit must be tied to the current claim token.

2. Artifact first, result commit second #

Intermediate/final output should be durably stored before committing TaskResult.

3. Stage completion must be monotonic #

Once a stage is marked complete for the authoritative attempt set, downstream unblocking should move forward only.

4. Duplicate task execution is tolerated #

Retries or speculative execution can run multiple attempts, but only one authoritative result should be committed for scheduling purposes.


Step 9 - Scale Adjustments #

HotspotTypeFirst response
hot scheduler shard with many task claimscontention hotspotincrease shard count and isolate large jobs or hot stages
shuffle / partition output bandwidthwrite throughput hotspotseparate artifact service and add locality-aware storage/caching
stage-completion fan-incontention hotspothierarchical aggregation of task completions before final stage transition
retry storms after executor fleet issuecontention hotspotexponential backoff, jitter, and executor health draining
timeout scanswrite throughput hotspotbucket expiries and scan incrementally
progress dashboard queriesread hotspotkeep them as materialized projections only

What scales well #

This system scales by:

  • sharding scheduling and task coordination
  • giving each shard one authoritative owner
  • storing immutable shuffle/output artifacts in a separate scalable layer
  • keeping stage-completion logic incremental

What fails first #

Usually:

  • scheduler hot spots
  • shuffle bandwidth bottlenecks
  • stale-result fencing mistakes
  • large stage fan-in causing completion bottlenecks

Canonical design conclusion #

The mechanical outcome is:

  • primary state:
    • JobRequest
    • StageState
    • TaskRequest
    • TaskExecutionState
    • PartitionArtifact
    • TaskResult
    • PartitionOwnership
    • PartitionMap
  • critical invariants:
    • durable job/task submission
    • one active executor claim per task attempt
    • immutable partition outputs
    • task result valid only for current claim token and existing artifacts
    • stage completion valid only when all required task results are committed
    • timed-out or retryable tasks become runnable again safely
  • mechanisms:
    • append log
    • lease
    • immutable artifact publication
    • guarded result commit / stage-completion / retry transitions
    • fenced shard ownership
  • reads:
    • direct authoritative reads for stage/task/artifact state
    • projections only for dashboards and progress summaries

Polished interview answer #

I’d design the batch framework as a sharded scheduler plus executor fleet. Job submission records immutable job and task requests. Executors reserve partitioned tasks through a lease-like claim with a claim token and expiry, publish shuffle or final output partitions into immutable artifact storage, and then commit task results through a guarded transition that succeeds only for the current claim token and only if referenced outputs already exist. Stage completion is another guarded transition driven by committed task results, and once a stage completes the scheduler materializes downstream tasks. If an executor crashes or times out, the task is safely requeued for at-least-once execution, while status views are derived from authoritative stage and task state.


Concrete Substrate #

I’ll choose a sharded scheduler with executor fleet plus separate immutable artifact storage as the concrete baseline, because it matches the mechanics we derived:

  • append-only job/task submission
  • lease-like task claim
  • immutable partition artifact publication
  • guarded task-result commit
  • guarded stage-completion
  • one owner per shard

Concrete tech family:

  • scheduler service in Go, Java, or Scala
  • local durable metadata storage:
    • RocksDB or replicated metadata DB
  • shard replication:
    • Raft or leader-follower replication with commit index
  • artifact storage:
    • object store, HDFS-class store, or distributed blob store
  • metadata/control:
    • etcd or internal metadata quorum

Each shard leader stores:

  • task records
  • current TaskExecutionState
  • current StageState
  • ready-task queue
  • claim-expiry index
  • stage-completion accounting metadata

Artifact storage stores:

  • immutable shuffle partitions
  • immutable final outputs
  • manifests and metadata by partition identity

Operation Layer #

1. Submit job #

API

  • SubmitJob(job_request, request_id?)

Initiator

  • client

Entry point

  • gateway or scheduler frontend

Authoritative decider

  • scheduler shard owner(s)

Precondition

  • request id optional for dedup

Transition

  • append JobRequest
  • create initial StageState
  • materialize runnable TaskRequest records for source-ready stage tasks

Response

  • {job_id}

2. Claim task #

API

  • ReserveTask(worker_capabilities, max_tasks, lease_timeout)

Initiator

  • executor/client

Entry point

  • scheduler frontend or shard leader

Authoritative decider

  • shard leader owning the task

Precondition

  • task chosen must currently be READY
  • executor capabilities satisfy task requirements

Transition

  • selected task:
    • READY -> CLAIMED(executor_id, claim_token, expiry)
  • remove from ready queue
  • insert into claim-expiry index

Response

  • {tasks: [{task_id, stage_id, partition_id, input_refs, claim_token, lease_expiry}]}

3. Publish partition output #

API

  • PutPartitionArtifact(artifact_id, bytes, metadata)

Initiator

  • executor/client

Entry point

  • artifact store

Authoritative decider

  • artifact storage service

Precondition

  • artifact identity and completeness validation succeed

Transition

  • store immutable PartitionArtifact

Response

  • success / already exists

4. Commit task result #

API

  • CommitTaskResult(task_id, claim_token, result_metadata)

Initiator

  • executor/client

Entry point

  • scheduler frontend or shard leader

Authoritative decider

  • shard leader owning the task

Precondition

  • current TaskExecutionState is CLAIMED
  • claim_token matches current holder
  • referenced artifacts exist in artifact store

Transition

  • CLAIMED -> SUCCEEDED
  • overwrite TaskResult(task_id)
  • increment stage-completion accounting

Response

  • success

5. Complete stage / unblock downstream stage #

API

  • internal scheduler transition

Initiator

  • system

Entry point

  • shard leader

Authoritative decider

  • shard leader

Precondition

  • all required task results for current stage are committed
  • current StageState allows completion

Transition

  • StageState: RUNNING -> COMPLETED
  • materialize downstream TaskRequests for newly unblocked stages

6. Retry timed-out or failed task #

API

  • internal background process

Initiator

  • system

Entry point

  • shard leader

Authoritative decider

  • shard leader

Precondition

  • task is still CLAIMED and expired
  • or task is failed and retryable by policy

Transition

  • move task back to READY
  • clear current claim token
  • reinsert into ready queue

Entry Point vs Decider vs Responder #

PathEntry pointAuthoritative deciderPhysical responderLogical responder
SubmitJobgateway / frontendscheduler shard leaderleader or front nodebatch framework
ReserveTaskfrontend / shard leadershard leaderleader or front nodebatch framework
PutPartitionArtifactartifact endpointartifact storage serviceartifact nodebatch framework
CommitTaskResultfrontend / shard leadershard leaderleader or front nodebatch framework
stage completionshard leadershard leaderinternalbatch framework
retry / requeueshard leadershard leaderinternalbatch framework
shard failoverfollower / coordination layershard quorum / lease storenew leader / control planebatch framework

Concrete HLD #

Main components:

  • job submission frontend
    • receives job requests
  • scheduler shard leaders
    • authoritative owners for stage/task lifecycle
    • manage ready, claimed, and retry indexes
  • executor fleet
    • pulls runnable tasks and executes partitions
  • artifact store
    • stores immutable shuffle partitions and final outputs
  • metadata/control service
    • tracks shard ownership and routing
  • background timeout / retry workers
    • usually run on shard leaders

Short Interview Version #

I’d build the batch framework as a sharded scheduler plus executor fleet with a separate immutable artifact store. Job submission records immutable job and task requests. Executors reserve partitioned tasks through a lease-like claim with a claim token and expiry, run the computation, publish shuffle or final output partitions to immutable artifact storage, and then commit task results through a guarded transition that succeeds only for the current claim token and only if referenced outputs already exist. Stage completion is another guarded transition driven by committed task results, and once a stage completes the scheduler materializes downstream tasks. If an executor crashes or times out, the task is safely requeued for at-least-once execution, while progress views are derived from authoritative stage and task state.