Skip to main content
  1. System Design Components/

Stream Processing Framework (Flink / Spark Streaming-class)

Stream Processing Framework (Flink / Spark Streaming-class) #

This note models a Flink/Spark-Streaming-class stream processing framework where clients submit continuous jobs, operators consume partitioned input streams, maintain checkpointed state, emit output streams, and recover from failures through replay plus checkpoint restoration.


Step 1 - Normalize #

Assume the baseline prompt is:

  • design a stream processing framework
  • clients submit continuous jobs / operator graphs
  • workers consume ordered partitions from input streams
  • operators maintain state
  • outputs are continuously emitted
  • failures recover using checkpoints and replay
  • system scales across many workers

Normalize into state-affecting paths.

RequirementActorOperationState touchedPriority
Client submits streaming job / operator graphClientappend eventS1
create target
StreamingJobRequest
C1
Scheduler assigns operator partition to workerSystemstate transitionS1
update target
OperatorAssignmentState
C1
Worker consumes input and advances operator stateClientstate transitionS1
update target
OperatorState
C1
Worker emits output records or sink writesClientappend eventS1
create target
OutputRecord
C1
Worker records input progress / offsetsClientoverwrite stateS1
update target
OperatorOffsetState
C1
System triggers and commits checkpointSystemstate transitionS1
update target
CheckpointState
C1
System restores operator state after failureSystemstate transitionS1
update target
OperatorAssignmentState
C1
System routes job/operator shard to current ownerSystemread sourceS1
read source target
PartitionMap
C1
System reassigns ownership after node failureSystemstate transitionS1
update target
PartitionOwnership
C1
Client reads lag / throughput / job statusClientread projectionS1
read projection target
StreamingJobStatusView
R2

Notes on normalization #

Important choices:

  • job submission is append event
    • streaming job intent is immutable fact recording
  • assignment is state transition
    • operator partitions move between workers
  • state advancement is state transition
    • current keyed/operator state changes over time
  • output emission is append event
    • outputs/sink writes are immutable downstream facts
  • progress tracking is overwrite state
    • current offset/progress is the main truth
  • checkpoint and restore are explicit because recovery is core to stream systems

This system is a hybrid of:

  • partitioned stream consumption
  • mutable operator state
  • checkpointed recovery

Step 2 - Critical Path Selection #

RequirementPriority classWhy
Submit streaming jobC1job intent and graph must not be lost
Assign operator partition to workerC1split or stale assignment breaks processing correctness
Advance operator stateC1current processing semantics depend on correct state evolution
Emit output / sink writesC1downstream correctness depends on emitted results
Record input progress / offsetsC1replay and exactly-once/at-least-once behavior depend on correct progress tracking
Trigger and commit checkpointC1recovery correctness depends on stable snapshots
Restore state after failureC1failover correctness depends on valid restore point and reassignment
Route to current ownerC1wrong routing can split assignment and state updates
Reassign ownership after node failureC1failover must preserve processing lifecycle correctness
Lag / throughput dashboardsR2operational only

Baseline critical paths #

Main C1 paths:

  • P1 submit job
  • P2 assign operator partition
  • P3 advance operator state
  • P4 emit output
  • P5 record progress / offsets
  • P6 checkpoint commit
  • P7 restore after failure
  • P8 route to owner
  • P9 reassign ownership

This design is driven by:

  • one active worker assignment per operator partition
  • ordered partition consumption
  • checkpointed operator state and offset progress
  • guarded recovery after failure

Step 3 - Primary State Extraction #

For a stream framework, the minimal primary state is the submitted job, operator assignment and state, offset/progress state, checkpoint state, output records, and routing/ownership state.

Candidate object labelCandidate sourceCandidate needed for C1/R1?Candidate decomposition actionClassPrimary?OwnerEvolutionScope kindScope value
StreamingJobRequestdirect nounYeskeep as candidateeventYesserviceappend-onlyinstancejob_id
OperatorAssignmentStatelifecycle objectYeskeep as candidateprocessYesservicestate machineinstancejob_id + operator_partition
OperatorStatelifecycle objectYeskeep as candidateprocessYesserviceoverwriteinstancestate_scope
OperatorOffsetStatedirect nounYeskeep as candidateentityYesserviceoverwriteinstanceoperator_partition
CheckpointStatelifecycle objectYeskeep as candidateprocessYesservicestate machineinstancecheckpoint_id
OutputRecorddirect nounYeskeep as candidateeventYesserviceappend-onlyinstanceoutput_record_id
PartitionOwnershiphidden write targetYeskeep as candidateprocessYesservicestate machineinstanceshard_id
PartitionMaphidden write targetYeskeep as candidateentityYesserviceoverwritecollectionjob/operator shards
StreamingJobStatusViewderived read modelNoreject as UI artifactprojectionNoderivedoverwritecollectionjob_id

