- My Development Notes/
- Distributed Coordination: The Hidden Component/
- Aggregation — Distributed Counting and Rate Limiting/
Aggregation — Distributed Counting and Rate Limiting
Table of Contents
Aggregation — Distributed Counting and Rate Limiting #
Aggregation is the coordination mechanism that combines values from multiple sources into a single result. Unlike Selection (one winner) or Agreement (all commit), Aggregation does not require a unique outcome — it requires a correct combination.
Aggregation appears at two levels:
Infrastructure aggregation: a distributed query system combines partial results from multiple shards into a final result. OpenSearch’s scatter-gather (Chapter 1 of the search-index series) is aggregation infrastructure.
Application aggregation: user actions distributed across many nodes must be combined to produce a count, a rate, or a leaderboard. A rate limiter must aggregate request counts from all nodes to determine whether a user has exceeded their quota.
The challenge is the same at both levels: partial results must be combinable without requiring a central coordinator to hold all values simultaneously.
The Aggregation Properties #
For a merge function to work in a distributed setting:
Commutativity: merge(A, B) = merge(B, A). The order in which partial results arrive does not matter. Associativity: merge(merge(A, B), C) = merge(A, merge(B, C)). Partial results can be combined in any grouping. Idempotency (for some structures): merge(A, A) = A. Merging a duplicate partial result produces no change.
These are the same properties as CRDTs (Chapter 10) — Aggregation data structures are a subclass of convergent replicated data types.
Count-Min Sketch #
The Count-Min Sketch (Cormode & Muthukrishnan, 2003) estimates the frequency of any element in a data stream using sub-linear space.
Structure #
A Count-Min Sketch is a 2D array of counters with dimensions d × w:
- d rows, each with a distinct hash function
- w columns (buckets per row)
Add(item):
For each row i:
j = hash_i(item) % w
sketch[i][j] += 1
Estimate(item):
For each row i:
j = hash_i(item) % w
count_estimates[i] = sketch[i][j]
Return min(count_estimates)
The minimum across all rows gives the frequency estimate. Taking the minimum reduces the effect of hash collisions (items that share a bucket with the target item inflate the count in that bucket, but rarely all d buckets simultaneously).
Error Bound #
The Count-Min Sketch provides an upper bound guarantee: with probability 1 - δ, the estimated count is within ε × total_items of the true count.
The sketch dimensions required:
- w (columns) = ceil(e / ε) ≈ ceil(2.718 / ε)
- d (rows) = ceil(ln(1/δ))
For ε = 0.1% error, δ = 1% failure probability: w = 2718 columns, d = 5 rows. Total counters: 2718 × 5 = 13,590. Each counter is a 4-byte integer. Total memory: ~53 KB to track frequencies over a stream of billions of events.
Application: Top-K Heavy Hitters #
A common use case is finding the top-K most frequent items (top trending hashtags, top requested URLs, top API callers):
class TopKTracker:
def __init__(self, k=100, w=2718, d=5):
self.sketch = CountMinSketch(w=w, d=d)
self.heap = [] # Min-heap of (count, item)
self.k = k
self.seen = {}
def add(self, item: str):
self.sketch.add(item)
estimated_count = self.sketch.estimate(item)
if item in self.seen or len(self.heap) < self.k:
self.seen[item] = estimated_count
# Update heap
heapq.heappush(self.heap, (estimated_count, item))
elif estimated_count > self.heap[0][0]:
# Item is a new heavy hitter; evict the minimum
_, evicted = heapq.heapreplace(self.heap, (estimated_count, item))
del self.seen[evicted]
self.seen[item] = estimated_count
def top_k(self) -> list:
return sorted(self.heap, reverse=True)
Distributed top-K: each node maintains a local Count-Min Sketch. To combine:
merged_sketch[i][j] = sketch_node1[i][j] + sketch_node2[i][j] + ... + sketch_nodeN[i][j]
Addition is element-wise — commutativity and associativity hold. The merged sketch has the same error properties as a single sketch that had seen all items from all nodes.
Application: Rate Limiting per User #
A rate limiter needs to count how many requests user X has made in the last 60 seconds. With millions of users, storing an exact counter per user is expensive. A Count-Min Sketch approximates the count with bounded error:
# Key: (user_id, current_minute) — sliding window by minute
key = f"{user_id}:{int(time.time() // 60)}"
sketch.add(key)
count = sketch.estimate(key)
if count > rate_limit_per_minute:
return 429 # Too Many Requests
The approximation overestimates (never underestimates by the min-over-rows property). Some users may be incorrectly rate-limited at the boundary. This is acceptable for rate limiting — a small false positive rate is a known tradeoff.
HyperLogLog #
HyperLogLog (Flajolet et al., 2007) estimates the cardinality (number of distinct elements) of a set using O(log log N) memory — dramatically less than storing all elements.
Intuition #
For a uniformly random hash, the probability that the maximum number of leading zeros in any hash across N distinct elements is k is approximately 2^k = N. More leading zeros → more distinct elements.
HyperLogLog uses multiple registers (sub-streams) to reduce variance:
- Hash each element.
- Use the first b bits of the hash to select a register (m = 2^b registers).
- For the remaining bits, count the position of the first 1-bit (number of leading zeros + 1).
- Update register R[index] with the max of current value and new position.
Estimate:
Z = (Σ 2^(-R[i]))^(-1) (harmonic mean of 2^(-R[i]))
estimate = α_m × m² × Z
where α_m is a bias correction constant
Memory: with m = 2048 registers and max 5-bit counters: 2048 × 5 bits = 1.28 KB. This estimates cardinality to within 1.04/√2048 ≈ 2.3% standard error.
# Redis HyperLogLog (built-in)
redis.pfadd("daily_unique_users:2026-03-27", user_id)
count = redis.pfcount("daily_unique_users:2026-03-27")
# Returns approximate distinct user count with ~0.81% error
Merging HyperLogLog Sketches #
HyperLogLog’s key property for distributed aggregation: two HLL sketches can be merged by taking the element-wise maximum of their registers:
merged[i] = max(sketch_node1[i], sketch_node2[i])
This is commutative, associative, and idempotent — a CRDT merge operation. The merged sketch estimates the cardinality of the union of the two sets.
# Redis: merge multiple HLL sketches into one
redis.pfmerge("total_unique_users", "node1:unique_users", "node2:unique_users", "node3:unique_users")
count = redis.pfcount("total_unique_users")
Use case: unique visitor counting across CDN nodes
Each CDN node maintains an HLL sketch of visitor IDs (IP + user agent hash). Every hour, all node sketches are merged into a central sketch. The cardinality of the merged sketch is the number of unique visitors across all CDN nodes — with no network coordination during the visit itself.
Rate Limiting Algorithms #
Rate limiting is Aggregation applied to access control: count requests from an actor within a window, and deny access when the count exceeds a threshold. The challenge is doing this accurately and efficiently at scale.
Token Bucket #
The token bucket algorithm models a bucket that fills at a constant rate (one token per 1/rate seconds) up to a maximum capacity (burst capacity). Each request consumes one token. Requests are rejected when the bucket is empty.
class TokenBucket:
def __init__(self, rate: float, capacity: int):
self.rate = rate # tokens per second
self.capacity = capacity # max burst size
self.tokens = capacity # start full
self.last_refill = time.time()
def allow(self) -> bool:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return True
return False
Properties:
- Allows bursting up to
capacitytokens. - Smooths out traffic over time (long-term rate ≤ configured rate).
- Unused tokens accumulate up to capacity, allowing bursts.
Sliding Window #
The sliding window algorithm counts requests within a rolling time window:
def allow_sliding_window(user_id: str, window_seconds: int, limit: int) -> bool:
now = time.time()
window_start = now - window_seconds
# Remove entries older than the window
redis.zremrangebyscore(f"rate:{user_id}", 0, window_start)
# Count entries in current window
count = redis.zcard(f"rate:{user_id}")
if count >= limit:
return False # Rate limited
# Add current request
redis.zadd(f"rate:{user_id}", {str(uuid.uuid4()): now})
redis.expire(f"rate:{user_id}", window_seconds)
return True
A Redis sorted set stores request timestamps. Requests outside the window are expired. The count of remaining entries is the request count in the current window.
Approximate sliding window with two buckets: maintain counts for the current and previous fixed windows. Estimate the count in the rolling window by interpolating:
def count_in_window(current_count, previous_count, window_seconds) -> float:
now = time.time()
seconds_into_current_window = now % window_seconds
previous_window_weight = 1 - (seconds_into_current_window / window_seconds)
return current_count + previous_count * previous_window_weight
This approximation introduces at most 1/window_seconds error — negligible for most rate limiting use cases. It uses O(1) memory per user instead of O(requests_per_window).
Distributed Rate Limiting #
A single-node rate limiter is straightforward. A distributed rate limiter — where requests from the same user arrive at different nodes — must coordinate counts.
The coordination options:
Centralized counter (Redis): all nodes update a shared counter in Redis. Accurate but adds a Redis round-trip to every request.
def allow_centralized(user_id: str, limit: int, window: int) -> bool:
key = f"rate:{user_id}:{int(time.time() // window)}"
count = redis.incr(key)
redis.expire(key, window * 2) # Keep for two windows
return count <= limit
Local + periodic sync: each node maintains a local counter. Periodically (every 100ms), nodes sync their local counts to a central store and read back the global total.
class DistributedRateLimiter:
def __init__(self, local_limit_fraction=0.1):
# Allocate 10% of global limit to local budget
# to reduce coordination frequency
self.local_budget = global_limit * local_limit_fraction
self.local_count = 0
self.global_count = 0
self.last_sync = time.time()
def allow(self, user_id: str) -> bool:
if self.local_count < self.local_budget:
self.local_count += 1
return True
# Local budget exhausted; sync with global counter
return self._sync_and_check(user_id)
def _sync_and_check(self, user_id: str) -> bool:
# Atomic: add local count to global, read back global total
total = redis.incrby(f"global_rate:{user_id}", self.local_count)
self.local_count = 0
if total > global_limit:
return False
# Re-allocate local budget
self.local_budget = global_limit * 0.1
return True
This approach trades accuracy for lower latency — the system may allow slightly more than the configured limit during the sync interval.
Token bucket with Redis + Lua for atomic operations:
-- rate_limit.lua: atomic token bucket update
local key = KEYS[1]
local rate = tonumber(ARGV[1]) -- tokens per second
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3]) -- current time in seconds (float)
local requested = tonumber(ARGV[4])
local bucket = redis.call("HMGET", key, "tokens", "last_refill")
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now
-- Refill tokens
local elapsed = now - last_refill
tokens = math.min(capacity, tokens + elapsed * rate)
-- Check and deduct
local allowed = 0
if tokens >= requested then
tokens = tokens - requested
allowed = 1
end
redis.call("HMSET", key, "tokens", tokens, "last_refill", now)
redis.call("EXPIRE", key, math.ceil(capacity / rate) * 2)
return allowed
The Lua script executes atomically on the Redis server — no race condition between reading and updating the token count.
Aggregation in Analytics Pipelines #
Beyond rate limiting, Aggregation applies to any analytics computation across distributed data:
Ad click counting: each edge server records a count of clicks. Every minute, all edge servers push their counts to a central aggregator. The aggregator sums counts per ad. The accuracy requirement is loose (approximate counts are acceptable); the throughput requirement is high (billions of clicks per day).
Leaderboard / Top-K: each shard of a gaming system tracks scores for its users. Every second, shards push their local Top-K lists to an aggregator. The aggregator merges the Top-K lists (keeping only the global top-K). The merge is O(K × N_shards), not O(total_users).
Real-time metrics (P99 latency): each service instance records latency samples into a TDigest (Chapter 6 of search-index series — percentile approximation). TDigests are mergeable by concatenating their internal centroids. The merged digest estimates the P99 across all service instances.
Aggregation Summary #
| Data structure | Use case | Memory | Accuracy | Mergeable |
|---|---|---|---|---|
| Exact counter | Request counts per user (small scale) | O(users) | Exact | Yes (sum) |
| Count-Min Sketch | Frequency estimation, Top-K at scale | O(ε⁻¹ × log δ⁻¹) | ε × N over-count | Yes (element-wise sum) |
| HyperLogLog | Distinct count (cardinality) | O(log log N) | ~1% error | Yes (element-wise max) |
| Token bucket | Rate limiting with burst | O(1) per user | Exact | No (stateful) |
| Sliding window (sorted set) | Accurate rate limiting | O(requests) | Exact | Complex |
| TDigest | Percentile estimation | O(compression) | ~1% at P99 | Yes (merge centroids) |