CS 448: Database Systems Implementation

David Toman

Estimated study time: 2 hr 16 min

Table of contents

Sources and References

Primary textbook — Ramakrishnan and Gehrke, Database Management Systems, 3rd ed., McGraw-Hill, 2003 (chapters 3–23 form the core reading). Supplementary texts — Hector Garcia-Molina, Jeffrey D. Ullman, and Jennifer Widom, Database Systems: The Complete Book, 2nd ed., Pearson, 2009; C. Mohan, Donald Haderle, Bruce Lindsay, Hamid Pirahesh, and Peter Schwarz, “ARIES: A Transaction Recovery Method Supporting Fine-Granularity Locking and Partial Rollbacks Using Write-Ahead Logging,” ACM TODS 17(1), 1992. Online resources — PostgreSQL 16 documentation (postgresql.org/docs); CMU 15-445/645 Database Systems lecture notes (Andy Pavlo, 2023); MIT 6.830 Database Systems open courseware; the ARIES paper (full text freely available via ACM DL).


Chapter 1: The Relational Model and DBMS Architecture

Understanding how a database management system is built — not just used — requires starting from the mathematical foundations that make relational databases correct, then working upward through the architectural layers that make them fast.

The Relational Model

A relation is a set of tuples, each conforming to a schema. Formally, a relation \( R \) over attributes \( A_1, A_2, \ldots, A_n \) with domains \( D_1, \ldots, D_n \) is a subset of \( D_1 \times D_2 \times \cdots \times D_n \). Because it is a set, there are no duplicate tuples and no inherent ordering.

A primary key is a minimal set of attributes whose values uniquely identify every tuple. A foreign key in relation \( R \) referencing relation \( S \) asserts that every value of the foreign-key attributes in \( R \) must appear as a primary-key value in \( S \); this is referential integrity.

Relational Algebra

SQL is declarative — it says what you want. The query optimizer translates SQL into relational algebra, an operational language specifying how to compute the answer. The core operators are:

OperatorSymbolDescription
Selection\( \sigma_\theta(R) \)Keep tuples satisfying predicate \( \theta \)
Projection\( \pi_{A_1,\ldots,A_k}(R) \)Keep only listed attributes (eliminates duplicates in set semantics)
Cartesian product\( R \times S \)All pairs of tuples from \( R \) and \( S \)
Natural join\( R \bowtie S \)Cartesian product filtered by equality on shared attribute names
Union\( R \cup S \)All tuples in either relation (schemas must match)
Difference\( R - S \)Tuples in \( R \) not in \( S \)
Renaming\( \rho_{B/A}(R) \)Rename attribute \( A \) to \( B \)

Extended operators include grouping \( \gamma \), sorting \( \tau \), and explicit duplicate elimination \( \delta \). These are not expressible in pure relational algebra but are needed in practice.

Mapping SQL to Relational Algebra

A SELECT-FROM-WHERE query translates as: form the Cartesian product of the FROM tables, apply the WHERE predicate as \( \sigma \), then project via \( \pi \) onto the SELECT list. Aggregation with GROUP BY maps to \( \gamma_{G; f(A)} \), where \( G \) is the grouping-attribute list and \( f(A) \) is an aggregate function. The query optimizer manipulates this algebraic tree to find an equivalent, cheaper plan.

Example — SQL to relational algebra, step by step. Consider the query:

Find the names of students enrolled in CS 448 with GPA above 3.5.

SELECT s.name
FROM Students s, Enrolled e
WHERE s.sid = e.sid
  AND e.cid = 'CS448'
  AND s.gpa > 3.5;

A naïve mechanical translation applies the Cartesian product first, then the full WHERE predicate, then projects:

\[ \pi_{\text{name}}\!\left(\sigma_{\text{gpa} > 3.5 \;\land\; \text{cid} = \text{'CS448'} \;\land\; s.\text{sid} = e.\text{sid}}(\text{Students} \times \text{Enrolled})\right) \]

The optimizer immediately rewrites this by pushing selections down before the join, drastically reducing intermediate result sizes. The predicate \(\text{gpa} > 3.5\) references only Students, so it is applied there first; the predicate \(\text{cid} = \text{'CS448'}\) references only Enrolled, so it is applied there first. The join predicate \(s.\text{sid} = e.\text{sid}\) is then used as the natural join condition:

\[ \pi_{\text{name}}\!\Big(\sigma_{\text{gpa} > 3.5}(\text{Students}) \;\bowtie_{s.\text{sid} = e.\text{sid}}\; \sigma_{\text{cid} = \text{'CS448'}}(\text{Enrolled})\Big) \]

If Students has 10,000 tuples and Enrolled has 100,000 tuples, the naïve plan forms a 109-tuple Cartesian product. The optimized plan first reduces Students to perhaps 200 high-GPA rows and Enrolled to perhaps 150 CS 448 rows, then joins those 200 and 150 rows — a reduction of more than six orders of magnitude in work before the join even begins.

Set semantics vs. bag (multiset) semantics. Pure relational algebra operates on sets: duplicate tuples are automatically eliminated, and no ordering is defined. SQL, by contrast, uses bag semantics by default. A SELECT without DISTINCT does not eliminate duplicates; UNION ALL preserves duplicates whereas UNION eliminates them; and aggregate functions like AVG count each occurrence of a value.

This distinction has practical consequences for query optimization. Eliminating duplicates requires either a sort-merge pass (cost \(O(N \log N)\)) or a hash deduplication pass (cost \(O(N)\) average). If the optimizer assumes set semantics when the query engine uses bag semantics, it may incorrectly prune plans. More subtly, some algebraic rewritings valid under set semantics are invalid under bag semantics — for instance, \(\pi_A(\sigma_\theta(R)) \cup \pi_A(\sigma_{\lnot\theta}(R)) = \pi_A(R)\) holds for sets but may introduce duplicate elimination under bags. Modern optimizers track whether each subplan preserves or eliminates duplicates to apply only sound rewrites.

Physical and logical data independence — concrete examples.

Physical independence: suppose the Students table starts as an unindexed heap file. A DBA later adds a B+-tree index on Students.gpa to accelerate range queries. Applications that issue SELECT * FROM Students WHERE gpa > 3.5 continue to work without modification — the optimizer silently switches from a heap scan to an index scan, returning the same results faster. The application sees no change at all.

Logical independence: suppose the Enrolled table is split into two tables, EnrolledFall and EnrolledSpring, for administrative reasons. A view is created:

CREATE VIEW Enrolled AS
  SELECT * FROM EnrolledFall
  UNION ALL
  SELECT * FROM EnrolledSpring;

Existing queries and applications that reference the name Enrolled continue to work identically; the DBMS expands the view definition transparently at query time. Logical independence is harder to achieve in practice because not all schema changes can be mediated through views — for example, splitting an attribute (breaking a phone number into country code and number) cannot easily be hidden.

DBMS Architecture

A DBMS is a layered system. Each layer has a clean interface so that internals can change without affecting layers above:

Disk/Storage Manager — reads and writes fixed-size pages to block devices.
Buffer Pool Manager — caches pages in RAM; decides which pages to evict.
Access Methods — heap files and B+-tree indexes built on top of the buffer pool.
Query Executor — the iterator tree that evaluates a physical query plan.
Query Optimizer — selects the cheapest plan from the algebraic search space.
SQL Parser / Rewriter — parses SQL, expands views, checks semantics.
Client Interface — JDBC/ODBC, wire protocol (e.g., PostgreSQL's libpq).

Physical data independence means applications do not break when storage structures change (e.g., adding an index). Logical data independence means applications do not break when the schema changes (e.g., adding a column). Layering enforces both.

PostgreSQL exposes its catalog through system tables: pg_class (relations and indexes), pg_attribute (columns), pg_index (index metadata). The optimizer reads these to know which indexes exist and to obtain statistics stored in pg_statistic.


Chapter 2: Storage, Buffer Management, and File Organization

Before any query can run, data must be read from disk. The cost model for query optimization is almost entirely determined by the number of page I/Os performed, so understanding the storage layer is essential.

The Disk I/O Model

Modern storage is organized into fixed-size pages (typically 4–16 KB). Accessing a page on a hard disk requires a seek (moving the read head) plus rotational latency — combined on the order of 5–10 ms. A sequential read of the next page costs only a fraction of that because no seek is needed. SSDs reduce random I/O latency to ~0.1 ms, but the sequential-vs-random gap persists. Main memory access is \(\approx 100\) ns. The storage hierarchy:

\[ \text{L1/L2/L3 cache} \ll \text{RAM} \ll \text{SSD} \ll \text{HDD} \]

The query optimizer therefore tries to maximize sequential I/O and minimize random I/O.

Heap Files

A heap file is an unordered collection of pages. It supports insert (always goes to a page with free space), delete (mark a slot as empty), and full scan. Each page has:

  • A page header (page LSN for recovery, free-space pointer, slot count)
  • A slot directory at the end of the page mapping slot numbers to record offsets
  • Records packed from the beginning of the usable area

Fixed-length records allow direct offset calculation: record \( i \) starts at \( \text{header\_size} + i \times \text{record\_size} \). Variable-length records store a null bitmap followed by an array of (offset, length) pairs pointing into the record data area. The record ID (RID) is the pair (pageId, slotNo) and is stable across reorganizations within a page.

Buffer Pool

The buffer pool is a region of RAM divided into frames. A page table maps each loaded pageId to its frame index, along with a pin count (how many threads are currently using it) and a dirty bit (whether it has been modified).

When a thread needs a page it calls pin(pageId). If the page is in the pool, the pin count is incremented and the frame pointer is returned. If not, an eviction is needed: choose an unpinned frame via a replacement policy, write it to disk if dirty, then load the requested page.

Replacement policies:

PolicyIdeaWhen Useful
LRUEvict least recently used frameGeneral workloads
MRUEvict most recently used frameSequential scans (avoids Belady’s anomaly for repeated scan)
ClockApproximate LRU with a reference bit; sweep pointerProduction databases
Example — Buffer pool trace under LRU vs. MRU. Suppose the buffer pool has 3 frames and we issue the following page reference sequence: \[ 1,\ 2,\ 3,\ 4,\ 1,\ 2,\ 5,\ 1,\ 2,\ 3,\ 4 \]

LRU policy (evict the page that was least recently used):

StepRequestFrames afterHit/Miss
11{1}miss
22{1, 2}miss
33{1, 2, 3}miss
44{4, 2, 3} — evict 1 (LRU)miss
51{4, 1, 3} — evict 2miss
62{4, 1, 2} — evict 3miss
75{5, 1, 2} — evict 4miss
81{5, 1, 2}hit
92{5, 1, 2}hit
103{3, 1, 2} — evict 5miss
114{3, 4, 2} — evict 1miss

Total LRU misses: 9 (hits at steps 8, 9).

MRU policy (evict the page that was most recently used — appropriate for repeated sequential scans):

StepRequestFrames afterHit/Miss
11{1}miss
22{1, 2}miss
33{1, 2, 3}miss
44{1, 2, 4} — evict 3 (MRU)miss
51{1, 2, 4}hit
62{1, 2, 4}hit
75{1, 2, 5} — evict 4 (MRU)miss
81{1, 2, 5}hit
92{1, 2, 5}hit
103{1, 2, 3} — evict 5 (MRU)miss
114{1, 2, 4} — evict 3 (MRU)miss

Total MRU misses: 7 (hits at steps 5, 6, 8, 9).

For this particular access sequence, MRU outperforms LRU because pages 1 and 2 are repeatedly reused — MRU keeps them in the pool by evicting the freshly loaded pages that are not immediately needed again. The practical lesson is that no single policy dominates: LRU is best for temporal locality workloads, while MRU is best for large sequential scans that cycle through a dataset larger than the buffer pool.

Databases cannot rely on the OS page cache because the OS does not understand transaction semantics — for example, it might write a dirty page before the corresponding log record, violating write-ahead logging. Therefore, DBMS implementations bypass the OS cache (O_DIRECT on Linux) and manage their own buffer pool.

Row vs. Column Storage

NSM (N-ary Storage Model), also called row-oriented storage, stores each tuple contiguously. This is optimal for OLTP: inserting or updating a full row touches a single page.

DSM (Decomposition Storage Model), or column-oriented storage, stores each attribute in a separate file or column group. For OLAP queries that read only a few attributes over millions of rows, DSM reads far less data. Examples: MonetDB, Vertica, Apache Parquet. The trade-off is that reconstructing a full row requires joining column files on row position.

DSM vs. NSM: a concrete I/O comparison. Consider a table Employees with 10 million rows and 20 columns — name, address, phone, department, salary, and 15 other attributes. Each row is 200 bytes; the entire table occupies roughly 2 GB.

OLAP query: SELECT AVG(salary) FROM Employees WHERE dept = ‘Engineering’. Only two columns are needed: dept and salary. In NSM, the query must read all 2 GB of the table regardless, because each page interleaves all 20 columns. In DSM, only the dept column (perhaps 80 MB) and the salary column (perhaps 80 MB) are read — about 160 MB total, an 12× reduction in I/O. On a system with 500 MB/s sequential read throughput, NSM takes ~4 seconds and DSM takes ~0.3 seconds.

OLTP query: UPDATE Employees SET salary = 100000 WHERE eid = 123. In NSM, this is a single random read of one row’s page plus one write — perhaps 2 I/Os total. In DSM, the DBMS must update the salary column file for row 123 (one I/O) and potentially update a secondary index — but if the update is tracked at the page level, every column file that holds the modified row may require a separate I/O. For a 20-column table, a single-row update can cost 20× more I/Os in DSM than in NSM.

This asymmetry explains why OLTP databases universally use NSM (PostgreSQL, MySQL, Oracle), while OLAP and analytical databases use DSM (Vertica, ClickHouse, DuckDB, Apache Parquet on Spark).


Chapter 3: Indexing — B+-Trees and Hash Indexes

An index is a data structure that maps search key values to the RIDs of matching tuples. Without an index, answering a point query requires scanning all \( N \) pages of the heap file — \( O(N) \) I/Os. With a B+-tree index, the cost drops to \( O(\log_d N) \) I/Os, and with hashing to \( O(1) \) expected. The trade-off is write overhead and space.

B+-Tree Structure

A B+-tree of order \( d \) satisfies:

  • Every non-root internal node has between \( d \) and \( 2d \) keys (and one more pointer than keys).
  • Every leaf node has between \( d \) and \( 2d \) key-pointer pairs.
  • All data entries (key + RID list) are stored in leaf nodes; internal nodes hold only routing keys.
  • Leaf nodes are linked in a doubly-linked list sorted by key, enabling efficient range scans.

The height of a B+-tree storing \( N \) entries is \( h \approx \lceil \log_d N \rceil \). For \( d = 100 \) and \( N = 10^6 \), \( h \approx 3 \) — meaning a point lookup costs 3–4 I/Os regardless of table size.

Why is \(d\) so large in practice? Consider a typical DBMS page of 8 KB, with 8-byte integer keys and 8-byte pointers. An internal node stores \(2d\) keys and \(2d + 1\) pointers, so: \[ 2d \times 8 + (2d + 1) \times 8 \leq 8192 \]\[ 16d \times 8 + 8 \leq 8192 \implies 32d \leq 8184 \implies d \leq 255 \]

Taking \(d \approx 200\) as a practical estimate (accounting for headers and alignment), each internal node fans out to \(2d + 1 = 401\) children. A tree of height 1 (just the root) covers 401 leaves; height 2 covers \(401^2 \approx 160{,}000\) leaves; height 3 covers \(401^3 \approx 64\) million leaves. Since each leaf holds up to \(2d = 400\) data entries, a 3-level tree can index \(400 \times 64 \times 10^6 \approx 25\) billion entries. This is why, for any realistic database, the B+-tree height is essentially constant at 3–4 levels: the lookup cost is bounded regardless of whether the table has 10,000 rows or 10 billion rows.

Insertion traverses to the leaf. If the leaf overflows (more than \( 2d \) entries), it is split: the middle key is pushed up to the parent, which may itself split, propagating toward the root. If the root splits, a new root is created, increasing tree height by one.

Example — B+-tree insertion trace with \(d = 2\). With order \(d = 2\), each node holds at most \(2d = 4\) entries and at least \(d = 2\) entries (except the root). Insert keys 1, 2, 3, 4, 5 into an initially empty tree.

After inserting 1, 2, 3, 4: all four keys fit in a single leaf node; the tree has height 1 and the root is also the sole leaf: [1 | 2 | 3 | 4].

Insert 5: the leaf now has 5 entries, exceeding the maximum of 4. It must split. The leaf is divided at the median (between keys 2 and 3): the left leaf retains [1 | 2] and the right leaf holds [3 | 4 | 5]. The smallest key of the right leaf, which is 3, is copied (pushed) up to the parent.

Since there was no parent (the leaf was the root), a new root is created:

         [3]
        /    \
  [1|2]      [3|4|5]

The tree now has height 2. The root is an internal node with one key (3) and two child pointers. Any search key \(k < 3\) follows the left pointer; any \(k \geq 3\) follows the right pointer. Leaf nodes remain linked in a doubly-linked list, so a range scan from 2 to 4 traverses the root (1 I/O), visits the left leaf (1 I/O), then follows the right-sibling pointer to the right leaf (1 I/O).

If we now insert key 6 and then 7, the right leaf [3|4|5|6] is full; inserting 7 splits it into [3|4] and [5|6|7], and key 5 is pushed to the root: [3|5]. The tree remains at height 2 but the root now has two keys. The first root overflow that would trigger a root split requires inserting enough keys to fill the root and propagate splits up — at \(d=2\) this happens after only a few more insertions, but at \(d = 200\) the root does not split until approximately \(200^3\) entries are in the tree.

Deletion traverses to the leaf and removes the entry. If the node underflows (fewer than \( d \) entries), it either borrows from a sibling (redistribution) or merges with a sibling, pulling a key down from the parent. Merges can propagate upward.

PostgreSQL implements B+-trees via its GiST (Generalized Search Tree) framework, which supports multi-column, partial (indexed only on a subset of rows), and expression indexes (e.g., LOWER(email)).

Clustered vs. Unclustered Indexes

A clustered index keeps the heap file sorted in the same order as the index. A range scan using a clustered index reads pages sequentially. An unclustered index does not control heap order; each matching tuple may be on a different page, so a range scan of \( k \) matches can cost up to \( k \) random I/Os — potentially worse than a full scan for large \( k \).

A covering index stores all columns needed by a query, avoiding heap lookups entirely (an index-only scan).

When to prefer a full table scan over an unclustered index. Suppose a table has \(N = 10{,}000\) pages with 100 rows per page (so \(|R| = 1{,}000{,}000\) total rows). A query retrieves \(k\) matching rows.

Clustered index scan: matching rows are physically adjacent on disk, so the scan reads approximately \(k / 100\) contiguous pages — for \(k = 500\), that is 5 I/Os.

Unclustered index scan: matching rows may be scattered across up to \(k\) different pages (one random I/O per row in the worst case). For \(k = 500\), that is potentially 500 random I/Os — 100× more expensive than the clustered scan.

Full table scan: always costs \(N = 10{,}000\) I/Os, but they are entirely sequential.

The crossover point: when \(k\) is large enough that the unclustered index requires more I/Os than a full scan, the optimizer prefers the full scan. This happens when \(k \gtrsim N / 10 = 1{,}000\) rows for the numbers above (i.e., when the predicate is less than 0.1% selective, the unclustered index becomes counterproductive). PostgreSQL’s planner estimates this crossover automatically using the correlation statistic stored in pg_statistic — a value near 1 or −1 means the index order matches the heap order (clustered-like), while a value near 0 means the index is effectively unclustered.

Static Hashing

Static hashing partitions records into \( B \) buckets using hash function \( h(k) \bmod B \). Each bucket is a chain of pages. A point lookup costs \( O(1 + \text{overflow pages}) \). Static hashing cannot answer range queries and is expensive to resize (must rehash all records).

The fundamental limitation of static hashing becomes apparent over time. As records are inserted, buckets grow beyond a single page and require overflow chains. Once the overflow chains exceed one or two pages, the average lookup cost exceeds two I/Os — at which point a B+-tree is often competitive for point queries while also supporting range queries. This is why static hashing is rarely used as a primary index structure in production databases, though it survives as an in-memory hash table for hash join probe phases and for in-memory databases where overflow chains are irrelevant.

Extendible Hashing

Extendible hashing eliminates the cost of full rehashing. A directory of \( 2^g \) pointers (where \( g \) is the global depth) maps the \( g \)-bit hash prefix to a bucket. Each bucket has a local depth \( \ell \leq g \).

When a bucket overflows: split it into two, increment local depth. If local depth would exceed global depth, double the directory (copy each entry; only the overflowed bucket needs a new page). Doubling the directory is cheap because only pointers are copied. On average, only one bucket page is needed per split.

Example — extendible hashing directory doubling. Start with global depth \(g = 2\). The directory has \(2^2 = 4\) entries, indexed by the 2-bit hash prefix:
Directory (g=2):
  00 → Bucket A  (local depth ℓ=2, holds keys hashing to 00)
  01 → Bucket B  (ℓ=2)
  10 → Bucket C  (ℓ=2)
  11 → Bucket D  (ℓ=2)

Suppose Bucket A currently holds 4 keys (the maximum), all hashing to prefix 00. A new key with hash prefix 00 arrives, causing Bucket A to overflow.

Split: Bucket A is split. The distinguishing bit is now the 3rd bit. Keys with 3-bit prefix 000 stay in Bucket A; keys with 3-bit prefix 100 go to the new Bucket A’. Both new buckets have local depth \(\ell = 3\).

Directory must double because \(\ell = 3 > g = 2\). The directory doubles from 4 to 8 entries (\(g\) becomes 3). Each old 2-bit entry is copied into two new 3-bit entries:

Directory (g=3):
  000 → Bucket A   (ℓ=3, keys hashing to 000)
  001 → Bucket B   (ℓ=2, pointer copied from old 01)
  010 → Bucket C   (ℓ=2, pointer copied from old 10)
  011 → Bucket D   (ℓ=2, pointer copied from old 11)
  100 → Bucket A'  (ℓ=3, new bucket for keys hashing to 100)
  101 → Bucket B   (ℓ=2, same pointer as 001)
  110 → Bucket C   (ℓ=2, same pointer as 010)
  111 → Bucket D   (ℓ=2, same pointer as 011)

Notice that Buckets B, C, and D each have two directory entries pointing to them (because their local depth 2 is less than the new global depth 3). No data was moved in Buckets B, C, or D — only the pointers in the directory were duplicated. Only the entries in Bucket A were redistributed between A and A’. This is what makes extendible hashing efficient: doubling the directory costs \(O(2^g)\) pointer copies but only \(O(\text{bucket size})\) data moves.

Linear Hashing

Linear hashing avoids the directory entirely. Splits occur in a fixed round-robin order controlled by a split pointer \( p \). When overall load factor exceeds a threshold, bucket \( p \) is split using a secondary hash function, and \( p \) advances. A query uses \( h(k) \bmod 2^r \); if the result is less than \( p \), use \( h(k) \bmod 2^{r+1} \). This gives amortized \( O(1) \) operations.

The elegance of linear hashing lies in the fact that directory doubling — the expensive operation in extendible hashing — never occurs. Instead, the number of buckets grows smoothly, one bucket at a time. However, a query may require checking a small number of overflow pages if the split pointer has not yet reached the bucket that should have been split to accommodate a particular key. Linear hashing is therefore preferred in environments with very large numbers of buckets (where directory doubling in extendible hashing would be expensive) or in environments where uniform growth is more important than instantaneous point-query optimality.


Chapter 4: Query Evaluation — Iterators and Join Algorithms

A query plan is a tree of relational operators. The key design question is: how do operators communicate, and how much memory does each require? The iterator model answers the first question; the choice of join algorithm answers the second.

The Iterator (Volcano) Model

Every physical operator implements three methods:

open()   -- initialize state, recursively open children
next()   -- produce the next output tuple (returns EOF when done)
close()  -- release resources, recursively close children

The root of the plan tree is called next() repeatedly by the client. Each call pulls one tuple up through the tree. This pull-based pipelining means that most operators never materialize their entire output to disk — data flows tuple-by-tuple through the pipeline. The main exception is operators that need to see all input before producing any output (blocking operators): sort, hash join build phase, aggregation.

Example — iterator model for a simple query. Consider the query SELECT s.name FROM Students s WHERE s.gpa > 3.5. The physical plan is: Selection (gpa > 3.5) on top of a heap scan of Students, with a Projection (name) at the root.

Execution proceeds as follows. The client calls root.next() (the Projection operator). Projection calls select.next(). Selection calls heapscan.next(), which reads the first page of Students and returns the first tuple (sid=1, name=‘Alice’, gpa=3.7). Selection evaluates gpa > 3.5: true. Selection returns the tuple to Projection. Projection extracts the name field and returns (‘Alice’,) to the client.

The client calls root.next() again. Projection calls select.next(). Selection calls heapscan.next(), which returns the next tuple (sid=2, name=‘Bob’, gpa=3.2). Selection evaluates gpa > 3.5: false. Selection calls heapscan.next() again without returning to Projection. And so on.

The key observation: at no point is the entire Students table or the entire selection result materialised in memory. Memory usage is bounded by the size of one page in the heap scan buffer, regardless of how large Students is. This is the power of the iterator model for pipeline-friendly operators.

Selection and Projection

Selection applies a predicate to each tuple from its child and calls next() again if the predicate fails. Cost equals the child’s output cost — no additional I/O.

Projection emits only the requested attributes. Duplicate elimination requires either sorting (to bring duplicates adjacent) or hashing (to detect duplicates). Both approaches add \( O(N) \) I/O for large inputs.

Nested Loop Join

Let \( R \) have \( P_R \) pages and \( S \) have \( P_S \) pages.

Simple NLJ: for each tuple of \( R \) (outer), scan all of \( S \) (inner). Cost: \( P_R + |R| \cdot P_S \) — one full scan of \( S \) per outer tuple. Prohibitively expensive.

Block NLJ: load a chunk of \( B-2 \) pages from \( R \) into memory (using \( B-2 \) buffer frames, one for the inner scan, one for output). Scan \( S \) once per chunk. Cost:

\[ P_R + \left\lceil \frac{P_R}{B-2} \right\rceil \cdot P_S \]

With \( B = 102 \) and \( P_R = 1000 \), this is \( 1000 + 10 \cdot P_S \) instead of \( 1000 + |R| \cdot P_S \).

Index NLJ: if \( S \) has an index on the join attribute, for each outer tuple probe the index to find matching inner tuples. Cost: \( P_R + |R| \cdot (\text{index traversal cost}) \). Extremely efficient when \( R \) is small and the index is clustered.

Sort-Merge Join (SMJ)

SMJ first sorts both inputs on the join attribute, then merges them in a single scan. Because matching tuples are adjacent after sorting, each page of each input is read at most once during the merge (assuming no large groups of equal keys).

Cost (simple case): \( 2(P_R + P_S) \) to sort both inputs (two-pass, see Chapter 5) plus \( (P_R + P_S) \) for the merge pass. Total: \( 3(P_R + P_S) \).

SMJ handles range join conditions (e.g., \( R.a \leq S.b \)) and produces output sorted on the join key, which can be exploited by a parent ORDER BY or merge join.

Hash Join

Hash join exploits the fact that only tuples with the same hash value can join.

Build phase: apply hash function \( h_1 \) to both \( R \) and \( S \) to partition each into \( B-1 \) buckets written to disk. Corresponding buckets of \( R \) and \( S \) can only contain matching tuples.

Probe phase: for each of the \( B-1 \) bucket pairs, load the \( R \)-bucket into a hash table using a second hash function \( h_2 \), then probe with each tuple from the \( S \)-bucket. Tuples that hash to the same \( h_2 \) value are join candidates; compare on the actual attribute.

Cost (assuming each partition fits in memory for the probe phase):

\[ 3(P_R + P_S) \]

The requirement is that each partition fits in \( B \) pages, i.e., \(\frac{P_R}{B-1} \leq B\), or \( P_R \leq B^2 \).

Hybrid hash join keeps one partition entirely in memory during the build phase, saving one write and read for that partition.

Join Algorithm Comparison

AlgorithmI/O CostMemory Req.Supports Range Join?Notes
Simple NLJ\( P_R + \|R\| \cdot P_S \)3 pagesYesOnly for tiny tables
Block NLJ\( P_R + \lceil P_R/(B-2) \rceil \cdot P_S \)\( B \) pagesYesGood for skewed sizes
Index NLJ\( P_R + \|R\| \cdot C_{\text{idx}} \)VariesWith clustered idxBest with small outer
Sort-Merge\( 3(P_R + P_S) \)\( \sqrt{P_R + P_S} \)YesOutput is sorted
Hash Join\( 3(P_R + P_S) \)\( \sqrt{P_R} \)Equality onlyBest for large joins

Chapter 5: External Sorting and Other Operators

Sorting is foundational to query processing: it enables sort-merge join, sort-based aggregation, duplicate elimination, ORDER BY, and certain index builds. When the input does not fit in memory, external sort-merge is required.

Two-Pass External Sort-Merge

Assume \( B \) buffer pages are available. The input has \( N \) pages.

Pass 0 (run generation): repeatedly read \( B \) pages into memory, sort them, and write the sorted run to disk. This produces \( \lceil N/B \rceil \) runs, each of length \( B \) pages.

Pass 1 (merge): merge up to \( B-1 \) runs simultaneously using a priority queue (one frame per run, one output frame). A two-pass sort (one pass-0 plus one pass-1) handles inputs up to \( B \cdot (B-1) \approx B^2 \) pages.

In general, the number of merge passes needed is \( \lceil \log_{B-1}(\lceil N/B \rceil) \rceil \), and the total I/O cost is:

\[ 2N \times \left(1 + \left\lceil \log_{B-1}\left\lceil \frac{N}{B} \right\rceil \right\rceil\right) \]

(factor of 2 for each pass because every page is read and written once).

Example — external merge sort with \(B = 3\) buffer pages and \(N = 10\) data pages.

Pass 0 (run generation): with 3 buffer pages, we load 3 pages at a time, sort them in memory, and write out a sorted run. We perform \(\lceil 10/3 \rceil = 4\) iterations, producing runs of sizes 3, 3, 3, 1 pages (the last run has only 1 page remaining). Total I/O for Pass 0: read 10 pages + write 10 pages = 20 I/Os.

Pass 1 (merge): with \(B = 3\) buffers we can merge \(B - 1 = 2\) runs at a time (one input buffer per run, one output buffer). We have 4 runs, so Pass 1 requires \(\lceil 4/2 \rceil = 2\) merge groups:

  • Merge run 1 (3 pages) and run 2 (3 pages) → sorted output of 6 pages.
  • Merge run 3 (3 pages) and run 4 (1 page) → sorted output of 4 pages.

Total I/O for Pass 1: read 10 pages + write 10 pages = 20 I/Os.

Pass 2 (final merge): now we have 2 runs (6 pages and 4 pages). Merge them with 2 input buffers and 1 output buffer — exactly fits in \(B = 3\). Total I/O for Pass 2: read 10 pages + write 10 pages = 20 I/Os.

Total I/O: \(3 \times 20 = 60\) I/Os for \(N = 10\) pages. Using the general formula: the number of merge passes is \(\lceil \log_{B-1} \lceil N/B \rceil \rceil = \lceil \log_2 4 \rceil = 2\), and total cost is \(2N \times (1 + 2) = 2 \times 10 \times 3 = 60\). ✓

Scaling up: with \(B = 1001\) buffer pages (a realistic buffer pool size), Pass 0 produces \(\lceil N/1001 \rceil\) runs. If the input is 1 million pages (\(N = 10^6\)), Pass 0 produces about 1000 runs of 1001 pages each. Pass 1 can merge all 1000 runs simultaneously in a single pass (1000 input buffers + 1 output buffer = 1001 buffers). So the entire sort requires only 2 passes — Pass 0 plus one merge pass — at a total cost of \(2 \times 2 \times 10^6 = 4\) million I/Os. This is why even terabyte-scale data can be sorted in two or three passes with a sufficiently large buffer pool.

Replacement Sort

Instead of sorting each \( B \)-page chunk in isolation, replacement sort maintains a priority queue of all \( B \) pages. Output the minimum key; when a new record is read, if its key is \( \geq \) the last output, add it to the current run; otherwise hold it for the next run. This produces runs of expected length \( 2B \) (for random input), halving the number of initial runs and reducing merge passes.

The factor of 2 for random input is a classic result from the theory of sorting: with a priority queue of \( B \) records fed from a random stream, the expected run length is \( 2B \) because at any point in time, roughly half the records in the heap are eligible for the current run (their key exceeds the last output) and half are deferred to the next run. For real-world data that is partially sorted (e.g., time-series data with nearly-increasing timestamps), runs can be far longer — sometimes covering the entire input if the data arrives in sorted order. This makes replacement sort particularly effective for sorting nearly-sorted data, where it degenerates to a single pass with no merge needed.

Sorting vs. hashing for aggregation: when to prefer each. Both sort-based and hash-based aggregation cost roughly \(3N\) I/Os for a table of \(N\) pages, but they have different strengths and failure modes.

Sort-based aggregation is predictable: the worst-case cost is bounded by the sort cost (\(2N \lceil \log_{B-1}(N/B) \rceil\) I/Os), which is always well-defined. If the sort produces an output that is useful for subsequent operators (e.g., the GROUP BY attribute is also the ORDER BY attribute), the sort cost is amortised across both. Sort-based aggregation also handles arbitrarily large groups — a group with 10 million members is simply a long run in the sorted stream.

Hash-based aggregation can be faster when groups are small and numerous (many distinct keys) because the in-memory hash table can hold all partial aggregates without spilling. But if there are few distinct keys and each key has many values, the hash table must store only the aggregate (e.g., a running sum), which is very compact. The failure mode is when the hash partition is too large to fit in memory — recursive partitioning adds extra I/O passes, potentially exceeding the sort-based cost. Most modern optimizers choose between the two based on estimated group cardinality and available memory.

Aggregation and Duplicate Elimination

Sort-based aggregation: sort on the GROUP BY attributes; then scan once, maintaining running aggregates and emitting a result whenever the group key changes. Cost dominated by the sort.

Hash-based aggregation: partition on the GROUP BY attributes using \( h_1 \) (same as hash join build phase). For each partition, build an in-memory hash table and aggregate. Requires that each partition fits in memory. Cost: \( 3N \) I/Os (similar to hash join).

DISTINCT is duplicate elimination — identical to aggregation with no aggregate function. The same sort-based or hash-based approaches apply.

Hash join cost vs. sort-merge join cost. Both hash join and sort-merge join achieve a cost of \(3(P_R + P_S)\) I/Os under ideal conditions, but "ideal" means different things for each.

For hash join, the requirement is that each partition of the smaller table \(R\) fits in the available buffer pages after hashing — specifically, \(P_R / (B-1) \leq B\), or \(P_R \leq B^2\). When this holds, the cost is exactly \(3(P_R + P_S)\): \((P_R + P_S)\) for the partition pass plus \(2(P_R + P_S)\) for the probe pass (read each bucket from disk and process it). If the partition overflows (the table is too large relative to memory), a recursive partitioning step is needed, adding another \(2(P_R + P_S)\) per level.

For sort-merge join, the requirement is that the sort of each input can be completed in two passes (i.e., \(P_R, P_S \leq B^2\)). When both inputs happen to arrive already sorted on the join key (e.g., from a prior sort or a clustered index scan), the merge step alone costs \((P_R + P_S)\) — considerably cheaper.

Practical guidance: prefer hash join for large equality joins on unsorted inputs; prefer sort-merge join when (a) the output must be sorted on the join key (saving a subsequent sort), (b) the join condition is a range predicate (\(R.a \leq S.b\)) rather than equality (hash join cannot handle range joins), or (c) one or both inputs are already sorted. Both algorithms degrade gracefully under memory pressure, but sort-merge degrades more predictably (extra merge passes), while hash join can degrade catastrophically if partitions are severely skewed.

Set Operations

Union: sort both inputs, merge (output all tuples, skip duplicates). Or hash-partition both, then process bucket pairs.

Intersection and Difference: sort-merge or hash-based, same cost structure as sort-merge join with an equality predicate. \( \text{Cost} \approx 3(P_R + P_S) \).

Outer Joins via Sort-Merge

Left outer join: perform sort-merge join; for each outer tuple with no match, emit it padded with NULLs. Full outer join: both unmatched sides get NULL-padded output. Sort-merge naturally handles this since unmatched tuples are detected during the merge scan.

Hash join can also handle outer joins with a minor modification: during the probe phase, track which build-side tuples were matched. After the probe is complete, any unmatched build-side tuples (for right outer joins) are emitted with NULL-padded probe-side attributes. This requires one additional bit per build-side tuple in the hash table, adding negligible memory overhead. Left outer hash join does not need this tracking — any probe-side tuple that finds no match in the hash table is immediately emitted with NULLs, during the probe pass itself.


Chapter 6: Query Optimization

Query optimization is the process of choosing, from the large space of equivalent execution plans, the one with lowest estimated cost. It is one of the most complex components of a DBMS and the primary reason why declarative SQL outperforms hand-written procedural code in practice.

Relational Algebra Equivalences

The optimizer rewrites the algebraic tree using equivalences before enumerating plans:

  • Cascade of selections: \( \sigma_{\theta_1 \land \theta_2}(R) \equiv \sigma_{\theta_1}(\sigma_{\theta_2}(R)) \)
  • Commutativity of join: \( R \bowtie S \equiv S \bowtie R \)
  • Associativity of join: \( (R \bowtie S) \bowtie T \equiv R \bowtie (S \bowtie T) \)
  • Pushing selections down: apply \( \sigma \) as early as possible to reduce intermediate result sizes
  • Pushing projections down: project away unneeded attributes early to reduce tuple width

Join commutativity and associativity mean that for \( n \) relations there are \( \frac{(2(n-1))!}{(n-1)!} \) distinct join orderings — for \( n = 10 \) this exceeds 17 million. Enumeration must be pruned.

The System R (Selinger) Optimizer

The System R optimizer (Selinger et al., 1979) uses dynamic programming over left-deep join trees (trees where every right child is a base relation, never an intermediate result). Left-deep trees enable pipelining: the inner relation is always a base-table scan or index scan, not a materialized intermediate.

DP subproblem: for each subset \( S \) of relations, find the cheapest plan for joining them. Build up from single relations to all \( n \) relations. The key insight: the best plan for \( S \) depends only on cost and interesting orders — sort orders on \( S \)’s output that might benefit a parent operator (e.g., for ORDER BY or a downstream merge join). Plans producing the same result with the same interesting order but higher cost are pruned.

Example — System R dynamic programming for a 3-table join \(R \bowtie S \bowtie T\).

Suppose the catalog reports \(|R| = 1000\) tuples, \(|S| = 5000\) tuples, \(|T| = 2000\) tuples; join attributes \(A\) (between \(R\) and \(S\)) and \(B\) (between \(S\) and \(T\)) each have 500 distinct values in both relations.

Step 1 — single-table plans. For each relation, the cheapest access path is a full scan (assuming no relevant index). Record the scan cost in I/Os and the output cardinality.

Step 2 — two-table join plans. Enumerate all pairs and estimate sizes using the formula \(|R \bowtie_A S| \approx |R| \times |S| / \max(V(R,A), V(S,A))\):

\[ |R \bowtie S| \approx \frac{1000 \times 5000}{500} = 10{,}000 \]\[ |S \bowtie T| \approx \frac{5000 \times 2000}{500} = 20{,}000 \]\[ |R \bowtie T| = |R| \times |T| = 2{,}000{,}000 \quad \text{(no shared attribute — Cartesian product)} \]

The planner records the cheapest algorithm for each pair (hash join at \(3(P_R + P_S)\) I/Os, or block NLJ if one side is small).

Step 3 — three-table join plans. Using memoized 2-table costs, compute all 3-table left-deep orderings:

  • \((R \bowtie S) \bowtie T\): intermediate result has 10,000 tuples. Join with \(T\): \(10{,}000 \times 2{,}000 / 500 = 40{,}000\) tuples output.
  • \((S \bowtie T) \bowtie R\): intermediate has 20,000 tuples. Join with \(R\): \(20{,}000 \times 1{,}000 / 500 = 40{,}000\) tuples output.
  • \((R \bowtie T) \bowtie S\): intermediate has 2,000,000 tuples (Cartesian product). Join with \(S\): enormous. Pruned by DP — its cost dominates all alternatives.

The optimizer selects the plan with the lowest total I/O cost. Here, \((R \bowtie S) \bowtie T\) and \((S \bowtie T) \bowtie R\) produce the same output size; the tiebreaker is the total I/O cost of the two join steps, which depends on the number of pages of each intermediate result.

The critical insight from this example: the plan \((R \bowtie T) \bowtie S\) that produces the Cartesian product intermediate is pruned at Step 2 because its cost in the subproblem \(\{R, T\}\) already dominates both \(\{R, S\}\) and \(\{S, T\}\). Dynamic programming ensures such plans are never extended, achieving the optimal \(O(2^n)\) complexity rather than \(O(n!)\).

The exponential plan space and how optimizers tame it. For \(n\) tables, the number of distinct join orderings is \(n!\). Even for \(n = 10\), \(10! = 3{,}628{,}800\) orderings. System R's key restriction to left-deep trees reduces this to \((2^n - 1)\) DP subproblems: for \(n = 10\), that is \(2^{10} - 1 = 1023\) — a reduction of more than three orders of magnitude. Left-deep trees also have the practical advantage that the inner side of every join is a base-table scan, enabling full pipelining with no intermediate materialization.

However, for \(n > 12\), even \(2^{12} = 4096\) subproblems can be expensive when each subproblem involves multiple candidate physical plans. PostgreSQL therefore switches to a genetic algorithm (GEQO) for queries joining more than 8 relations by default (configurable via geqo_threshold). GEQO encodes join orders as “chromosomes”, applies crossover and mutation operators, and evaluates the fitness (estimated cost) of each candidate plan — finding a near-optimal plan in \(O(k \times \text{population size})\) evaluations rather than exhaustive enumeration. The resulting plan is not guaranteed optimal but is usually within a small constant factor of the DP optimum.

Cost Estimation

Cost estimation requires predicting:

  1. Cardinality of each intermediate result
  2. Selectivity of each predicate

For a predicate \( A = v \), selectivity is estimated as \( 1/V(R, A) \) where \( V(R, A) \) is the number of distinct values of \( A \) in \( R \) (stored in the catalog). For a range predicate \( A \leq v \), selectivity is \( (v - \min_A)/(\max_A - \min_A) \).

Histograms (equi-width or equi-depth) store the value distribution of an attribute and yield more accurate estimates than assuming uniform distribution. PostgreSQL stores histograms in pg_statistic and updates them via ANALYZE.

A fundamental limitation is the independence assumption: the selectivity of a conjunction \( \theta_1 \land \theta_2 \) is estimated as \( \text{sel}(\theta_1) \times \text{sel}(\theta_2) \). When attributes are correlated (e.g., city and zip code), this dramatically underestimates the true selectivity.

Modern systems address correlated attributes via multi-dimensional histograms or machine learning-based cardinality estimators. PostgreSQL 10 introduced extended statistics (CREATE STATISTICS), which allows the DBA to declare that two or more columns are correlated; PostgreSQL then computes a joint MCV list and a joint histogram for conjunctive selectivity estimation. Even with these improvements, cardinality estimation remains the hardest unsolved problem in query optimization: a 2019 study by Leis et al. (“How Good Are Query Optimizers, Really?”) showed that PostgreSQL’s cardinality estimates for multi-join queries are off by a factor of \(10^3\) or more in 20% of benchmark cases, leading to suboptimal join orders.

Access Path Selection

For each base relation, the optimizer chooses between:

  • Full heap scan: reads all \( N \) pages sequentially; cost \( = N \).
  • Clustered index scan: if a predicate matches the leading index column; cost \( \approx \text{sel} \times N \) pages.
  • Unclustered index scan: cost can be up to \( \text{sel} \times |R| \) random I/Os; beneficial only for very selective predicates.
  • Index-only scan: if all referenced attributes are in the index; no heap I/O needed.

The access path decision interacts subtly with multi-column predicates. If a table has a B+-tree index on \((A, B)\), the index is useful for predicates on \(A\) alone, or on \((A, B)\) together, but not for predicates on \(B\) alone — because B is not the leading key. PostgreSQL’s planner handles this with a bitmap index scan: it evaluates multiple indexes on different attributes, constructs a bitmap of matching page IDs from each, ANDs (or ORs) the bitmaps, and then fetches only the pages identified by the result. This allows a conjunction of two selective predicates on different indexed attributes to be answered without a full scan, even though neither index alone fully evaluates the conjunction.

Example — bitmap index scan in PostgreSQL. Table Orders with indexes on customer\_id and status. Query: SELECT * FROM Orders WHERE customer_id = 42 AND status = 'pending'.

Suppose the predicate customer_id = 42 matches 200 pages (out of 10,000 total) and status = ‘pending’ matches 500 pages. The planner runs:

  1. Scan the customer_id index, build a bitmap of matching page IDs: 200 bits set.
  2. Scan the status index, build another bitmap: 500 bits set.
  3. AND the two bitmaps: pages that satisfy both predicates — perhaps 10 pages.
  4. Fetch only those 10 pages from the heap (sorted by page ID to convert random I/Os to near-sequential I/Os).

Total cost: two index scans (cheap, log-height traversals) plus 10 heap page fetches. Without the bitmap scan, the planner would either use one index (and recheck the other predicate in memory) or do a full table scan. The bitmap approach exploits both indexes simultaneously.

Plan Enumeration Frameworks

The Cascades/Volcano framework (used in SQL Server, DB2) uses a memo structure and top-down search with memoization. Transformation rules rewrite logical operators; implementation rules map logical to physical operators. Rules fire lazily and prune via cost bounds.

PostgreSQL uses the Selinger-style bottom-up DP for \( \leq 12 \) relations, then switches to a genetic algorithm (GEQO) for more, because the search space is too large for exhaustive DP.

Query Reformulation

View expansion: inline the view definition into the query before optimization.

Subquery flattening: convert correlated subqueries (EXISTS, IN) to semi-joins or joins. For example:

\[ \text{SELECT} * \text{ FROM } R \text{ WHERE } R.a \in (\text{SELECT } S.a \text{ FROM } S) \]

becomes a semi-join \( R \ltimes_{R.a = S.a} S \), which can be executed as a hash join that emits at most one copy of each \( R \)-tuple.

Magic sets: for recursive queries (e.g., transitive closure), magic-sets rewriting pushes bindings from the query into the recursive computation to avoid computing the full closure.


Chapter 7: Transactions and Concurrency Control

Concurrency is essential for throughput, but concurrent execution of transactions can corrupt data. The theory of concurrency control answers the question: which interleaving of operations is safe?

ACID Properties

Atomicity: a transaction either commits fully or is rolled back entirely — no partial effects are visible.
Consistency: a transaction takes the database from one consistent state to another; integrity constraints are preserved.
Isolation: the effect of concurrent transactions is as if they executed serially.
Durability: once a transaction commits, its effects survive any subsequent failure.

The four ACID properties are enforced by different DBMS subsystems. Atomicity is enforced by the recovery manager via undo logging: if a transaction aborts (voluntarily or due to deadlock detection), the recovery manager rolls back all its writes using the before images in the log. Consistency is enforced partly by the DBMS (integrity constraints, foreign key checks at commit time) and partly by the application (business logic invariants). Isolation is enforced by the concurrency control manager (2PL, MVCC, or OCC). Durability is enforced by the recovery manager via redo logging: after a crash, committed writes are replayed from the log.

BASE and eventual consistency as alternatives to ACID. Distributed NoSQL systems (Cassandra, DynamoDB, CouchDB) often relax ACID in favor of higher availability and partition tolerance. The alternative is described by the **BASE** acronym (Basically Available, Soft state, Eventually consistent), coined by Eric Brewer in contrast to ACID.

Basically available: the system always responds to requests, even if some nodes are unreachable — it may return stale or partial data rather than an error. Soft state: the system’s state may change over time even without new input, as eventual consistency propagates updates. Eventually consistent: given enough time without new updates, all replicas will converge to the same value.

Cassandra achieves eventual consistency through anti-entropy: background processes on each node compare their data with replicas using Merkle tree digests, and any discrepancies are repaired by copying the newer version. Write conflicts are resolved by last-write-wins (using a timestamp embedded in each write) — a simple rule that is incorrect when clocks are skewed, but acceptable for many applications where the window of conflict is small.

The choice between ACID and BASE is driven by the application’s consistency requirements. Financial transactions (bank transfers, double-entry bookkeeping) require ACID — any inconsistency is unacceptable and immediately visible to users. Social media “like” counts can tolerate eventual consistency — a count that is off by 1 for a few seconds is invisible to users and inconsequential. The art is correctly identifying which parts of an application require strong consistency and which can safely relax it.

Serializability

A schedule is an interleaving of operations from multiple transactions. A schedule is serial if no two transactions interleave. A schedule is conflict-serializable if it can be transformed into a serial schedule by swapping non-conflicting operations. Two operations conflict if they access the same data item and at least one is a write.

The conflict graph (precedence graph) has a node per transaction and an edge \( T_i \to T_j \) if an operation of \( T_i \) conflicts with a later operation of \( T_j \). A schedule is conflict-serializable if and only if its conflict graph is acyclic.

Isolation Anomalies and Levels

Isolation LevelDirty ReadNon-Repeatable ReadPhantom Read
READ UNCOMMITTEDPossiblePossiblePossible
READ COMMITTEDPreventedPossiblePossible
REPEATABLE READPreventedPreventedPossible
SERIALIZABLEPreventedPreventedPrevented

A dirty read occurs when a transaction reads a value written by an uncommitted transaction. A non-repeatable read occurs when a transaction re-reads a value and finds it changed (by a committed transaction). A phantom read occurs when a transaction re-executes a range query and finds new tuples inserted by another committed transaction.

Example — isolation anomalies in action.

Dirty read (prevented at READ COMMITTED and above):

TimeT1T2
t1WRITE X = 100
t2READ X → sees 100
t3ABORT (X reverts to original value)
t4T2 has used a value that never committed

T2 read a value that was subsequently undone. Any decision T2 made based on X = 100 is now based on phantom data.

Non-repeatable read (prevented at REPEATABLE READ and above):

TimeT1T2
t1READ X → 50
t2WRITE X = 75, COMMIT
t3READ X → 75

T1 reads X twice within the same transaction and gets different values. A bank transaction checking “is the balance sufficient for both steps?” can be tricked this way.

Phantom read (prevented only at SERIALIZABLE):

TimeT1T2
t1SELECT * FROM Students WHERE gpa > 3.5 → {Alice, Bob}
t2INSERT INTO Students VALUES (‘Carol’, 3.9); COMMIT
t3SELECT * FROM Students WHERE gpa > 3.5 → {Alice, Bob, Carol}

T1’s two identical range queries return different result sets. Locking individual tuples does not prevent this because Carol did not exist when T1 first acquired its locks. Preventing phantoms requires either predicate locking (locking the entire range \(\text{gpa} > 3.5\)) or Serializable Snapshot Isolation (detecting anti-dependency cycles at commit time, as PostgreSQL does).

Two-Phase Locking (2PL)

2PL is the dominant protocol for conflict serializability. A transaction has two phases:

  1. Growing phase: may acquire locks, may not release any.
  2. Shrinking phase: may release locks, may not acquire any.

Theorem: any schedule generated by 2PL is conflict-serializable.

Lock modes and compatibility:

Requested \ HeldSX
SCompatibleIncompatible
XIncompatibleIncompatible

Multi-granularity locking (MGL) allows locking at the level of database, table, page, or tuple. Before acquiring an S or X lock on a node, the transaction must hold an intention lock (IS or IX) on all ancestors. The SIX lock (S + intention exclusive) is useful for transactions that scan a table but update some tuples.

Strict 2PL releases all locks only at commit or abort. This prevents cascading aborts (where a transaction must be rolled back because it read a value written by an aborted transaction) and is the standard used in practice.

The distinction between plain 2PL and strict 2PL matters in failure scenarios. Under plain 2PL, T1 may release its X-lock on A immediately after finishing its writes, while still holding other locks. T2 can then read A (now with T1’s value) before T1 commits. If T1 subsequently aborts, T2 has read a value that never committed — a dirty read — and T2 must also be aborted to maintain consistency. This cascading abort can ripple through many transactions. Strict 2PL prevents this by ensuring no transaction reads another’s uncommitted writes: all locks are held until commit or abort, so the only values readable are those of committed transactions. The price is reduced concurrency — transactions block on locked data longer — but the operational simplicity of never experiencing cascading aborts makes strict 2PL universally preferred in production systems.

Deadlock

Deadlock occurs when two or more transactions each hold a lock the other needs. Detection: maintain a wait-for graph; a cycle indicates deadlock. The DBMS selects a victim (usually the youngest or cheapest to restart) and aborts it.

Prevention: assign timestamps at transaction start. Under Wait-Die, an older transaction waits; a younger one aborts (dies). Under Wound-Wait, an older transaction forces the younger to abort (wounds it); a younger transaction waits.

Example — 2PL deadlock detection via the wait-for graph.
TimeT1T2
t1X-lock A — granted
t2X-lock B — granted
t3X-lock B — waiting (T2 holds B)
t4X-lock A — waiting (T1 holds A)

Neither transaction can proceed. The wait-for graph has an edge \(T1 \to T2\) (T1 waits for T2 to release B) and an edge \(T2 \to T1\) (T2 waits for T1 to release A). The graph contains the cycle \(T1 \to T2 \to T1\), which the DBMS’s deadlock detector discovers by running cycle detection (DFS or similar) on the wait-for graph periodically (e.g., every 100 ms in PostgreSQL).

Upon detecting the cycle, the DBMS selects a victim to abort. PostgreSQL aborts the transaction that would be cheapest to restart — typically the one with fewer locks held or the one that started more recently. Suppose T2 is chosen: T2 is rolled back, its lock on B is released, and T1 can now acquire B and proceed. T2 is restarted by the application.

Prevention vs. detection trade-off: deadlock detection adds periodic graph traversal overhead (\(O(n + e)\) per cycle) but allows transactions to proceed optimistically. Deadlock prevention via Wait-Die or Wound-Wait eliminates cycles entirely at the cost of aborting transactions that might never actually have deadlocked — potentially reducing throughput under high contention.

Multi-Version Concurrency Control (MVCC)

MVCC maintains multiple versions of each data item, tagged with transaction IDs. Readers access a consistent snapshot without acquiring read locks, so they never block writers and writers never block readers.

In PostgreSQL, each tuple has xmin (the ID of the transaction that created it) and xmax (the ID of the transaction that deleted or updated it, or 0 if current). A transaction with snapshot \( T \) sees a tuple if \( \text{xmin} \leq T \) and \( (\text{xmax} = 0 \) or \( \text{xmax} > T) \). Old versions accumulate; the VACUUM process reclaims space occupied by versions no longer visible to any active transaction.

MVCC snapshot semantics and the "snapshot" isolation level. When a transaction begins at PostgreSQL's REPEATABLE READ or SERIALIZABLE level, it takes a snapshot of the set of currently committed transactions. This snapshot \(S\) defines exactly which tuple versions are visible to that transaction: version \(v\) of tuple \(t\) is visible if the transaction that created \(v\) (recorded in \(\text{xmin}\)) committed before snapshot \(S\) was taken, and the transaction that deleted \(v\) (recorded in \(\text{xmax}\)) either hasn't committed yet or committed after \(S\).

The key property is that the snapshot is fixed for the duration of the transaction. Even if another transaction commits and modifies tuples after our snapshot is taken, our transaction sees the pre-modification versions. This gives the illusion of running on a static database, eliminating non-repeatable reads at no locking cost.

However, MVCC alone does not prevent all anomalies. Write skew is a subtle problem: T1 reads value X and writes Y; T2 reads value Y and writes X; neither conflicts directly with the other in the locking sense, but together they violate a constraint (e.g., “at least one of X or Y must be positive”). PostgreSQL’s Serializable Snapshot Isolation (SSI), introduced in PostgreSQL 9.1, detects such anti-dependencies at commit time and aborts one of the conflicting transactions — achieving true serializability without sacrificing MVCC’s read performance.

Optimistic Concurrency Control (OCC)

OCC assumes conflicts are rare. Each transaction proceeds in three phases:

  1. Read phase: execute, keeping a read set and write set in private workspace.
  2. Validation phase: check that the read set has not been modified by a concurrent committed transaction (certification).
  3. Write phase: if validation succeeds, apply writes; otherwise restart.

OCC avoids locking overhead and is effective for read-heavy, low-contention workloads.


Chapter 8: Crash Recovery and ARIES

Crash recovery ensures that committed transactions survive failures and aborted transactions leave no trace — i.e., it enforces Atomicity and Durability. The ARIES algorithm (Mohan et al., 1992) is the most influential recovery algorithm and underlies recovery in IBM DB2, Microsoft SQL Server, and PostgreSQL.

Failure Model

  • Transaction abort: voluntary or due to deadlock; must undo writes.
  • System crash: power failure or OS crash; RAM is lost, disk is intact. Must redo committed writes and undo uncommitted writes.
  • Media failure: disk is destroyed; requires archived backup plus log replay.

Write-Ahead Logging (WAL)

The fundamental rule of crash recovery: before a dirty page is written to disk, the log record(s) describing that change must be written to disk first (the redo rule). Additionally: before a transaction commits, all its log records must be written to disk (the commit rule). This is write-ahead logging.

Each log record contains:

  • LSN (Log Sequence Number): a monotonically increasing identifier assigned to each record.
  • TransId: the transaction that made the change.
  • PageId: the affected page.
  • PrevLSN: the LSN of the previous log record of this transaction (forms a per-transaction chain).
  • Type: UPDATE, COMMIT, ABORT, CLR, BEGIN_CHECKPOINT, END_CHECKPOINT.
  • Before image and after image (for UPDATE records).

Each data page stores the pageLSN: the LSN of the most recent log record that modified this page. During redo, if a page’s pageLSN \( \geq \) the LSN of a log record, the update has already been applied and is skipped.

Example — WAL and ARIES recovery in action.

Scenario before crash:

LSNTypeTransIdPageIdDescription
1UPDATET1PT1 writes page P (before: 10, after: 20)
2UPDATET2PT2 writes page P (before: 20, after: 30)
3COMMITT1T1 commits

The system crashes after writing LSN 3 to the log (log is on disk) but before flushing page P to disk. After the crash, page P on disk still has its value from before any of these writes (value 10). The in-memory state (including the dirty buffer for P with value 30) is lost.

Recovery — Analysis phase: Starting from the last checkpoint, scan the log forward through LSN 1, 2, 3. Build the Active Transaction Table (ATT) and Dirty Page Table (DPT):

  • ATT: T2 is active (no COMMIT or ABORT record seen for T2).
  • DPT: page P has recLSN = 1 (first log record that dirtied P).
  • T1 is committed (LSN 3 is its COMMIT) — T1 is a winner.

Recovery — Redo phase: Redo start point = min(recLSN in DPT) = LSN 1. Replay every UPDATE from LSN 1 forward:

  • LSN 1: apply T1’s update to P (set P = 20). Set pageLSN of P to 1.
  • LSN 2: apply T2’s update to P (set P = 30). Set pageLSN of P to 2.
  • LSN 3 is a COMMIT record — no data to redo.

After redo, the database state exactly matches the pre-crash state: page P has value 30.

Recovery — Undo phase: T2 is in the ATT with status “in-progress” — it is a loser. Undo T2’s writes in reverse LSN order. T2’s last write was LSN 2 (set P = 30). Undo it: set P back to 20 (the before image). Write a Compensation Log Record (CLR) at LSN 4 documenting this undo action, with undoNextLSN = prevLSN of LSN 2 = null (T2 had no earlier log records). Write a ABORT record for T2 at LSN 5.

Final state: Page P = 20 (T1’s committed write is preserved; T2’s uncommitted write is undone). T1’s durability is satisfied; T2’s atomicity is satisfied.

Idempotency: if the system crashes again during redo (say, between applying LSN 1 and LSN 2), a second recovery run simply reads LSN 1 again, checks pageLSN of P (which equals 1 from the first redo attempt that was written back), skips LSN 1 (pageLSN \(\geq\) LSN, so already applied), and proceeds to apply LSN 2 — producing the same result as if the second crash had never occurred.

ARIES: Three Phases

ARIES recovery proceeds in three phases after a crash:

Analysis Phase

Starting from the most recent checkpoint record, scan the log forward to the end. Maintain:

  • Active Transaction Table (ATT): transactions that were in progress at the crash, with their last LSN and status.
  • Dirty Page Table (DPT): pages that were dirty at crash, with their recLSN (the LSN of the first log record that dirtied the page — the earliest point from which redo is needed).

The redo start point is \( \min(\text{recLSN}) \) over all entries in the DPT.

Redo Phase

Starting from the redo start point, replay every update log record encountered, whether the transaction committed or not. The principle is repeating history: reconstruct the exact pre-crash database state. For each UPDATE record with LSN \( l \):

  • If the page is not in the DPT, skip (it was flushed to disk before the crash and is up to date).
  • If the page is in the DPT with \( \text{recLSN} > l \), skip.
  • Otherwise, fetch the page. If \( \text{pageLSN} \geq l \), skip. Otherwise apply the update and set \( \text{pageLSN} = l \).

Undo Phase

All transactions in the ATT with status “in-progress” at crash end are losers and must be rolled back. Process their log records in reverse order of LSN (using the prevLSN chains). For each update to undo, write a Compensation Log Record (CLR) — a redo-only record that records the undo action and sets a undoNextLSN field pointing to the next record to undo for that transaction. CLRs are never undone themselves, making recovery idempotent: if the system crashes again during recovery, redo will replay the CLRs and undo will resume from the correct point.

Checkpointing

A fuzzy checkpoint allows normal processing to continue while checkpointing. The DBMS writes a BEGIN_CHECKPOINT record, then writes an END_CHECKPOINT record containing the current ATT and DPT. No pages need to be flushed. After the END_CHECKPOINT is stable on disk, the log can be truncated up to \( \min(\text{recLSN in DPT}) \). The analysis phase at recovery starts from the most recent BEGIN_CHECKPOINT.

The purpose of checkpointing is to bound the length of log replay on restart. Without checkpointing, recovery must replay the entire log from the beginning of time — potentially hours or days of log records. With periodic checkpointing (e.g., every 5 minutes), recovery at most replays 5 minutes of log. The trade-off is the I/O cost of writing the ATT and DPT to disk at checkpoint time, and the CPU overhead of tracking dirty pages and active transactions. In practice, most production systems checkpoint every 1–5 minutes, balancing recovery time against ongoing overhead.

Write-ahead logging and the force/steal policy. WAL interacts with the buffer pool through two independent policy choices:

Force vs. no-force: a “force” policy requires that all dirty pages modified by a committing transaction be written to disk at commit time. This guarantees durability without a redo log — after commit, all data is on disk. The cost is one random I/O per dirty page per commit, which is prohibitive for OLTP. The “no-force” policy (used by ARIES) allows committing without forcing pages — durability is instead provided by the redo log. Only the log record for COMMIT needs to be forced to disk (a sequential write), which is far cheaper.

Steal vs. no-steal: a “steal” policy allows the buffer pool to evict a dirty page modified by an uncommitted transaction when memory pressure demands it. This is necessary in practice — the buffer pool is finite and must be able to reclaim frames even from active transactions. But it means that a dirty page may reach disk before its transaction commits: if the transaction later aborts, the on-disk page has a wrong value. The undo log (before images) is what allows ARIES to fix this. A “no-steal” policy prevents this problem but requires pinning all pages modified by active transactions — untenable for long transactions that touch many pages.

The combination used by ARIES — no-force + steal — is the most permissive and the most efficient. It requires both redo and undo logging, which is exactly what the full WAL provides.

Recovery Correctness

ARIES guarantees correctness by the combination of two principles:

  1. Repeating history in the redo phase ensures that even uncommitted writes are re-applied, reconstructing the exact pre-crash state before beginning undo. This avoids the errors of “undo-only” and “redo-only” approaches.
  2. CLR idempotency ensures that if a crash occurs during recovery itself, a second recovery run produces the same result without re-undoing already-undone work.

The “repeating history” principle may seem counterintuitive: why redo the writes of uncommitted (loser) transactions before undoing them? The reason is that undo operations are not reversible in general — you cannot simply “skip” a loser transaction’s writes during redo without knowing the exact sequence of writes by other transactions that may have depended on them. By first replaying the entire log (including loser writes), ARIES reconstructs the exact database state that existed at the time of the crash. Only then does it undo the losers, which is now safe because the database is in a known, deterministic state.

Media failure recovery and the role of archive logging. System crash recovery (covered above) handles the case where RAM is lost but disk is intact. Media failure — disk corruption or disk loss — is a different beast: the data pages themselves are gone. Recovery from media failure requires:
  1. Restore from the most recent full backup (e.g., a weekly snapshot of all data pages).
  2. Replay the archived write-ahead log from the backup’s checkpoint LSN to the present.

This is called REDO-only recovery from backup, because there are no loser transactions to undo — the backup represents a consistent state (all transactions committed up to the backup’s checkpoint were fully materialised). Log archiving must therefore store the WAL indefinitely, or at least as far back as the oldest backup being retained.

PostgreSQL implements this as Point-in-Time Recovery (PITR): a base backup plus a continuous archive of WAL segments allows recovery to any point in time since the base backup. The command pg_restore combined with recovery_target_time in postgresql.conf enables DBAs to say “restore the database to its state at 14:32:00 yesterday” — invaluable for recovering from accidental bulk deletes or data corruption.


Chapter 9: Parallel and Distributed Databases

As data volumes exceed what a single machine can store or process, parallelism becomes essential. Parallel databases achieve high throughput by distributing work across multiple processors or nodes.

Parallel DBMS Architectures

ArchitectureDescriptionScalabilityExample
Shared MemoryAll CPUs share RAM and disksPoor (\(\leq\) 32 cores)Uniprocessor DBMS
Shared DiskCPUs have private RAM, shared diskModerateOracle RAC
Shared NothingEach node has private RAM and diskExcellentTeradata, Greenplum

Shared-nothing (MPP) scales best because there is no single bottleneck resource. All inter-node communication is via explicit message passing. The challenge is distributing data and workload evenly.

Shared-nothing bottlenecks: skew and stragglers. In a perfectly balanced shared-nothing system, adding \(k\) nodes gives exactly \(k\)× throughput. In practice, this ideal is rarely achieved because of three forms of imbalance.

Data skew: if the data is not uniformly distributed across nodes (e.g., one node holds 30% of the data), queries that require all nodes to complete before returning an answer are limited by the slowest node (Amdahl’s law). The slowest node processes 30% of the work while others finish with 7% each — the effective parallelism is \(1/0.30 \approx 3.3\)×, not 14×.

Query skew: some queries touch only a small fraction of the data (highly selective predicates). A query with selectivity 1% on a 100-node cluster may only need data from 1 node — it gains no parallelism benefit from the cluster. Conversely, a full-table scan benefits maximally from parallelism.

Stragglers: in cloud environments, nodes share physical hardware with other tenants. A single slow node due to I/O contention, garbage collection, or noisy neighbours can stall an entire parallel query. Speculative execution (launching a second copy of a straggling task on a different node, using whichever finishes first) is the standard mitigation — implemented in both Hadoop and Spark. The cost is additional cluster resource usage; the benefit is dramatically reduced tail latency (P99 query times).

Data Partitioning

Data is partitioned across nodes so that each node holds a fraction:

  • Round-robin: distribute tuples cyclically across nodes. Perfectly balanced, but no partition pruning — all nodes must participate in every query.
  • Hash partitioning: assign tuple to node \( h(\text{key}) \bmod N \). Enables partition pruning for equality predicates; all tuples with the same key land on the same node (needed for parallel hash join without repartitioning).
  • Range partitioning: assign ranges of key values to nodes. Enables range predicate pruning and range scans on one or a few nodes. Vulnerable to data skew if key distribution is non-uniform.

Data Partitioning

Data is partitioned across nodes so that each node holds a fraction:

  • Round-robin: distribute tuples cyclically across nodes. Perfectly balanced, but no partition pruning — all nodes must participate in every query.
  • Hash partitioning: assign tuple to node \( h(\text{key}) \bmod N \). Enables partition pruning for equality predicates; all tuples with the same key land on the same node (needed for parallel hash join without repartitioning).
  • Range partitioning: assign ranges of key values to nodes. Enables range predicate pruning and range scans on one or a few nodes. Vulnerable to data skew if key distribution is non-uniform.

Parallel Query Execution

A parallel query plan introduces exchange operators between tree nodes:

  • Gather: collect tuples from multiple workers into one stream.
  • Repartition: redistribute tuples across nodes/threads using hash or range partitioning on a specified attribute (needed to co-locate matching tuples for a parallel hash join).
  • Broadcast: send all tuples from one partition to all other partitions (for small dimension tables in a star join).

Intra-query parallelism runs a single query faster by splitting work among multiple workers. A parallel hash join partitions both inputs across \( k \) workers; each worker performs a local hash join on its shard. Speedup is ideally \( k \); in practice, skew and synchronization overhead reduce it.

MapReduce

MapReduce (Hadoop) is a batch-processing paradigm for large-scale data:

  1. Map: each input record emits zero or more (key, value) pairs.
  2. Shuffle: the runtime groups all emitted pairs by key, distributing them to reducers.
  3. Reduce: for each key, process the list of values and emit aggregate results.

MapReduce provides automatic fault tolerance (restart failed tasks) and is easy to program. However, it lacks a query optimizer, requires materializing intermediate results between stages, and does not support fine-grained concurrency control. Modern systems (Spark, Flink) replace HDFS materialization with in-memory pipelining.

Distributed Commit: 2PC

When a transaction spans multiple nodes, all must agree to commit or abort. Two-Phase Commit (2PC):

  1. The coordinator sends PREPARE to all participants.
  2. Each participant writes a prepare log record, acquires all needed locks, and votes YES or NO.
  3. If all votes are YES, the coordinator writes a COMMIT log record and sends COMMIT to all. If any vote is NO, the coordinator sends ABORT.
  4. Participants apply the decision and release locks.

The weakness of 2PC is blocking: if the coordinator crashes after sending PREPARE but before sending the decision, participants are stuck holding locks until the coordinator recovers. Three-Phase Commit (3PC) adds a pre-commit phase to eliminate this blocking in the absence of network partitions, but is rarely used in practice due to added latency.

2PC and the CAP theorem. 2PC is a CP protocol: it provides consistency (all nodes agree on the outcome) and partition tolerance is limited — during a coordinator failure, the system blocks (it refuses to make progress rather than risk an inconsistent outcome). In a network partition where participants cannot reach the coordinator, they must wait indefinitely holding locks and resources.

Modern distributed databases address this with two different approaches. Paxos-based commit (used in Google Spanner, CockroachDB) replaces the single coordinator with a Paxos group: the commit record is committed to a replicated log, so coordinator failure simply triggers a leader election within the Paxos group rather than a blocking wait. The system remains available as long as a majority of the Paxos group nodes are reachable.

Saga patterns (used in microservices architectures) avoid 2PC entirely by decomposing a long-running transaction into a sequence of local transactions, each with a compensating transaction for rollback. If step 3 of a 5-step saga fails, steps 1 and 2 are undone by executing their compensating transactions. Sagas provide eventual consistency rather than ACID isolation — concurrent sagas may see each other’s partial states — but they avoid the distributed lock-holding and blocking problems of 2PC.

Distributed Query Processing

A distributed query must decide whether to ship data (move data from remote node to local for processing) or ship query (send the query fragment to the data). Shipping data is expensive over wide-area networks. Semi-join reduction reduces the data shipped: before joining \( R \) on node 1 with \( S \) on node 2, ship the join-key projection of \( S \) to node 1, compute the semi-join to filter \( R \), then ship only the filtered \( R \) tuples to node 2. If \( S \) is highly selective, this saves significant bandwidth.

Push-based vs. pull-based distributed execution. Traditional distributed query processing uses a pull model: the query coordinator sends subplan fragments to each node, and each node pulls data from its children, processes it, and returns results to the coordinator. This is essentially the Volcano iterator model extended across network boundaries.

Modern cloud-native systems (Snowflake, Presto, BigQuery) use a push-based model: the coordinator pre-computes the full execution plan, assigns operator pipelines to nodes, and then pushes data through the pipeline proactively — nodes do not wait to be pulled. Push-based execution eliminates the round-trip latency of pull-based coordination (one network round trip per next() call in the pull model) and enables better pipelining across node boundaries.

Snowflake in particular separates the execution layer (ephemeral virtual warehouses that hold no state) from the storage layer (S3 object storage). Each query creates a fresh set of compute workers, assigns each a partition of the input data, and these workers communicate via a high-bandwidth inter-node network using a push protocol. Because compute workers are stateless, scaling up (adding workers) is instantaneous — there is no data rebalancing, since the data stays in S3 and workers are simply assigned different partitions to process.


Chapter 10: Decision Support and Advanced Topics

OLTP workloads consist of many short read-write transactions; OLAP (decision support) workloads consist of complex analytical queries scanning large fractions of the database. Optimizing for both simultaneously is difficult, and modern systems often specialize.

Star Schemas and Decision Support

A star schema consists of a central fact table (e.g., sales transactions) surrounded by dimension tables (e.g., product, customer, date, store). The fact table is typically very large (billions of rows); dimension tables are smaller. A snowflake schema normalizes dimension tables further (e.g., date → month → quarter → year), at the cost of more joins.

Queries on star schemas typically filter on dimension attributes and aggregate fact measures. Bitmap indexes are effective on dimension foreign keys because the bit-AND of multiple bitmap indexes efficiently evaluates multi-dimensional filter conditions without reading the fact table.

Example — star schema query and bitmap index evaluation. Fact table Sales has 1 billion rows with foreign keys to dimensions Product (10,000 products), Customer (1 million customers), and Date (3,650 days = 10 years).

Query: Total sales revenue for electronics products sold in Canada in Q4 2024.

SELECT SUM(s.revenue)
FROM Sales s
JOIN Product p ON s.prod_id = p.prod_id
JOIN Customer c ON s.cust_id = c.cust_id
JOIN Date d ON s.date_id = d.date_id
WHERE p.category = 'Electronics'
  AND c.country = 'Canada'
  AND d.quarter = 'Q4' AND d.year = 2024;

With bitmap indexes on each dimension foreign key in Sales:

  1. Evaluate p.category = ‘Electronics’ against the Product dimension: 500 matching product IDs. Build a bitmap \(B_1\) over 1 billion Sales rows where the prod_id matches one of those 500 IDs. \(B_1\) has about \(1{,}000{,}000{,}000 \times (500/10{,}000) = 50{,}000{,}000\) bits set.

  2. Evaluate c.country = ‘Canada’: about 5% of customers. Bitmap \(B_2\) has ~50 million bits set.

  3. Evaluate the date predicate: 90 days in Q4 2024 out of 3,650 days. Bitmap \(B_3\) has about \(1{,}000{,}000{,}000 \times (90/3650) \approx 24{,}700{,}000\) bits set.

  4. AND the three bitmaps: \(B_1 \land B_2 \land B_3\) identifies matching fact rows. Under independence, the expected count is \(10^9 \times 0.05 \times 0.05 \times 0.0247 \approx 61{,}750\) rows.

  5. Fetch only those 61,750 rows from the fact table and sum revenue.

Without bitmap indexes, the query would require joining all 1 billion fact rows against all three dimension tables — scanning all 8 GB of the fact table. With bitmap indexes, only 61,750 rows are fetched, a 16,000× reduction in fact-table I/O.

Materialized Views

A materialized view is a precomputed query result stored as a physical table. For example, a daily sales aggregation query can be answered in milliseconds from a materialized view rather than minutes from the raw fact table.

View maintenance keeps the materialized view current as the base data changes. Recomputation is simple but expensive. Incremental maintenance applies the delta of changes (insertions and deletions) to the view using the delta rule. For a view \( V = R \bowtie S \):

\[ \Delta V_{\text{insert}} = \Delta R \bowtie S \cup R \bowtie \Delta S \cup \Delta R \bowtie \Delta S \]

The lattice of views generalizes this: multiple materialized views can be organized by the coarseness of their aggregation, and finer-grained views can be answered from coarser ones by further aggregation.

Materialized views and query rewriting. An advanced optimizer feature is **automatic view matching**: even if a query does not explicitly reference a materialized view, the optimizer can detect that the query's algebraic expression is a subset of a materialized view's expression, and rewrite the query to use the view instead of the base tables.

For example, suppose a materialized view MonthlySales stores SUM(revenue) grouped by (product_id, year, month). A query asking for annual revenue by product issues a GROUP BY over (product_id, year). A smart optimizer recognizes that summing the materialized monthly totals over all months in a year gives the annual total — equivalent to the query, but reading the much smaller materialized view instead of the billion-row fact table.

PostgreSQL does not perform automatic view matching (queries must explicitly reference materialized views). Oracle, SQL Server, and Redshift do support it via their “intelligent query rewriting” / “materialized view recommendation” features. The difficulty is correctness: the optimizer must prove, using algebraic equivalence rules, that the view-based plan returns the same result as the original query for all possible data — a problem that is decidable but computationally expensive for complex views.

Column Stores and Compression

Column stores (e.g., Vertica, ClickHouse, DuckDB) store each column in a separate file, sorted by a chosen sort key. This enables:

  • Projection pruning: read only the columns referenced by the query.
  • Vectorized execution: process a batch of values from a single column using SIMD CPU instructions.
  • Aggressive compression: values in a column are of the same type and often highly regular.

The interaction between compression and vectorized execution is particularly powerful. Consider a region column in a sales table with 10 million rows and only 5 distinct values (North, South, East, West, Central). Dictionary encoding maps each distinct value to a 3-bit integer (since \(\lceil \log_2 5 \rceil = 3\)), compressing the column by 10× compared to storing each string directly. A predicate WHERE region = ‘North’ evaluates against the dictionary (cost: 1 comparison) and produces a 3-bit integer code; then the compressed column is scanned with bitwise comparisons against the code — processing 64 values per 192-bit SIMD register in a single CPU instruction cycle. The query runs without ever decompressing the column, achieving both space savings and faster execution than the uncompressed case.

Common compression schemes:

SchemeWhen Useful
Run-Length Encoding (RLE)Long runs of repeated values (e.g., sorted region column)
Delta EncodingSlowly increasing values (e.g., timestamps, prices)
Dictionary EncodingLow-cardinality string columns (e.g., status codes)
Bit-packingInteger columns with small range

Queries can often operate directly on compressed data without decompressing first.

In-Memory Databases

When the entire working set fits in RAM, the buffer manager becomes unnecessary overhead. Systems like VoltDB, H-Store, and HyPer eliminate it entirely. The DBMS manages memory directly and logs only to disk for durability. Because data is never lost on a controlled shutdown, redo logging for data pages is not needed — only undo logging (or MVCC snapshots) is needed for transaction isolation.

HyPer uses MVCC with hardware transactional memory and morsel-driven parallelism: the relation is divided into fixed-size morsels (chunks of \(\approx\) 10,000 tuples); worker threads dynamically steal morsels from a global work queue, achieving adaptive load balancing.

The logging bottleneck in in-memory databases. Even when all data is in memory, durability requires writing log records to persistent storage. For high-throughput OLTP (e.g., 100,000 transactions per second), the log becomes the performance bottleneck. Each transaction requires at least one sequential disk write (the commit log record) before it can return success to the client.

Three approaches mitigate the logging bottleneck:

Group commit: instead of flushing the log for each transaction individually, buffer log records in memory for a short interval (e.g., 1 ms) and flush all buffered records in a single disk write. If 500 transactions commit in that 1 ms, the log flush amortises across all 500 transactions — one fsync instead of 500. The latency of each transaction increases by up to 1 ms, but throughput increases proportionally.

Write-ahead log shipping to a replica (used by VoltDB): instead of writing to local disk, the primary sends log records to a standby replica over the network, which acknowledges receipt. The primary considers the transaction durable once the replica acknowledges. If the primary crashes, the replica takes over. Network round-trip latency (e.g., 100 μs intra-datacenter) is much lower than disk seek latency (5 ms), making this approach both faster and more fault-tolerant.

Non-volatile memory (NVM/Optane DCPMM): NVM provides byte-addressable persistence with latency of 100–300 ns — far lower than SSD (100 μs) or HDD (5 ms). Writing the commit log record to NVM rather than disk eliminates the logging bottleneck for most workloads. NVM-aware DBMS designs (e.g., NOVA, WineFS) bypass the file system and write directly to NVM using CPU store instructions, achieving commit latency of a few microseconds.

NewSQL systems (CockroachDB, TiDB, Google Spanner) provide full ACID guarantees with horizontal scalability. Spanner uses TrueTime (GPS + atomic clock timestamps) to implement external consistency across globally distributed nodes.

Cloud-native databases (Amazon Aurora, Snowflake, Azure Synapse) separate storage from compute: the storage layer is replicated object storage (S3 or equivalent); compute nodes are stateless and can be elastically scaled. Aurora uses a redo-log-only protocol — only log records are shipped from the compute layer to the storage layer, reducing write amplification.

Vectorized execution (pioneered in MonetDB/X100 and adopted in DuckDB) processes data in batches of 1,024–8,192 values per operator call rather than one tuple at a time. This dramatically reduces interpretation overhead and enables SIMD parallelism within each batch.

The iterator model's overhead and why vectorization matters. The Volcano iterator model (Chapter 4) is elegant but carries per-tuple overhead. For each output tuple, the root calls `next()` on its child, which calls `next()` on its child, cascading down the tree. Each call involves a virtual function dispatch (a pointer dereference and indirect jump), instruction cache pressure from switching between operator code, and the overhead of a C function call stack frame. For a selection operator processing 10 million tuples, this amounts to 10 million virtual function calls — at 5–10 ns each, that is 50–100 ms of pure interpretation overhead, before any actual predicate evaluation.

Vectorized execution replaces one-tuple-at-a-time with one-batch-at-a-time. Each next() call returns a batch of 1,024 tuples (represented as column arrays). The selection operator evaluates the predicate over 1,024 values using a tight loop that the CPU can vectorize (SIMD: process 4–16 comparisons per clock cycle). Virtual function dispatch overhead is amortised over 1,024 tuples instead of paid per tuple. For the same 10 million tuples, vectorized execution reduces the number of next() calls from 10 million to ~9,800.

DuckDB’s benchmarks show that vectorized execution achieves 10–100× speedup over Volcano-model execution on analytical workloads, primarily because OLAP queries typically process large amounts of data with simple predicates — exactly the regime where tight inner loops and SIMD dominate. OLTP workloads (few tuples per query) see less benefit because the per-batch overhead is comparable to the per-tuple cost when batches are nearly empty.

Spanner and TrueTime: external consistency at global scale. Google Spanner (Corbett et al., OSDI 2012) is the first system to provide ACID transactions with external consistency across globally distributed nodes. The key challenge is that clocks on different machines are not perfectly synchronized — they drift apart by up to tens of milliseconds. If T1 commits at Google's US data center and T2 starts 1 millisecond later at a European data center, T2 should observe T1's writes. Without perfect clock synchronization, this is impossible to guarantee using timestamp-based MVCC.

Spanner’s solution is TrueTime: a globally distributed clock infrastructure built on GPS receivers and atomic clocks at every data center. TrueTime exposes the API TT.now(), which returns an interval \([t_{\text{earliest}}, t_{\text{latest}}]\) guaranteed to contain the true current time. The uncertainty (typically 1–7 ms) is bounded by the clock skew between GPS/atomic clock references.

At commit time, Spanner assigns a commit timestamp \(s\) within the TrueTime interval and then waits until TT.now().earliest > s before releasing the commit acknowledgment (“commit wait”). This wait ensures that every other node’s clock has advanced past \(s\) before any subsequent transaction can read the committed data, guaranteeing that the commit timestamp faithfully reflects real-world ordering. The cost is a 1–7 ms latency penalty per commit — acceptable for cross-continental transactions where network RTT already dominates.


Chapter 11: Query Optimization — Deeper Dive

The cost estimation discussion in Chapter 6 left open several questions: how exactly are join cardinalities estimated when values are non-uniformly distributed, how do the three principal join algorithms compare across a concrete I/O model, and what happens when the optimizer’s estimates are wrong at execution time?

Selectivity Estimation for Joins

The key quantity that drives every join-order decision is the join selectivity: the fraction of the Cartesian product \(R \times S\) that survives the join predicate. For an equi-join on attribute \(A\),

\[ \text{sel}(R \bowtie_A S) = \frac{1}{\max(V(R,A),\ V(S,A))} \]

where \(V(R,A)\) is the number of distinct values of \(A\) in \(R\). This formula assumes uniformity and independence — neither of which holds in practice.

Equi-depth (equi-height) histogram. An equi-depth histogram on attribute \(A\) of relation \(R\) partitions the value domain into \(k\) buckets such that each bucket contains the same number of tuples — approximately \(|R|/k\) tuples per bucket. The histogram stores, for each bucket, the lower and upper boundary values and the bucket count (always \(|R|/k\)) plus the number of distinct values in the bucket. Equi-depth histograms give more accurate selectivity estimates than equi-width histograms for skewed distributions, because dense regions of the domain automatically receive more buckets.

For a point predicate \(A = v\), the optimizer locates the bucket containing \(v\) and estimates the selectivity as \(1 / V_{\text{bucket}}\), where \(V_{\text{bucket}}\) is the number of distinct values in that bucket. For a join predicate \(R.A = S.A\), matching histogram buckets are compared: bucket \(i\) of \(R\) overlaps bucket \(j\) of \(S\) only on the overlapping value range, and the contribution to join cardinality is estimated as \((\text{count}_i \times \text{count}_j) / \max(V_i, V_j)\) for each overlapping pair, then summed.

Example — histogram-based join selectivity. Relation \(R\) has 10,000 tuples; attribute \(A\) has an equi-depth histogram with \(k = 4\) buckets (2,500 tuples each): \[ [1\text{–}10]:\ V=5 \quad [11\text{–}50]:\ V=8 \quad [51\text{–}200]:\ V=20 \quad [201\text{–}1000]:\ V=30 \]

Relation \(S\) has 8,000 tuples; attribute \(A\) has 4 buckets (2,000 tuples each):

\[ [1\text{–}25]:\ V=6 \quad [26\text{–}100]:\ V=15 \quad [101\text{–}500]:\ V=40 \quad [501\text{–}1000]:\ V=20 \]

The naive formula \(|R \bowtie S| = |R| \times |S| / \max(V(R,A), V(S,A)) = 10000 \times 8000 / \max(63, 81) \approx 987{,}654\) treats the entire domain uniformly. The histogram approach breaks this into overlapping bucket pairs:

  • \(R[1\text{–}10]\) overlaps \(S[1\text{–}25]\) on \([1\text{–}10]\): contribution \(\approx 2500 \times (10/25 \times 2000) / \max(5,6) = 2500 \times 800 / 6 \approx 333{,}333\). Scaled to the overlap fraction.

In practice, histogram matching yields estimates that are within a factor of 2–3× of the true cardinality in most cases, versus factors of 10–100× for the uniform-distribution formula on skewed data.

Sampling provides an alternative: draw a random sample of 1–5% of each relation, execute the join on the sample, and extrapolate. For a 1% sample of both \(R\) and \(S\), the sample join has \(0.01 \times 0.01 = 0.0001\) of the true join size in expectation; multiply by \(10^4\) to estimate the full join cardinality. Sampling is unbiased but introduces variance — small samples of skewed data can produce estimates off by an order of magnitude. Database systems like Redshift use sketches (e.g., HyperLogLog for distinct-value estimation, Count-Min Sketch for frequency estimation) as a compact, deterministic alternative to random sampling.

I/O Cost Formulas for Join Algorithms

Let \(B(R)\) denote the number of pages of relation \(R\), \(B(S)\) the pages of \(S\), and \(B\) the number of available buffer pages. The three principal join algorithms compare as follows.

Join algorithm I/O cost summary. \[ \text{Cost}_{\text{BNLJ}} = B(R) + \left\lceil \frac{B(R)}{B-2} \right\rceil \cdot B(S) \]

The outer relation \(R\) is read once (\(B(R)\) I/Os). For each chunk of \(B-2\) pages from \(R\), the inner relation \(S\) is scanned in full (\(B(S)\) I/Os). Assign the smaller relation as outer to minimize the number of \(S\)-scans.

\[ \text{Cost}_{\text{SMJ}} = \underbrace{2B(R)(1+\lceil \log_{B-1}\lceil B(R)/B\rceil \rceil)}_{\text{sort }R} + \underbrace{2B(S)(1+\lceil \log_{B-1}\lceil B(S)/B\rceil \rceil)}_{\text{sort }S} + B(R) + B(S) \]

When both inputs fit within two-pass sort (i.e., \(B(R),B(S) \leq B^2\)), this simplifies to \(3(B(R)+B(S))\).

\[ \text{Cost}_{\text{HJ}} = 3\bigl(B(R) + B(S)\bigr) \]

provided each partition of \(R\) fits in \(B\) pages, i.e., \(B(R) \leq B(B-1)\). Otherwise, recursive partitioning adds \(2(B(R)+B(S))\) per additional level.

Example — choosing join algorithm with concrete numbers. Suppose \(B(R) = 500\) pages, \(B(S) = 5000\) pages, and \(B = 50\) buffer pages.

BNLJ (R as outer): \(500 + \lceil 500/48 \rceil \times 5000 = 500 + 11 \times 5000 = 55{,}500\) I/Os.

BNLJ (S as outer): \(5000 + \lceil 5000/48 \rceil \times 500 = 5000 + 105 \times 500 = 57{,}500\) I/Os. Using \(R\) as outer is slightly cheaper.

SMJ: both relations require two-pass sort since \(B(R) = 500 \leq B^2 = 2500\) and \(B(S) = 5000 > 2500\) — so \(S\) needs a three-pass sort. Approximate total: \(3 \times 500 + 4 \times 5000 + (500 + 5000) \approx 1500 + 20000 + 5500 = 27{,}000\) I/Os.

HJ: check memory constraint — \(B(R)/(B-1) = 500/49 \approx 11\) pages per partition. With \(B = 50\), each partition fits in 50 pages. \(3(500 + 5000) = 16{,}500\) I/Os. Hash join wins.

At \(B = 10\) pages, hash join requires \(B(R)/(B-1) = 56\) pages per partition but only 10 buffer pages — recursive partitioning is needed, and hash join may lose to SMJ. The crossover is at \(B \approx \sqrt{B(R)} = \sqrt{500} \approx 22\) pages.

Adaptive Query Processing

Even the best cardinality estimates can be wrong by orders of magnitude for multi-join queries with correlated predicates. Adaptive query processing reacts to estimation errors at runtime.

The eddies framework (Avnur and Hellerstein, 2000) routes tuples through operator pipelines dynamically, steering tuples toward operators that are currently most selective and least congested. The routing policy is updated by observing the output rate of each operator.

A simpler production technique is re-optimization: execute the plan up to the first operator with high uncertainty (e.g., a join), materialize the intermediate result, measure its actual cardinality, and if the measured cardinality differs from the estimate by more than a threshold (e.g., 10×), re-invoke the optimizer with the corrected statistic to plan the remainder of the query. PostgreSQL 14 added incremental sort as a mild form of this; CockroachDB and SQL Server perform full mid-query re-optimization.

Adaptive triggers in practice. A re-optimization trigger fires when the ratio of estimated to actual intermediate result size exceeds a factor of \(\delta\) (typically 10 to 100). The cost of re-optimization is one optimizer invocation (milliseconds) plus one materialization pass over the intermediate result. For a long-running OLAP query where the remaining plan cost is measured in minutes, paying a few milliseconds for re-optimization is almost always worthwhile. For OLTP queries that complete in microseconds, re-optimization overhead dominates. Systems therefore apply re-optimization only to queries above a minimum estimated cost threshold.

Chapter 12: Concurrency Control — Full Protocols

The brief MVCC and OCC introductions in Chapter 7 merit deeper treatment. This chapter gives the complete protocol for each, then compares their throughput profiles.

MVCC: Complete Protocol

In a full MVCC system, every write operation creates a new version of the modified tuple rather than overwriting the old one. The version chain is stored either in a separate delta store (as in SQL Server) or inline in the heap using the slot mechanism (as in PostgreSQL).

MVCC read and write rules (PostgreSQL snapshot isolation).

Transaction start: assign transaction ID \(T\). Record the snapshot \(S_T\): the set of all transaction IDs that are committed at this moment.

\[ c \in S_T \quad \text{and} \quad (d = 0 \ \text{or}\ d \notin S_T) \]

That is, the version was created by a committed transaction in \(T\)’s snapshot and has not been deleted by any transaction in the snapshot. No locks are acquired on reads.

Write of tuple \(t\): create a new version \(v'\) with \(\text{xmin}(v') = T\), \(\text{xmax}(v') = 0\). Mark the old current version’s \(\text{xmax}\) with \(T\). Acquire a write lock on \(t\) to prevent concurrent writes (first-writer-wins for write-write conflicts).

Commit: mark \(T\) as committed. All versions with \(\text{xmin} = T\) become visible to subsequent transactions.

Abort: mark all versions with \(\text{xmin} = T\) as dead. No other transaction will make them visible. No undo log is needed — the old version remains in place.

Garbage collection (VACUUM): a version \(v\) is dead if \(\text{xmax}(v)\) is committed and no active transaction’s snapshot includes \(\text{xmax}(v)\) in its visibility window. Dead versions are reclaimed.

The most important property of MVCC is that readers never block writers and writers never block readers. This is the key advantage over 2PL for read-heavy OLTP workloads: a long-running read transaction (e.g., a reporting query) does not hold any locks, so concurrent write transactions proceed without blocking. The cost is version storage: every write creates a new version, and VACUUM must periodically reclaim old versions. On write-heavy tables with many updates, the version chain can grow long, requiring sequential chain traversals for reads — the tuple bloat problem.

Serializable Snapshot Isolation (SSI)

Snapshot isolation prevents dirty reads and non-repeatable reads but allows write skew: two transactions each read a set of tuples and then write to different tuples in a way that jointly violates a constraint, even though neither write conflicts directly with the other’s read.

Write skew example. Doctors Alice and Bob are on call. The constraint is "at least one doctor must be on call." Both read the on-call table and see two doctors. Alice writes "Alice not on call"; Bob writes "Bob not on call." Under snapshot isolation both commits succeed, violating the constraint — but there was no read-write conflict in the locking sense.

Serializable Snapshot Isolation (SSI), introduced by Cahill et al. (2008) and implemented in PostgreSQL since version 9.1, detects write skew by tracking rw-anti-dependencies. A rw-anti-dependency from \(T_1\) to \(T_2\) exists when \(T_1\) reads a version that \(T_2\) subsequently writes (without \(T_1\) seeing \(T_2\)’s write, because \(T_2\) started after \(T_1\)’s snapshot). SSI tracks these edges in a conflict graph; if a cycle of the form \(T_1 \xrightarrow{rw} T_2 \xrightarrow{rw} T_3\) (with \(T_3 = T_1\) in the simplest case) is detected at commit time, one transaction is aborted.

2PL vs. OCC vs. MVCC: throughput under contention.

Low contention (read-heavy workload, few conflicts): MVCC wins decisively. Readers never block; locks are acquired only for writes. OCC also performs well — validation rarely fails. 2PL pays lock acquisition overhead even when no conflict would have occurred.

Medium contention (mixed read-write): MVCC and OCC are comparable. OCC’s validation phase can abort transactions that 2PL would have serialized via blocking — aborted transactions waste the work done in their read phase. MVCC avoids this by using write locks only, keeping readers entirely lock-free.

High contention (write-heavy, many conflicts): 2PL typically wins. Under OCC, a high conflict rate means many transactions fail validation and must restart — in the worst case (many writers on a hot item), the effective throughput collapses because most work is wasted in aborted transactions. MVCC similarly suffers under high write contention because the first-writer-wins rule serializes writers. 2PL at least allows waiting transactions to eventually proceed rather than restarting from scratch.

The conclusion: MVCC (with SSI for serializability) is the dominant choice for general-purpose OLTP databases (PostgreSQL, MySQL InnoDB, Oracle), while OCC is preferred for in-memory databases with short transactions and low contention (H-Store, HyPer, FoundationDB). 2PL survives in legacy systems and in scenarios requiring strict two-phase behavior for correctness proofs.


Chapter 13: Recovery — ARIES in Full Detail

Chapter 8 gave a worked example of ARIES recovery. This chapter provides a complete LSN-level trace to make each phase concrete.

Worked ARIES Log Sequence Example

Complete ARIES trace with six log records. Assume the system checkpointed at LSN 10 with an empty ATT and DPT (a clean state). The following transactions then execute:
LSNTypeTransIdPageIdPrevLSNBeforeAfter
11UPDATET1P35070
12UPDATET2P58090
13UPDATET1P31170100
14COMMITT113
15UPDATET2P7124060

The system crashes after LSN 15 is written to the log. Pages P3, P5, and P7 were not flushed to disk before the crash (they are dirty in the buffer pool, now lost).

Analysis phase (scan from LSN 10 forward):

  • LSN 11: T1 starts; add T1 to ATT (lastLSN=11, status=in-progress). P3 not in DPT → add P3 with recLSN=11.
  • LSN 12: T2 starts; add T2 to ATT (lastLSN=12). P5 not in DPT → add P5 with recLSN=12.
  • LSN 13: T1 updates P3; update ATT[T1].lastLSN=13. P3 already in DPT (recLSN stays 11).
  • LSN 14: T1 commits; update ATT[T1].status=committed.
  • LSN 15: T2 updates P7; update ATT[T2].lastLSN=15. Add P7 to DPT with recLSN=15.

End of Analysis: ATT = {T1: committed, lastLSN=14; T2: in-progress, lastLSN=15}. DPT = {P3: recLSN=11, P5: recLSN=12, P7: recLSN=15}. Redo start = min(recLSN) = 11. Winners = {T1}. Losers = {T2}.

Redo phase (scan from LSN 11 forward, apply updates):

  • LSN 11: UPDATE T1 P3 (after=70). Fetch P3 from disk (old value 50). pageLSN(P3)=0 < 11 → apply: P3 ← 70, pageLSN(P3) ← 11.
  • LSN 12: UPDATE T2 P5 (after=90). Fetch P5 (value 80). pageLSN(P5)=0 < 12 → apply: P5 ← 90, pageLSN(P5) ← 12.
  • LSN 13: UPDATE T1 P3 (after=100). pageLSN(P3)=11 < 13 → apply: P3 ← 100, pageLSN(P3) ← 13.
  • LSN 14: COMMIT T1 — no data to redo.
  • LSN 15: UPDATE T2 P7 (after=60). Fetch P7 (value 40). pageLSN(P7)=0 < 15 → apply: P7 ← 60, pageLSN(P7) ← 15.

After redo: P3=100, P5=90, P7=60 — exactly the pre-crash in-memory state.

Undo phase (roll back T2, the loser):

T2’s lastLSN=15. Undo LSN 15: T2 wrote P7 (before=40, after=60). Undo: P7 ← 40. Write CLR at LSN 16: (CLR, T2, P7, undoNextLSN=12, before=60, after=40). pageLSN(P7) ← 16.

Next to undo for T2: undoNextLSN of LSN 15’s prevLSN is 12. Undo LSN 12: T2 wrote P5 (before=80, after=90). Undo: P5 ← 80. Write CLR at LSN 17: (CLR, T2, P5, undoNextLSN=null). Write ABORT at LSN 18 for T2.

Final state: P3=100 (T1’s committed writes preserved), P5=80 (T2’s write undone), P7=40 (T2’s write undone). T1’s durability satisfied; T2’s atomicity satisfied.

If the system crashes again during redo at LSN 13, the second recovery run re-reads LSN 11, sees pageLSN(P3)=11 ≥ 11, skips it; re-reads LSN 13, sees pageLSN(P3)=11 < 13, applies it again — idempotent. CLRs written during the first undo phase are replayed as redo records (they are redo-only), so partial undo work is not re-done.


Chapter 14: Columnar Storage and Modern OLAP

The DSM discussion in Chapter 2 introduced the idea of column-oriented storage. This chapter develops the compression techniques and execution model that make modern OLAP systems an order of magnitude faster than row-store alternatives.

Compression Schemes in Detail

Column data exhibits patterns that general-purpose compressors (gzip, LZ4) cannot exploit at query time because they require decompression before processing. Column-specific schemes maintain the data in a form that the query engine can operate on directly.

Dictionary encoding. Replace each distinct string value with a small integer code. Store the dictionary (code → string) separately. For a column with \(D\) distinct values, codes require \(\lceil \log_2 D \rceil\) bits each. Queries that compare against a constant first look up the constant in the dictionary (O(log D) time), obtain its code, then scan the compressed column comparing integer codes — no string comparison in the inner loop.
Run-length encoding (RLE). Replace consecutive runs of identical values with a pair (value, run-length). For a sorted column with \(k\) distinct values, RLE reduces storage from \(N\) entries to at most \(N\) pairs but typically far fewer. A column sorted on region with 1 million rows and 5 distinct regions compresses from 1 million entries to at most 5 pairs per distinct value transition — potentially thousands of pairs total but far less than 1 million. Aggregation queries (COUNT, SUM) on RLE-compressed columns can be answered directly from the (value, count) pairs without expanding the run.
Delta encoding. Store the difference between consecutive values rather than the absolute value. For a monotonically increasing timestamp column with values \(t_1, t_2, \ldots\), store \(t_1, t_2 - t_1, t_3 - t_2, \ldots\). If timestamps increase by roughly 1 second, the deltas are small integers (1–5 bits each) rather than 64-bit epoch values, yielding a 10–30× compression ratio. Bit-packing then packs these small deltas into 64-bit words for SIMD processing.

Late vs. Early Materialization

In a column store executing a query like SELECT name, salary FROM Employees WHERE dept = 'Eng', there are two strategies for combining the results of the column scans.

Early materialization: immediately after evaluating the predicate on the dept column (producing a set of row positions), fetch the name and salary values for those positions and assemble full tuples. All subsequent operators work on complete tuples — the familiar row-store model. Simple but wastes effort if additional predicates further filter the result.

Late materialization: propagate only the position vector (a list of matching row indices) through the operator pipeline. Fetch actual column values only at the last possible moment, just before the final projection. Each operator in the pipeline works on position vectors and compressed column data. For a query with three predicates, late materialization avoids fetching name and salary for rows that are filtered out by the second or third predicate.

Example — late materialization I/O savings. Table with 10 million rows; three predicates on columns C1, C2, C3 with selectivities 10%, 5%, and 2% respectively (applied in order). Each column is 8 bytes wide.

Early materialization: apply predicate on C1 → 1 million matches. Fetch C2 for all 1 million rows (8 MB) to evaluate second predicate → 50,000 matches. Fetch C3 for 50,000 rows (0.4 MB) → 10,000 final matches. Total column reads beyond C1: 8.4 MB.

Late materialization: apply predicate on C1 → position vector of 1 million positions (stored as 4-byte integers: 4 MB). Apply predicate on C2 at matching positions → 50,000 positions (0.2 MB). Apply predicate on C3 → 10,000 positions (0.04 MB). Fetch all other projected columns only for the final 10,000 rows. Savings: the position vector representation is compact, and columns C2 and C3 need only be accessed at the 1 million and 50,000 matching positions rather than for full scans.

Vertica, DuckDB, and ClickHouse all use late materialization by default.

Vectorized Execution on Compressed Columns

Vectorized execution processes batches of 1,024–8,192 values. Combined with dictionary encoding, the inner loop of a selection operator compares 64-bit words (each holding multiple packed codes) against a bitmask derived from the dictionary lookup. On a modern CPU with 256-bit AVX2 registers, 32 one-byte codes can be compared in a single instruction — achieving effective throughput of \(32 \times \text{clock frequency}\) comparisons per second.

Dremel Nested Data Model

Google’s Dremel (Melnik et al., 2010) extends columnar storage to nested and repeated fields — the data model underlying Protocol Buffers, Parquet, and ORC. A nested record like {name: "Alice", address: {city: "Waterloo", zip: "N2L"}} cannot be stored in a flat column without duplication or loss of structure.

Dremel encodes each leaf field in its own column, accompanied by two metadata arrays: a repetition level (how many repeated ancestor fields restarted since the previous value) and a definition level (how many nullable ancestor fields are defined rather than null). Together, these allow full reconstruction of the original nested record from the column representation, without any additional per-record length or offset information.


Chapter 15: Distributed Databases — CAP, Consistency, and Consensus

Chapter 9 treated parallel databases within a data center. This chapter addresses the harder problem of databases spanning multiple data centers or operating over unreliable networks.

CAP Theorem

CAP Theorem (Brewer 2000; formalized by Gilbert and Lynch 2002). In an asynchronous distributed system, it is impossible for a data store to simultaneously guarantee all three of the following properties:

Consistency (C): every read returns the most recently written value (linearizability).

Availability (A): every request to a non-failed node receives a response (not an error or timeout).

Partition tolerance (P): the system continues to operate despite an arbitrary number of messages being dropped or delayed by the network between nodes.

Proof sketch of impossibility. Suppose a network partition separates two nodes \(N_1\) and \(N_2\). A client writes value \(v\) to \(N_1\). Another client reads from \(N_2\). If the system is available, \(N_2\) must return a response — but it cannot have received \(v\) (the partition blocks all messages). If it returns the old value, consistency is violated. If it returns an error, availability is violated. There is no third option.

The theorem implies that a distributed system can be CP (consistent and partition-tolerant, but may become unavailable during partitions — e.g., HBase, Zookeeper) or AP (available and partition-tolerant, but may return stale data — e.g., Cassandra, DynamoDB) but not both C and A under all network conditions.

Real-world systems do not face binary choices. PACELC (Abadi, 2012) extends CAP: even when there is no partition (\(P\)), a system must trade off latency (\(L\)) against consistency (\(C\)). Strong consistency requires synchronous replication — waiting for all replicas to acknowledge a write — which adds latency. Eventual consistency allows asynchronous replication — responding immediately, propagating updates later — which reduces latency but allows stale reads.

Two-Phase Commit: Coordinator Failure Scenarios

The brief 2PC description in Chapter 9 omitted the critical failure cases. Consider what happens when the coordinator fails at each point in the protocol.

2PC coordinator failure scenarios.

Failure before PREPARE is sent: participants have no record of the transaction. Each participant’s timeout fires and it unilaterally aborts. Correct — the transaction had not started committing.

Failure after PREPARE, before COMMIT/ABORT decision: participants have written prepare records and are waiting. They hold all locks. They cannot unilaterally abort (the coordinator might have decided to commit just before crashing) or commit (the decision has not been made). The participants are blocked indefinitely until the coordinator recovers. This is the fundamental weakness of 2PC — it is a blocking protocol.

Failure after COMMIT sent to some participants but not all: some participants commit (releasing locks), others are still waiting. Upon recovery, the coordinator re-sends COMMIT to all — participants that already committed ignore the duplicate (idempotent); participants that were waiting proceed to commit. Correct.

Mitigation — cooperative termination: blocked participants can contact each other. If any participant received COMMIT, all can commit. If any participant voted NO, all can abort. But if all participants voted YES and none has received the decision, they remain blocked — only the coordinator’s recovery resolves the outcome.

Spanner’s External Consistency via TrueTime Commit Wait

The Spanner TrueTime mechanism (introduced briefly in Chapter 10) is a solution to the following problem: in a geographically distributed database, how do you assign commit timestamps that respect real-world time ordering, even across data centers with skewed clocks?

Spanner commit wait protocol.
  1. The transaction leader (a Paxos leader for the affected shard) calls \(\text{TT.now}()\) to obtain interval \([t_{\text{early}}, t_{\text{late}}]\).
  2. Assign commit timestamp \(s = t_{\text{late}}\) (the upper bound — the latest possible current time).
  3. Execute Paxos commit (replicate the commit record to a majority of replicas).
  4. Commit wait: block until \(\text{TT.now}().\text{earliest} > s\). This ensures that the absolute true time has definitely advanced past \(s\) on every node in the system.
  5. Release locks and acknowledge commit to the client.

Guarantee: if transaction \(T_1\) commits before \(T_2\) starts (in real time), then \(s_{T_1} < s_{T_2}\). Readers using timestamp \(s_{T_2}\) will always observe \(T_1\)’s writes because \(s_{T_1} < s_{T_2}\).

Paxos vs. Raft: A Comparison

Both Paxos and Raft are consensus protocols that allow a set of nodes to agree on a value (or a log of values) even if some nodes crash, as long as a majority remain available.

Paxos (Lamport 1989) is the theoretical foundation: a two-phase protocol (Prepare/Promise then Accept/Accepted) that is provably correct but famously difficult to implement fully — multi-Paxos, leader election, log compaction, and membership changes all require extensions not present in the original paper.

Raft (Ongaro and Ousterhout 2014) is explicitly designed for understandability. It decomposes consensus into three subproblems with clear separation: leader election, log replication, and safety. A Raft cluster elects a single leader that receives all writes, replicates them to followers, and commits once a majority acknowledge. Leader election uses randomized timeouts to avoid split votes. Raft is used in etcd, CockroachDB, TiKV, and InfluxDB.

The practical difference: Raft’s clear leader-based model makes it easier to implement correctly, at the cost of a single-leader bottleneck for write throughput. Multi-Paxos can support multi-leader configurations (though this complicates conflict resolution), potentially achieving higher write throughput at the cost of implementation complexity.


Chapter 16: Query Languages Beyond SQL

SQL is the dominant query language for relational data, but it is not expressive for all problems. Recursive queries, graph traversal, and certain deductive reasoning tasks are more naturally expressed in Datalog, a logic-based query language.

Datalog: Rules, Recursion, and Stratified Negation

Datalog syntax. A Datalog program consists of facts (base relations) and rules of the form: \[ \text{Head}(X_1, \ldots, X_k) \leftarrow \text{Body}_1(Y_1, \ldots), \ldots, \text{Body}_n(Z_1, \ldots) \]

The head is derived whenever all body goals are satisfied. Variables are implicitly universally quantified; constants appear directly. Rules are applied repeatedly until a fixpoint is reached (no new facts can be derived).

Transitive closure in Datalog. Given a base relation \(\text{Edge}(X, Y)\) representing directed graph edges, the transitive closure \(\text{Reach}(X, Y)\) (meaning "there is a path from X to Y") is expressed by two rules: \[ \text{Reach}(X, Y) \leftarrow \text{Edge}(X, Y) \]\[ \text{Reach}(X, Y) \leftarrow \text{Edge}(X, Z),\ \text{Reach}(Z, Y) \]

The first rule says that a direct edge implies reachability. The second rule says that if there is a path from \(Z\) to \(Y\) and an edge from \(X\) to \(Z\), then \(X\) can reach \(Y\). Applying these rules iteratively starting from the base facts reaches a fixpoint — the complete transitive closure.

Worked computation: Suppose \(\text{Edge} = \{(A,B), (B,C), (C,D)\}\).

Iteration 0: \(\text{Reach} = \{(A,B), (B,C), (C,D)\}\) (base case).

Iteration 1: apply recursive rule. \((A,B) \in \text{Edge}\) and \((B,C) \in \text{Reach}\) → derive \((A,C)\). \((A,B)\) and \((B,D) \notin \text{Reach}\) yet; \((B,C)\) and \((C,D)\) → derive \((B,D)\). New: \(\text{Reach} = \{(A,B),(B,C),(C,D),(A,C),(B,D)\}\).

Iteration 2: \((A,B)\) and \((B,D) \in \text{Reach}\) → derive \((A,D)\). New: \(\text{Reach} = \{(A,B),(B,C),(C,D),(A,C),(B,D),(A,D)\}\).

Iteration 3: no new facts derivable. Fixpoint reached.

The equivalent SQL requires a recursive CTE (WITH RECURSIVE), which evaluates identically to Datalog’s fixpoint iteration. Datalog makes the semantics explicit.

Stratified negation allows Datalog rules to use negation as long as no recursive rule negates itself (a cycle through negation). Rules are stratified into layers: stratum 0 contains base facts and rules with no negation; stratum 1 contains rules that may use negated stratum-0 predicates; and so on. Queries with stratified negation are evaluated bottom-up, one stratum at a time, and are well-defined (no semantic ambiguity). Queries with unstratified negation (negation within a cycle) are not supported in standard Datalog.

Relational Division and Its SQL Encoding

Relational division \(R \div S\) answers queries of the form “find all \(X\) values in \(R\) that appear paired with every \(Y\) value in \(S\).” This is the “for all” quantifier, which SQL does not have directly — SQL uses double negation instead.

Relational division in SQL. Find students enrolled in every course in the set \(S = \{\text{'CS448'}, \text{'CS456'}\}\).

Relations: \(\text{Enrolled}(\text{sid}, \text{cid})\), \(\text{RequiredCourses}(\text{cid})\) containing CS448 and CS456.

The double-negation encoding:

SELECT DISTINCT sid FROM Enrolled e1
WHERE NOT EXISTS (
    SELECT cid FROM RequiredCourses r
    WHERE NOT EXISTS (
        SELECT * FROM Enrolled e2
        WHERE e2.sid = e1.sid AND e2.cid = r.cid
    )
);

Reading this inside-out: the innermost subquery checks whether student e1.sid is enrolled in course r.cid. The middle NOT EXISTS asks “is there a required course that e1.sid is NOT enrolled in?” The outer NOT EXISTS asks “is there no required course that e1.sid is not enrolled in?” — equivalently, is e1.sid enrolled in all required courses. The double negation encodes the universal quantifier.


Chapter 17: Advanced Indexing

GiST: Generalized Search Tree Framework

The B+-tree is optimal for one-dimensional ordered data. Many applications require indexes over multi-dimensional points, geometric shapes, full-text tokens, or other structured types. GiST (Generalized Search Tree, Hellerstein et al. 1995) provides a single tree framework parameterized by a key type and a set of predicate methods (consistent, union, penalty, picksplit).

GiST node invariant. Every entry \((p, \text{ptr})\) in a GiST internal node satisfies: predicate \(p\) holds for every object in the subtree rooted at \(\text{ptr}\). For a B+-tree, \(p\) is a range predicate. For an R-tree, \(p\) is a bounding box. For a full-text index, \(p\) is a set of contained terms.

R-trees (Guttman 1984) instantiate GiST for spatial data. Each internal node stores a minimum bounding rectangle (MBR) that encloses all objects in its subtree. A spatial query (e.g., “find all restaurants within 5 km of point \(q\)”) traverses the tree top-down, pruning subtrees whose MBR does not intersect the query region. PostgreSQL implements R-trees via GiST with the PostGIS extension.

Full-Text Inverted Indexes and BM25

A full-text inverted index maps each term \(t\) to its posting list: the sorted list of document IDs (and optionally positions within each document) that contain \(t\). Queries that specify multiple terms intersect posting lists using a merge algorithm identical to sort-merge join.

TF-IDF scores a document \(d\) for query term \(t\) as:

\[ \text{tf-idf}(t,d) = \text{tf}(t,d) \times \log\frac{N}{\text{df}(t)} \]

where \(\text{tf}(t,d)\) is the term frequency (occurrences of \(t\) in \(d\)), \(N\) is the total number of documents, and \(\text{df}(t)\) is the document frequency (number of documents containing \(t\)). Rare terms (low \(\text{df}\)) receive high IDF weight.

BM25 (Robertson and Zaragoza 2009) improves on TF-IDF by accounting for document length normalization and term frequency saturation:

\[ \text{BM25}(t,d) = \text{IDF}(t) \cdot \frac{\text{tf}(t,d) \cdot (k_1+1)}{\text{tf}(t,d) + k_1\bigl(1 - b + b \cdot |d|/\text{avgdl}\bigr)} \]

where \(k_1 \approx 1.2\) controls term frequency saturation, \(b \approx 0.75\) controls length normalization, \(|d|\) is the document length, and \(\text{avgdl}\) is the average document length across the corpus. BM25 is the default ranking function in Elasticsearch and PostgreSQL’s full-text search (ts_rank).

LSM-Trees: LevelDB and RocksDB

Conventional B+-trees perform random writes — a single key insertion may require a random disk seek to the leaf page. For write-intensive workloads (e.g., time-series databases, log ingestion), this random I/O is the bottleneck. The Log-Structured Merge-Tree (LSM-tree) (O’Neil et al. 1996) trades random writes for sequential writes at the cost of additional read I/O and periodic compaction.

LSM-tree write and compaction.

Write: all writes go to an in-memory sorted structure (the MemTable, typically a skip list or red-black tree). When the MemTable reaches a size threshold (e.g., 64 MB), it is flushed to disk as an immutable SSTable (Sorted String Table) file at Level 0 (L0). The flush is a single sequential write — no random I/O.

Compaction: L0 accumulates SSTables. When L0 has too many files, a background compaction merges L0 files into L1, then L1 into L2, and so on. Each compaction at level \(i\) reads all files in the level-\(i\) range being compacted, merges them with the overlapping range of level \(i+1\), and writes the result as new level-\(i+1\) SSTables. At each level, the total size grows by a factor of 10 (a typical amplification factor): L0 ≈ 256 MB, L1 ≈ 2.56 GB, L2 ≈ 25.6 GB, etc.

Read: check the MemTable first (O(\log N) for a skip list). If not found, probe L0 SSTables in recency order (each may require a binary search or Bloom filter check). Then probe L1, L2, etc. In the worst case, a point read checks one SSTable per level — \(O(\log_{\text{amp}} \text{total size})\) SSTables.

Write amplification: each byte written by the application may be written to disk multiple times during compaction. For an amplification factor of 10 per level and \(L\) levels, write amplification is up to \(10 \times L\). RocksDB’s default configuration achieves write amplification of approximately 10–30×. This is acceptable compared to random-write B+-trees, which can achieve effective write amplification of 100× due to random I/O causing cache misses.

LSM-tree Bloom filter optimization. Each SSTable is accompanied by a **Bloom filter**: a compact probabilistic data structure that answers "is key \(k\) definitely not in this SSTable?" with no false negatives and a tunable false-positive rate. With a 10-bit Bloom filter per key and \(k=7\) hash functions, the false-positive rate is approximately 1%. For a point read that misses all levels, the system probes the Bloom filter of each SSTable rather than performing a binary search in the SSTable data — the Bloom filter fit entirely in L2/L3 cache, reducing a potentially disk-bound operation to a cache-resident comparison.

Concrete numbers: a RocksDB instance with 5 levels and 100 SSTables per level would require up to 500 SSTable accesses per point read without Bloom filters. With Bloom filters (1% false positive rate), only \(5 \times 0.01 = 0.05\) SSTables are falsely probed per level, plus one true hit — so the expected number of SSTable data reads for a key present at level 5 is \(4 \times 0.01 + 1 = 1.04\). The Bloom filter reduces read amplification by nearly 100× in this scenario.

LSM-tree vs. B+-tree trade-off summary.
PropertyB+-treeLSM-tree
Write I/O patternRandom (leaf page update)Sequential (MemTable flush + compaction)
Write amplificationLow (1–5×)High (10–30×)
Read amplificationLow (3–5 I/Os for point read)Medium (Bloom filter reduces to 1–2 disk reads)
Space amplificationLow (10–30% overhead)Medium (space for multiple levels + compaction tombstones)
Range scan performanceExcellent (sequential leaf reads)Good (SSTables within a level are non-overlapping; scan merges them)
Write throughputLimited by random I/OVery high (sequential writes)

LSM-trees dominate in write-heavy, append-oriented workloads: Cassandra, HBase, LevelDB, RocksDB, InfluxDB, and TiKV all use LSM-tree storage. B+-trees dominate in read-heavy OLTP: PostgreSQL, MySQL InnoDB, and SQLite use B+-trees. The choice reflects the fundamental tension between write and read optimization in storage-engine design.

Back to top