Important modeling choices #

StreamingJobRequest #

Primary because:

  • job submission is immutable request recording
  • captures operator graph, inputs, sinks, and runtime config

OperatorAssignmentState #

Primary because:

  • one worker should own a given operator partition at a time
  • captures ASSIGNED, RUNNING, FAILED, REBALANCING

OperatorState #

Primary because:

  • keyed/window/operator state is central correctness state
  • it evolves continuously and is later snapshotted

OperatorOffsetState #

Primary because:

  • current consumed offsets/progress determine replay boundary

CheckpointState #

Primary because:

  • checkpoint lifecycle governs consistent recovery points

OutputRecord #

Primary because:

  • emitted outputs are immutable downstream facts

Minimal strict primary set #

The strongest minimal set is:

  • StreamingJobRequest
  • OperatorAssignmentState
  • OperatorState
  • OperatorOffsetState
  • CheckpointState
  • OutputRecord
  • PartitionOwnership
  • PartitionMap

Step 4 - Hard Invariants #

For a Flink/Spark-Streaming-class framework, the hard invariants are about one active worker per operator partition, ordered state/progress advancement, consistent checkpoints, and valid recovery.

PathTierTypeInvariant statement
P1 submit jobHARDuniquenessKey job request_id maps to at most one logical outcome recorded streaming job request within job scope.
P2 assign operator partitionHARDuniquenessKey job_id + operator_partition maps to at most one logical outcome current active worker assignment within assignment scope.
P3 advance operator stateHARDeligibilityAction advance_operator_state is valid only if worker holds current assignment token for operator_partition and input progress is in valid order at decision time.
P4 emit outputHARDaccountingOutput emitted for operator_partition corresponds to records processed under the current committed operator state transition.
P5 record input progressHARDorderingOffset/progress updates for operator_partition are ordered by monotonic processed position within input partition scope.
P6 commit checkpointHARDeligibilityAction commit_checkpoint is valid only if all required operator snapshots and corresponding progress markers for the checkpoint barrier are present and current checkpoint state allows commit.
P7 restore after failureHARDeligibilityAction restore_operator_partition is valid only if selected checkpoint is authoritative for the job and new assignment token/epoch is current at decision time.
P8 route to ownerHARDuniquenessKey shard_id maps to at most one logical outcome current authoritative owner within shard_id.
P9 reassign ownershipHARDeligibilityAction reassign_shard is valid only if current owner is failed or relinquished and candidate owner is eligible and sufficiently current on shard_id at decision time.

What matters most #

1. One active assignment per operator partition #

This prevents split processing and conflicting state writes.

2. State and progress must move forward together #

Operator state and input offsets define the logical processing frontier.

3. Checkpoints must represent a consistent cut #

Recovery depends on restoring operator state and offsets from the same committed checkpoint.

4. Recovery must be fenced #

A stale worker must not continue emitting output after the partition has been reassigned.


Step 5 - Execution Context #

For the strict baseline stream framework:

FieldValueWhy
Topologysingle service distributedone logical stream-processing system spread across schedulers and workers
Write coordination scopeper object scopecorrectness is per operator partition, state scope, checkpoint, and shard ownership scope
Read consistency targetstrong onlyassignment, checkpoint, and recovery paths must use authoritative state
Holder modelclientworkers temporarily hold active operator-partition assignments
Compensation acceptable?Nowrong checkpoint or split assignment cannot be safely repaired afterward

Derived implications #

  • holder_may_crash = true

    • workers can crash while holding operator partitions
  • cross_service_write = false

    • baseline keeps assignment, checkpoint, and state metadata within one logical service
  • bounded_staleness_allowed = false

    • assignment/checkpoint/recovery paths need authoritative current state
  • cross_service_atomicity_required = false

    • no multi-service transaction across unrelated services in baseline
  • exclusive_claim_required = true

    • one worker should own an operator partition at a time
  • guarded_by_current_state = true

    • state advance, checkpoint, and recovery depend on current assignment and progress state

