- My Development Notes/
- Distributed Logs: From Theory to Production/
- Integration & The Connect Ecosystem/
Integration & The Connect Ecosystem
Table of Contents
Integration as Engineering Surface, Not Glue Code #
Integration logic is the dark matter of data architectures — invisible in system diagrams, massive in aggregate, and holding everything together. Every team eventually writes small services to consume from a topic, transform records, and write to a database. These services accumulate boilerplate: graceful shutdown, backpressure, retry logic, metric instrumentation. Over time, the operational concerns overshadow the actual business logic.
A declarative integration runtime (Kafka Connect, Redpanda Connect) is the answer to this accumulation. It is not a stream processing engine — it does not do windowed aggregations or stateful joins. It is a configuration-driven, stateless integration runtime designed to move data from A to B with transformations, resilient backpressure, and minimal operator overhead. It treats data movement as a commodity problem solved by configuration, reserving custom code only for complex domain logic.
The Pipeline Abstraction: Input → Processing → Output #
The core mental model is a directed graph of three stages: Input, Processing, and Output.
input:
kafka:
addresses: [ "broker-0:9092" ]
topics: [ "logs_raw" ]
consumer_group: "connect_group_1"
pipeline:
processors:
- mapping: |
root = this
root.processed_at = now()
root.host = meta("kafka_topic")
- catch:
- log:
level: ERROR
message: "Processing failed: ${! error() }"
output:
aws_s3:
bucket: "company-logs"
path: "${! meta(\"kafka_topic\") }/${! timestamp_unix() }.json"
batching:
count: 100
period: 5s
This configuration is not a script — it is a declaration of intent. The runtime handles the execution loop, threading, and error propagation.
Inputs and the transaction model. An input is not just a data source — it is an acknowledgment controller. A message is not considered “processed” until it has successfully passed through the pipeline and been acknowledged by the output. When the Kafka input consumes a message, it creates an in-memory transaction. This transaction travels through the pipeline. Only when the output successfully writes does the input commit the offset. If the process crashes mid-pipeline, the offset is never committed, and the broker redelivers the message on restart. This synchronous acknowledgment chain guarantees at-least-once delivery by construction — no additional logic required.
Stateless processing and Bloblang. The pipeline section defines a list of processors that are strictly stateless: each processor receives a message, transforms it, and passes it to the next step. It has no access to a local state store. This limitation is intentional — it ensures every instance is ephemeral and fungible. The primary transformation language is Bloblang, a purpose-built mapping language designed for safe, allocation-free JSON manipulation:
# Remap fields, generate IDs, and enrich metadata
root.id = this.user_id
root.contact.email = this.email.lowercase()
root.metadata.source = meta("kafka_topic")
root.ingest_ts = now()
# Conditional logic
root.status = if this.age > 18 { "adult" } else { "minor" }
# Drop sensitive fields
root.password = deleted()
Bloblang executes within the pipeline threads and is sandboxed to prevent side effects. Unlike general-purpose scripting, it cannot make network calls or access external state — enrichment that requires a database lookup uses a dedicated cache or http processor with explicit timeout and retry semantics.
Operational Boundaries: What Stateless Means in Practice #
No local state across restarts. Unlike Kafka Streams, which may maintain a local RocksDB for joins, a stateless connect runtime buffers in-flight data only. Any instance can be killed and replaced with a fresh one without recovery — which makes it ideal for Kubernetes ReplicaSets and auto-scaling groups.
Concurrency and ordering. Parallelism is controlled via pipeline.threads:
threads: 1— messages are processed sequentially. Ordering is strictly preserved from input to output. This is mandatory for CDC events whereUPDATEmust followINSERT.threads: > 1— messages are processed in parallel. The input reads sequentially, but the processing stage may reorder messages. For high-throughput stateless workloads (log ingestion, metric forwarding), this provides linear scaling. For order-dependent workloads, usethreads: 1or apartitionprocessor to ensure messages with the same key are processed by the same thread.
Backpressure is automatic. If the output (e.g., S3) becomes slow or returns errors, the output component stops acknowledging messages. This pressure propagates backward: the pipeline stops accepting new messages, and eventually the input stops consuming from the broker. Unlike ad-hoc scripts that require manual “sleep and retry” loops, the runtime pauses consumption naturally, preventing memory exhaustion.
A Taxonomy of Transforms #
Mixing transform types indiscriminately leads to pipelines that are hard to debug. Four categories, with distinct operational profiles:
1. Pure transforms — deterministic Input → Output mappings. Renaming JSON keys, converting timestamps, masking PII, calculating derived fields. Safe to run with high parallelism. Never depend on external state.
2. Lossy transforms — reduce data volume or cardinality. Filtering logs by level, sampling traffic, deduplicating within a time window. The critical discipline: be explicit about what is dropped. Use metrics to track how many messages are filtered. A deduplication processor requires an internal cache — size it correctly to avoid OOM kills.
3. Observability transforms — side-step the data flow to emit signals without altering the payload. Logging when a required field is missing, incrementing a Prometheus counter for every payment event. Avoid excessive logging in the hot path, as string serialization dominates CPU time.
4. Enrichment transforms (the danger zone) — fetch data from external systems (e.g., querying a database to hydrate a user ID). Network latency dominates processing time. Always use aggressive caching, strict timeouts, and parallel branch processors to perform lookups alongside the main flow if possible.
Production Ingestion Patterns #
The Change Data Capture (CDC) blueprint. Ingesting from PostgreSQL, MySQL, or SQL Server via a CDC connector (e.g., Debezium) transforms mutable tables into immutable event logs. The most critical decision: map the database Primary Key to the Kafka message key. This ensures all changes to a specific row land in the same partition, preserving causal ordering. If the key is omitted or randomized, a consumer may process a DELETE before the UPDATE it was supposed to follow.
Tombstone handling requires explicit attention. When a row is deleted, the CDC connector emits a record with a null payload. Any transformation step that attempts to parse a null payload — for example, to mask PII fields — will crash the pipeline. Guard against null at the entry of every processor, and route tombstones directly to the output without transformation.
Service and webhook ingestion. When the source cannot embed a native Kafka client (Stripe webhooks, legacy services), the connect runtime acts as a Stream Gateway: it accepts synchronous HTTP requests and converts them into asynchronous events on a topic.
The key trade-off is between latency and durability. In synchronous mode, the gateway waits for the broker’s acknowledgment before returning 200 OK to the client — backpressure from the cluster propagates directly to the caller. In asynchronous (“fire-and-forget”) mode, the gateway accepts requests immediately — but if it crashes before flushing its buffer, data is lost. For external webhooks from third parties, synchronous mode is preferred: it forces the sender to retry on failure using their own reliability mechanisms.
Partitioning in service ingestion is frequently overlooked. HTTP requests have no native concept of partition key. Without intervention, the gateway round-robins messages, destroying locality. Extract a specific field (e.g., customer_id from the JSON body) and use it as the message key.
Durable egress to object storage. Moving data from a Kafka topic to S3, GCS, or Azure Blob Storage is the canonical pattern for data lake population. The common mistake is treating object storage as a real-time consumer, creating one file per message. This “small file problem” destroys query engine performance — Trino and Spark spend more time listing file metadata than reading data.
Batching is a correctness primitive, not just a performance optimization:
output:
aws_s3:
bucket: "data-lake-prod"
path: "${! meta(\"topic\") }/year=${! timestamp_unix() / 31536000 }/month=${! timestamp(\"01\") }/"
batching:
count: 10000
period: "5m"
byte_size: 10485760 # 10MB
credentials:
from_ec2_role: true
When batching is enabled, the Kafka offset is not committed until the entire batch is successfully written. If one record in a batch of 500 causes a write failure, all 500 are replayed. This reinforces the need for idempotent sinks — use deterministic file naming incorporating partition ID and starting offset, so retried writes overwrite the same file rather than creating duplicates.
Contract-Aware Routing and Schema Registry Integration #
As pipelines mature, the structure of data flowing through them inevitably changes. Without a governance strategy, these changes cause “poison pill” failures where consumers crash on unexpected formats. Contract-aware routing makes routing decisions based on schema metadata rather than payload content.
The connect runtime integrates with the Schema Registry (covered in Chapter 5) to deserialize only the envelope of a message — specifically the schema ID found in the wire format header. This avoids the CPU overhead of full deserialization while allowing the pipeline to validate that the schema ID is known and active.
This enables Quarantine Lanes: if a message arrives with an unrecognized schema ID, route it to a separate topic for manual inspection rather than halting the entire stream.
pipeline:
processors:
- schema_registry_decode:
url: "http://schema-registry:8081"
- switch:
- check: this.schema_subject == "payments-v1"
output:
resource: high_priority_topic
- check: this.schema_subject.contains("debug")
output:
resource: discard
- output:
resource: default_datalake
This approach allows data contracts to evolve without requiring lock-step upgrades of all consumers. The integration layer absorbs the complexity of version negotiation.
Operating as a Fleet: Failure Semantics and DLQs #
The three failure categories from Chapter 3 apply equally to pipeline outputs.
Transient failures (rate limits, temporary connection resets) — retry with exponential backoff. A naive immediate-retry loop creates a thundering herd that overwhelms a recovering downstream service. Distinguish retriable from non-retriable status codes:
output:
http_client:
url: "https://api.downstream.svc/v1/events"
verb: POST
retries: 3
retry_period: 1s
max_retry_backoff: 30s
drop_on: [ 400, 404 ] # Do not retry client errors
max_in_flight: 10
Permanent failures (schema validation errors, 400 Bad Request from malformed JSON) — these messages will never succeed regardless of retries. Retrying wastes resources and blocks the pipeline.
Ambiguous failures (timeout after send, before acknowledgment) — the write may have succeeded or been discarded. For non-idempotent sinks (Elasticsearch, MongoDB, HTTP APIs), use idempotence keys derived from the source message key or offset, so an ambiguous retry results in a safe update rather than a duplicate record.
The Dead-Letter Queue (DLQ) as engineered subsystem. When retries are exhausted, sequestering the failing message and continuing is often preferable to halting the pipeline. A DLQ is not a garbage dump — it must contain three things: the original payload, the error message describing why it failed, and metadata regarding its provenance (source topic, partition, offset). Without this context, a message in the DLQ is effectively lost.
output:
fallback:
- try:
sql_insert:
driver: postgres
dsn: "postgres://user:pass@db:5432/analytics"
table: "events"
columns: [ "id", "timestamp", "data" ]
args_mapping: |
root = [ this.id, this.ts, this.payload ]
- result:
processors:
- mapping: |
meta error_msg = error()
meta original_topic = meta("kafka_topic")
meta failed_at = now()
root = this
kafka_franz:
seed_brokers: [ "broker-0:9092" ]
topic: "dead_letter_queue"
key: ${! meta("id") }
The fallback mechanism acts as a circuit breaker for the specific message. The primary output attempts the write first. If that fails after internal retries, it falls back to enriching the message with error context and writing it to the DLQ topic. Crucially, the fallback “succeeds” — so the offset is committed in the source topic, preventing head-of-line blocking. A separate repair pipeline can consume the DLQ, correct the records, and replay them without blocking the main ingestion path.