- My Development Notes/
- System Design Components/
- Stream Processing Framework (Flink / Spark Streaming-class)/
Stream Processing Framework (Flink / Spark Streaming-class)
Stream Processing Framework (Flink / Spark Streaming-class) #
This note models a Flink/Spark-Streaming-class stream processing framework where clients submit continuous jobs, operators consume partitioned input streams, maintain checkpointed state, emit output streams, and recover from failures through replay plus checkpoint restoration.
Step 1 - Normalize #
Assume the baseline prompt is:
- design a stream processing framework
- clients submit continuous jobs / operator graphs
- workers consume ordered partitions from input streams
- operators maintain state
- outputs are continuously emitted
- failures recover using checkpoints and replay
- system scales across many workers
Normalize into state-affecting paths.
| Requirement | Actor | Operation | State touched | Priority |
|---|---|---|---|---|
| Client submits streaming job / operator graph | Client | append event | S1create targetStreamingJobRequest | C1 |
| Scheduler assigns operator partition to worker | System | state transition | S1update targetOperatorAssignmentState | C1 |
| Worker consumes input and advances operator state | Client | state transition | S1update targetOperatorState | C1 |
| Worker emits output records or sink writes | Client | append event | S1create targetOutputRecord | C1 |
| Worker records input progress / offsets | Client | overwrite state | S1update targetOperatorOffsetState | C1 |
| System triggers and commits checkpoint | System | state transition | S1update targetCheckpointState | C1 |
| System restores operator state after failure | System | state transition | S1update targetOperatorAssignmentState | C1 |
| System routes job/operator shard to current owner | System | read source | S1read source targetPartitionMap | C1 |
| System reassigns ownership after node failure | System | state transition | S1update targetPartitionOwnership | C1 |
| Client reads lag / throughput / job status | Client | read projection | S1read projection targetStreamingJobStatusView | R2 |
Notes on normalization #
Important choices:
- job submission is
append event- streaming job intent is immutable fact recording
- assignment is
state transition- operator partitions move between workers
- state advancement is
state transition- current keyed/operator state changes over time
- output emission is
append event- outputs/sink writes are immutable downstream facts
- progress tracking is
overwrite state- current offset/progress is the main truth
- checkpoint and restore are explicit because recovery is core to stream systems
This system is a hybrid of:
partitioned stream consumptionmutable operator statecheckpointed recovery
Step 2 - Critical Path Selection #
| Requirement | Priority class | Why |
|---|---|---|
| Submit streaming job | C1 | job intent and graph must not be lost |
| Assign operator partition to worker | C1 | split or stale assignment breaks processing correctness |
| Advance operator state | C1 | current processing semantics depend on correct state evolution |
| Emit output / sink writes | C1 | downstream correctness depends on emitted results |
| Record input progress / offsets | C1 | replay and exactly-once/at-least-once behavior depend on correct progress tracking |
| Trigger and commit checkpoint | C1 | recovery correctness depends on stable snapshots |
| Restore state after failure | C1 | failover correctness depends on valid restore point and reassignment |
| Route to current owner | C1 | wrong routing can split assignment and state updates |
| Reassign ownership after node failure | C1 | failover must preserve processing lifecycle correctness |
| Lag / throughput dashboards | R2 | operational only |
Baseline critical paths #
Main C1 paths:
P1submit jobP2assign operator partitionP3advance operator stateP4emit outputP5record progress / offsetsP6checkpoint commitP7restore after failureP8route to ownerP9reassign ownership
This design is driven by:
- one active worker assignment per operator partition
- ordered partition consumption
- checkpointed operator state and offset progress
- guarded recovery after failure
Step 3 - Primary State Extraction #
For a stream framework, the minimal primary state is the submitted job, operator assignment and state, offset/progress state, checkpoint state, output records, and routing/ownership state.
| Candidate object label | Candidate source | Candidate needed for C1/R1? | Candidate decomposition action | Class | Primary? | Owner | Evolution | Scope kind | Scope value |
|---|---|---|---|---|---|---|---|---|---|
| StreamingJobRequest | direct noun | Yes | keep as candidate | event | Yes | service | append-only | instance | job_id |
| OperatorAssignmentState | lifecycle object | Yes | keep as candidate | process | Yes | service | state machine | instance | job_id + operator_partition |
| OperatorState | lifecycle object | Yes | keep as candidate | process | Yes | service | overwrite | instance | state_scope |
| OperatorOffsetState | direct noun | Yes | keep as candidate | entity | Yes | service | overwrite | instance | operator_partition |
| CheckpointState | lifecycle object | Yes | keep as candidate | process | Yes | service | state machine | instance | checkpoint_id |
| OutputRecord | direct noun | Yes | keep as candidate | event | Yes | service | append-only | instance | output_record_id |
| PartitionOwnership | hidden write target | Yes | keep as candidate | process | Yes | service | state machine | instance | shard_id |
| PartitionMap | hidden write target | Yes | keep as candidate | entity | Yes | service | overwrite | collection | job/operator shards |
| StreamingJobStatusView | derived read model | No | reject as UI artifact | projection | No | derived | overwrite | collection | job_id |
Important modeling choices #
StreamingJobRequest #
Primary because:
- job submission is immutable request recording
- captures operator graph, inputs, sinks, and runtime config
OperatorAssignmentState #
Primary because:
- one worker should own a given operator partition at a time
- captures
ASSIGNED,RUNNING,FAILED,REBALANCING
OperatorState #
Primary because:
- keyed/window/operator state is central correctness state
- it evolves continuously and is later snapshotted
OperatorOffsetState #
Primary because:
- current consumed offsets/progress determine replay boundary
CheckpointState #
Primary because:
- checkpoint lifecycle governs consistent recovery points
OutputRecord #
Primary because:
- emitted outputs are immutable downstream facts
Minimal strict primary set #
The strongest minimal set is:
StreamingJobRequestOperatorAssignmentStateOperatorStateOperatorOffsetStateCheckpointStateOutputRecordPartitionOwnershipPartitionMap
Step 4 - Hard Invariants #
For a Flink/Spark-Streaming-class framework, the hard invariants are about one active worker per operator partition, ordered state/progress advancement, consistent checkpoints, and valid recovery.
| Path | Tier | Type | Invariant statement |
|---|---|---|---|
P1 submit job | HARD | uniqueness | Key job request_id maps to at most one logical outcome recorded streaming job request within job scope. |
P2 assign operator partition | HARD | uniqueness | Key job_id + operator_partition maps to at most one logical outcome current active worker assignment within assignment scope. |
P3 advance operator state | HARD | eligibility | Action advance_operator_state is valid only if worker holds current assignment token for operator_partition and input progress is in valid order at decision time. |
P4 emit output | HARD | accounting | Output emitted for operator_partition corresponds to records processed under the current committed operator state transition. |
P5 record input progress | HARD | ordering | Offset/progress updates for operator_partition are ordered by monotonic processed position within input partition scope. |
P6 commit checkpoint | HARD | eligibility | Action commit_checkpoint is valid only if all required operator snapshots and corresponding progress markers for the checkpoint barrier are present and current checkpoint state allows commit. |
P7 restore after failure | HARD | eligibility | Action restore_operator_partition is valid only if selected checkpoint is authoritative for the job and new assignment token/epoch is current at decision time. |
P8 route to owner | HARD | uniqueness | Key shard_id maps to at most one logical outcome current authoritative owner within shard_id. |
P9 reassign ownership | HARD | eligibility | Action reassign_shard is valid only if current owner is failed or relinquished and candidate owner is eligible and sufficiently current on shard_id at decision time. |
What matters most #
1. One active assignment per operator partition #
This prevents split processing and conflicting state writes.
2. State and progress must move forward together #
Operator state and input offsets define the logical processing frontier.
3. Checkpoints must represent a consistent cut #
Recovery depends on restoring operator state and offsets from the same committed checkpoint.
4. Recovery must be fenced #
A stale worker must not continue emitting output after the partition has been reassigned.
Step 5 - Execution Context #
For the strict baseline stream framework:
| Field | Value | Why |
|---|---|---|
| Topology | single service distributed | one logical stream-processing system spread across schedulers and workers |
| Write coordination scope | per object scope | correctness is per operator partition, state scope, checkpoint, and shard ownership scope |
| Read consistency target | strong only | assignment, checkpoint, and recovery paths must use authoritative state |
| Holder model | client | workers temporarily hold active operator-partition assignments |
| Compensation acceptable? | No | wrong checkpoint or split assignment cannot be safely repaired afterward |
Derived implications #
holder_may_crash = true- workers can crash while holding operator partitions
cross_service_write = false- baseline keeps assignment, checkpoint, and state metadata within one logical service
bounded_staleness_allowed = false- assignment/checkpoint/recovery paths need authoritative current state
cross_service_atomicity_required = false- no multi-service transaction across unrelated services in baseline
exclusive_claim_required = true- one worker should own an operator partition at a time
guarded_by_current_state = true- state advance, checkpoint, and recovery depend on current assignment and progress state
What this implies #
This pushes us toward:
- one authoritative owner per operator shard
- explicit assignment, state, and checkpoint state machines
- lease-like worker ownership for operator partitions
- checkpointed state and offset metadata for recovery
Step 6 - Deterministic Mechanism Selection #
| Path | Write shape | Base mechanism | Required companions |
|---|---|---|---|
P1 submit job | append-only event | append log | request id dedup |
P2 assign operator partition | exclusive claim | lease | assignment token, heartbeat |
P3 advance operator state | guarded state transition | single writer per operator partition | assignment token, ordered input progress |
P4 emit output | append-only event | append log or sink append | sink semantics / idempotency key |
P5 record progress / offsets | overwrite current value | CAS on version or monotonic overwrite | processed offset/version |
P6 commit checkpoint | guarded state transition | barrier-synchronized checkpoint commit | checkpoint epoch, snapshot references |
P7 restore after failure | guarded state transition | CAS on (assignment, checkpoint version) | assignment token, checkpoint epoch |
P8 route to owner | exclusive claim | lease | fencing token, heartbeat |
P9 reassign ownership | guarded state transition | CAS on (state, version) | fencing token, shard catch-up check |
Why these fit #
Assignment #
Operator partition ownership is temporary and exclusive, so lease semantics fit.
State advancement #
For a given operator partition, state must be advanced by one current worker in input order, so single-writer per partition fits.
Output #
Output records or sink writes are immutable downstream facts, so append-only fits. Exactly-once sink behavior may require sink-side idempotency or transaction support.
Offsets #
Current progress is current-value state, so overwrite with monotonic/version discipline fits.
Checkpoint commit #
Checkpoint completion is a guarded transition that only succeeds when all required operator snapshots for the barrier are present.
Canonical substrate implied #
The baseline now points to:
- sharded job manager / scheduler
- one active worker per operator partition
- mutable operator state plus monotonic offset state
- checkpoint coordinator and snapshot store
- append-only output emission
Step 7 - Read Model / Source of Truth #
For a Flink/Spark-Streaming-class framework, truth is mostly direct source state. Dashboards are derived.
| Concept | Truth | Read path | Rebuild path |
|---|---|---|---|
C1 job submission | StreamingJobRequest | read source directly | authoritative job store |
C2 operator assignment | OperatorAssignmentState | read source directly | authoritative assignment store |
C3 current operator state | OperatorState | read source directly | authoritative state store or latest committed checkpoint plus replay |
C4 current input progress | OperatorOffsetState | read source directly | authoritative progress store or latest committed checkpoint plus replay |
C5 checkpoint lifecycle | CheckpointState | read source directly | authoritative checkpoint coordinator store |
C6 output records / sink effects | OutputRecord or sink record | read source directly | authoritative output/sink store |
C7 shard ownership | PartitionOwnership | read source directly | authoritative ownership store |
C8 shard routing map | PartitionMap | read source directly | authoritative routing metadata |
C9 lag / throughput / dashboards | derived from assignment, progress, and outputs | materialized view | recompute from authoritative state |
Important point #
For the core semantics:
- recovery reads authoritative
CheckpointState,OperatorState, andOperatorOffsetState - assignment uses authoritative ownership state
- dashboards are projections
Step 8 - Failure Handling #
| Path | Retry | Competing writers | Crash after commit | Publish failure | Stale holder |
|---|---|---|---|---|---|
P1 submit job | retry with request_id to avoid duplicate submission | competing jobs coexist; dedup matters only for same logical request | committed submission survives owner crash if replicated past commit point | client may retry safely with idempotency key | n/a |
P2 assign operator partition | retry assignment safely | only one active assignment should win for an operator partition at a time | if assignment committed and worker crashes, partition stays assigned until timeout or failover | n/a | stale worker fenced by assignment token/epoch |
P3 advance operator state | worker retries local processing on restart from checkpoint/replay | only current assigned worker may commit authoritative state advance | committed state survives crash if checkpointed or durable enough for replay boundary | sink write may need idempotency/transaction discipline | stale worker cannot advance after reassignment |
P4 emit output | sink retry depends on sink semantics | duplicate output possible unless sink supports idempotency or exactly-once transactions | committed output survives sink crash if durable | failed sink publish may require replay from checkpoint | stale worker fenced by assignment token/checkpoint epoch |
P5 record progress | retry with monotonic version/offset | stale progress write loses monotonic check | committed progress survives crash if persisted or checkpointed | n/a | stale worker cannot advance after reassignment |
P6 checkpoint commit | retry checkpoint barrier/commit safely | only one checkpoint commit should win current checkpoint epoch | committed checkpoint survives coordinator crash if persisted | partial checkpoint stays uncommitted and is ignored | n/a |
P7 restore after failure | restore can retry from authoritative latest completed checkpoint | only one reassignment/restore should win for current operator partition | restored worker crash triggers later recovery | n/a | old worker fenced by higher assignment epoch |
P8 route to owner | retry after refreshing shard map | only one valid owner should exist | if owner changed, refreshed map points to new owner | n/a | stale owner rejected by fencing token |
P9 reassign ownership | retry failover transition safely | only one reassignment wins current ownership state | promoted owner crash triggers later reassignment | n/a | old owner fenced and must not continue serving |
What matters most #
1. Assignment-token fencing #
This is the worker safety mechanism.
Bad case:
- worker A owns operator partition
- failover reassigns partition to worker B
- A later continues writing output and offsets
Without fencing:
- split processing corrupts state and sink semantics
So state/progress/output commits must be tied to the current assignment token.
2. Checkpoint and progress must align #
Recovery is only correct if restored state and input offsets come from the same completed checkpoint.
3. Output semantics depend on sink design #
At-least-once is easier; exactly-once requires idempotent or transactional sinks tied to checkpoint completion.
Step 9 - Scale Adjustments #
| Hotspot | Type | First response |
|---|---|---|
| hot input partition or operator key space | contention hotspot | increase partition count and rebalance keyed state |
| checkpoint coordination overhead | contention hotspot | incremental checkpoints and hierarchical checkpoint aggregation |
| large state snapshots | write throughput hotspot | incremental snapshotting and separate scalable state backend |
| sink bandwidth bottlenecks | write throughput hotspot | buffered async sinks and sink partitioning |
| failover/rebalance churn | contention hotspot | stabilize leases and reduce unnecessary reassignments |
| lag/metrics queries | read hotspot | keep them as materialized projections only |
What scales well #
This system scales by:
- partitioning input streams and operator instances
- giving each operator partition one active worker
- separating state backend and checkpoint storage from hot processing
- using incremental checkpoints and scalable sinks
What fails first #
Usually:
- hot keyed partitions
- large checkpoint overhead
- sink bottlenecks
- stale-worker fencing mistakes
Canonical design conclusion #
The mechanical outcome is:
- primary state:
StreamingJobRequestOperatorAssignmentStateOperatorStateOperatorOffsetStateCheckpointStateOutputRecordPartitionOwnershipPartitionMap
- critical invariants:
- one active worker assignment per operator partition
- ordered state and progress advancement by current owner
- checkpoint commit valid only for a consistent barrier-aligned snapshot
- recovery valid only from authoritative committed checkpoint plus current assignment token
- output commits fenced by current assignment/checkpoint semantics
- mechanisms:
append loglease- single writer per operator partition
- guarded checkpoint / restore transitions
- fenced shard ownership
- reads:
- direct authoritative reads for assignment, progress, and checkpoint state
- projections only for lag and throughput dashboards
Polished interview answer #
I’d design the stream framework as a sharded job manager plus worker fleet. Each operator partition has one active worker at a time, enforced through a lease-like assignment token. That worker consumes ordered input partitions, advances operator state, records monotonic input progress, and emits outputs continuously. Recovery is built around checkpoints: the coordinator triggers barrier-aligned snapshots, commits a checkpoint only when all required operator snapshots and progress markers are present, and on failure reassigns the operator partition to a new worker which restores state and offsets from the latest committed checkpoint before replaying input. The main scaling levers are more input partitions, scalable state backends, incremental checkpoints, and well-partitioned sinks.
Concrete Substrate #
I’ll choose a sharded job manager with worker fleet plus separate checkpoint/state backend as the concrete baseline, because it matches the mechanics we derived:
- append-only job submission
- lease-like operator assignment
- single-writer operator state advancement
- guarded checkpoint commit
- guarded restore after failure
Concrete tech family:
- job manager / coordinator in
Java,Scala, orGo - metadata storage:
- replicated metadata DB or
RocksDB-backed service state
- replicated metadata DB or
- shard replication:
Raftor leader-follower replication for coordinator metadata
- state backend:
- local RocksDB state plus remote checkpoint store
- checkpoint store:
- object storage or distributed blob store
- input streams:
- Kafka-class partitioned log or equivalent
Each coordinator shard stores:
- job records
- operator assignments
- current progress metadata
- checkpoint lifecycle state
Workers store:
- active in-memory or local-disk operator state
- temporary checkpoint snapshots before remote commit
Remote checkpoint store stores:
- completed state snapshots
- checkpoint manifests
- barrier/epoch metadata
Operation Layer #
1. Submit streaming job #
API
SubmitStreamingJob(job_request, request_id?)
Initiator
- client
Entry point
- gateway or job-manager frontend
Authoritative decider
- coordinator shard owner(s)
Precondition
- request id optional for dedup
Transition
- append
StreamingJobRequest - create initial
OperatorAssignmentState - assign source operator partitions to workers
Response
{job_id}
2. Assign operator partition #
API
- internal scheduling / rebalance flow
Initiator
- system
Entry point
- job manager
Authoritative decider
- coordinator shard leader
Precondition
- operator partition unassigned or failed/rebalancing
Transition
- set
OperatorAssignmentState(operator_partition) = ASSIGNED(worker_id, assignment_token, expiry)
3. Advance operator state #
API
- internal worker processing loop
Initiator
- worker
Entry point
- worker local runtime plus state backend
Authoritative decider
- current assignment owner under job manager token
Precondition
- worker holds current assignment token
- input offsets are next valid positions
Transition
- apply records to
OperatorState - emit outputs
- advance
OperatorOffsetState
4. Trigger and commit checkpoint #
API
- internal checkpoint coordinator flow
Initiator
- system
Entry point
- job manager
Authoritative decider
- checkpoint coordinator
Precondition
- current checkpoint epoch active
- all required operators have acknowledged barrier and persisted snapshot references
Transition
CheckpointState: TRIGGERED -> COMPLETED- checkpoint becomes authoritative restore point
5. Restore after failure #
API
- internal failover / restore flow
Initiator
- system
Entry point
- job manager
Authoritative decider
- coordinator shard leader
Precondition
- worker failed or partition reassigned
- latest completed checkpoint exists
Transition
- assign new worker with new assignment token
- restore
OperatorStateandOperatorOffsetStatefrom latest committed checkpoint - resume input consumption from checkpointed progress
Entry Point vs Decider vs Responder #
| Path | Entry point | Authoritative decider | Physical responder | Logical responder |
|---|---|---|---|---|
SubmitStreamingJob | gateway / frontend | coordinator shard leader | leader or front node | stream framework |
| operator assignment | job manager | coordinator shard leader | control-plane node | stream framework |
| state advance / output | worker runtime | current assignment token plus coordinator authority | worker / sink | stream framework |
| checkpoint commit | job manager | checkpoint coordinator | control-plane node | stream framework |
| restore after failure | job manager | coordinator shard leader | control-plane node | stream framework |
| shard failover | follower / coordination layer | shard quorum / lease store | new leader / control plane | stream framework |
Concrete HLD #
Main components:
- job submission frontend
- receives streaming job requests
- job manager / coordinator shard leaders
- authoritative owners for assignment, progress, and checkpoint lifecycle
- worker fleet
- consumes partitions, advances operator state, emits outputs
- state backend
- stores local or remote operator state snapshots
- checkpoint store
- stores completed checkpoint manifests and snapshots
- input log system
- provides ordered partitioned input replay
- sinks / output streams
- receive emitted records or side effects
Short Interview Version #
I’d build the stream framework as a sharded job manager plus worker fleet. Each operator partition has one active worker at a time, enforced through a lease-like assignment token. That worker consumes ordered input partitions, advances operator state, records monotonic progress, and emits outputs continuously. Recovery is built around checkpoints: the coordinator triggers barrier-aligned snapshots, commits a checkpoint only when all required operator snapshots and progress markers are present, and on failure reassigns the operator partition to a new worker which restores state and offsets from the latest committed checkpoint before replaying input. The main scaling levers are more input partitions, scalable state backends, incremental checkpoints, and well-partitioned sinks.