What this implies #

This pushes us toward:

  • one authoritative owner per operator shard
  • explicit assignment, state, and checkpoint state machines
  • lease-like worker ownership for operator partitions
  • checkpointed state and offset metadata for recovery

Step 6 - Deterministic Mechanism Selection #

PathWrite shapeBase mechanismRequired companions
P1 submit jobappend-only eventappend logrequest id dedup
P2 assign operator partitionexclusive claimleaseassignment token, heartbeat
P3 advance operator stateguarded state transitionsingle writer per operator partitionassignment token, ordered input progress
P4 emit outputappend-only eventappend log or sink appendsink semantics / idempotency key
P5 record progress / offsetsoverwrite current valueCAS on version or monotonic overwriteprocessed offset/version
P6 commit checkpointguarded state transitionbarrier-synchronized checkpoint commitcheckpoint epoch, snapshot references
P7 restore after failureguarded state transitionCAS on (assignment, checkpoint version)assignment token, checkpoint epoch
P8 route to ownerexclusive claimleasefencing token, heartbeat
P9 reassign ownershipguarded state transitionCAS on (state, version)fencing token, shard catch-up check

Why these fit #

Assignment #

Operator partition ownership is temporary and exclusive, so lease semantics fit.

State advancement #

For a given operator partition, state must be advanced by one current worker in input order, so single-writer per partition fits.

Output #

Output records or sink writes are immutable downstream facts, so append-only fits. Exactly-once sink behavior may require sink-side idempotency or transaction support.

Offsets #

Current progress is current-value state, so overwrite with monotonic/version discipline fits.

Checkpoint commit #

Checkpoint completion is a guarded transition that only succeeds when all required operator snapshots for the barrier are present.

Canonical substrate implied #

The baseline now points to:

  • sharded job manager / scheduler
  • one active worker per operator partition
  • mutable operator state plus monotonic offset state
  • checkpoint coordinator and snapshot store
  • append-only output emission

Step 7 - Read Model / Source of Truth #

For a Flink/Spark-Streaming-class framework, truth is mostly direct source state. Dashboards are derived.

ConceptTruthRead pathRebuild path
C1 job submissionStreamingJobRequestread source directlyauthoritative job store
C2 operator assignmentOperatorAssignmentStateread source directlyauthoritative assignment store
C3 current operator stateOperatorStateread source directlyauthoritative state store or latest committed checkpoint plus replay
C4 current input progressOperatorOffsetStateread source directlyauthoritative progress store or latest committed checkpoint plus replay
C5 checkpoint lifecycleCheckpointStateread source directlyauthoritative checkpoint coordinator store
C6 output records / sink effectsOutputRecord or sink recordread source directlyauthoritative output/sink store
C7 shard ownershipPartitionOwnershipread source directlyauthoritative ownership store
C8 shard routing mapPartitionMapread source directlyauthoritative routing metadata
C9 lag / throughput / dashboardsderived from assignment, progress, and outputsmaterialized viewrecompute from authoritative state

Important point #

For the core semantics:

  • recovery reads authoritative CheckpointState, OperatorState, and OperatorOffsetState
  • assignment uses authoritative ownership state
  • dashboards are projections

Step 8 - Failure Handling #

PathRetryCompeting writersCrash after commitPublish failureStale holder
P1 submit jobretry with request_id to avoid duplicate submissioncompeting jobs coexist; dedup matters only for same logical requestcommitted submission survives owner crash if replicated past commit pointclient may retry safely with idempotency keyn/a
P2 assign operator partitionretry assignment safelyonly one active assignment should win for an operator partition at a timeif assignment committed and worker crashes, partition stays assigned until timeout or failovern/astale worker fenced by assignment token/epoch
P3 advance operator stateworker retries local processing on restart from checkpoint/replayonly current assigned worker may commit authoritative state advancecommitted state survives crash if checkpointed or durable enough for replay boundarysink write may need idempotency/transaction disciplinestale worker cannot advance after reassignment
P4 emit outputsink retry depends on sink semanticsduplicate output possible unless sink supports idempotency or exactly-once transactionscommitted output survives sink crash if durablefailed sink publish may require replay from checkpointstale worker fenced by assignment token/checkpoint epoch
P5 record progressretry with monotonic version/offsetstale progress write loses monotonic checkcommitted progress survives crash if persisted or checkpointedn/astale worker cannot advance after reassignment
P6 checkpoint commitretry checkpoint barrier/commit safelyonly one checkpoint commit should win current checkpoint epochcommitted checkpoint survives coordinator crash if persistedpartial checkpoint stays uncommitted and is ignoredn/a
P7 restore after failurerestore can retry from authoritative latest completed checkpointonly one reassignment/restore should win for current operator partitionrestored worker crash triggers later recoveryn/aold worker fenced by higher assignment epoch
P8 route to ownerretry after refreshing shard maponly one valid owner should existif owner changed, refreshed map points to new ownern/astale owner rejected by fencing token
P9 reassign ownershipretry failover transition safelyonly one reassignment wins current ownership statepromoted owner crash triggers later reassignmentn/aold owner fenced and must not continue serving

