CS 451: Data-Intensive Distributed Computing
Jimmy Lin
Estimated study time: 2 hr 35 min
Table of contents
Sources and References
Primary textbooks — Jimmy Lin and Chris Dyer, Data-Intensive Text Processing with MapReduce, Morgan & Claypool, 2010 (Chapters 1–6, freely available); Tom White, Hadoop: The Definitive Guide, 4th ed., O’Reilly, 2015. Supplementary texts — Holden Karau and Rachel Warren, Learning Spark, 2nd ed., O’Reilly, 2020; Marc Peter Deisenroth, A. Aldo Faisal, and Cheng Soon Ong, Mathematics for Machine Learning, Cambridge University Press, 2020. Online resources — Apache Spark documentation (spark.apache.org); Apache Kafka documentation (kafka.apache.org); Google Bigtable paper (Chang et al., OSDI 2006); Google MapReduce paper (Dean & Ghemawat, OSDI 2004); Apache HBase documentation; Flink documentation (flink.apache.org).
Chapter 1: The Big Data Problem
The Data Deluge
The premise of this course is simple: the amount of data produced by the modern world has vastly outpaced the ability of a single machine to process it. Social media platforms generate terabytes of logs per hour. Genomic sequencing produces petabytes of raw reads per year. IoT sensors in industrial plants emit continuous streams from millions of devices. The Large Hadron Collider generates roughly 15 petabytes of data annually. Scientific simulations in climate modeling and astrophysics routinely produce outputs too large for any one disk.
The fundamental problem is not merely storage — hard drives have grown cheaper — but processing. A single machine with 32 cores and 256 GB of RAM can scan perhaps 10 GB/s from local NVMe storage. At that rate, scanning 1 petabyte takes over 27 hours. And that assumes the data fits on local disks, the algorithm is purely sequential, and there is no memory pressure.
Scaling Up vs. Scaling Out
Two architectural responses exist to this problem. Scaling up (vertical scaling) means purchasing a larger machine: more cores, more RAM, faster storage. This works until it doesn’t — the largest commodity servers top out around 6 TB of RAM and 128 cores, and specialized hardware (SGI UV, for instance) becomes astronomically expensive. More importantly, a single machine remains a single point of failure.
Scaling out (horizontal scaling) means distributing the work across many commodity machines connected by a network. A cluster of 1,000 nodes, each with 10 cores and 64 GB RAM, provides 10,000 cores and 64 TB of memory at a fraction of the cost of an equivalently powerful single machine. The catch is programming complexity: the developer must now reason about coordination, communication, and failure modes that simply do not exist on a single machine.
The Distributed Systems Challenge
Distributed systems introduce a set of failure modes that fundamentally change how software must be written. The “fallacies of distributed computing” (attributed to Peter Deutsch and colleagues at Sun Microsystems) enumerate common false assumptions:
| Fallacy | Reality |
|---|---|
| The network is reliable | Packets are dropped, cables are cut |
| Latency is zero | Cross-datacenter RTT can be 100+ ms |
| Bandwidth is infinite | 10 Gbps shared across hundreds of nodes |
| The network is secure | Active and passive attackers exist |
| Topology doesn’t change | Nodes are added, removed, and moved |
| There is one administrator | Large clusters have complex operational teams |
| Transport cost is zero | Serialisation and network I/O dominate |
| The network is homogeneous | Mixed hardware generations in real clusters |
Beyond the network, distributed systems must cope with partial failures — a scenario with no analogue on a single machine. When you call a remote procedure and receive no response, you cannot tell whether the message was lost, the remote node crashed, the response was lost, or the remote node is running slowly. This ambiguity forces developers to design for idempotency and re-execution.
Fault tolerance is achieved through several complementary mechanisms: redundancy (store multiple copies of data so no single disk failure causes data loss); replication (replicate computation state across nodes so any node can take over); heartbeats (nodes send periodic liveness signals, and the absence of a heartbeat within a timeout triggers a failure declaration); and re-execution (when a task fails, simply re-run it on another node — this is only possible if tasks are deterministic and side-effect-free, which MapReduce enforces by design).
The Google Stack
The modern era of large-scale data processing was defined by three papers from Google, published between 2003 and 2006:
- GFS (Ghemawat, Gobioff, Leung — SOSP 2003): The Google File System. A distributed file system optimised for large sequential reads and append-only writes, running on commodity hardware with automatic replication and failure recovery.
- MapReduce (Dean and Ghemawat — OSDI 2004): A programming model and execution engine for parallel data processing, hiding all distributed systems complexity behind a two-function interface.
- Bigtable (Chang et al. — OSDI 2006): A distributed storage system for structured data, providing random reads and writes at scale on top of GFS.
These three papers inspired the open-source Hadoop ecosystem: HDFS (GFS clone), Hadoop MapReduce (MapReduce clone), and HBase (Bigtable clone).
HDFS Architecture
HDFS (Hadoop Distributed File System) is the storage layer that underpins the Hadoop ecosystem. Its design is deliberately simple and optimised for the MapReduce access pattern: write once, read many, in large sequential scans.
Files are divided into blocks of 128 MB by default. Each block is replicated to three DataNodes using a rack-aware placement strategy: one replica on the local rack, two on a separate rack. This means the cluster can tolerate the failure of any single node or any single rack without data loss.
The 128 MB default also amortises the overhead of the NameNode lookup. Each task launch requires a NameNode query to find block locations. If blocks were 1 KB, a 1 GB file would require 1 million NameNode lookups per MapReduce job. At 128 MB, the same file requires only 8 lookups.
The downside of large blocks: a MapReduce job on a 10 MB file creates a single 128 MB block (actually a 10 MB block with an artificial block size record), and assigns all work to one map task — no parallelism. HDFS and MapReduce are not well-suited for large numbers of small files. HDFS Federation (multiple NameNodes) and HBase (which organises many small records into large HFiles) partially address this at the cost of additional architectural complexity.
The NameNode is the metadata master. It holds the entire file system namespace — the mapping from file paths to block IDs, and from block IDs to the DataNodes that hold each replica — entirely in memory for fast access. This is both a strength (O(1) metadata lookups) and a weakness (the NameNode is a single point of failure and a scalability bottleneck; HDFS Federation addresses this by sharding the namespace across multiple NameNodes).
Reads proceed as follows: the client contacts the NameNode to learn which DataNodes hold the needed blocks, then reads directly from the closest DataNode (locality is inferred from network topology). Writes use a pipeline: the client sends data to DataNode 1, which forwards to DataNode 2, which forwards to DataNode 3; acknowledgements flow back along the pipeline. HDFS does not support random updates — files are write-once (or append-only). This constraint, which would be unacceptable in a general-purpose file system, is exactly what allows HDFS to achieve high throughput: there are no update locks, no cache invalidation, no complex consistency protocols.
Chapter 2: MapReduce Algorithm Design
The Programming Model
MapReduce reduces the problem of parallel data processing to two user-supplied functions:
\[ \text{map}(k_1, v_1) \rightarrow \text{list}(k_2, v_2) \]\[ \text{reduce}(k_2, \text{list}(v_2)) \rightarrow \text{list}(k_3, v_3) \]The framework handles everything in between: reading data from HDFS, launching map tasks across the cluster, partitioning and sorting intermediate key-value pairs by key (the shuffle), grouping all values for a given key, launching reduce tasks, and writing output back to HDFS. The developer only writes the map and reduce functions.
The canonical example is word count. The mapper tokenises each line and emits (word, 1) for every token. The framework groups all (word, 1) pairs by word. The reducer sums the list of ones to produce the total count. Despite its simplicity, word count captures the essential structure of a vast class of distributed aggregation problems.
- Shard 1: “the cat sat on the mat the cat”
- Shard 2: “the dog chased the cat”
- Shard 3: “the mat is on the floor”
Map phase (with combiner): Each mapper tokenises its shard and applies the combiner locally before emitting:
- Shard 1 emits: (“the”, 3), (“cat”, 2), (“sat”, 1), (“on”, 1), (“mat”, 1)
- Shard 2 emits: (“the”, 2), (“dog”, 1), (“chased”, 1), (“cat”, 1)
- Shard 3 emits: (“the”, 2), (“mat”, 1), (“is”, 1), (“on”, 1), (“floor”, 1)
Partitioner: Intermediate key-value pairs are assigned to reducers by hash(word) % R, where \(R\) is the number of reducers. Suppose we have 2 reducers and the hash function sends “the”, “cat”, “mat”, “on” to Reducer 0 and “sat”, “dog”, “chased”, “is”, “floor” to Reducer 1.
Reduce phase:
- Reducer 0 receives: (“the”, [3, 2, 2]), (“cat”, [2, 1]), (“mat”, [1, 1]), (“on”, [1, 1]). Outputs: (“the”, 7), (“cat”, 3), (“mat”, 2), (“on”, 2).
- Reducer 1 receives: (“sat”, [1]), (“dog”, [1]), (“chased”, [1]), (“is”, [1]), (“floor”, [1]). Outputs: each with count 1.
Focus on “the”: it appeared 3 times in shard 1, 2 times in shard 2, and 2 times in shard 3. The combiner on each shard pre-aggregates these, so only 3 pairs traverse the network to Reducer 0 rather than 7 individual (“the”, 1) pairs. The reducer sums \(3 + 2 + 2 = 7\). This is the final, correct count.
Without any combiner, the 7 individual (“the”, 1) pairs from across all shards would all need to cross the network. With a combiner, only 3 pairs cross — and for a word appearing 47,000 times in a shard, the combiner reduces 47,000 network messages to 1.
The execution timeline is: (1) split input into chunks and assign one map task per chunk; (2) mappers run in parallel, each writing intermediate output to local disk; (3) the shuffle phase sorts and partitions intermediate records by key, then transfers them across the network to reducers; (4) reducers run in parallel, each processing all values for a contiguous range of keys; (5) output is written to HDFS.
Combiners
The shuffle is the most expensive phase of MapReduce because it involves network I/O, which is orders of magnitude slower than local disk I/O. The combiner is a mini-reducer that runs on each mapper’s local output before the shuffle, reducing the number of key-value pairs that must traverse the network.
For word count, the combiner is identical to the reducer: it sums the partial counts from one mapper before they leave the node. If a mapper processes a 128 MB chunk containing the word “the” 50,000 times, without a combiner it emits 50,000 ("the", 1) pairs. With a combiner, it emits one ("the", 50000) pair — a 50,000× reduction in network traffic for that key.
The critical constraint: the combiner must be commutative and associative, because it may be applied zero, one, or multiple times — the framework makes no guarantee. This rules out operations like average (you cannot average partial averages without knowing the counts), but the workaround is to emit (sum, count) pairs and compute the final average in the reducer.
("the", 1) that must be serialized, written to local disk, and then transferred across the network to the reducer. With a combiner, the mapper emits exactly one record ("the", 1000). The network traffic for this single word is reduced by a factor of 1,000.More precisely: without a combiner, the total number of intermediate records is \(O(\text{total tokens in corpus})\). With a combiner, it is \(O(\text{distinct words per shard} \times \text{number of shards})\). For natural-language text, the vocabulary is bounded by roughly 1 million distinct words regardless of corpus size, while the total token count grows linearly with corpus size. For a corpus of 1 billion words distributed across 1,000 shards, the combiner reduces network traffic from \(\sim 10^9\) records to \(\sim 10^9 / 1000 = 10^6\) records for common words — a factor of 1,000× reduction in shuffle I/O for those keys.
The combiner is not free: it requires a pass over the mapper’s local output. But because this pass is over local disk (or even in-memory buffered data), it is orders of magnitude cheaper than network I/O.
Partitioners
The partitioner determines which reducer receives each intermediate key. The default is a hash partitioner: hash(key) mod numReducers. This distributes keys uniformly in expectation but can produce data skew if the key distribution is non-uniform (e.g., if 90% of your data has the same key, one reducer gets 90% of the work).
Custom partitioners solve two classes of problems. First, load balancing: if you know the key distribution in advance (or sample it), you can partition to equalise reducer work. Second, total order: HDFS output files are sorted within each reducer’s partition, but not across partitions. To produce a globally sorted output, use TotalOrderPartitioner, which samples the key space and assigns contiguous key ranges to reducers.
Design Pattern: Local Aggregation (In-Mapper Combining)
The combiner helps, but it operates on the serialised output of the mapper and requires a separate pass over that output. In-mapper combining goes further: the mapper maintains an in-memory associative array, accumulating partial aggregates across all key-value pairs it processes, and only emits at the end of the map task (or periodically to bound memory usage).
For word count, the mapper holds a HashMap<String, Integer> and increments counts in memory. At the end of the task, it emits one pair per unique word seen. This is faster than the combiner because it avoids serialisation of intermediate output entirely.
The trade-off is memory: the in-memory array can grow without bound if the key space is large. For word count over natural language text, the vocabulary is bounded (perhaps 1 million unique words), so the array stays manageable. For problems with unbounded key spaces, in-mapper combining must emit periodically.
hash(key) % R. For word count, this is appropriate — keys are strings and hash collisions distribute them roughly uniformly. But two common situations require custom partitioners.Total sort: Hadoop’s output is sorted within each reducer’s partition (reducer 0 outputs keys A–F, reducer 1 outputs G–M, etc.), but not across partitions. To produce a globally sorted output file, use a range partitioner. The approach: sample the key space (read a random 1% of the input with a small MapReduce job or Spark reservoir sampling), determine the percentile boundaries (the key at the 33rd percentile goes to reducer 0’s upper boundary, etc.), then use these boundaries as a partition function. This is exactly TotalOrderPartitioner in Hadoop.
Load balancing for skewed data: if 80% of records share the same key (e.g., a web log dominated by a single popular URL), the default hash partitioner puts all of them on one reducer. A custom partitioner can detect high-frequency keys via sampling and spread their records across multiple reducers — at the cost of requiring a final merge step to combine the partial results for those keys. This “sharding” of hot keys is a general technique in distributed systems, analogous to the salting technique for Spark joins.
Design Pattern: Pairs and Stripes for Co-occurrence
Computing word co-occurrence matrices (how often do words appear together in a window?) illustrates a fundamental design choice between two MapReduce patterns.
Pairs approach: for each word \(w_i\) in a window, emit ((w_i, w_j), 1) for every co-occurring word \(w_j`. The reducer counts how many times each pair occurs. To compute the marginal \(P(w_i)\), emit a special marker ((w_i, *), 1) for each occurrence of \(w_i`. Problem: the number of intermediate pairs is \(O(n \cdot k)\) where \(k\) is window size, and each pair traverses the network separately.
Stripes approach: for each word \(w_i\), emit (w_i, {w_j: 1, w_k: 1, ...}) — a single key with a hash map as value. The reducer merges hash maps by summing counts. Much less network traffic (one record per anchor word per mapper instead of one per co-occurring pair), but each value is a large object that may stress the serialisation layer.
The choice depends on the vocabulary size and cluster characteristics. For small vocabularies, stripes dominates. For large vocabularies where the per-word hash map would exceed memory, pairs becomes more practical.
Design Pattern: Order Inversion
Computing relative frequencies \(P(w_j | w_i) = \text{count}(w_i, w_j) / \text{count}(w_i)\) requires knowing the marginal \(\text{count}(w_i)\) before processing individual pair counts. Order inversion exploits MapReduce’s sorting to ensure the marginal arrives at the reducer before the joint counts.
The trick: emit ((w_i, *), 1) for the marginal with a special sentinel key * that sorts lexicographically before all other values of \(w_j\). The reducer sees the marginal first, stores it, then processes subsequent pair counts to compute relative frequencies. This converts what would require two MapReduce passes into one.
The general recipe for order inversion:
- Identify a value \(V_{\text{pre}}\) that must be computed before the main computation (e.g., the marginal count).
- Design a composite key \((k, \text{sentinel})\) where the sentinel sorts before all other secondary keys.
- The reducer automatically receives \(V_{\text{pre}}\) first (due to sorting), stores it in a local variable, then processes all subsequent values using it.
This pattern applies broadly: computing TF-IDF requires knowing \(\text{df}(t)\) (the document frequency) before computing per-document TF-IDF scores. In a single-pass approach, emit \((t, \text{sentinel})\) with the document frequency and \((t, \text{docId})\) with the per-document term frequency; the reducer computes TF-IDF on-the-fly. Similarly, for computing per-user session statistics where the session end event must be processed before per-event statistics, the session-end event can be given a sentinel key that sorts before all other events for that user.
The limitation of order inversion is that it only controls ordering within a group (values for the same key). It cannot order across groups or implement arbitrary multi-pass dependencies. When the dependency graph is more complex (e.g., step C depends on both step A and step B, neither of which is a simple sort-first computation), multiple MapReduce passes or Spark’s DAG model is required.
Design Pattern: Value-to-Key Conversion (Secondary Sort)
MapReduce sorts records by key before delivering them to reducers. By pushing part of the value into the key, you can impose a sort order on the values within a group.
For example, to find the maximum temperature per weather station, the natural approach emits (station_id, temperature) and the reducer computes the max. But if you need to emit records in temperature-sorted order within each station, encode (station_id, temperature) as the composite key and define a custom partitioner that partitions only on station_id (so all records for a station go to the same reducer) and a custom comparator that sorts by the full composite key. The reducer then receives records already sorted by temperature.
Limitations of MapReduce
MapReduce’s rigid two-stage structure is both its strength and its weakness. It is easy to reason about, easy to make fault-tolerant (re-run any failed task), and easy to schedule. But many algorithms are inherently iterative: PageRank, gradient descent, BFS, k-means clustering. Each MapReduce iteration reads from HDFS and writes back to HDFS — a full round-trip through slow distributed storage. An algorithm requiring 50 iterations incurs 50 HDFS reads and 50 HDFS writes. Spark was built to fix this.
A second limitation is the rigidity of the two-stage model for expressing multi-step pipelines. A SQL-style query joining three tables, filtering, and aggregating naturally maps to a pipeline of five or six operators; in MapReduce, each join requires a separate MapReduce job, so a three-table join requires at least two jobs with intermediate HDFS materialisation. Hive and Pig addressed this by compiling high-level query languages to chains of MapReduce jobs, but the materialisation overhead remained. Spark’s DAG execution model eliminates this by allowing arbitrary chains of transformations to execute in a single pass without materialising intermediate results to distributed storage.
A third limitation is the absence of a query optimizer. MapReduce provides no cost model, no statistics, no plan enumeration. The developer must manually choose partition counts, join strategies, and combiner usage — decisions that a relational optimizer makes automatically. This makes MapReduce programs brittle: a program tuned for 100 GB of input may perform poorly on 10 TB of input because the manually chosen partition count is no longer appropriate. SparkSQL’s Catalyst optimizer and AQE (Adaptive Query Execution) represent the convergence point: a system with the flexibility of MapReduce’s arbitrary code execution and the cost-awareness of a relational query optimizer.
Chapter 3: From MapReduce to Spark
Resilient Distributed Datasets
The core abstraction in Spark is the Resilient Distributed Dataset (RDD). An RDD is an immutable, partitioned collection of records distributed across a cluster. Unlike MapReduce’s fixed two-stage model, RDDs support a rich set of transformations that compose into arbitrary DAGs (directed acyclic graphs) of computation.
The “resilient” in RDD refers to its fault tolerance mechanism. Instead of replicating data (expensive), Spark records the lineage of each RDD — the sequence of transformations that produced it from the original data source. If a partition is lost due to node failure, Spark can recompute it by replaying the lineage from the last checkpoint or the original data. This is efficient because lineage graphs are typically shallow (a few transformations deep), and recomputation is embarrassingly parallel.
raw = sc.textFile("hdfs://logs") → parsed = raw.map(parseLine) → filtered = parsed.filter(isValid) → result = filtered.reduceByKey(add). Spark stores this entire derivation as a DAG of RDD objects. If the executor holding partition 3 of filtered crashes, Spark does not need to re-read all partitions — it re-reads only the HDFS blocks corresponding to partition 3 of raw, re-applies parseLine and isValid on that partition, and reconstructs the lost data. The remaining 999 partitions are unaffected.This “recomputation fault tolerance” contrasts with HDFS-style replication fault tolerance:
Replication stores 3 copies of every block. A node failure causes no recomputation — the replica on another node is used immediately. But replication costs 3× storage and 3× write bandwidth. For a 10 TB dataset, that is 30 TB of storage.
Recomputation stores each partition once. A node failure triggers re-execution of the lineage for the lost partitions. If the lineage is long (many transformations) or the source data must be re-read from disk, recomputation can be slow.
The trade-off: recomputation is better when compute is cheap relative to storage (i.e., the transformations are fast CPU-bound operations), the lineage is short, and the underlying source data (HDFS) is itself replicated (so re-reading the source is always possible). Replication is better when the computations are expensive (e.g., training a neural network on each batch) or when the lineage is very long (Spark checkpointing materializes an RDD to HDFS, effectively cutting the lineage at that point and trading compute for storage — useful for iterative algorithms running hundreds of iterations).
RDDs are lazily evaluated: calling a transformation (e.g., map, filter) does not trigger computation. It merely adds a node to the lineage DAG. Computation only begins when an action is called (e.g., count, collect, saveAsTextFile). This allows Spark’s optimizer to fuse transformations, eliminating intermediate materialisation.
Transformations and Actions
Transformations build the DAG; actions execute it.
| Category | Operation | Description |
|---|---|---|
| Transformation | map(f) | Apply f to each element |
| Transformation | flatMap(f) | Apply f, flatten resulting iterables |
| Transformation | filter(p) | Retain elements satisfying predicate p |
| Transformation | reduceByKey(f) | Aggregate values per key using f |
| Transformation | groupByKey() | Group values per key into a list |
| Transformation | sortByKey() | Sort by key |
| Transformation | join(other) | Inner join two pair RDDs by key |
| Transformation | cogroup(other) | Full outer group join |
| Transformation | union(other) | Set union |
| Action | collect() | Return all elements to driver |
| Action | count() | Return number of elements |
| Action | reduce(f) | Aggregate using f, return single value |
| Action | take(n) | Return first n elements |
| Action | saveAsTextFile(path) | Write to HDFS |
| Action | foreach(f) | Apply f to each element (side effects) |
Spark Execution Model
When an action is called, Spark submits the lineage DAG to the DAG scheduler. The DAG scheduler analyses the graph and partitions it into stages at shuffle boundaries. Within a stage, all transformations can be pipelined — a record flows from one transformation to the next without materialisation. Across stage boundaries, a full shuffle (analogous to MapReduce’s shuffle/sort) must occur.
Narrow dependencies occur when each output partition depends on at most one input partition (e.g., map, filter). These can be pipelined within a stage. Wide dependencies occur when an output partition depends on multiple input partitions (e.g., groupByKey, join). These require a shuffle and create stage boundaries.
The task scheduler takes each stage and launches tasks on executors. An executor is a JVM process running on a worker node, holding a fixed number of CPU cores and a fixed memory budget. Tasks within a stage run in parallel across partitions.
text = sc.textFile("hdfs://data/") # RDD 1: raw lines
words = text.flatMap(lambda line: line.split()) # RDD 2: words (narrow)
pairs = words.map(lambda w: (w, 1)) # RDD 3: (word, 1) pairs (narrow)
counts = pairs.reduceByKey(lambda a, b: a+b) # RDD 4: (word, count) — SHUFFLE
sorted_counts = counts.sortByKey() # RDD 5: sorted — SHUFFLE
sorted_counts.saveAsTextFile("hdfs://output/")
The DAG scheduler identifies two stage boundaries (the two shuffles):
Stage 1: textFile → flatMap → map → map-side combine for reduceByKey. All transformations are pipelined — each task reads one HDFS block, tokenises it, and writes sorted (word, partial-count) pairs to local disk for the shuffle. This stage has as many tasks as there are HDFS input blocks (e.g., 100 tasks for a 12.8 GB file with 128 MB blocks).
Stage 2: reduceByKey reduce-side (reads shuffle data from Stage 1’s local disk, sums counts) → map-side for sortByKey. Output is again written to local disk. Number of tasks = spark.sql.shuffle.partitions (default 200).
Stage 3: sortByKey reduce-side (globally sorts by sampling key distribution, then range-partitions and sorts each partition) → saveAsTextFile. Each task writes one output file to HDFS.
Each stage can only begin after the previous stage is fully complete (all tasks finished, shuffle data available). Within a stage, tasks run in parallel across the cluster — if Stage 1 has 100 tasks and the cluster has 50 cores, 50 tasks run simultaneously, then the next 50 run after the first complete.
Persistence and Caching
The killer feature of Spark for iterative algorithms is caching. Calling rdd.cache() (equivalent to rdd.persist(StorageLevel.MEMORY_ONLY)) instructs Spark to materialise the RDD in executor memory after it is first computed, rather than recomputing it from lineage on each subsequent use.
For PageRank running 30 iterations, the adjacency list is read once from HDFS and cached. Each iteration reuses the cached RDD rather than re-reading from disk. This alone can yield 10–100× speedups over equivalent MapReduce implementations.
Storage levels offer trade-offs:
| Level | Description |
|---|---|
MEMORY_ONLY | Store as deserialized Java objects; fastest access |
MEMORY_AND_DISK | Spill to disk if memory is insufficient |
MEMORY_ONLY_SER | Store serialized (smaller, slower to access) |
DISK_ONLY | Store only on disk |
OFF_HEAP | Store in off-heap memory (avoids GC pressure) |
Pair RDDs and Key-Value Operations
reduceByKey is almost always preferable to groupByKey for aggregation tasks. groupByKey collects all values for a key into a single in-memory list before passing it to the user function — if a key has millions of values, this can cause OOM errors. reduceByKey applies a binary associative function incrementally and can perform local aggregation on each partition before the shuffle, exactly like a combiner.
aggregateByKey(zeroValue)(seqOp, combOp) generalises reduceByKey: seqOp merges a new value into an accumulator, combOp merges two accumulators. This supports cases where the accumulator type differs from the value type (e.g., computing a per-key mean requires accumulating a (sum, count) pair).
Accumulators and Broadcast Variables
Accumulators are distributed counters or sums. Workers can only add to an accumulator; only the driver can read the total. They are useful for counting events during a transformation (e.g., counting malformed records) without returning data to the driver via a full reduce action.
Broadcast variables distribute a read-only value to all executors exactly once, rather than serialising it into each task closure. This is critical for map-side joins: if a lookup table is small enough to fit in memory (say, 100 MB), broadcast it to all workers, then each mapper does a local hash-table lookup instead of a shuffle.
Spark vs. MapReduce
| Dimension | MapReduce | Spark |
|---|---|---|
| Intermediate storage | HDFS (disk) | Memory (with disk fallback) |
| Computation graph | Fixed map + reduce | General DAG |
| Iterative algorithms | Poor (HDFS round-trips) | Excellent (cache RDDs) |
| Interactive analysis | None | Spark REPL / notebooks |
| Fault tolerance | Task re-execution | Lineage recomputation |
| Language support | Java primarily | Scala, Python, R, Java |
Chapter 4: Analyzing Text
Inverted Indexes
An inverted index maps each term to the list of documents (and positions) in which it appears — the core data structure of every search engine. Building one at scale is a canonical MapReduce application.
The mapper processes each document, tokenises the text, and emits (term, docId) for every token. The reducer receives all (term, docId) pairs grouped by term and assembles the posting list: a sorted list of document IDs, typically augmented with term frequency \(\text{tf}(t, d)\) (how many times term \(t\) appears in document \(d\)).
For web-scale corpora, posting lists can contain billions of entries and must be compressed. Delta encoding stores gaps between successive document IDs rather than absolute IDs (since IDs are sorted, gaps are small positive integers). Variable-byte encoding encodes small integers in fewer bytes than a fixed 4-byte int. Together, these techniques can reduce posting list size by 10–20×.
TF-IDF Weighting
Raw term frequency is a poor measure of relevance — words like “the” and “is” appear frequently in all documents and carry no discriminative information. TF-IDF (term frequency–inverse document frequency) discounts ubiquitous terms:
\[ \text{tf-idf}(t, d) = \text{tf}(t, d) \cdot \log \frac{N}{\text{df}(t)} \]where \(N\) is the total number of documents and \(\text{df}(t)\) is the number of documents containing term \(t\). Words that appear in many documents get a low IDF weight; rare, specific terms get high weight. Computing IDF requires knowing \(\text{df}(t)\), which is the output of one MapReduce pass (count documents per term). A second pass then computes TF-IDF scores.
“database” appears in \(\text{df} = 10{,}000\) of the 1 million documents. The IDF is:
\[ \text{idf}(\text{"database"}) = \log\!\left(\frac{10^6}{10{,}000}\right) = \log(100) \approx 4.61 \]TF-IDF score: \(0.05 \times 4.61 = 0.23\).
\[ \text{idf}(\text{"the"}) = \log\!\left(\frac{10^6}{950{,}000}\right) = \log(1.053) \approx 0.051 \]TF-IDF score: \(0.12 \times 0.051 = 0.006\).
Despite “the” having a higher raw TF (0.12 vs. 0.05), its TF-IDF score is 38× lower than “database”. This is the desired behaviour: TF-IDF naturally suppresses stopwords without requiring an explicit stopword list, because their IDF approaches zero as they appear in nearly every document.
\[ \text{idf}(\text{"ARIES"}) = \log\!\left(\frac{10^6}{50}\right) = \log(20{,}000) \approx 9.9 \]TF-IDF score: \(0.03 \times 9.9 = 0.30\). A rare technical term gets a high weight, appropriately flagging this document as highly relevant for queries about ARIES.
Documents are represented as sparse vectors in the term space: most entries are zero (a document uses only a tiny fraction of the vocabulary). Cosine similarity between document vectors is the standard relevance metric.
Precision and recall trade off: a system that retrieves every document in the corpus achieves 100% recall but very low precision. A system that retrieves only 1 document (if it is relevant) achieves 100% precision but low recall (1/10 = 10%). \(F_1\) is their harmonic mean, penalising extreme imbalance.
NDCG (Normalized Discounted Cumulative Gain) at \(k=5\). Suppose the 5 retrieved results are labeled by relevance grade: [highly relevant, highly relevant, relevant, not relevant, highly relevant] with gains [3, 3, 1, 0, 3]:
\[ \text{DCG@5} = 3 + \frac{3}{\log_2 3} + \frac{1}{\log_2 4} + \frac{0}{\log_2 5} + \frac{3}{\log_2 6} = 3 + 1.89 + 0.5 + 0 + 1.16 = 6.55 \]The ideal DCG (IDCG) is computed assuming the best possible ordering — all highly-relevant documents first: [3, 3, 3, 1, 0]:
\[ \text{IDCG@5} = 3 + \frac{3}{\log_2 3} + \frac{3}{\log_2 4} + \frac{1}{\log_2 5} + \frac{0}{\log_2 6} = 3 + 1.89 + 1.5 + 0.43 + 0 = 6.82 \]\[ \text{NDCG@5} = \frac{\text{DCG@5}}{\text{IDCG@5}} = \frac{6.55}{6.82} \approx 0.96 \]An NDCG of 0.96 indicates the system’s ranking is nearly optimal for this query. NDCG is preferred over raw precision/recall in modern IR evaluation because it rewards systems that rank highly-relevant documents higher in the result list, not just retrieving a larger quantity of relevant documents.
Language Models
A language model assigns a probability to a sequence of words. The unigram model estimates:
\[ P(w \mid d) = \frac{\text{tf}(w, d)}{|d|} \]This is simply the relative frequency of word \(w\) in document \(d\). For information retrieval, a document is ranked by the probability it would generate the query words under this model.
The problem is zero probability: if a query word does not appear in a document, the model assigns probability zero, causing the document to be ranked last even if it is highly relevant. Smoothing mixes the document model with a background (collection) model:
Jelinek-Mercer smoothing:
\[ P_\lambda(w \mid d) = (1 - \lambda) P(w \mid d) + \lambda P(w \mid C) \]Dirichlet smoothing:
\[ P_\mu(w \mid d) = \frac{\text{tf}(w, d) + \mu \cdot P(w \mid C)}{|d| + \mu} \]Documents are ranked by KL-divergence between the query model and the document model, or equivalently by log-likelihood of the query under the smoothed document model.
Jelinek-Mercer smoothing with parameter \(\lambda\) interpolates between the document model and the collection (background) model:
\[ P_\lambda(w \mid d) = (1-\lambda)\,\frac{\text{tf}(w,d)}{|d|} + \lambda\,\frac{\text{cf}(w)}{|C|} \]where \(\text{cf}(w)\) is the collection frequency of \(w\) and \(|C|\) is the total number of tokens in the corpus. For \(\lambda = 0.1\), 90% of the document’s probability mass comes from the document itself; for \(\lambda = 0.5\), it is a half-and-half mixture. The optimal \(\lambda\) is typically tuned on a held-out validation set — title queries (short, precise) benefit from smaller \(\lambda\); verbose queries benefit from larger \(\lambda\) because they contain many common words that should be down-weighted against the background model.
Dirichlet smoothing has the additional property that it adapts the degree of smoothing to document length: short documents get more smoothing (borrowing more from the background model) than long documents. This is intuitively correct — a short document that does not mention a query term is more likely to be about that term than a long document that does not mention it.
Naive Bayes Text Classification
Naive Bayes classifies documents by applying Bayes’ theorem with the strong independence assumption that words occur independently given the class:
\[ P(c \mid d) \propto P(c) \prod_{w \in d} P(w \mid c) \]Training consists of estimating \(P(c)\) (class prior) and \(P(w \mid c)\) (per-class word distribution) from labelled data — pure counting, trivially parallelisable in MapReduce. Prediction computes the log of the above expression to avoid floating-point underflow:
\[ \log P(c \mid d) = \log P(c) + \sum_{w \in d} \log P(w \mid c) \]Laplace smoothing (add-one smoothing) prevents zero probability for unseen words: \(P(w \mid c) = (\text{count}(w, c) + 1) / (|V| + \sum_{w'} \text{count}(w', c))\).
Latent Dirichlet Allocation
Topic models treat documents as mixtures of latent topics and topics as distributions over words. LDA (Blei et al., 2003) is the standard topic model. Training via collapsed Gibbs sampling iterates: for each word token in the corpus, remove its current topic assignment, then resample a new topic based on how frequently the word appears in each topic and how frequently the document uses each topic.
At scale, this sampling loop over billions of word tokens is parallelised in Spark by partitioning the corpus across workers, each maintaining local topic-word counts that are periodically synchronised. The SparkML LDA estimator implements both online variational Bayes (faster) and Expectation-Maximisation variants.
In a distributed implementation, \(\theta_{dk}\) is local to the partition holding document \(d\) — no synchronisation needed. But \(\phi_{kw}\) is global: every update to a word token’s topic assignment changes \(\phi\) for all workers. Exact synchronisation after every token update would require \(O(N \times \text{num\_topics})\) network messages for a corpus of \(N\) tokens — prohibitive.
The practical solution is approximate synchronisation: each worker maintains a local copy of \(\phi\) and batches its updates. Every \(B\) token updates, the worker pushes its local deltas to a parameter server (or an all-reduce collective) and pulls the global \(\phi\) back. With batch size \(B = 1000\), the communication cost is \(O(N/1000 \times K \times V)\) where \(K\) is the number of topics and \(V\) is the vocabulary size — a 1000× reduction over exact synchronisation. The trade-off is that some token assignments are made using a slightly stale \(\phi\), introducing noise into the sampling process. In practice, the sampler still converges because the staleness is small relative to the signal in \(\phi\).
Chapter 5: Analyzing Graphs
Graph Representations and Challenges
Graphs are pervasive in data-intensive applications: the web graph (pages and hyperlinks), social graphs (users and friendships), knowledge graphs (entities and relations), and citation networks. A graph with \(n\) nodes and \(m\) edges can be stored in HDFS as an adjacency list: each line contains a node ID followed by the IDs of its neighbours. For sparse graphs (most real-world graphs), this is far more compact than an adjacency matrix.
Processing graphs in MapReduce is fundamentally awkward because graph algorithms are typically iterative (propagate information along edges until convergence) and inherently non-local (a single step may require accessing the neighbours’ neighbours’ neighbours). Each MapReduce iteration must load the entire graph from HDFS, limiting iteration throughput.
Parallel BFS in MapReduce
Breadth-first search computes the shortest-hop distance from a source node \(s\) to all other nodes. In MapReduce, the standard approach maintains a distance array and iterates:
- Represent each node as a record
(node_id, current_distance, adjacency_list). - Mapper: if
current_distance < ∞, emit(neighbour, current_distance + 1)for each neighbour; also emit the node itself to preserve the adjacency list. - Reducer: for each node, take the minimum distance seen across all emitted records.
- Repeat until no distances change (detected via a counter accumulator).
The inefficiency is that every node is processed every iteration, even nodes far from the current frontier. Spark’s caching helps (the adjacency list is cached), but the fundamental issue of scanning the full graph per iteration remains. Pregel (Chapter 10) addresses this with message-passing that only activates nodes on the frontier.
Web graphs have larger effective diameter (10–30) due to their more hierarchical structure, but still converge quickly relative to the graph’s size. Random geometric graphs (e.g., sensor networks where nodes connect only to geographically nearby sensors) have much larger diameter (\(O(\sqrt{n})\) for a 2D grid layout) and would require hundreds of iterations.
The counter accumulator trick for convergence detection works as follows: a Hadoop counter is a global atomic long maintained by the framework across all tasks. The mapper increments a counter NUM_UPDATES whenever it updates a node’s distance. After each iteration, the driver checks whether NUM_UPDATES > 0. If zero, no distances changed in this iteration, so the algorithm has converged. This avoids the need for the driver to explicitly check the entire distance array — the check is \(O(1)\) regardless of graph size.
PageRank
PageRank models a random surfer who follows hyperlinks at random with probability \(d\) and teleports to a random page with probability \(1 - d\):
\[ P(v) = \frac{1 - d}{N} + d \sum_{u \rightarrow v} \frac{P(u)}{\text{out}(u)} \]This is solved by power iteration: initialise \(P(v) = 1/N\) for all \(v\), then repeatedly apply the above formula until the \(L_1\) norm of the change vector falls below a threshold (typically 10–30 iterations suffice).
The MapReduce implementation: the mapper emits (neighbour, P(u) / out(u)) for each outgoing edge, distributing the source’s rank to all its neighbours. It also emits the adjacency list itself to reconstruct the graph. The reducer sums incoming contributions and applies the damping factor.
- Node 0 → [1, 2] (out-degree 2)
- Node 1 → [3] (out-degree 1)
- Node 2 → [3] (out-degree 1)
- Node 3 → [0] (out-degree 1)
Each record in HDFS is of the form (nodeId, [current_rank, outlinks]). After initialization, all ranks are \(1/4 = 0.25\).
Map phase: For each node, emit rank contributions to each out-neighbour, and emit the node’s adjacency structure (tagged with a sentinel) to reconstruct the graph for the next iteration:
- Node 0 (rank 0.25, out-degree 2): emit (1, 0.125), (2, 0.125), and (0, [0, [1,2]])
- Node 1 (rank 0.25, out-degree 1): emit (3, 0.25), and (1, [0, [3]])
- Node 2 (rank 0.25, out-degree 1): emit (3, 0.25), and (2, [0, [2]]) — wait, Node 2 links to 3
- Node 3 (rank 0.25, out-degree 1): emit (0, 0.25), and (3, [0, [0]])
Shuffle: Group by node ID. Reducer 0 receives contributions from Node 3 (0.25) plus the adjacency sentinel from Node 0. Reducer 3 receives contributions from Node 1 (0.25) and Node 2 (0.25).
Reduce phase: Apply the PageRank formula \(\text{new\_rank} = (1-d)/N + d \times \sum \text{contributions}\):
- Node 0: \(0.15/4 + 0.85 \times 0.25 = 0.0375 + 0.2125 = 0.25\)
- Node 1: \(0.0375 + 0.85 \times 0.125 = 0.0375 + 0.10625 = 0.144\)
- Node 2: \(0.0375 + 0.85 \times 0.125 = 0.144\)
- Node 3: \(0.0375 + 0.85 \times (0.25 + 0.25) = 0.0375 + 0.425 = 0.4625\)
After iteration 1, Node 3 (which receives contributions from two nodes) has accumulated more rank. Iterations continue until the \(L_1\) norm of rank changes falls below a convergence threshold (typically \(10^{-6}\)). In practice, 10–30 iterations suffice for the ranks to converge on most real graphs.
The key structural constraint is that the mapper must emit the adjacency list alongside the rank contributions — otherwise the graph structure would be lost between iterations, because HDFS stores each iteration’s output as a flat file with no memory of the previous iteration’s structure.
Dangling nodes (pages with no outgoing links) accumulate rank and never distribute it, causing rank to “leak” from the system. The fix is to add a teleportation term that distributes dangling node mass uniformly across all pages.
In Spark, PageRank benefits enormously from caching the adjacency list RDD. The RDD containing links is read from HDFS once and cached, while the ranks RDD is updated each iteration:
val links = sc.textFile(…).map(…).cache()
var ranks = links.mapValues(_ => 1.0)
for (_ <- 1 to numIter) {
val contribs = links.join(ranks).flatMap { case (_, (urls, rank)) =>
urls.map(url => (url, rank / urls.size))
}
ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}
Graph Partitioning
For distributed graph processing, how nodes and edges are partitioned across machines fundamentally affects communication cost. Edge-cut partitioning places vertices on partitions and cuts edges that cross partitions; vertex-cut partitioning places edges on partitions and replicates vertices. For power-law graphs (most real social and web graphs), vertex-cut partitioning typically yields fewer replicas because high-degree vertices would otherwise cause massive edge cuts.
Under edge-cut partitioning, a high-degree hub node with degree \(d\) that is placed on machine \(m\) causes all \(d\) of its edges to be stored locally on \(m\), but most of its neighbours are on other machines. Every superstep in a BSP algorithm, machine \(m\) must send messages to potentially \(d\) other machines — if \(d = 10^6\) (e.g., a Twitter celebrity), machine \(m\) becomes a communication bottleneck.
Under vertex-cut partitioning, edges are distributed evenly across machines. Each machine holds a roughly equal number of edges. Hub vertices are replicated: a “mirror” copy exists on each machine that holds some of the hub’s edges. Computation on the hub’s outgoing edges is spread across all machines that hold those edges. The cost is that the hub’s vertex state (e.g., its PageRank value) must be replicated and synchronized across all machines that hold a mirror — but for PageRank, this is one scalar per hub per superstep.
GraphX implements vertex-cut partitioning by default, distributing edges with a 2D-partitioning scheme that bounds the maximum replication factor to \(O(\sqrt{k})\) for a vertex of degree \(k\).
Chapter 6: Data Mining and Machine Learning at Scale
Gradient Descent
Most supervised machine learning reduces to minimising a loss function \(\mathcal{L}(\theta)\) over model parameters \(\theta\). Gradient descent updates:
\[ \theta \leftarrow \theta - \eta \nabla_\theta \mathcal{L}(\theta) \]where \(\eta\) is the learning rate. The loss decomposes across training examples:
\[ \mathcal{L}(\theta) = \sum_{i=1}^{n} \ell(\theta; x_i, y_i) \]Batch gradient descent sums gradients over the entire dataset before updating — parallelisable in MapReduce (each mapper computes gradients on its shard; the reducer sums them) but slow to converge because it takes one update per full pass over the data.
Stochastic gradient descent (SGD) updates after every example — fast convergence but noisy. Mini-batch GD processes small batches (32–256 examples) per update, balancing convergence speed and gradient noise.
MapReduce Gradient Computation
The MapReduce pattern for gradient computation is straightforward:
- The driver broadcasts the current parameter vector \(\theta\) to all workers.
- Each mapper computes the gradient contribution from its local data shard.
- The reducer sums gradients across all shards.
- The driver applies the update and repeats.
Communication cost per iteration is \(O(|\theta|)\) — the parameter vector must traverse the network each iteration. For deep learning models with billions of parameters, this becomes the bottleneck.
Logistic Regression
Logistic regression models binary classification with the sigmoid function \(\sigma(z) = 1 / (1 + e^{-z})\):
\[ P(y = 1 \mid x; \theta) = \sigma(\theta^\top x) \]The cross-entropy loss is:
\[ \mathcal{L}(\theta) = -\sum_i \left[ y_i \log \sigma(\theta^\top x_i) + (1 - y_i) \log (1 - \sigma(\theta^\top x_i)) \right] \]The gradient is:
\[ \nabla_\theta \mathcal{L} = \sum_i \left( \sigma(\theta^\top x_i) - y_i \right) x_i \]This sum is embarrassingly parallel. In practice, distributed logistic regression uses L-BFGS (limited-memory Broyden-Fletcher-Goldfarb-Shanno), a quasi-Newton method that approximates the inverse Hessian and converges in far fewer iterations than SGD, making the communication overhead worthwhile.
Parameter Server Architecture
For very large models, the driver bottleneck is addressed by the parameter server (Li et al., 2014). The parameter space is sharded across a set of dedicated server nodes. Worker nodes pull the parameters they need, compute gradients on local data, and push gradients back. Servers apply updates asynchronously.
Asynchronous SGD introduces stale gradients: a worker may compute a gradient using parameter values that are several updates old by the time the gradient reaches the server. Surprisingly, asynchronous SGD still converges for many convex and non-convex problems — the staleness adds noise but does not prevent convergence, and the throughput gains from eliminating synchronisation barriers typically outweigh the convergence penalty.
The AllReduce collective (used by Horovod, distributed PyTorch, and TensorFlow’s tf.distribute) eliminates the server bottleneck by having workers communicate directly with each other using a ring or tree topology. In ring-AllReduce, each of the \(W\) workers holds \(1/W\) of the gradient vector. In two phases (reduce-scatter + all-gather), each worker accumulates the sum of all workers’ partial gradients for its slice and then distributes the summed slice back to all workers. The total data transferred per worker is \(2(W-1)/W \times P\) values — independent of \(W\) in the asymptotic limit. This gives AllReduce near-ideal scaling: doubling the number of workers doubles gradient throughput, with only a small constant-factor increase in communication per iteration.
AllReduce is synchronous (all workers must participate in each round), making it unsuitable for highly heterogeneous clusters where stragglers slow everyone down. The parameter server’s asynchrony is its key advantage in heterogeneous settings.
Spark MLlib
Spark’s MLlib provides a Pipelines API modelled after scikit-learn:
- Transformer: applies a transformation to a DataFrame (e.g.,
Tokenizer,StandardScaler,PCA). - Estimator: learns from data and produces a Transformer (e.g.,
LogisticRegression,KMeans). - Pipeline: chains Transformers and Estimators into a single workflow; calling
fit()on a Pipeline trains all Estimators in sequence. - PipelineModel: the fitted Pipeline, ready for
transform().
Key algorithms available: logistic regression, linear SVM, random forests, gradient-boosted trees, k-means, LDA, collaborative filtering (ALS), PCA, Word2Vec, TF-IDF feature extraction, and the hashing trick (feature hashing for dimensionality reduction of sparse features).
Distributed K-Means
K-means clustering iterates between assigning each point to its nearest centroid and recomputing centroids as the mean of assigned points. In MapReduce/Spark:
- Initialisation: k-means++ selects initial centroids by sampling with probability proportional to squared distance from already-selected centroids, improving convergence.
- Assignment step (map): each mapper computes the nearest centroid for each of its points, emitting
(centroid_id, point). - Update step (reduce): each reducer receives all points assigned to one centroid and computes the new centroid as their mean.
- Convergence: repeat until centroid movement falls below a threshold or a maximum iteration count is reached.
The broadcast variable for centroids (at most \(k \times d\) floats) is sent to all workers each iteration, avoiding a shuffle for the assignment step.
The k-means++ initialization algorithm addresses this by selecting initial centroids in a way that spreads them apart: the first centroid is chosen uniformly at random; each subsequent centroid is chosen with probability proportional to its squared distance from the nearest already-chosen centroid. This “seeding” strategy guarantees that the expected WCSS after initialization is within \(O(\log k)\) of the optimal solution, which in practice reduces the number of restarts needed to find a good clustering.
In a distributed setting, k-means++ initialization requires a sequential pass through the data (each centroid must be selected before the next probability distribution is computed). A practical alternative is k-means|| (k-means parallel), which overshoots the number of required centroids by a factor of \(O(\log k)\) in each of only \(O(\log \psi)\) rounds (where \(\psi\) is the initial WCSS), dramatically reducing the number of sequential passes while retaining the \(O(\log k)\) approximation guarantee.
Chapter 7: Analyzing Relational Data
Relational Algebra in MapReduce
Relational operations map naturally to MapReduce. Selection (\(\sigma\)) is a map-side filter: each mapper discards records not matching the predicate, with no shuffle required. Projection (\(\pi\)) is a map-side field extraction. The expensive operations are joins (\(\bowtie\)) and group-by aggregations (\(\gamma\)).
Reduce-Side Join
The most general join strategy: tag each record with its source relation before the shuffle, then join in the reducer.
- Mapper for relation \(R\): emit
(join_key, ("R", record)). - Mapper for relation \(S\): emit
(join_key, ("S", record)). - Reducer: receives all records for a key, partitioned by source tag. Compute the Cartesian product of records from \(R\) with records from \(S\).
This works for any join size but requires both relations to be shuffled in full, giving network cost \(O(|R| + |S|)\).
Map-Side Join (Broadcast Join)
If one relation is small enough to fit in memory (the “dimension table” in a star schema), broadcast it to all mappers as a hash table. Each mapper then performs a local hash lookup for each record in the large relation, with no shuffle at all.
This is the most efficient join strategy when applicable, reducing network cost to \(O(\text{small relation})\) for the broadcast plus zero shuffle. Spark implements this automatically as a broadcast hash join when one side is smaller than spark.sql.autoBroadcastJoinThreshold (default 10 MB).
Repartition Join
When neither relation fits in memory, a repartition join pre-partitions both relations on the join key into the same set of partitions. Co-located partition pairs are joined locally without a full shuffle across the network. This requires both relations to be sorted by join key within each partition.
SparkSQL’s default join strategy for large tables (when broadcast join is not applicable) is the sort-merge join: after the repartition shuffle, each partition is sorted within each task, and the two sorted streams are merged. The cost is:
\[ \text{Shuffle cost} + \text{Sort cost} + \text{Merge cost} = O(|R| + |S|) + O\!\left(\frac{|R|}{P} \log \frac{|R|}{P}\right) \times P + O(|R| + |S|) \]where \(P\) is the number of partitions. The sort cost per partition is \(O\!\left(\frac{|R|}{P} \log \frac{|R|}{P}\right)\), and with \(P\) partitions the total sort cost is \(O(|R| \log(|R|/P))\). Choosing \(P\) large enough that each partition fits in memory eliminates disk spills in the sort phase, reducing the sort to an in-memory operation and the cost to \(O((|R| + |S|) \log(|R|/P))\) total.
The performance gap came from three sources: (1) lack of indexing — MapReduce has no query optimizer, no statistics, and no indexes; every query scans all input data from HDFS; (2) disk I/O overhead — MapReduce materialises intermediate results to HDFS between every stage, while column stores keep data in column-compressed format and pipeline between operators; (3) serialisation cost — MapReduce serialises and deserialises every key-value pair through a general-purpose Java serialiser on both the map-output side and the reduce-input side.
So why did MapReduce succeed and not be immediately replaced by parallel databases? Its genuine advantages were: (a) schema-on-read — no schema declaration is needed; raw logs, JSON, and binary blobs are processed directly without ETL; (b) fault tolerance for long-running jobs — a MapReduce job running for 8 hours automatically recovers from individual task failures without restarting from scratch; most parallel DBMSs of 2009 would abort the entire query on any node failure; (c) programmability — arbitrary Python, Java, or C++ code runs inside mappers and reducers, including calls to external APIs, model inference, and complex parsing logic that cannot be expressed in SQL.
The two paradigms have since converged: Hive and SparkSQL add SQL optimisation atop MapReduce/Spark, while cloud analytical databases (Redshift, BigQuery, Snowflake) add fault-tolerant, scalable batch execution atop columnar storage. The boundary between “distributed database” and “big data platform” is now largely a matter of historical branding rather than technical distinction.
Spark SQL and DataFrames
Spark SQL provides a structured API over RDDs. A DataFrame is an RDD of Row objects with a named schema, analogous to a relational table. The same physical execution engine backs both the DataFrame API and SQL queries.
df = spark.read.parquet("users.parquet")
df.filter(df.age > 25).groupBy("city").agg(count("*")).show()
The Catalyst optimizer applies a sequence of rule-based transformations (constant folding, predicate pushdown, projection pruning) and cost-based optimisations (join reordering based on table statistics) to the logical plan before generating the physical plan. The Tungsten execution engine generates JVM bytecode at runtime (whole-stage code generation), eliminates Java object overhead with unsafe memory management, and optimises for CPU cache locality.
Adaptive Query Execution
Spark 3.0 introduced Adaptive Query Execution (AQE), which re-optimises the query plan at runtime using statistics collected from completed stages. Key features: coalescing small shuffle partitions (avoids the overhead of thousands of tiny tasks), dynamically switching join strategies (if a table turns out to be smaller than expected, switch from sort-merge join to broadcast hash join), and skew join optimisation (split skewed partitions).
ANALYZE TABLE equivalent has been run), stale (the data was updated since statistics were last computed), or inaccurate (the true data distribution has correlations that the optimizer's independence assumption misses).AQE’s key insight: once Stage 1 is complete, Spark knows exactly how many bytes and rows were produced per shuffle partition. If Stage 1 was supposed to produce 200 shuffle partitions of 50 MB each (as estimated), but actually produced 198 partitions of 1 KB each and 2 partitions of 5 GB each, AQE can:
- Coalesce the 198 small partitions into a few larger partitions, reducing the number of Stage 2 tasks from 200 to (say) 5 for those partitions — eliminating the overhead of scheduling 198 nearly-empty tasks.
- Split the 2 giant partitions into sub-partitions, each assigned to a separate task, preventing those two tasks from being stragglers.
- Re-plan the join: if the shuffle output of one table is much smaller than estimated, AQE can switch from a sort-merge join (which was chosen based on the estimated size) to a broadcast hash join — reading that table’s shuffle data into memory on each executor rather than performing another shuffle.
This runtime re-optimization is only possible because Spark materialises shuffle data to disk between stages, creating a natural inspection point. Flink’s record-at-a-time model cannot perform this kind of inter-stage re-optimization because there are no discrete stage boundaries to inspect.
Delta Lake and the Lakehouse
Traditional data lakes (HDFS/S3 with Parquet files) lack ACID transactions: concurrent writes can corrupt files, schema changes are fragile, and “time travel” (querying historical snapshots) is impossible. Delta Lake (Databricks, open-sourced 2019) adds a transaction log (_delta_log/) to an object storage directory. The log records every operation as a JSON entry; Parquet files are never modified in place.
This enables: ACID transactions (optimistic concurrency control via log entry conflicts), schema enforcement and evolution, time travel (df.read.format("delta").option("versionAsOf", 5)), and Z-ordering (space-filling curve multi-column sort for data skipping). Delta Lake represents the “Lakehouse” architecture: the scalability and cost of a data lake with the reliability and governance of a data warehouse.
Delta Lake’s transaction log is a directory _delta_log/ containing numbered JSON files, one per committed transaction. Each JSON file records the set of Parquet files added and removed by that transaction. A table’s current state is computed by reading all JSON log entries up to the latest version and aggregating the adds and removes.
ACID transactions are implemented via optimistic concurrency control: when two writers try to commit simultaneously (versions \(n\) and \(n+1\)), only one can win the atomic PUT of the version \(n+1\) log file (S3 guarantees atomic single-object PUT). The loser reads the winner’s commit, checks for conflicts (did both modify the same partition?), and either retries without conflict or raises a concurrency exception.
Snapshot isolation for readers: a reader specifies the version number to read (or implicitly uses the latest). The reader assembles the table state from the log up to that version and reads only the corresponding Parquet files, never seeing partially-applied writes.
Vacuum: because Parquet files are never deleted immediately (deletion is recorded as a “remove” entry in the log, but the file remains on disk), old versions accumulate. The VACUUM command deletes Parquet files that are no longer referenced by any log entry within the retention window (default: 7 days). After VACUUM, time travel to versions older than the retention window is no longer possible.
Chapter 8: Real-Time Analytics — Streaming Systems
Motivation
Batch processing (MapReduce, Spark) is inherently retrospective: you process data that has already accumulated. Many applications require processing data as it arrives, with latency measured in milliseconds to seconds rather than hours. Financial fraud detection must flag a suspicious transaction before it settles. Real-time bidding in digital advertising must decide within 100 ms whether to bid on an ad impression. IoT platforms must alert operators seconds after a sensor exceeds a threshold.
Apache Kafka
Kafka is a distributed log optimised for high-throughput, durable, ordered message delivery. Its core abstraction is the topic, a named, ordered sequence of records. Topics are partitioned — each partition is an ordered, immutable sequence of records stored on one or more brokers. Producers append records to partitions; consumers read records sequentially from a given offset.
The key insight of Kafka’s design is that the log is durable: records are retained for a configurable period (days to weeks), not deleted upon consumption. This allows consumers to replay history, multiple consumer groups to independently consume the same topic, and the stream to serve as a source of truth for downstream reprocessing.
Consumer groups enable parallel consumption: each partition is consumed by exactly one consumer within a group, so a group with \(n\) members can process up to \(n\) partitions in parallel. Adding consumers beyond the partition count yields no additional parallelism.
- C1 consumes P0, P1
- C2 consumes P2, P3
- C3 consumes P4, P5
Each consumer reads exclusively from its assigned partitions — there is no coordination required during normal operation. C1 commits offset 1,000 for P0 and 2,500 for P1; C2 commits offset 3,100 for P2 and 800 for P3, and so on. These offsets are stored in a special Kafka topic called __consumer_offsets.
If C2 crashes, Kafka’s group coordinator detects the failure (C2 stops sending heartbeats within the session.timeout.ms). A rebalance is triggered: C1 and C3 take over C2’s partitions. C3 now consumes P2, P3, P4, P5, resuming from the committed offsets (3,100 for P2, 800 for P3). No messages are lost and no messages are duplicated (assuming C2’s last batch was fully committed before the crash).
If a fourth consumer C4 joins the group, another rebalance occurs. With 6 partitions and 4 consumers, a typical assignment is C1↔P0, C2↔P1,P2, C3↔P3,P4, C4↔P5. Adding a seventh consumer would leave one consumer idle — a strong incentive to set the number of partitions equal to the maximum expected consumer parallelism when creating the topic.
On a modern HDD, sequential write throughput is approximately 100–200 MB/s, while random writes (seeking to a random location before each write) achieve only 0.1–1 MB/s — a gap of 100–1000×. On an SSD, sequential writes reach ~500 MB/s vs. random writes of ~10–50 MB/s (still a 10–50× gap, because even SSDs must erase and rewrite flash pages at the granularity of large erase blocks).
By restricting producers to sequential appends and consumers to sequential reads, Kafka achieves throughput that approaches raw disk bandwidth. A single Kafka broker with a modern SSD can sustain ~1 million messages per second (at 1 KB per message). By contrast, a traditional message queue that uses a B-tree or hash table to store message state performs random I/O on every enqueue and dequeue, limiting throughput to roughly 10,000–100,000 messages per second on the same hardware.
A second optimization is that Kafka relies on the OS page cache for reads: when a consumer reads messages that were recently produced, those messages are often still in the OS page cache from when the producer wrote them — the data never touches disk at all. This means Kafka can deliver messages from producer to consumer entirely in memory at close to network bandwidth, while still guaranteeing durability for older messages via the on-disk log.
Stream Processing Fundamentals
Two time domains are critical in stream processing:
- Event time: the time at which an event actually occurred (embedded in the event’s data).
- Processing time: the time at which the event is processed by the stream processor.
These differ because of network delays, retransmissions, and out-of-order delivery. A sensor reading from 10:00:00 may arrive at the stream processor at 10:00:05. For most analytics applications (e.g., “how many clicks occurred in the 9am–10am window?”), event time is correct; processing time is an approximation that can be significantly off.
Watermarks are the mechanism for reasoning about event-time progress despite out-of-order data. A watermark at time \(t\) is a declaration that all events with event time \(\leq t\) have been observed. Any event arriving with event time \(< t\) after the watermark is a late event and is either dropped or handled via a special late-data policy.
Windowing divides the infinite stream into finite chunks for aggregation:
- Tumbling windows: fixed-size, non-overlapping (e.g., one window per hour).
- Sliding windows: fixed-size, overlapping by a hop (e.g., 1-hour windows updated every 10 minutes).
- Session windows: variable-size, defined by gaps in activity (e.g., a user session ends after 30 minutes of inactivity).
State representation: Maintain a circular buffer of 6 buckets, each accumulating 10 seconds of events (since 60 s / 10 s = 6 buckets). Each bucket is a hash map from word to count.
At each 10-second mark:
- Emit: sum the word counts across all 6 active buckets. This is the sliding window result for the past 60 seconds.
- Rotate: discard the oldest bucket (it now falls outside the 60-second window) and open a fresh empty bucket for the next 10 seconds of incoming events.
- Fill: route all incoming events for the next 10 seconds into the new (youngest) bucket.
Concrete trace (showing bucket contents at each slide boundary):
- At t=60s: buckets cover [0,10), [10,20), [20,30), [30,40), [40,50), [50,60). Emit sum. Drop [0,10) bucket, open [60,70) bucket.
- At t=70s: buckets cover [10,20), [20,30), [30,40), [40,50), [50,60), [60,70). Emit sum. Drop [10,20) bucket, open [70,80) bucket.
Memory and compute cost: At 100,000 events/second and a vocabulary of at most 1 million distinct words, each 10-second bucket holds at most \(10{,}000 \times 10 = 100{,}000\) word-count entries (though in practice far fewer distinct words appear in any 10-second window). Emitting the result requires summing 6 hash maps — \(O(\text{distinct words per window})\) — which is far cheaper than re-scanning the raw event log for the past 60 seconds. This bucket-based design is the core of Flink’s built-in sliding window operator and Spark Structured Streaming’s sliding window aggregate.
Spark Structured Streaming
Spark Structured Streaming extends the DataFrame API to streaming data. The same SQL and DataFrame operations work on both batch and streaming DataFrames. Internally, Spark treats the stream as an unbounded table to which rows are continuously appended, and executes micro-batches (by default, as fast as possible) against it.
Three output modes govern what is written to the sink per micro-batch:
- Append mode: only new rows are written (suitable for stateless queries or windows with watermarks).
- Complete mode: the entire result table is rewritten each batch (suitable for aggregations that may update historical rows).
- Update mode: only rows that changed since the last batch are written.
Stateful aggregations (e.g., per-session counts) use a state store (backed by RocksDB or in-memory hash maps) to maintain per-key state across micro-batches.
Apache Flink
Flink is a true stream processor: it processes events one at a time rather than in micro-batches. This gives Flink sub-millisecond latency (compared to Spark Streaming’s latency of 100ms–1s) at the cost of slightly higher throughput overhead per event.
Flink’s native streaming processes each event individually as it arrives, maintaining per-key state (in RocksDB or in-memory hash maps) that is updated on every event. There is no artificial batching delay — the latency from event arrival to result emission is bounded by the processing time of a single event (typically 1–50 ms including network overhead). The throughput penalty is that Flink must maintain more complex state management infrastructure (per-record state access, watermark tracking, checkpoint coordination) that has higher overhead per event than Spark’s batched execution.
The practical choice: use Spark Structured Streaming when the application can tolerate 500 ms–2 s latency and benefits from SQL/DataFrame expressiveness; use Flink when sub-100 ms latency is required (financial fraud detection, real-time bidding) or when stateful processing at very high cardinality (millions of active keys) requires RocksDB-backed state that exceeds JVM heap capacity.
Flink’s fault tolerance relies on distributed snapshots (inspired by the Chandy-Lamport algorithm). Periodically, a barrier is injected into each input stream. When an operator has received the barrier on all inputs, it snapshots its state to durable storage. Recovery replays the input from the last barrier. Combined with Kafka’s durable log, this provides exactly-once semantics end-to-end: no event is processed more than once, and none is lost, even after failures.
Flink’s RocksDB state backend stores keyed state in an embedded RocksDB instance on each task manager, enabling state larger than memory (important for long-running streaming applications with millions of keys).
Exactly-Once Semantics
The exactly-once guarantee is the gold standard for streaming correctness but the most expensive to achieve:
- At-most-once: messages may be lost; no duplicates. Lowest overhead.
- At-least-once: no messages are lost; duplicates possible. Medium overhead.
- Exactly-once: no messages lost, no duplicates. Highest overhead.
Kafka achieves exactly-once producer delivery via idempotent producers (sequence numbers prevent duplicate writes on retry) and transactions (atomic writes across partitions). Flink’s checkpointing mechanism, combined with Kafka transactions for output, provides end-to-end exactly-once.
Kafka’s idempotent producer solves this by assigning each message a monotonically increasing sequence number. The broker remembers the last sequence number it accepted from each producer. If a message arrives with a duplicate sequence number (indicating a retry), the broker accepts it (to acknowledge) but does not write it again — the message is deduplicated at the broker level. This works because Kafka brokers have a persistent log: the broker can check the last accepted sequence number even after a restart.
The harder problem is ensuring that the consumer’s offset commit and the consumer’s output write are atomic. If the consumer processes a message, writes the result to an external database, and then crashes before committing its Kafka offset, the message will be reprocessed on restart and the result may be written twice to the database. Flink solves this by using a two-phase commit protocol with Kafka: the Flink checkpoint barrier acts as the “prepare” phase, and only when the checkpoint is complete does Flink commit the Kafka consumer offsets and commit any pending Kafka producer transactions — atomically, in a single transaction. If Flink crashes between the checkpoint and the commit, the pending transaction is aborted and the checkpoint is replayed.
Achieving exactly-once with arbitrary external sinks (non-Kafka databases, REST APIs) is not generally possible without application-level idempotency — either the sink must support idempotent writes (e.g., upsert semantics) or the application must design its writes to be naturally idempotent (e.g., setting a value rather than incrementing it).
Chapter 9: Mutable State — Key-Value Stores and HBase
Limitations of HDFS for Mutable State
HDFS is optimised for sequential, batch access to immutable data. It cannot support the access patterns of many real-world applications: random reads of individual records (e.g., fetch user profile by user ID), low-latency point queries (e.g., check if a URL has been crawled), or frequent small updates (e.g., increment a counter). These patterns require a different storage abstraction.
Google Bigtable
Bigtable (Chang et al., OSDI 2006) is described as a “sparse, distributed, persistent, multi-dimensional sorted map”:
\[ (\text{row key},\ \text{column family},\ \text{column qualifier},\ \text{timestamp}) \rightarrow \text{value} \]The row key is an arbitrary byte string; rows are sorted lexicographically. All data for a row key is co-located, enabling efficient range scans. Column families group related columns for separate physical storage and compaction settings; they must be declared at schema creation time. Column qualifiers are arbitrary byte strings within a family; they need not be declared. Timestamps allow multiple versions of each cell.
Data is stored in tablets (ranges of the row key space), each served by one tablet server. Writes go to an in-memory memtable and a write-ahead log (WAL) for durability. When the memtable fills, it is flushed to an immutable SSTable on GFS. Reads merge the memtable and all SSTables. Periodic compaction merges SSTables, reclaiming space and reducing read amplification.
Apache HBase
HBase is the open-source Bigtable clone that runs on HDFS. The architecture mirrors Bigtable: Region servers serve data for a contiguous range of row keys; each region has a MemStore (in-memory write buffer), a WAL (HLog, stored on HDFS for durability), and a set of HFiles (immutable SSTables on HDFS). A central HMaster manages region assignment but is not in the data path.
LSM Trees
The data structure underlying both Bigtable and HBase is the Log-Structured Merge Tree (LSM tree). Writes are directed to an in-memory buffer (L0), which is fast. When L0 is full, it is flushed to an on-disk sorted structure (L1 SSTable). Periodically, SSTables at level \(i\) are merged and pushed to level \(i+1\) in a compaction process.
Read amplification: a point query for \(k\) must now check L0 (in-memory, fast), then S1 through S4 in L1 (each requiring a Bloom filter check and potentially a disk read), then potentially the relevant SSTable in L2 (if an older version is there). In the worst case, a read touches \(O(\text{number of levels} \times \text{SSTables per level})\) disk files — this is read amplification.
Compaction reduces read amplification by merging SSTables: when L1 accumulates enough SSTables, the LSM tree runs a compaction job that reads all L1 SSTables and the corresponding key range in L2, merges them (applying any deletes and keeping only the latest version of each key), and writes new, larger SSTables at L2. After compaction, key \(k\) exists in exactly one SSTable at L2 — a read now touches L0 plus exactly one SSTable, reducing read amplification.
The trade-off is write amplification: each byte of data may be written once to L0, once to L1, once to L2, and potentially again if compaction merges L2 into L3. In RocksDB’s default configuration with 7 levels, the write amplification is approximately 10–30×: a 1 MB write to the database may cause 10–30 MB of actual disk writes across all compaction levels. This is why SSDs wear out faster under LSM workloads than under B-tree workloads — the higher write amplification translates to more flash erase cycles per logical write.
This design separates the write path (sequential, in-memory) from the read path (may require checking multiple levels). The trade-off:
| Operation | LSM Tree | B-Tree (traditional RDBMS) |
|---|---|---|
| Write throughput | Very high (sequential log) | Moderate (random updates) |
| Read latency | Higher (merge multiple levels) | Lower (single tree traversal) |
| Space amplification | Higher (duplicate keys across levels) | Lower |
| Write amplification | Higher (data rewritten at each level) | Lower |
RocksDB (Facebook’s LSM implementation) is the state backend for many modern systems including Flink, Kafka Streams, and TiKV.
Row Key Design
The row key is the only primary index in HBase. It determines:
- Co-location: rows with adjacent keys are co-located in the same region, enabling efficient range scans.
- Load distribution: if all writes go to the same lexicographic range (e.g., using a timestamp prefix means all recent writes go to the last region), one region server becomes a hotspot.
Good row key design avoids hotspots. For time-series data, salting prepends a random prefix to distribute writes across regions. Hash bucketing replaces the natural key with hash(key) + key, distributing rows uniformly at the cost of losing range-scan capability.
userId + timestamp — this co-locates all events for a user and enables efficient time-range scans per user.However, if user IDs are monotonically increasing (e.g., registration order), new users always write to the last region. The most recent region server handles 100% of writes — a write hotspot.
Salting fix: prepend a 2-byte hash of the user ID: hash(userId)[0:2] + userId + timestamp. With 256 possible salt values (2^8), writes are distributed uniformly across up to 256 regions. The salt is deterministic (derived from userId), so reads for a specific user still go to exactly one region (the same salt prefix is computed at read time).
Read cost: a scan of all events for user 42 is still a single-region range scan, because the salt prefix for user 42 is fixed (say, 0xA7). The scan key is 0xA7 + userId=42 + timestamp=MIN to 0xA7 + userId=42 + timestamp=MAX.
Multi-user scan cost: a scan of all events for all users in a time range cannot exploit row-key ordering — it requires a full table scan. For applications that frequently need cross-user time-range queries, an alternative key design (e.g., reversed timestamp prefix with hash suffix) might be better, but then per-user time-range scans become more expensive. Row key design in HBase is a zero-sum trade-off between different access patterns; unlike a relational DBMS, there is no query optimizer to help — the developer must make this choice explicitly at schema design time.
The Google Bigtable Design for Web Search
Bigtable’s data model is elegantly suited to web search infrastructure. Consider the web crawl table used by Google: the row key is the reversed hostname plus path (e.g., com.google.www/search). Reversing the hostname groups all pages from the same domain together lexicographically, enabling efficient range scans over all pages from google.com. This is a direct example of the row key design principle: choose the key to co-locate data that is frequently accessed together.
Column families separate different types of data: a contents: family stores the raw HTML of each page (one cell per crawl timestamp, allowing the history of a page’s content to be retrieved); a anchor: family stores all anchor texts pointing to this page (each column qualifier is the URL of the referring page, so anchor:cnn.com/ stores the anchor text that CNN’s homepage uses when linking to this page); a language: family stores the detected language. This schema supports the full range of operations needed for web search: fetching the latest content for indexing, computing PageRank using link structure, and language-specific ranking — all from a single table.
CAP Theorem
The CAP theorem (Brewer, 2000; Gilbert and Lynch, 2002) states that a distributed system can provide at most two of three guarantees simultaneously:
| Property | Meaning |
|---|---|
| Consistency (C) | Every read sees the most recent write |
| Availability (A) | Every request receives a (non-error) response |
| Partition tolerance (P) | The system continues operating despite network partitions |
Since network partitions are unavoidable in real systems, the practical choice is between CP (consistent, but may reject requests during a partition) and AP (available, but may return stale data during a partition). HBase chooses CP: it rejects writes to regions whose MemStore WAL cannot be confirmed, rather than risk an inconsistent state. Cassandra, by contrast, is AP: it allows writes even when some replicas are unreachable, reconciling conflicts later.
Chapter 10: Graph Analytics Redux and System Design
Pregel: Vertex-Centric Graph Processing
MapReduce’s graph algorithms suffer from the same inefficiency as any other iterative algorithm: full graph scans per iteration, with adjacency lists serialised to and from HDFS. Pregel (Malewicz et al., SOSP 2010) introduces the vertex-centric (also called “think like a vertex”) programming model.
Computation proceeds in supersteps, each comprising:
- Each active vertex receives messages sent in the previous superstep.
- Each active vertex runs its
compute()function: reads incoming messages, updates its value, sends messages along outgoing edges, and optionally votes to halt. - All messages are delivered; halted vertices are reactivated if they receive a message.
Computation terminates when all vertices have voted to halt and there are no pending messages. This is the Bulk Synchronous Parallel (BSP) model: all vertices compute in parallel within a superstep, and communication occurs only at superstep boundaries.
PageRank in Pregel: each vertex stores its current rank and a list of neighbours. In each superstep, it sends rank / degree to each neighbour and updates its own rank by summing received contributions. After a fixed number of supersteps (or convergence check via aggregator), computation halts.
The advantage over MapReduce is that only active vertices (those on the current frontier, or those that received messages) need to run compute(). For BFS, only the frontier is active; the rest of the graph is idle. For PageRank, all vertices are active every superstep — the benefit is in-memory message passing rather than disk I/O.
- 0 → 1 (weight 4)
- 0 → 2 (weight 2)
- 1 → 3 (weight 5)
- 2 → 1 (weight 1)
- 2 → 3 (weight 8)
- 3 → 4 (weight 2)
Source node: 0. Initial distances: node 0 = 0, all others = ∞.
Superstep 1: Only node 0 is active (distance 0). Node 0 sends messages: (node 1, distance 4) and (node 2, distance 2). All other nodes are halted.
After superstep 1, each node takes the minimum of its current distance and all received messages:
- Node 1: min(∞, 4) = 4 → activated
- Node 2: min(∞, 2) = 2 → activated
- Node 0 votes to halt.
Superstep 2: Nodes 1 and 2 are active.
- Node 1 (dist 4) sends: (node 3, 4+5=9).
- Node 2 (dist 2) sends: (node 1, 2+1=3), (node 3, 2+8=10).
After superstep 2:
- Node 1: min(4, 3) = 3 → updated and re-activated
- Node 3: min(∞, 9, 10) = 9 → activated
Superstep 3: Nodes 1 and 3 are active.
- Node 1 (dist 3) sends: (node 3, 3+5=8).
- Node 3 (dist 9) sends: (node 4, 9+2=11).
After superstep 3:
- Node 3: min(9, 8) = 8 → updated
- Node 4: min(∞, 11) = 11 → activated
Superstep 4: Nodes 3 and 4 active. Node 3 (dist 8) sends (node 4, 8+2=10). Node 4 updates to min(11, 10) = 10.
Superstep 5: Node 4 is active but has no outgoing edges; it votes to halt. No pending messages remain → computation terminates.
Final distances from node 0: {0:0, 1:3, 2:2, 3:8, 4:10}. The BSP model guarantees that at the start of superstep \(r+1\), every node has received all messages sent in superstep \(r\) — this synchrony is what makes the correctness argument straightforward.
Asynchronous systems (PowerGraph, GraphLab) allow vertices to compute and send messages without waiting for a global barrier. A vertex may receive an updated message from a neighbour while that neighbour is still computing — effectively using stale information from earlier rounds mixed with fresh information from the current one. For PageRank, asynchronous updates propagate information faster through the graph: a rank update to a high-degree hub node is immediately available to all its neighbours on their next compute step, rather than waiting for the next superstep. In practice, asynchronous PageRank converges in 2–5× fewer effective iterations than synchronous PageRank.
The cost of asynchrony is complexity: reasoning about correctness requires careful analysis of whether the update function is a contraction mapping (i.e., whether iterating it asynchronously still converges to the same fixed point as the synchronous version). For monotone algorithms like BFS and shortest paths, asynchronous updates are provably safe. For algorithms with non-monotone updates, asynchrony can cause oscillation or divergence.
The trade-off is checkpoint overhead: at every checkpoint interval, all workers write their vertex states to GFS — a cost proportional to the total graph state size. For a graph with \(n = 10^9\) vertices each with 8 bytes of state, checkpointing writes 8 GB per checkpoint. With \(k = 5\) supersteps between checkpoints and a superstep duration of 30 seconds, checkpoints happen every 2.5 minutes, imposing an additional 8 GB / (500 MB/s) = 16 seconds of I/O overhead per checkpoint interval — about 11% overhead.
In practice, Pregel systems (and GraphX) use two optimisations: (1) delta checkpointing — only write vertex states that changed since the last checkpoint, not the full state; (2) lineage-based recovery (as in GraphX) — store the sequence of aggregated message sets for each superstep, allowing replay from the last successful superstep rather than the last full checkpoint. The latter is analogous to Spark’s RDD lineage and is efficient when message sets are compact relative to the full vertex state.
GraphX
GraphX is Spark’s graph processing library, providing a property graph API on top of RDDs. A property graph stores arbitrary attributes on vertices and edges:
- VertexRDD:
RDD[(VertexId, VD)]— vertex IDs with vertex data. - EdgeRDD:
RDD[Edge[ED]]— edges with source ID, destination ID, and edge data. - Triplet view:
RDD[EdgeTriplet[VD, ED]]— each edge augmented with source and destination vertex attributes.
The core messaging primitive is aggregateMessages:
graph.aggregateMessages[Msg](
sendMsg = ctx => ctx.sendToDst(ctx.srcAttr),
mergeMsg = (a, b) => a + b
)
This replaces Pregel’s compute() and combiner in a single call. sendMsg runs on each triplet; mergeMsg combines messages arriving at the same destination.
GraphX provides built-in algorithms: PageRank (both static iterations and dynamic convergence), connected components, strongly connected components, triangle counting, shortest paths, and label propagation for community detection.
Dedicated graph databases (Neo4j, Amazon Neptune, TigerGraph) are optimised for these transactional patterns. They use native graph storage models — vertices and edges stored with direct pointers to adjacency lists — enabling traversals that jump from vertex to vertex via pointer dereference (cost: \(O(\text{degree})\) per hop) rather than via a join (cost: \(O(\log n)\) in a B-tree or \(O(1)\) in a hash table per hop, but with much higher constant factors due to indirection through the storage layer).
The benchmark distinction: GraphX can run PageRank over a 1 billion node graph in minutes; Neo4j can answer “find all paths between user A and user B of length ≤ 4” in milliseconds for a million-node graph. The access pattern determines the tool — batch analytics belongs to GraphX/Pregel; real-time traversal belongs to dedicated graph databases.
System Design: Choosing the Right Tool
A key learning from this course is that no single system is optimal for all workloads. Choosing between systems requires understanding the access patterns, latency requirements, and consistency needs of the application:
| Workload | Recommended Stack |
|---|---|
| Batch ETL, large-scale aggregation | HDFS + Spark (DataFrame/SQL) |
| Ad-hoc analytics on structured data | Spark SQL / Presto / Trino |
| Iterative ML training | Spark MLlib / Horovod on Spark |
| Graph algorithms | Spark GraphX |
| Real-time stream processing (low latency) | Kafka + Flink |
| Real-time stream processing (ease of use) | Kafka + Spark Structured Streaming |
| Random-access mutable data (high throughput) | HBase / Cassandra |
| OLTP (strong consistency, low latency) | PostgreSQL / MySQL |
Lambda and Kappa Architectures
The Lambda architecture (Nathan Marz, 2011) addresses the tension between accuracy and latency by running two parallel pipelines:
- Batch layer: reprocesses the entire historical dataset periodically to produce accurate, complete views. Slow but correct.
- Speed layer: processes only the most recent data (since the last batch run) with low latency. Fast but approximate or incomplete.
- Serving layer: merges results from batch and speed layers to answer queries.
The Lambda architecture guarantees that the batch layer will eventually correct any errors introduced by the speed layer. However, maintaining two separate codebases (one batch, one streaming) for the same business logic is operationally expensive.
The Kappa architecture (Jay Kreps, 2014) simplifies this by eliminating the batch layer: everything is a stream. Historical reprocessing is done by replaying Kafka from offset 0 with a new version of the streaming application. This works when the stream processor (e.g., Flink) is fast enough to reprocess history within an acceptable window and when the streaming API is expressive enough to handle all queries.
The Kappa architecture is most appropriate when: (1) the stream processor can replay historical data at 10–100× real-time speed (Flink can typically do this); (2) Kafka retains the full event history (which requires sufficient storage — at 1 TB/day of events, retaining 1 year of history requires ~365 TB of Kafka storage); and (3) all business logic is expressible in the streaming API. When historical datasets are petabyte-scale or when certain queries require full-table scans that are more naturally expressed in SQL on a column store, the Lambda architecture’s batch layer remains valuable. The “Lakehouse” architecture (Delta Lake + Spark) can be seen as a modernized Lambda variant where both batch and streaming are expressed in the same SparkSQL API, with the Delta transaction log providing ACID semantics across both.
The Reduce-Side Join in Detail
The reduce-side join is the most general join strategy in MapReduce, but its generality comes at a cost: both full relations are shuffled across the network, incurring \(O(|R| + |S|)\) bytes of network traffic. For relations of 1 TB each, this means 2 TB of data traverses the cluster network — at 10 Gbps network bandwidth, that is over 26 minutes of network transfer time, dominating the total job time.
Broadcast hash join (map-side join): if one relation fits in memory on each worker (typically \(\leq\) 100 MB, configurable), broadcast it to all workers and perform a local hash lookup for each record in the large relation. Network cost: \(O(|S_\text{small}| \times W)\) for the broadcast plus zero shuffle. This is \(W\)× more data sent than the reduce-side join, but it is sent to each worker once rather than involving a sort and merge — in practice it is 5–20× faster than a reduce-side join for small dimension tables.
Repartition join: if both relations are too large for broadcasting but are already partitioned on the join key, perform a local join on co-located partition pairs. If the data is not already partitioned, a repartition shuffle is needed — same cost as a reduce-side join but without the tagging overhead.
Reduce-side join: when neither relation fits in memory. Tag records with their source, shuffle by join key, and join in the reducer. This always works but is the most expensive.
Semi-join: if the filter selectivity of one relation is very high, project it to just the join key (a much smaller dataset), broadcast the projected keys, filter the large relation on each worker (eliminating most rows), and then perform the full join on the filtered result — saving a large fraction of the shuffle traffic.
Spark’s Catalyst optimizer selects between broadcast hash join and sort-merge join (equivalent to repartition join) automatically based on estimated table sizes and the spark.sql.autoBroadcastJoinThreshold configuration. For custom RDD operations (not DataFrames), the developer must choose the strategy manually.
Performance Tuning
Achieving good performance in Spark requires attention to several dimensions:
Serialisation: Spark’s default Java serialisation is slow and produces large objects. Kryo serialisation is typically 10× faster and 5× smaller. Configure with spark.serializer = org.apache.spark.serializer.KryoSerializer.
Kryo is faster than Java serialisation because it writes only the object’s data fields (no class metadata per object), uses schema registration (the class-to-ID mapping is declared at startup rather than embedded in each serialised byte stream), and avoids Java’s reflection overhead. For a typical (String, Int) pair representing a word-count intermediate result, Java serialisation produces approximately 120 bytes (class descriptor + object header + field data); Kryo produces approximately 12 bytes (2-byte type ID + null-terminated string + 4-byte int). Across 10 billion intermediate pairs in a word count job over a 1 TB corpus, this is the difference between 1.2 TB and 120 GB of shuffle data — a 10× reduction in network and disk I/O from a single configuration change.
Partitioning: too few partitions underutilises the cluster; too many creates excessive task scheduling overhead and small shuffle files. A rule of thumb is 2–4 partitions per CPU core. Data skew (one key with far more records than others) causes stragglers — detect with the Spark UI’s stage task distribution histogram, then salt the skewed key.
Memory management: executor memory is divided between execution (shuffle, sort, aggregation) and storage (cached RDDs). The unified memory manager (Spark 1.6+) allows these regions to borrow from each other dynamically. Off-heap storage (Tungsten unsafe memory) avoids Java GC pauses for cached DataFrames.
Shuffle optimisation: the sort-based shuffle (default) spills to disk when the in-memory buffer fills. Increase spark.shuffle.memoryFraction or executor memory to reduce spills. Use reduceByKey over groupByKey to reduce shuffle data size. Enable spark.sql.shuffle.partitions tuning (default 200 is often too many for small datasets, too few for large ones).
The map-side write is \(O(M \log M)\) where \(M\) is the mapper output size (sort is the bottleneck). The network transfer is \(O(\text{total shuffle data})\) and is often the dominant cost because network throughput (10 Gbps) is lower than local disk throughput (500 MB/s for SSD × number of mapper nodes). The reduce-side merge is \(O(M \log R)\) where \(R\) is the number of reducers (external merge sort if data spills to disk).
Reducing shuffle size: the most impactful optimisation is reducing the amount of data that crosses the network. Techniques include: (a) apply reduceByKey (which performs local pre-aggregation like a combiner) instead of groupByKey followed by a reduce; (b) use column pruning — if only two of ten columns participate in the join key and the downstream operation, project to those columns before the shuffle; (c) filter before the shuffle — apply all selective predicates before the shuffle so that only matching rows are transferred.
Shuffle service: in cluster deployments, Spark can use an external shuffle service (a persistent daemon on each node) so that shuffle files remain accessible even after executor processes are killed. This enables dynamic resource allocation — Spark can release executors that finish their map tasks early (returning cores to the cluster for other jobs) while shuffle files remain readable by later reducers. Without the external shuffle service, killing an executor also destroys its shuffle files, preventing dynamic allocation.
Detecting skew: the Spark UI’s Stages tab shows the task duration distribution. If the maximum task duration is much larger than the median, skew is likely. The Input Data column shows how much data each task processed — a task with 10 GB vs. 10 MB median is a clear signal.
Fixing skew with salting: for a join between a large table L and a small table R on key K, if K is skewed, append a random integer in \([0, n)\) to K in L (“salt” the key) and replicate each row of R \(n\) times with each of the \(n\) salted key variants. The join is then performed on the salted key, distributing the skewed key’s work across \(n\) tasks. After the join, drop the salt suffix. The cost is replicating R by a factor of \(n\) — acceptable when R is small and \(n\) is modest (e.g., \(n = 10\)).
AQE skew join optimization: Spark 3.0’s Adaptive Query Execution automates this for sort-merge joins. If AQE detects that a shuffle partition is much larger than average, it splits that partition into multiple sub-tasks, each processing a smaller slice of the skewed key’s data. The non-skewed side is broadcast to each sub-task. This requires no manual salting — AQE applies it automatically after observing the actual shuffle sizes.
Garbage collection: long GC pauses stall tasks and can trigger speculative execution. Use G1GC (-XX:+UseG1GC) for executors with large heaps. Monitor GC time in the Spark UI’s executor tab.
Chapter 11: Hadoop Ecosystem Deep Dive
HDFS Internals: NameNode, DataNodes, and Replication
The brief description of HDFS in Chapter 1 glossed over several engineering choices that profoundly affect correctness and performance. This chapter examines the architecture at the level required to diagnose failures and make informed deployment decisions.
The NameNode never stores block data itself. DataNodes store blocks as plain files in their local file systems, identified by block ID. On startup, each DataNode performs a block report — enumerating all block IDs present on its local disks — and sends this report to the NameNode. The NameNode cross-references block reports against its namespace to verify that every block has the required replication factor. This bootstrap protocol means that the NameNode’s in-memory block-to-DataNode mapping is rebuilt entirely from DataNode reports on every restart, rather than persisted to disk directly.
- The client requests a new block from the NameNode. The NameNode allocates a block ID and selects three DataNodes according to the rack-awareness policy: the first replica on the client's rack (or a random rack if the client is off-cluster), the second on a different rack, and the third on the same rack as the second but a different DataNode.
- The client establishes a TCP pipeline: Client → DN1 → DN2 → DN3.
- The client streams block data to DN1 in 64 KB packets. DN1 forwards each packet to DN2; DN2 forwards to DN3.
- Acknowledgements flow back along the pipeline: DN3 → DN2 → DN1 → Client. The client considers the write durable only when it has received a pipeline acknowledgement from all three DataNodes.
- When the block is complete, the client notifies the NameNode, which updates its namespace to record the block-to-DataNode mapping.
The rack-awareness policy ensures that a single rack failure (power or top-of-rack switch outage) cannot destroy all replicas. With one replica on rack A and two on rack B, a rack A failure leaves two intact replicas on rack B. A rack B failure leaves one intact replica on rack A — below the replication factor, triggering re-replication from rack A to another available DataNode.
- Replica 1: R1-DN2 (same rack as client, reduces cross-rack traffic during write)
- Replica 2: R2-DN1 (a different rack — ensures rack-fault tolerance)
- Replica 3: R2-DN3 (same rack as replica 2, different node — saves cross-rack bandwidth vs. a third rack, while still tolerating R1's failure)
This placement means the write pipeline sends data across exactly one inter-rack link (Client → R1-DN2 → R2-DN1, and R2-DN1 → R2-DN3 uses the intra-rack switch). Cross-rack bandwidth is the scarcer resource in most data centers (bisection bandwidth is lower than intra-rack bandwidth), so minimising inter-rack traffic during writes is the primary design goal of rack-awareness.
YARN: Yet Another Resource Negotiator
MapReduce 1.0 conflated resource management with job scheduling: the JobTracker managed both cluster resources and the execution of every MapReduce job. This made it impossible to run non-MapReduce workloads (Spark, Flink, MPI) on the same cluster. YARN decouples these responsibilities.
- ResourceManager (RM): the global cluster manager. Tracks available resources (CPU and memory) on each node. Accepts resource requests from applications and allocates containers (resource bundles of a specified CPU+memory slice on a specific node).
- NodeManager (NM): a per-node daemon. Launches and monitors containers on its node, reporting resource usage back to the RM.
- ApplicationMaster (AM): a per-application daemon, itself running in a container allocated by the RM. The AM implements the application-specific scheduling logic — requesting containers, distributing work to them, and monitoring their progress.
The ApplicationMaster model is the key insight: each application (a Spark job, a MapReduce job, a Flink session) brings its own scheduling logic, running in a user-space container rather than privileged RM code. A Spark AM requests containers based on Spark’s task scheduler’s needs; a MapReduce AM requests containers based on map and reduce task counts. The RM only needs to know about CPU and memory — it is agnostic to what the containers actually compute.
- The RM allocates one container for the Spark ApplicationMaster (typically 1 core, 1 GB).
- The Spark AM launches and registers with the RM. It requests 10 containers of (4 core, 8 GB).
- The RM's scheduler (CapacityScheduler or FairScheduler) checks available resources per node and allocates containers as they become available — e.g., Container C1 on node N3, Container C2 on node N7, etc.
- The AM receives container allocations and launches Spark executor JVMs in each container via the NodeManager on that node.
- As tasks complete and executors become idle, the AM can request additional containers (dynamic allocation) or release unused containers back to the RM.
This negotiation protocol adds roughly 2–5 seconds of startup latency to every Spark job (RM → AM round-trip, AM registration, container request/grant cycles). For long-running jobs this is negligible; for interactive queries expected to complete in under a second, it is unacceptable — hence Spark’s long-running executor model where executors persist across queries within the same SparkSession.
Hive: HiveQL to MapReduce DAG
Hive provides a SQL-like interface (HiveQL) over HDFS-stored data, compiling queries to chains of MapReduce jobs (or Tez DAGs in newer versions). Its design philosophy is “schema on read”: the schema is applied when the data is read, not when it is written. This allows raw log files, CSV files, and JSON to be queried directly by declaring an external table definition.
A HiveQL query such as:
SELECT country, COUNT(*) AS cnt
FROM page_views
WHERE date = '2025-01-01'
GROUP BY country
ORDER BY cnt DESC
LIMIT 10;
is compiled to three MapReduce jobs: (1) a map-only job that applies the WHERE predicate and projects to (country, 1) pairs; (2) a MapReduce job that shuffles by country and counts (the GROUP BY aggregation); (3) a second MapReduce job that globally sorts by count descending and returns the top 10. Each job writes its output to a temporary HDFS directory. The Hive metastore tracks table schemas and partition metadata, enabling partition pruning — if page_views is partitioned by date, job 1 reads only the date=2025-01-01 partition rather than scanning the full table.
The deeper insight is that the Hive compilation pipeline — parsing SQL, building a logical plan, applying rule-based optimisations (predicate pushdown, column pruning), and generating a physical MapReduce DAG — is structurally identical to a relational query compiler. Hive’s metastore (which stores table statistics and partition metadata) is structurally identical to a database catalog. Spark’s Catalyst optimizer and SparkSQL are the direct descendants of these ideas, replacing MapReduce execution with in-memory DAG execution and adding cost-based optimisation.
ORC and Parquet: Columnar Storage Formats
Row-oriented formats (CSV, JSON, Avro, SequenceFile) store each record contiguously: all fields of row 1, then all fields of row 2, etc. Analytical queries typically access only a few columns out of many (e.g., SELECT SUM(revenue) FROM sales WHERE year = 2024 accesses 2 of perhaps 50 columns). With row-oriented storage, the entire row must be read from disk even though 48 of 50 columns are discarded — pure I/O waste.
- Column projection: only the columns needed by a query are read from disk. For a 50-column table and a 2-column query, I/O is reduced by 25×.
- Predicate pushdown: each row group stores min/max statistics per column. If the predicate
year = 2024can be evaluated using these statistics, entire row groups can be skipped without reading their data. - Compression efficiency: values in the same column are drawn from the same domain (e.g., all integers, all country codes), so they compress far better than interleaved row data. Run-length encoding, dictionary encoding, and delta encoding achieve 5–20× compression ratios on columnar data vs. 2–3× on row-oriented data.
- Vectorised execution: columnar data is naturally amenable to SIMD CPU instructions — an AVX-512 instruction can compare 16 integers simultaneously, enabling 16 predicate evaluations per CPU cycle.
Row-oriented (CSV/Avro): The query must read all 10 TB from disk, decompress, and then discard 47 of 50 columns. Effective I/O: 10 TB. With columnar compression (5×), the row-oriented compressed file is 2 TB — still fully read.
Columnar (Parquet/ORC): Only 3 columns are read. Each column occupies \(10 \text{ TB} / 50 = 200 \text{ GB}\) uncompressed, compressed to 40 GB at 5×. Total I/O: \(3 \times 40 \text{ GB} = 120 \text{ GB}\). After predicate pushdown on the date column (skip row groups outside 2024), suppose 80% of row groups are eliminated: actual I/O drops to \(0.2 \times 120 \text{ GB} = 24 \text{ GB}\).
The 10 TB row-oriented scan vs. 24 GB columnar scan is an 83× I/O reduction — this is the empirical basis for the “10–100× better for analytics workloads” claim. In wall-clock time, assuming 500 MB/s HDFS read throughput: row-oriented scan takes \(10{,}000 \text{ GB} / 0.5 \text{ GB/s} \approx 5.6\) hours; columnar scan takes \(24 \text{ GB} / 0.5 \text{ GB/s} \approx 48\) seconds.
Parquet and ORC are the two dominant columnar formats in the Hadoop ecosystem. Parquet (originally developed by Twitter and Cloudera) is the default format for Spark and is supported by virtually every tool in the ecosystem (Hive, Presto, Athena, BigQuery). ORC (Optimized Row Columnar, originally from Hive) achieves slightly better compression and read performance for Hive workloads due to tighter integration with the ORC reader’s vectorised execution engine. For Spark workloads, Parquet is generally preferred.
Chapter 12: Spark Internals
DAG Scheduler and Task Scheduler Separation
Spark’s execution engine is cleanly divided into two layers, each with distinct responsibilities.
spark.task.maxFailures times before failing the stage.The separation is important: the DAG scheduler reasons about logical dependencies between stages (which stage must complete before another can begin); the task scheduler reasons about physical placement and execution of individual tasks within a stage. This separation of concerns allows each component to be independently optimised.
RDD Dependency Types
- Narrow dependency: each output partition depends on at most one input partition. Examples:
map,filter,flatMap,mapPartitions. Narrow dependencies allow pipelining within a stage and enable efficient partition-level fault recovery (only the lost output partition needs to be recomputed, not the whole stage). - Wide dependency (shuffle dependency): each output partition may depend on all input partitions. Examples:
groupByKey,reduceByKey,join(without co-partitioning),sortByKey. Wide dependencies create stage boundaries and require materialising the shuffle output to disk before the next stage can begin.
The distinction is not merely academic. After a node failure, Spark can recover a lost narrow-dependency partition by recomputing only the upstream partition on that partition’s lineage path. For a wide dependency, the lost output partition may have been produced from records across all input partitions — recovering it requires recomputing contributions from potentially every input partition, which may require re-running the entire upstream stage.
Speculative Execution of Straggler Tasks
In large clusters, some tasks invariably run slower than others — due to network congestion, disk I/O contention, or hardware degradation. A stage cannot complete until all its tasks finish, so a single straggler can delay the entire job.
spark.speculation = true. The task scheduler maintains a running median of task completion times within each stage. A task is declared a straggler if it has been running longer than spark.speculation.multiplier × median (default: 1.5×) AND longer than spark.speculation.quantile (default: 75%) of tasks in that stage have already finished. The scheduler launches a speculative copy of the straggler on a different executor. Whichever copy finishes first — the original or the speculative — provides the result; the other is killed.The speculative copy uses the same input partition as the original. Because Spark partitions are immutable and deterministic, the result is guaranteed to be identical regardless of which copy finishes first. Speculative execution is most effective when straggler tasks are caused by external interference (slow disk, network congestion) rather than by the task having more input data than average (data skew) — in the latter case, the speculative copy will also be slow, providing no benefit.
Spark Memory Management: Unified Memory Pool and Tungsten
Before Spark 1.6, memory was statically divided between execution (shuffle, sort, aggregation) and storage (cached RDDs), with a fixed boundary. If a job needed more execution memory than the allocation, tasks would spill to disk even if the storage region was empty and vice versa.
String object representing the 3-character string "foo" occupies approximately 56 bytes — 18× the raw data size. Tungsten (introduced in Spark 1.4) bypasses the Java object model by storing data in raw binary format in sun.misc.Unsafe-managed off-heap memory (or on-heap, but without GC pressure). Tungsten records are laid out compactly without object headers, enabling:- Cache efficiency: sequential memory layout maximises CPU cache hit rates.
- Zero-copy operations: sort, shuffle, and aggregate on binary data without deserialising to Java objects.
- Reduced GC pressure: off-heap data is invisible to the Java garbage collector, eliminating GC pauses proportional to heap size.
Catalyst Optimizer Pipeline
SparkSQL’s Catalyst optimizer transforms a SQL string into an executable physical plan through four sequential phases.
- Parsing: the SQL string is parsed into an unresolved logical plan — an abstract syntax tree where column names and table references are unresolved tokens. The parser uses ANTLR grammar rules; this phase is purely syntactic.
- Analysis: the Analyzer resolves column names and table references against the catalog (metastore), infers data types, and validates that all referenced tables and columns exist. The output is a resolved logical plan.
- Logical optimization: the Optimizer applies a library of rule-based transformations to the resolved logical plan. Key rules include:
- Predicate pushdown: move
WHEREfilters as close to the data source as possible, reducing the rows flowing through later operators. - Column pruning: eliminate columns not referenced by any downstream operator.
- Constant folding: evaluate constant expressions at compile time (e.g.,
1 + 1→2). - Join reordering (cost-based): if table statistics are available, reorder joins to process smaller tables first.
- Predicate pushdown: move
- Physical planning: the Planner translates the optimised logical plan into one or more physical plans (concrete execution strategies). For a join, multiple physical strategies are considered (broadcast hash join, sort-merge join, shuffle hash join) and the one with the lowest estimated cost is selected. The Planner also inserts shuffle operators at wide-dependency boundaries.
- Code generation (Whole-Stage CodeGen): the physical plan is compiled to JVM bytecode at runtime. Instead of calling a virtual dispatch chain for each operator on each row, the entire pipeline of operators within a stage is fused into a single tight loop — eliminating virtual function call overhead and enabling JIT-friendly code. This is the mechanism behind Tungsten's claim of near-C++ execution speed for some workloads.
SELECT name FROM employees WHERE dept = 'eng' AND salary > 100000 over a Parquet table with columns (id, name, dept, salary, hire_date).After parsing: Project(name) ← Filter(dept=‘eng’ AND salary>100000) ← Scan(employees).
After analysis: types resolved — dept is StringType, salary is LongType, name is StringType.
After logical optimisation: Column pruning removes id and hire_date from the Scan (they are not referenced). Predicate pushdown pushes the filter into the Parquet reader, which will use ORC/Parquet row-group statistics to skip entire row groups where dept ≠ ’eng’ or max(salary) ≤ 100000.
Physical plan: WholeStageCodegen(Project(name) ← Filter(dept=‘eng’ AND salary>100000) ← ParquetScan(cols=[name,dept,salary], pushFilters=[dept=‘eng’,salary>100000])). The Parquet scan reads only 3 of 5 columns, skips non-matching row groups, and the filter+project are compiled into a single tight loop.
Chapter 13: Stream Processing Theory
Chandy-Lamport Distributed Snapshots
Exactly-once semantics in Flink rests on distributed snapshots. A naïve approach — pausing all computation, taking a snapshot, resuming — creates an unacceptable latency spike. The Chandy-Lamport algorithm (1985) takes a consistent global snapshot without stopping computation.
- The checkpoint coordinator injects a special barrier message into each input Kafka partition, after all records up to the checkpoint's logical time.
- When operator O receives a barrier on input channel C:
- O blocks channel C (stops reading from it).
- If barriers have been received on all input channels of O: O snapshots its local state to durable storage (RocksDB checkpoint to S3), then forwards the barrier on all its output channels and unblocks all input channels.
- If only some inputs have delivered the barrier: O buffers incoming records from unbarriered channels (in memory or disk) and continues processing records from barriered channels.
- When all operators have recorded their state and forwarded the barrier downstream, the checkpoint is complete. The checkpoint coordinator records the checkpoint as complete in the job manager.
- On failure recovery: the job replays input from the Kafka offset corresponding to the last complete checkpoint. Operator states are restored from their checkpointed snapshots. Because barriers guarantee that the checkpoint captures a globally consistent state (all in-flight messages at checkpoint time are either included in the snapshot or will be replayed from Kafka), no event is processed twice and none is lost.
The key correctness property: the Chandy-Lamport snapshot captures a consistent cut — a global state where for every message recorded as received by some process, the corresponding send event is also recorded. This is exactly the condition needed for the snapshot to be a valid point to restart from.
| Dimension | Flink | Spark Structured Streaming |
|---|---|---|
| Processing model | Record-at-a-time (true streaming) | Micro-batch (batch engine on each trigger) |
| Latency | 1–50 ms | 100 ms – 2 s |
| Throughput | Lower per-record (state access overhead) | Higher (amortised batch execution) |
| Exactly-once | Chandy-Lamport barriers + Kafka transactions | WAL + offset management in micro-batch |
| State backend | RocksDB (arbitrary state size) | In-memory or RocksDB (pluggable) |
| SQL/DataFrame API | Flink SQL (full SQL:2016) | SparkSQL (full SQL support) |
| Watermarking | Native event-time watermarks | Watermark API (eventTime column) |
| Replay/reprocessing | Replay Kafka from offset | Replay Kafka from offset |
| Ecosystem maturity | Strong in EU/China (Deutsche Telekom, Alibaba) | Strong in US cloud ecosystem (Databricks) |
The practical guideline: choose Flink when sub-100 ms latency is a hard requirement (financial fraud, real-time bidding, alerting). Choose Spark Structured Streaming when the application can tolerate 500 ms+ latency and benefits from tight integration with the Spark batch ecosystem (the same DataFrames, MLlib models, and Delta Lake tables).
Window Types: Formal Definitions and Worked Examples
- Tumbling window of size \(w\): non-overlapping consecutive intervals \([0, w), [w, 2w), [2w, 3w), \ldots\). Each event belongs to exactly one window.
- Sliding window of size \(w\) and slide \(s\) (where \(s \leq w\)): overlapping intervals \([0, w), [s, s+w), [2s, 2s+w), \ldots\). Each event belongs to \(\lceil w/s \rceil\) windows. When \(s = w\) the sliding window degenerates to a tumbling window.
- Session window with gap \(g\): variable-length windows that group events separated by less than \(g\) time into the same session. A new session begins when an event arrives more than \(g\) after the previous event.
t=1:3, t=4:7, t=9:2, t=15:5, t=16:1, t=32:4, t=35:8
Tumbling windows (w=10s): Window [0,10): events at t=1 (3) and t=4 (7) and t=9 (2) → sum=12. Window [10,20): events at t=15 (5) and t=16 (1) → sum=6. Window [20,30): no events → sum=0. Window [30,40): events at t=32 (4) and t=35 (8) → sum=12.
Sliding windows (w=10s, s=5s): Window [0,10): t=1,4,9 → 12. Window [5,15): t=9,15 → 7. Window [10,20): t=15,16 → 6. Window [15,25): t=15,16 → 6. Window [20,30): none → 0. Window [25,35): t=32 → 4. Window [30,40): t=32,35 → 12. Note that the event at t=15 appears in windows [5,15) (endpoint exclusive) and [10,20) — it belongs to \(\lceil 10/5 \rceil = 2\) windows.
Session windows (gap g=10s): Events at t=1,4,9 are within 10s of each other → Session 1 = [1,9], sum=12. Event at t=15 is 6s after t=9 < gap → extends Session 1. Event at t=16 is 1s after t=15 → still Session 1. Session 1 = [1,16], sum=18. Event at t=32 is 16s after t=16 > gap → new Session 2. Event at t=35 is 3s after t=32 < gap → Session 2 = [32,35], sum=12.
Session windows are the most natural model for user behaviour analytics: a web browsing session groups all page views where no page was viewed for more than 30 minutes. Session windows cannot be computed with a fixed-size buffer — their length is data-dependent.
Chapter 14: Large-Scale Graph Processing
PowerGraph: GraphLab and the Gather-Apply-Scatter Model
Pregel’s vertex-centric model places all computation in the compute() function called on each vertex. PowerGraph (Gonzalez et al., OSDI 2012) decomposes vertex computation into three explicit phases to better handle power-law graphs.
- Gather: accumulate information from neighbouring vertices and incident edges. Each edge contributes a value \(g_e\), and the gather phase produces a sum \(\Sigma = \bigoplus_{e \in \text{edges}(v)} g_e\) using an associative, commutative operator \(\oplus\).
- Apply: update the vertex's value using the gathered sum: \(v_{\text{new}} \leftarrow \text{apply}(v, \Sigma)\).
- Scatter: activate neighbouring vertices and/or update adjacent edge values based on the updated vertex value.
The GAS decomposition is designed for vertex-cut partitioning. When a high-degree vertex \(v\) is replicated across \(k\) machines (one “mirror” per machine that holds some of \(v\)’s edges), the Gather phase runs locally on each machine over its subset of \(v\)’s edges, producing a partial gather sum. These partial sums are sent to the “master” copy of \(v\), which combines them and runs Apply. The updated value is then sent back to all mirrors, which run Scatter locally over their edge subsets. This communication pattern is \(O(k)\) messages — proportional to the number of machines holding the vertex’s edges, not the total degree — breaking the bottleneck that Pregel faces for high-degree vertices.
Pregel (vertex-centric): The master of \(v\) sends one message along each of 1,000,000 outgoing edges in the Scatter phase — all 1,000,000 messages originate from the master, saturating its network link.
GAS (PowerGraph): In Scatter, each of the 100 machine mirrors reads the updated rank of \(v\) (one value broadcast from the master) and sends messages along its local 10,000 edges. Network load is distributed across 100 machines, each sending 10,000 messages — no single machine is saturated. The broadcast of the updated rank costs 100 messages (master → 100 mirrors), compared to 1,000,000 messages in Pregel. For a vertex of degree \(d\) replicated across \(k\) machines, GAS costs \(O(d/k + k)\) messages vs. Pregel’s \(O(d)\) — a factor of \(d/k\) reduction for the scatter phase.
Pregel SSSP on a 6-Node Graph: All Supersteps
Graph (directed, weighted edges):
- 0 → 1 (w=6), 0 → 2 (w=2)
- 1 → 3 (w=1)
- 2 → 1 (w=3), 2 → 4 (w=5)
- 3 → 4 (w=1), 3 → 5 (w=4)
- 4 → 5 (w=2)
Source: node 0. Initial distances: d[0]=0, d[1]=d[2]=d[3]=d[4]=d[5]=∞.
Superstep 1 — active: {0}. Node 0 sends: (1, 0+6=6), (2, 0+2=2). Node 0 halts.
Updates: d[1]=min(∞,6)=6, d[2]=min(∞,2)=2. Active after SS1: {1,2}.
Superstep 2 — active: {1,2}. Node 1 (d=6) sends: (3, 6+1=7). Node 2 (d=2) sends: (1, 2+3=5), (4, 2+5=7). Both halt.
Updates: d[1]=min(6,5)=5 → re-activated; d[3]=min(∞,7)=7; d[4]=min(∞,7)=7. Active after SS2: {1,3,4}.
Superstep 3 — active: {1,3,4}. Node 1 (d=5) sends: (3, 5+1=6). Node 3 (d=7) sends: (4, 7+1=8), (5, 7+4=11). Node 4 (d=7) sends: (5, 7+2=9). All halt.
Updates: d[3]=min(7,6)=6 → re-activated; d[5]=min(∞,11,9)=9 → activated; d[4]=min(7,8)=7 (no change, stays halted). Active after SS3: {3,5}.
Superstep 4 — active: {3,5}. Node 3 (d=6) sends: (4, 6+1=7), (5, 6+4=10). Node 5 (d=9) has no outgoing edges; halts.
Updates: d[4]=min(7,7)=7 (no change); d[5]=min(9,10)=9 (no change). Both node 4 and node 5 receive no improvement. Active after SS4: none.
Superstep 5 — no active vertices, no pending messages → computation terminates.
Final shortest distances from 0: d[0]=0, d[1]=5, d[2]=2, d[3]=6, d[4]=7, d[5]=9.
Verification: path 0→2→1→3→5 has cost 2+3+1+4=10 > 9; path 0→2→4→5 has cost 2+5+2=9 ✓; path 0→2→1→3→4→5 has cost 2+3+1+1+2=9 ✓ (tied). The BSP model correctly identifies the shortest distances in 5 supersteps for this 6-node graph.
Chapter 15: Distributed Machine Learning
Synchronous vs. Asynchronous SGD
The parameter server architecture introduced in Chapter 6 supports two update protocols with fundamentally different convergence and throughput characteristics.
The practical implication: in a homogeneous cluster (all workers at similar speed), staleness is small (1–2 iterations) and asynchronous SGD nearly matches synchronous SGD’s convergence while eliminating the synchronization barrier overhead. In a heterogeneous cluster (some workers 3–5× slower), staleness can reach dozens of iterations, causing gradient noise that slows convergence — asynchronous SGD’s advantage over synchronous SGD diminishes, and the stragglers that slow synchronous SGD also corrupt asynchronous gradients.
AllReduce: Ring Topology and Bandwidth Optimality
- Reduce-scatter phase (\(W-1\) steps): divide each gradient vector into \(W\) chunks of size \(P/W\). In each of \(W-1\) steps, each worker sends its current chunk to the next worker in the ring and receives a chunk from the previous worker. After receiving, the worker adds the received chunk to its own copy. After \(W-1\) steps, each worker holds the complete sum for one chunk (its "owned" chunk).
- All-gather phase (\(W-1\) steps): each worker sends its owned (fully summed) chunk around the ring. After \(W-1\) steps, every worker has received the fully summed values for all \(W\) chunks — i.e., the complete gradient sum.
Bandwidth analysis: Each worker sends and receives \(2(W-1)/W \times P\) values total across both phases. As \(W \to \infty\), this approaches \(2P\) values — independent of \(W\). This is bandwidth-optimal: no AllReduce algorithm can use less bandwidth per worker, because each worker must at minimum send and receive the full \(P\)-dimensional sum. Ring-AllReduce achieves this optimum by distributing the aggregation work evenly across all workers and all network links.
Federated Learning
The standard federated averaging algorithm (FedAvg, McMahan et al. 2017) proceeds as:
- Server broadcasts the current global model \(\theta_t\) to a selected subset \(S_t\) of \(K\) clients.
- Each client \(k \in S_t\) trains locally for \(E\) epochs on its local dataset \(\mathcal{D}_k\), producing an updated model \(\theta_t^k\).
- Clients send their updated models back to the server.
- The server aggregates by weighted averaging: \[ \theta_{t+1} = \sum_{k \in S_t} \frac{|\mathcal{D}_k|}{\sum_{j \in S_t} |\mathcal{D}_j|} \cdot \theta_t^k \]
- Repeat from step 1.
The aggregation formula weights each client’s update by the size of its local dataset, giving larger datasets proportionally more influence. This is equivalent to minimising the weighted sum of per-client losses: \(\mathcal{L}(\theta) = \sum_k \frac{|\mathcal{D}_k|}{|\mathcal{D}|} \mathcal{L}_k(\theta)\).
Data parallelism: the full model is replicated on each worker; the training data is partitioned. Each worker computes gradients on its data shard using the full model, and gradients are aggregated (via AllReduce or parameter server) to update the shared model. This is the dominant strategy for most deep learning — it scales to any number of workers as long as the model fits on each worker’s GPU (typically up to ~10 billion parameters on a single A100 GPU with 80 GB HBM).
Model parallelism: the model is partitioned across workers — different layers or attention heads reside on different GPUs. For a transformer with 175 billion parameters (GPT-3), a single GPU cannot hold the full model; model parallelism is required. Each forward and backward pass requires data to flow through layers on different GPUs via point-to-point communication. Pipeline parallelism (a variant) splits layers into stages and pipelines micro-batches through stages, keeping all GPUs busy simultaneously.
The practical strategy for very large models (LLMs, large vision transformers) is 3D parallelism: data parallelism across replicas of the full pipeline, model parallelism to split layers across GPUs within a pipeline stage, and tensor parallelism (splitting individual matrix multiplications across GPUs within a layer). Megatron-LM (NVIDIA) implements all three.
Chapter 16: Consistency and Coordination
Zookeeper: Distributed Coordination Service
Many distributed systems require coordination primitives: leader election (which node is the primary?), distributed locks (only one worker should process a partition), and configuration management (all nodes see the same config). Zookeeper (Hunt et al., ATC 2010) provides these as a general-purpose coordination service.
- Ephemeral znodes: automatically deleted when the client session that created them expires. Used for representing live nodes (if a client crashes, its session expires and its ephemeral znodes are deleted — signalling failure to observers).
- Sequential znodes: the server appends a monotonically increasing counter to the znode name on creation (e.g.,
/election/candidate-0000000042). Used in leader election recipes to establish a total order among candidates.
Watches: a client can set a watch on a znode to receive a one-time notification when the znode changes (data change, creation, deletion, or children change). Watches enable reactive coordination without polling.
- Each candidate creates an ephemeral sequential znode under
/election/, e.g.,/election/candidate-0000000001. - Each candidate reads all children of
/election/and determines its position in the sorted order. - The candidate with the lowest sequence number declares itself leader.
- All other candidates set a watch on the znode with the next lower sequence number (not on the leader directly — this avoids a "herd effect" where all candidates receive notifications when the leader changes).
- When the watched znode is deleted (its owner crashed), the watcher checks whether it now has the lowest sequence number. If yes, it becomes leader; if no, it sets a watch on the new next-lower znode.
This recipe provides leader election with \(O(1)\) watches per candidate (each watches one znode), compared to \(O(n)\) watches if all candidates watched the leader directly. The ephemeral znode mechanism automatically handles leader crashes without an explicit timeout protocol.
Consensus: Paxos and Raft
Consensus is the problem of getting a set of nodes to agree on a single value, despite failures. Formally: a consensus algorithm must satisfy safety (all nodes that decide, decide the same value) and liveness (all non-faulty nodes eventually decide).
Phase 1 (Prepare):
- Proposer chooses a ballot number \(n\) (globally unique, monotonically increasing) and sends
Prepare(n)to all acceptors. - Acceptor: if \(n\) > any previously promised ballot, respond with
Promise(n, v_{\text{accepted}}, b_{\text{accepted}})where \(v_{\text{accepted}}\) is the last accepted value (if any) and \(b_{\text{accepted}}\) is its ballot. Also promise to reject any future Prepare with ballot < \(n\).
Phase 2 (Accept):
- If proposer receives
Promisefrom a quorum: choose the value \(v\) with the highest \(b_{\text{accepted}}\) among all promises (if none, proposer may use its own value). SendAccept(n, v)to all acceptors. - Acceptor: if ballot \(n\) ≥ promised ballot, accept by recording \((n, v)\) and replying
Accepted(n, v)to learners. - Learner: if a quorum of acceptors have sent
Acceptedfor the same \((n, v)\), the value \(v\) is decided.
Raft (Ongaro and Ousterhout, ATC 2014) repackages Paxos’s ideas into a more understandable form. Raft elects a leader explicitly; the leader receives all client writes, appends them to its log, and replicates to followers. A log entry is committed once a quorum of followers acknowledge it. On leader failure, a new leader election uses ballot numbers (Raft calls them “terms”) to prevent stale leaders from overwriting committed entries.
AppendEntries RPCs to all followers. A follower accepts the entry if its log is consistent (the leader's previous log index and term match the follower's). Once a majority of servers have written the entry to their logs, the leader commits it, applies it to the state machine, and responds to the client. If the leader crashes before committing, the new leader's election process guarantees (via the "election restriction" — a candidate must have a log at least as up-to-date as any majority member) that the new leader's log contains all committed entries.Raft and Paxos are used as building blocks in etcd (Kubernetes’ config store), Google Chubby (Zookeeper’s ancestor), CockroachDB (multi-row transactions), and TiKV (distributed key-value store). Zookeeper uses its own ZAB (Zookeeper Atomic Broadcast) protocol, which is semantically equivalent to Paxos for state machine replication.
CRDTs: Conflict-Free Replicated Data Types
The CAP theorem (Chapter 9) forces AP systems to accept stale reads and potential write conflicts. CRDTs (Shapiro et al., 2011) are data types designed to be replicated without coordination, with a merge function that is guaranteed to converge to the same state regardless of the order or number of times replicas are merged.
Increment at node \(i\): \(V[i] \leftarrow V[i] + 1\). Local operation, no coordination needed.
Merge of two replicas \(V\) and \(V'\): \(V_{\text{merged}}[i] = \max(V[i], V'[i])\) for each \(i\).
Commutativity: \(\text{merge}(V, V') = \text{merge}(V', V)\) (max is commutative). Associativity: \(\text{merge}(A, \text{merge}(B, C)) = \text{merge}(\text{merge}(A, B), C)\) (max is associative). Idempotency: \(\text{merge}(V, V) = V\). These three properties guarantee that replicas converge to the same value regardless of merge order, message reordering, or duplicate messages.
N0 increments twice: \(V^{N0} = [2, 0, 0]\). N1 increments once: \(V^{N1} = [0, 1, 0]\). N2 increments three times: \(V^{N2} = [0, 0, 3]\). All increments happen concurrently (no coordination).
N0 and N1 merge: \(V^{N0 \cup N1} = [\max(2,0), \max(0,1), \max(0,0)] = [2, 1, 0]\). Counter value = 3.
N0∪N1 merges with N2: \(V^{\text{final}} = [\max(2,0), \max(1,0), \max(0,3)] = [2, 1, 3]\). Counter value = 6. ✓
If instead N1 merged with N2 first: \(V^{N1 \cup N2} = [0,1,3]\), then merged with N0: \([2,1,3]\) → same result. Commutativity and associativity of merge guarantee order-independence.
Each element added to the set is tagged with a unique token (a UUID). The OR-Set state is a set of (element, token) pairs. Removing element \(e\) removes all pairs with element \(e\) that the removing replica has observed.
Add \(e\) at replica \(r\): generate fresh token \(\tau\); add pair \((e, \tau)\) to \(r\)’s state.
Remove \(e\) at replica \(r\): remove all pairs \((e, \cdot)\) from \(r\)’s current state. (Pairs added concurrently on other replicas, unseen by \(r\), are unaffected.)
Merge of two replicas \(A\) and \(B\): \(A \cup B\) (set union of token-tagged pairs). An element \(e\) is in the merged set iff at least one \((e, \tau)\) pair survived (was added and not subsequently removed on its replica).
Commutativity and associativity: set union is commutative and associative. An element added on one replica and concurrently removed on another will be present after merge (the add’s token survives since the remove only removed tokens visible to the removing replica). This “add-wins” semantics is appropriate for shopping carts, collaborative document edits, and distributed tags.
Linearizability vs. Serializability
These two consistency properties are frequently confused but describe fundamentally different guarantees.
Linearizability is a property of a single register, key-value pair, or distributed lock — it says nothing about multi-object atomicity.
Strict serializability (also called strong serializability) combines both: the serialization order must respect real-time order. It is the strongest commonly-used consistency model and is equivalent to linearizability extended to transactions.
This scenario is a classic split-brain in an AP system (Cassandra, DynamoDB with eventual consistency). The remedy is to use stronger consistency (quorum reads and writes in Cassandra, or strong consistency mode in DynamoDB), accepting higher latency and reduced availability during network partitions.
The relationship between consistency models forms a hierarchy: strict serializability > linearizability (single-object) ≈ serializability (multi-object) > snapshot isolation > read committed > eventual consistency. Distributed systems typically choose a point on this hierarchy based on their latency and availability requirements — no single point is universally optimal.