What matters most #

1. Assignment-token fencing #

This is the worker safety mechanism.

Bad case:

  • worker A owns operator partition
  • failover reassigns partition to worker B
  • A later continues writing output and offsets

Without fencing:

  • split processing corrupts state and sink semantics

So state/progress/output commits must be tied to the current assignment token.

2. Checkpoint and progress must align #

Recovery is only correct if restored state and input offsets come from the same completed checkpoint.

3. Output semantics depend on sink design #

At-least-once is easier; exactly-once requires idempotent or transactional sinks tied to checkpoint completion.


Step 9 - Scale Adjustments #

HotspotTypeFirst response
hot input partition or operator key spacecontention hotspotincrease partition count and rebalance keyed state
checkpoint coordination overheadcontention hotspotincremental checkpoints and hierarchical checkpoint aggregation
large state snapshotswrite throughput hotspotincremental snapshotting and separate scalable state backend
sink bandwidth bottleneckswrite throughput hotspotbuffered async sinks and sink partitioning
failover/rebalance churncontention hotspotstabilize leases and reduce unnecessary reassignments
lag/metrics queriesread hotspotkeep them as materialized projections only

What scales well #

This system scales by:

  • partitioning input streams and operator instances
  • giving each operator partition one active worker
  • separating state backend and checkpoint storage from hot processing
  • using incremental checkpoints and scalable sinks

What fails first #

Usually:

  • hot keyed partitions
  • large checkpoint overhead
  • sink bottlenecks
  • stale-worker fencing mistakes

Canonical design conclusion #

The mechanical outcome is:

  • primary state:
    • StreamingJobRequest
    • OperatorAssignmentState
    • OperatorState
    • OperatorOffsetState
    • CheckpointState
    • OutputRecord
    • PartitionOwnership
    • PartitionMap
  • critical invariants:
    • one active worker assignment per operator partition
    • ordered state and progress advancement by current owner
    • checkpoint commit valid only for a consistent barrier-aligned snapshot
    • recovery valid only from authoritative committed checkpoint plus current assignment token
    • output commits fenced by current assignment/checkpoint semantics
  • mechanisms:
    • append log
    • lease
    • single writer per operator partition
    • guarded checkpoint / restore transitions
    • fenced shard ownership
  • reads:
    • direct authoritative reads for assignment, progress, and checkpoint state
    • projections only for lag and throughput dashboards

Polished interview answer #

I’d design the stream framework as a sharded job manager plus worker fleet. Each operator partition has one active worker at a time, enforced through a lease-like assignment token. That worker consumes ordered input partitions, advances operator state, records monotonic input progress, and emits outputs continuously. Recovery is built around checkpoints: the coordinator triggers barrier-aligned snapshots, commits a checkpoint only when all required operator snapshots and progress markers are present, and on failure reassigns the operator partition to a new worker which restores state and offsets from the latest committed checkpoint before replaying input. The main scaling levers are more input partitions, scalable state backends, incremental checkpoints, and well-partitioned sinks.


Concrete Substrate #

I’ll choose a sharded job manager with worker fleet plus separate checkpoint/state backend as the concrete baseline, because it matches the mechanics we derived:

  • append-only job submission
  • lease-like operator assignment
  • single-writer operator state advancement
  • guarded checkpoint commit
  • guarded restore after failure

Concrete tech family:

  • job manager / coordinator in Java, Scala, or Go
  • metadata storage:
    • replicated metadata DB or RocksDB-backed service state
  • shard replication:
    • Raft or leader-follower replication for coordinator metadata
  • state backend:
    • local RocksDB state plus remote checkpoint store
  • checkpoint store:
    • object storage or distributed blob store
  • input streams:
    • Kafka-class partitioned log or equivalent

Each coordinator shard stores:

  • job records
  • operator assignments
  • current progress metadata
  • checkpoint lifecycle state

Workers store:

  • active in-memory or local-disk operator state
  • temporary checkpoint snapshots before remote commit

Remote checkpoint store stores:

  • completed state snapshots
  • checkpoint manifests
  • barrier/epoch metadata

Operation Layer #

1. Submit streaming job #

API

  • SubmitStreamingJob(job_request, request_id?)

Initiator

  • client

Entry point

  • gateway or job-manager frontend

Authoritative decider

  • coordinator shard owner(s)

Precondition

  • request id optional for dedup

Transition

  • append StreamingJobRequest
  • create initial OperatorAssignmentState
  • assign source operator partitions to workers

Response

  • {job_id}

2. Assign operator partition #

API

  • internal scheduling / rebalance flow

Initiator

  • system

Entry point

  • job manager

Authoritative decider

  • coordinator shard leader

Precondition

  • operator partition unassigned or failed/rebalancing

Transition

  • set OperatorAssignmentState(operator_partition) = ASSIGNED(worker_id, assignment_token, expiry)

3. Advance operator state #

API

  • internal worker processing loop

Initiator

  • worker

Entry point

  • worker local runtime plus state backend

Authoritative decider

  • current assignment owner under job manager token

Precondition

  • worker holds current assignment token
  • input offsets are next valid positions

Transition

  • apply records to OperatorState
  • emit outputs
  • advance OperatorOffsetState

4. Trigger and commit checkpoint #

API

  • internal checkpoint coordinator flow

Initiator

  • system

Entry point

  • job manager

Authoritative decider

  • checkpoint coordinator

Precondition

  • current checkpoint epoch active
  • all required operators have acknowledged barrier and persisted snapshot references

Transition

  • CheckpointState: TRIGGERED -> COMPLETED
  • checkpoint becomes authoritative restore point

5. Restore after failure #

API

  • internal failover / restore flow

Initiator

  • system

Entry point

  • job manager

Authoritative decider

  • coordinator shard leader

Precondition

  • worker failed or partition reassigned
  • latest completed checkpoint exists

Transition

  • assign new worker with new assignment token
  • restore OperatorState and OperatorOffsetState from latest committed checkpoint
  • resume input consumption from checkpointed progress

Entry Point vs Decider vs Responder #

PathEntry pointAuthoritative deciderPhysical responderLogical responder
SubmitStreamingJobgateway / frontendcoordinator shard leaderleader or front nodestream framework
operator assignmentjob managercoordinator shard leadercontrol-plane nodestream framework
state advance / outputworker runtimecurrent assignment token plus coordinator authorityworker / sinkstream framework
checkpoint commitjob managercheckpoint coordinatorcontrol-plane nodestream framework
restore after failurejob managercoordinator shard leadercontrol-plane nodestream framework
shard failoverfollower / coordination layershard quorum / lease storenew leader / control planestream framework

Concrete HLD #

Main components:

  • job submission frontend
    • receives streaming job requests
  • job manager / coordinator shard leaders
    • authoritative owners for assignment, progress, and checkpoint lifecycle
  • worker fleet
    • consumes partitions, advances operator state, emits outputs
  • state backend
    • stores local or remote operator state snapshots
  • checkpoint store
    • stores completed checkpoint manifests and snapshots
  • input log system
    • provides ordered partitioned input replay
  • sinks / output streams
    • receive emitted records or side effects

Short Interview Version #

I’d build the stream framework as a sharded job manager plus worker fleet. Each operator partition has one active worker at a time, enforced through a lease-like assignment token. That worker consumes ordered input partitions, advances operator state, records monotonic progress, and emits outputs continuously. Recovery is built around checkpoints: the coordinator triggers barrier-aligned snapshots, commits a checkpoint only when all required operator snapshots and progress markers are present, and on failure reassigns the operator partition to a new worker which restores state and offsets from the latest committed checkpoint before replaying input. The main scaling levers are more input partitions, scalable state backends, incremental checkpoints, and well-partitioned sinks.