JP
João Pereira
All reading notes
Cover of Designing Data-Intensive Applications
systemsdistributedarchitecture

Notes on Designing Data-Intensive Applications

Designing Data-Intensive ApplicationsMartin Kleppmann

Chapter 1 - Trade-Offs in Data Systems Architecture

  • Data-intensive applications are bottlenecked by data — amount, complexity, or speed of change — rather than by raw compute
  • Most data-intensive applications are composed of standard building blocks: databases, caches, search indexes, stream processors, batch processors, message queues
  • The boundary between these components is increasingly blurry: Redis can be used as a message queue; Kafka has database-like durability; some databases have built-in full-text search
  • When you combine several tools to serve a request, you create a new, special-purpose data system out of smaller general-purpose components
    • Your application code becomes responsible for keeping those components consistent with each other
    • The performance guarantees you can offer your users are now the composite of all your subsystems' guarantees
  • Every architectural decision involves trade-offs between: reliability, scalability, maintainability, latency, throughput, consistency, cost, and operational complexity
  • There is no single correct architecture for a data system — the right design depends on the workload, team, scale, and failure tolerance required
  • The same problem (e.g., caching) can be solved by many different tools with different characteristics; understanding the trade-offs is the core skill
  • Many systems claim to be "schema-less" but this just moves the schema to the application code — data always has structure, the question is where it's enforced
  • The costs of a design decision compound over time: a wrong choice made early becomes expensive to change as data accumulates and services grow around it

Chapter 2 - Defining Nonfunctional Requirements

  • Three concerns matter the most in software systems: reliability, scalability, and maintainability
  • Reliability: the system continues to work correctly even when things go wrong
    • Fault: a component deviates from its spec; failure: the whole system stops providing service
    • Fault-tolerant systems prevent faults from escalating into failures
    • Counterintuitively, it can make sense to deliberately trigger faults (chaos engineering) to ensure fault-tolerance mechanisms are exercised regularly
    • Hardware faults are independent and random — add redundancy (RAID, dual-power, hot-swap CPUs)
    • Software errors are correlated — a bug can take down many nodes simultaneously; no simple solution, requires careful design, testing, process isolation, and monitoring
    • Human errors are the #1 cause of outages — minimize error opportunities through good abstractions, sandboxes, easy rollback, and monitoring
  • Scalability: the system's ability to handle growth — in data volume, traffic, or complexity
    • Not a binary property; the right question is "if the system grows in this way, how do we cope?"
    • Load parameters describe current load; examples: requests per second, database read/write ratio, concurrent users, cache hit rate
    • Twitter example: home timeline (read-heavy, ~300k reads/sec) vs. posting a tweet (write-heavy but fan-out amplifies it to followers)
      • Approach 1: query all followees' tweets at read time — simple but slow at scale
      • Approach 2: pre-compute each user's home timeline cache on write — fast reads but expensive fan-out for users with millions of followers
      • Twitter uses a hybrid: pre-compute for most users, query at read time for celebrities
    • Performance can be described as throughput (batch) or response time (online); measure with percentiles, not averages
      • p50 = median; p95 = 95th percentile; p99, p999 = tail latencies
      • Tail latency amplification: a request that fans out to 100 backends is as slow as the slowest one
    • Head-of-line blocking: a slow request monopolizes a resource and causes fast requests behind it to appear slow
    • Measure response time on the client side — only the client sees the full picture including network and queuing delays
    • Scaling up (vertical) vs. scaling out (horizontal): most large systems use a mix; purely horizontal stateless services are easiest; stateful systems add complexity
    • Elastic scaling (auto-scaling) vs. manual: manual is simpler and predictable; elastic handles unpredictable load but adds operational complexity
  • Maintainability: making life better for the engineering and operations teams who work on the system over time
    • Operability: make it easy to keep running — good monitoring, automation, documentation, clear behavior under failure, easy config management
    • Simplicity: reduce accidental complexity — complexity not inherent to the problem but from implementation choices; good abstractions hide it
    • Evolvability (extensibility, plasticity): make it easy to change — requirements change constantly; systems that can be adapted without full rewrites are worth more over time

Chapter 3 - Data Models and Query Languages

  • Data models are the most important part of software development — they affect not only how software is written but how we think about the problem
  • Every layer presents a data model to the layer above: hardware (bytes) → OS (files) → database (rows/documents) → application (objects)
  • The three main data models: relational, document, and graph

Relational Model

  • Data is organized into tables (relations) of rows and columns; relationships are represented by foreign keys
  • Schema-on-write: structure is defined at write time; migrations are required to change the schema
  • Excellent support for many-to-one and many-to-many relationships via joins
  • Normalizing data reduces duplication and makes updates consistent (update one place, not many)
  • The object-relational mismatch: application code works with objects; databases work with tables — ORMs reduce but don't eliminate this friction

Document Model

  • Data is stored as self-contained documents (JSON, XML); no schema enforced by the database
  • Schema-on-read: structure is implied and validated by application code at read time, not at write
  • Better locality: retrieving a document fetches all related data in one query (avoids joins for typical access patterns)
  • Good for one-to-many tree-structured data (e.g., a résumé: one person, many jobs, many schools)
  • Poor support for many-to-many relationships; joins must be done in application code
  • A document is usually retrieved and written in its entirety; partial updates require rewriting the whole thing
  • Denormalization is common in document databases — the same data may be duplicated in multiple documents, complicating updates

Graph Model

  • Designed for highly interconnected data where many-to-many relationships are the norm
  • A graph has vertices (nodes) and edges (relationships); both can have properties
  • Property graph model (Neo4j, JanusGraph): vertices have a unique ID, a set of outgoing/incoming edges, and key-value properties; edges have a label, direction, and properties
  • Triple-store model (Datomic, AllegroGraph): stores data as (subject, predicate, object) triples — e.g., (Lucy, age, 33) or (Lucy, marriedTo, Alain)
  • Graph models are extremely flexible for evolving data: adding new types of relationships or new properties requires no schema changes
  • "For highly interconnected data, the document model is awkward, the relational model is acceptable, and graph models are the most natural"

Query Languages

  • Declarative languages (SQL, Cypher, SPARQL, Datalog) specify what you want, not how to get it; the query optimizer decides execution
    • Declarative queries lend themselves to parallelism; you don't prescribe order of operations
  • Imperative queries (MongoDB's original API, Gremlin for graphs) specify exact operations step-by-step
  • CSS and XSL are declarative — a useful reminder that the concept extends beyond databases
  • MapReduce sits between declarative and imperative: the map and reduce functions are pure functions, but you write explicit code
  • Cypher: pattern-matching query language for Neo4j property graphs
  • SPARQL: query language for RDF triple-stores; predates Cypher
  • Datalog: a subset of Prolog; defines rules that can be reused across queries; used in Datomic

Convergence

  • Relational databases now support JSON columns (PostgreSQL, MySQL, SQL Server) — blurring the document/relational line
  • Document databases increasingly support join-like operations (RethinkDB, Couchbase)
  • The practical differences between models are narrowing; pick based on your data access patterns, not dogma

Chapter 4 - Storage and Retrieval

  • Databases do two things: store data and retrieve it; these are more complex than they appear
  • The data structures a database uses internally determine its performance characteristics for your workload
  • Two main families of storage engines: log-structured and page-oriented (B-trees); they have fundamentally different trade-offs

Hash Indexes (Log-Structured)

  • Simplest index: an in-memory hash map mapping each key to the byte offset in an append-only log file
  • All writes are sequential appends — fast on magnetic disk; old values are not overwritten
  • Compaction: periodically merge and compact log segments, keeping only the most recent value for each key
  • Tombstones: mark deleted keys; removed during compaction
  • Limitations: the entire key set must fit in RAM; range queries are inefficient (no ordering)
  • Used by Bitcask (Riak's storage engine)

SSTables and LSM-Trees

  • SSTable (Sorted String Table): like a log segment but keys are sorted — makes merging segments efficient (merge-sort style)
  • Building an SSTable: writes go into an in-memory balanced tree (memtable — a red-black or AVL tree); when full, flush to disk as an SSTable
  • Reads: check memtable → most recent SSTable → next most recent, etc.
  • Crash recovery: maintain a separate unsorted append-only log just for recovery; discard it after each successful memtable flush
  • Bloom filters avoid expensive reads for keys that don't exist
  • Compaction strategies: size-tiered (merge smaller SSTables into larger ones) or leveled (split key range across levels, level L+1 is ~10x larger than level L)
  • LSM-tree (Log-Structured Merge-Tree): the architectural pattern underpinning SSTables
  • Used by: LevelDB, RocksDB, Cassandra, HBase, ScyllaDB, InfluxDB, Lucene (for full-text indexes)
  • Write amplification: each byte written to the database may be written multiple times to disk due to compaction

B-Trees

  • The most widely used index structure; found in almost all relational databases
  • Data organized into fixed-size pages (typically 4 KB), forming a tree; the root page is accessed first for every lookup
  • Each page contains keys and references to child pages; leaf pages contain values (or references to values)
  • Branching factor: typically 500; a 4-level tree of 4 KB pages with branching factor 500 can store 250 TB
  • Updates: find the leaf page, modify the value, write the page back to disk — in-place modification
  • Insertions that overflow a page cause the page to split into two half-full pages; the parent is updated
  • Write-ahead log (WAL / redo log): every modification is recorded to a sequential append-only log before being applied; used for crash recovery
  • B-tree depth: O(log n); most databases need only 3–4 levels deep
  • Optimizations: copy-on-write (instead of WAL, write a new page version; used by LMDB); sibling page pointers for sequential scans; abbreviated keys in interior nodes (B+ trees)

LSM-Trees vs B-Trees

  • LSM-trees are generally faster for writes (sequential writes); B-trees are generally faster for reads
  • LSM-trees use sequential disk I/O and compress better — smaller files on disk
  • B-trees have higher write amplification (write once to WAL, once to page); LSM-trees also have write amplification but often less
  • LSM-tree compaction can interfere with ongoing read/write performance — high percentile latency can spike; this is an operational concern
  • A key may appear in multiple SSTable segments (temporarily); each key exists in exactly one B-tree location — better for transaction isolation
  • B-trees are better suited for strong transactional semantics (locking at exact key location)

Secondary Indexes, Clustered Indexes, Covering Indexes

  • Secondary index: an index on a non-primary key; critical for query performance
  • Clustered index: the actual row data is stored within the index (MySQL InnoDB primary key is always clustered)
  • Heap file: index stores a reference to the actual row stored separately; updates that don't change key can be done in-place in the heap
  • Covering index: stores extra columns alongside the index key — some queries can be served entirely from the index without looking up the actual row
  • Multi-column / concatenated index: appends columns together; only efficient for queries matching the leftmost prefix of the indexed columns
  • Multi-dimensional indexes: needed for geospatial data (e.g., querying by latitude and longitude simultaneously); R-trees used by PostGIS; space-filling curves (like Z-order curves) map multi-dimensional coordinates to a 1D number

In-Memory Databases

  • Disk is used for durability (WAL/snapshots), but performance is achieved by keeping everything in RAM
  • Not faster just because they avoid reading from disk — the OS often caches hot data anyway; the advantage is no overhead of encoding in-memory data structures for disk
  • Examples: VoltDB, MemSQL, Oracle TimesTen (relational); RAMCloud, Redis, Memcached (key-value)
  • Redis and Couchbase offer weak durability by writing to disk asynchronously
  • Anti-caching: for datasets larger than RAM, evict LRU pages to disk on access — allows in-memory databases to work with datasets larger than memory
  • Future: non-volatile memory (NVM/persistent memory) may blur the in-memory/on-disk distinction

OLTP vs OLAP

  • OLTP (Online Transaction Processing): low latency, small number of rows per query, frequent reads/writes by many users
  • OLAP (Online Analytical Processing): high throughput, aggregate queries over large numbers of rows, used by analysts/data scientists
  • Data warehouses: a read-only copy of OLTP data, loaded via ETL (Extract-Transform-Load); kept separate to avoid impacting OLTP performance
  • Star schema: a central fact table (each row is one event — a sale, a click) surrounded by dimension tables (who, what, where, when, how, why)
  • Snowflake schema: dimension tables are further normalized into sub-dimensions; star schemas are usually preferred for simplicity
OLTPOLAP
ReadSmall set of rows by keyAggregate over many rows
WriteRandom-access, low latencyBulk import / event stream
ScaleGBs–TBsTBs–PBs
BottleneckDisk seekDisk bandwidth

Column-Oriented Storage

  • Row-oriented storage: all columns for a row are stored adjacent — efficient for retrieving entire rows
  • Column-oriented storage: all values for a column are stored together — efficient for analytical queries that read a few columns across many rows
  • A query that reads 5 columns from a 100-column table only needs to read 5% of the data
  • Column compression: columns often have low cardinality (few distinct values) — encode with bitmaps; one bitmap per distinct value, one bit per row
  • Run-length encoding: further compress sparse bitmaps; "5 zeros, then 3 ones, then 20 zeros"
  • Vectorized processing: CPU-level optimizations (SIMD instructions) operate on compressed column data directly
  • Sort order: choose a sort key based on common queries; rows in all column files are reordered in the same order; you can maintain multiple sorted copies (like C-Store / Vertica)
  • Cassandra and HBase use "column families" — a group of columns stored together — but rows within each column family are row-oriented; this is not the same as column-oriented storage
  • Writing to column-oriented storage is hard: inserting a row requires updating every column file; use an LSM-tree in-memory buffer and flush to column-sorted files
  • Materialized aggregates and data cubes: pre-compute aggregate values (e.g., sum of sales by product and by date) and cache them in a materialized view; avoids rescanning raw data every query; inflexible because not all query shapes benefit

Chapter 5 - Encoding and Evolution

  • Evolvability: systems need to be able to change over time — new features require new fields, old fields get removed, types change
  • Backward compatibility: newer code can read data written by older code (usually easy — you know the old format)
  • Forward compatibility: older code can read data written by newer code (harder — old code must gracefully ignore unknown new fields)
  • Rolling upgrades (staged rollout): deploy new version to a few nodes at a time; means old and new code run simultaneously — both backward and forward compatibility are needed

Encoding Formats

  • Programs represent data in two forms: in-memory (objects, structs, lists — optimized for CPU access) and byte-sequence (for storage/network — self-contained)
  • Encoding (serialization) = in-memory → bytes; decoding (deserialization/parsing) = bytes → in-memory
  • Language-specific formats (Java Serializable, Python pickle): only usable in one language, poor versioning support, security issues — avoid for long-term storage or cross-service communication
  • JSON, XML, CSV: widely supported but have quirks
    • No distinction between integers and floating-point numbers in JSON
    • XML and CSV can't reliably distinguish strings from numbers
    • No binary data support in JSON (must base64-encode; increases size by 33%)
    • Optional/inconsistent schema support; no enforcement at write time
  • Binary JSON variants (BSON, MessagePack): smaller than JSON but sacrifice human readability; not radically better
  • Thrift (Facebook) and Protocol Buffers (Google): binary encoding using a schema
    • Fields encoded as field tag (integer) + type + value — no field names in the binary, just numbers
    • Forward compatibility: old code ignores unknown field tags
    • Backward compatibility: new fields must be optional; old required fields must keep their tag numbers
    • Thrift has two formats: BinaryProtocol and CompactProtocol (uses variable-length integers, more space-efficient)
  • Avro (Hadoop): schema defined in JSON or IDL; no field tags in the binary
    • Writer's schema and reader's schema can differ; the Avro library resolves them by matching field names
    • Must include writer's schema somewhere: at file start (for files), schema version number (for databases), negotiate schema on connection (for network)
    • Avro is friendlier to dynamically generated schemas (e.g., generated from a relational schema)
    • More compact than Thrift/Protobuf in some cases

Dataflow Patterns

  • Data flows between processes via: databases, service calls (REST/RPC), and asynchronous message passing
  • Through databases: writer encodes, reader decodes; both backward and forward compatibility needed because data outlives code; schema migration must handle old data on disk
  • Through services (REST and RPC):
    • REST: uses HTTP verbs (GET, POST, PUT, DELETE); resources identified by URLs; uses JSON or XML; leverages HTTP features (caching, authentication, content negotiation)
    • SOAP: XML-based; independent of HTTP; described by WSDL; complex but well-tooled
    • RPC (Remote Procedure Call): makes network calls look like local function calls (location transparency) — a fundamentally flawed abstraction
      • Network calls are unpredictable: they may time out, be lost, or execute twice (retries)
      • Local calls return a result or throw an exception; network calls can just… not return
      • Local calls have consistent types; network calls cross language boundaries requiring encoding
    • Modern RPC frameworks (gRPC, Thrift, Finagle) acknowledge these differences and don't pretend to be local; gRPC uses Protocol Buffers and supports streaming
    • For backward/forward compatibility in RPC: if the server is updated before clients, responses need forward compatibility (old client reads new response) and requests need backward compatibility (new server reads old request)
  • Through message brokers (asynchronous):
    • Producer sends to a named queue/topic; broker delivers to one or more consumers
    • Benefits: buffer if consumer is down, automatic redelivery, decouple sender from receiver, fan-out (one message to multiple consumers)
    • Examples: RabbitMQ, ActiveMQ, Apache Kafka, Amazon SQS
    • Message brokers don't enforce schemas — the responsibility is entirely on producers and consumers
    • Distributed actor frameworks (Akka, Orleans, Erlang OTP): the actor model extended across network nodes; each actor has a mailbox of messages; location transparency for actors is more reasonable than for RPC because failures are already a first-class concept

Chapter 6 - Replication

  • Replication: keeping a copy of the same data on multiple nodes connected by a network
  • Reasons to replicate: reduce latency (put data closer to users), increase availability (if one node fails, others continue), increase read throughput (scale reads across replicas)
  • All difficulty in replication comes from handling changes to replicated data

Single-Leader (Leader-Follower) Replication

  • One node is designated the leader (primary/master); all writes go to the leader
  • The leader writes changes to its replication log; followers apply changes in the same order
  • Reads can be served from any replica; follower reads may be stale
  • Synchronous replication: leader waits for follower confirmation before acknowledging write — guarantees up-to-date follower, but one unresponsive follower blocks all writes
  • Semi-synchronous: one follower is synchronous, others are asynchronous — ensures at least one follower is always up to date
  • Asynchronous replication: leader doesn't wait — lower latency, but unreplicated writes are lost if leader fails before replicating
  • Fully asynchronous replication is common despite the durability trade-off; used by many high-throughput systems

Setting Up New Followers

  • Take a snapshot of the leader's database; copy to the new follower; request all changes since the snapshot (using replication log position); catch up

Handling Node Outages

  • Follower failure: follower can catch up from its log once it reconnects
  • Leader failure (failover): detect failure (typically via timeout) → elect a new leader (usually the most up-to-date replica) → reconfigure all clients and other followers to use the new leader
  • Failover problems:
    • Async replication: new leader may not have the latest writes — they are discarded, causing data loss
    • Discarded writes can cause inconsistency with external systems (e.g., auto-increment IDs used by external services)
    • Split brain: two nodes both believe they are the leader — can corrupt data; usually resolved by STONITH (Shoot The Other Node In The Head)
    • What timeout to use for detecting leader failure? Too short = false positives; too long = slow recovery

Replication Log Implementations

  • Statement-based: log the SQL statement; nondeterministic functions (NOW(), RAND(), auto-increment) cause divergence; fragile, generally not used for MySQL since version 5.1
  • Write-ahead log (WAL) shipping: send the same low-level log the storage engine writes; tightly coupled to the storage engine format — prevents zero-downtime upgrades when leader and follower run different storage engine versions; used by PostgreSQL and Oracle
  • Logical (row-based) replication: a separate log format at row granularity (which rows were inserted/updated/deleted); decoupled from storage engine internals; MySQL binlog; easier for change data capture (CDC) to downstream systems
  • Trigger-based replication: application-level, using database triggers to log changes to a separate table; flexible but higher overhead and more failure-prone; used by Bucardo and Slony

Replication Lag Problems

  • Eventual consistency: asynchronous replicas fall behind the leader; the inconsistency is temporary but real
  • Read-your-own-writes consistency: a user should see writes they just submitted (even if replicas haven't caught up)
    • Solutions: read the user's own data from the leader; or track the timestamp of the last write and refuse to read from a replica that hasn't caught up to that point; or use sticky sessions (route the user to the same replica)
    • Cross-device complexity: user writes on mobile, reads on desktop — must coordinate the "last write" timestamp across devices
  • Monotonic reads: user should not see older data after already seeing newer data (e.g., two reads to different replicas at different lag levels)
    • Solution: route each user's reads to the same replica consistently (e.g., hash the user ID to pick a replica)
  • Consistent prefix reads: if writes happen in a causal sequence (A then B), reads should not see B without having seen A
    • Problem occurs in partitioned databases where different partitions lag independently
    • Solution: write causally-related data to the same partition; this is not always efficient

Multi-Leader Replication

  • Multiple nodes accept writes; each leader also acts as a follower to the other leaders
  • Use cases: multi-datacenter (one leader per datacenter), offline clients (CouchDB — each device is a leader), collaborative editing (Google Docs — each user's session is a leader)
  • Performance benefit: writes are processed locally in each datacenter; inter-datacenter replication is asynchronous
  • Considered dangerous in many situations; generally avoided unless the use case clearly demands it
  • The same data can be concurrently modified in two different datacenters → write conflicts
  • Conflict handling:
    • Avoidance: route all writes for a given record through the same datacenter — eliminates conflicts but defeats the purpose during outages
    • Last-write-wins (LWW): attach a timestamp to each write; the write with the highest timestamp wins — causes data loss; prone to clock skew issues
    • Higher replica ID wins: also causes data loss
    • Merge/concatenate: combine both values (works for some data types like sets)
    • Record conflicts: store all conflicting versions and let the user resolve (CouchDB)
    • CRDTs (Conflict-free Replicated Data Types): data structures that automatically merge in a mathematically correct way (counters, sets, maps) — Riak uses these
    • Operational Transformation: algorithm used by collaborative editors (Google Docs) to transform conflicting edits
  • Replication topologies:
    • Circular: each node forwards writes from one neighbor to the next; one broken node interrupts the chain
    • Star: one central node forwards all writes; the central node is a single point of failure
    • All-to-all: every leader replicates to every other leader; most fault-tolerant; causal ordering can still be violated due to network delays — use version vectors to order correctly

Leaderless Replication

  • Any replica can accept writes; no leader; the client sends writes to multiple replicas in parallel (or via a coordinator that enforces no ordering)
  • Examples: Amazon Dynamo (internal), Riak, Cassandra, Voldemort
  • Stale reads: if a node was offline during a write, it may return outdated data; the client reads from multiple nodes and takes the most recent value (using version numbers)
  • Read repair: when a client reads from multiple replicas and sees a stale value, it writes the newer value back to the stale node
  • Anti-entropy: background process that scans replicas and copies missing data between them; no ordering guarantee; may have significant delay
  • Quorum reads/writes: with n replicas, require w write confirmations and r read confirmations where w + r > n
    • Common: n = 3, w = 2, r = 2 — can tolerate one failed node for both reads and writes
    • w = n, r = 1: good for read-heavy workloads; only one node needs to respond for reads
    • w + r > n guarantees at least one node in the read set has the latest write (pigeon-hole principle)
  • Quorum edge cases where stale reads can still occur:
    • Sloppy quorums: write to non-home nodes during an outage; hinted handoff sends data to the home node later — durability but no immediate consistency
    • Concurrent writes where LWW loses a write
    • A write that partially succeeds (some w nodes confirm, others don't)
    • Read/write race on a node

Concurrent Writes and Version Vectors

  • "Two operations are concurrent if neither happens before the other" — physical time doesn't matter, only causal ordering
  • Last-write-wins (LWW): assign timestamps to writes; always keep the highest timestamp; popular (Cassandra uses it exclusively) but causes data loss for truly concurrent writes
  • Happens-before relationships: operation A → B if B knows about A; otherwise they are concurrent
  • Version numbers: a single replica maintains a version number per key, incremented on each write; clients always read the version before writing; on write, include the version number you last read
    • Server can then determine which values to overwrite and which to merge
  • Siblings (Riak): if writes are concurrent, keep both values — "siblings"; the client is responsible for merging; tombstones needed for deletions
  • Version vectors: in a multi-replica system, each replica tracks a version number per replica it has seen; the vector of all these numbers is the version vector; transferred to clients on read, returned on write — enables detection of concurrent writes across replicas

Chapter 7 - Sharding

  • Partitioning (called sharding in MongoDB/ElasticSearch/SolrCloud, regions in HBase, tablets in Bigtable, vnodes in Cassandra/Riak, vBuckets in Couchbase): splitting a large dataset across multiple nodes
  • Main reason: scalability — data and query load too large for a single machine
  • Partitioning is usually combined with replication: each partition is replicated across multiple nodes for fault tolerance
  • Skewed partition: uneven data distribution; hot spot: one partition receives disproportionate load

Partitioning Strategies

  • Partitioning by key range:
    • Assign a continuous range of keys (like encyclopedia volumes: A–D, E–H) to each partition
    • Boundaries can be manually set or auto-adjusted; within each partition, keys are sorted
    • Enables efficient range queries within a partition
    • Risk of hot spots: e.g., sensor data keyed by timestamp routes all today's writes to one partition; fix by prefixing the key with the sensor ID
  • Partitioning by hash of key:
    • Hash function (e.g., MD5) distributes keys uniformly; each partition owns a range of hash values
    • Eliminates hot spots from skewed keys (usually), but destroys sort order — range queries must touch all partitions
    • Cassandra uses a compound primary key: first part is hashed for partitioning, remaining parts form an SSTable sort key within the partition — allows range queries on the sorted columns within a single partition
  • Relieving hot spots for celebrity/viral data: even with hash partitioning, a single key can be hot (e.g., a celebrity's user ID on a write-heavy social network); add a random suffix to the key and split writes across multiple keys — but reads then must read from all these keys and combine

Secondary Indexes

  • Partitioning by document (local indexes):
    • Each partition maintains its own secondary index for documents it holds
    • Reading by secondary index requires scatter-gather: query all partitions, merge results
    • Tail latency amplification: scatter-gather sends queries in parallel, so response time = slowest partition
    • Used by MongoDB, Riak, Cassandra, ElasticSearch, SolrCloud, VoltDB
  • Partitioning by term (global index):
    • A single global index covering all partitions; the index itself is partitioned (by term or hash of term)
    • Reads are faster: query only the partition(s) of the index that contain the relevant terms
    • Writes are more complex: updating a document may require updating the global index in several different partitions
    • Global secondary indexes are usually updated asynchronously — slight staleness is acceptable
    • Used by DynamoDB (global secondary indexes are eventually consistent)

Rebalancing Partitions

  • Why: nodes are added or removed, node failures
  • Bad approach — hash mod N: almost all keys must move when N changes
  • Fixed number of partitions: create many more partitions than nodes (e.g., 1000 partitions for 10 nodes); add a new node by taking a few partitions from each existing node; only partitions (not keys) move
    • Used by Riak, Elasticsearch, Couchbase, Voldemort
    • Choosing the right number of partitions upfront is tricky — too few limits scaling, too many adds overhead
  • Dynamic partitioning: partitions split when they exceed a configured size threshold; split partitions can be moved to other nodes; undersized partitions merge
    • Used by HBase, MongoDB, RethinkDB
    • One concern: empty database starts with one partition, all writes go there until the first split
    • Pre-splitting: configure initial partitions upfront if key range is known
  • Proportional to nodes (Cassandra): fixed number of partitions per node; adding a node creates new partitions by splitting existing ones; partition size grows with data but shrinks when nodes are added

Request Routing (Service Discovery)

  • Three approaches:

    1. Any node: client contacts any node; if the node doesn't own the partition, it forwards the request
    2. Routing tier: a dedicated load balancer or routing tier knows the partition-to-node mapping; all requests go through it
    3. Client-aware: client has the partition map and connects directly to the right node
  • ZooKeeper: many distributed databases (HBase, SolrCloud, Kafka, Helix for LinkedIn) use ZooKeeper to track partition-to-node mappings; routing tiers subscribe to ZooKeeper for updates

  • Cassandra uses gossip protocol: each node knows the cluster state; clients contact any node which forwards appropriately

  • MongoDB has its own config server + mongos router tier

Chapter 8 - Transactions

  • Transactions: a way for the application to group several reads and writes into a logical unit — either all of them succeed (commit) or all fail (abort/rollback)
  • The application can ignore certain classes of errors and concurrency issues because the database handles them
  • Not every application needs transactions; some trade them away for performance or availability
  • ACID: Atomicity, Consistency, Isolation, Durability — the properties that transactions provide (though the definitions are fuzzy and inconsistently applied)

ACID Properties

  • Atomicity: all writes in a transaction either happen or are aborted as a whole; not about concurrency — about being able to safely retry after a fault midway through
  • Consistency: invariants about the data (e.g., accounting records must balance) are always true; this is actually an application-level property — the database can only enforce specific constraints (uniqueness, foreign keys)
  • Isolation: concurrent transactions appear to execute serially — each transaction pretends it's the only one running; serializability is the strongest form
  • Durability: data committed to a transaction is not lost, even after a crash; typically involves write-ahead logs or replication; "perfect durability does not exist" — correlated failures (datacenter fire) can destroy all copies

Isolation Levels

  • Read uncommitted: transactions can read writes that haven't been committed yet — dirty reads; rarely useful
  • Read committed: no dirty reads (only see committed data) and no dirty writes (only overwrite committed data); the default in Oracle, SQL Server, PostgreSQL
    • Row-level read locks would prevent dirty reads, but this degrades read performance; instead, databases keep both old and new values and give readers the old committed value
    • Does not prevent read skew (nonrepeatable read): reading the same row twice in a transaction can yield different values if another transaction commits between the two reads
  • Snapshot isolation (repeatable read): each transaction sees a consistent snapshot of the database as of the start of the transaction
    • Implemented via Multi-Version Concurrency Control (MVCC): each row has a creation transaction ID and an optional deletion transaction ID; a transaction ignores rows created after it started and rows deleted before it started
    • "Readers never block writers, and writers never block readers"
    • Essential for long-running queries (backups, analytics) that must not see inconsistent data mid-query
    • Called "serializable" in Oracle, "repeatable read" in MySQL/PostgreSQL — the terminology is inconsistent and confusing; the SQL standard definitions are also flawed

Write Anomalies

  • Dirty writes: overwriting a write that hasn't committed yet; almost all databases prevent this with row-level locks
  • Lost updates: two transactions read-modify-write the same value; one's update is silently overwritten
    • Prevention: atomic operations (UPDATE counter SET val = val + 1), explicit locks (SELECT FOR UPDATE), automatic lost-update detection (PostgreSQL detects and retries), compare-and-set
  • Write skew: each of two concurrent transactions reads a set of objects, and each writes based on what it read — but the writes to different objects violate an invariant
    • Example: two doctors both see that two other doctors are on call; each removes themselves, leaving zero doctors on call
    • Phantom: the effect of a write in one transaction changes the search results of a query in another transaction
    • Prevention requires serializability or materializing conflicts (creating explicit lock rows for things that don't yet exist)
  • Serializable isolation is the strongest guarantee — it ensures that the outcome is equivalent to some serial execution

Implementing Serializability

  • Actual serial execution (single-threaded): execute one transaction at a time, in order; surprisingly viable if transactions are short and the active dataset fits in RAM
    • VoltDB, Redis, Datomic use this approach
    • Stored procedures: instead of interactive multi-round-trip transactions, submit the entire transaction as a stored procedure — eliminates network latency within a transaction
    • Limitation: throughput is limited to a single CPU core; partitioning lets different CPU cores handle different partitions, but cross-partition transactions are expensive
  • Two-phase locking (2PL): historically the standard approach for about 30 years
    • Shared lock for reads (multiple readers OK); exclusive lock for writes (blocks all others)
    • Phase 1: acquire locks; Phase 2: release locks at commit/abort — locks are held for the full duration
    • Deadlocks: transaction A holds lock that B needs and vice versa — the database detects and aborts one of them
    • Predicate locks: lock all rows matching a WHERE condition, even rows that don't yet exist — prevents phantom reads
    • Index-range locks: coarser-grained version of predicate locks using an index entry — more efficient in practice
    • Performance: significantly lower throughput than weaker isolation; high latency; deadlocks add overhead
  • Serializable Snapshot Isolation (SSI): introduced in 2008; optimistic concurrency control
    • Allow transactions to proceed under snapshot isolation; track which reads may have been outdated by concurrent writes; at commit time, abort transactions that executed based on stale data
    • Better performance than 2PL under low contention; degrades gracefully under high contention
    • Compared to serial execution: allows multiple CPU cores; doesn't require all data in RAM
    • Used in FoundationDB and PostgreSQL's serializable mode

Chapter 9 - The Trouble with Distributed Systems

  • Writing software that runs on a single machine is fundamentally different from writing software for a distributed system
  • The fundamental difference: in a single process, a fault typically causes a total failure (crash); in a distributed system, some components fail while others continue — partial failures are nondeterministic and unpredictable
  • The goal: build reliable systems from unreliable components (like TCP being reliable on top of unreliable IP)

Unreliable Networks

  • Distributed systems use asynchronous packet networks — no guarantee on delay, ordering, or delivery
  • Possible outcomes of sending a message: the request was lost, the request is queued, the remote node failed, the remote node is slow, the response was lost, the response is queued
  • You cannot tell the difference from the sender's perspective — a timeout does not tell you whether the request was received or not
  • Detecting node failures: TCP connection refused (quick); timeout (slow); OS notification; network switch monitoring (if you have access); external monitoring services
  • Timeout choice: too short → false positives, premature failover, cascading load; too long → slow recovery; some systems use adaptive timeouts based on observed round-trip times
  • Network partitions: some links between nodes are interrupted while the nodes themselves are fine — the system appears split
  • Unbounded delays: network queues can grow without bound; routers, OS network buffers, VMM hypervisors, TCP congestion control — all add variable delay
  • "In a system with thousands of nodes, something is always broken"
  • Human error is the primary cause of network outages, not hardware failure

Unreliable Clocks

  • Each node has its own clock (quartz oscillator); clocks drift over time; NTP synchronizes them but imperfectly
  • Two kinds of clocks:
    • Time-of-day clock: wall clock time (milliseconds since epoch); can jump backward (NTP adjustment, leap seconds); NTP accuracy is ~35ms over the internet; not safe to use for ordering events or measuring elapsed time
    • Monotonic clock: guaranteed to move forward; suitable for measuring elapsed time on a single node; absolute value is meaningless; cannot be compared across nodes
  • Clock skew issues:
    • Last-Write-Wins using timestamps: if clocks are skewed, a "later" write can have an "earlier" timestamp and get silently discarded
    • Google assumes 6ms drift between NTP syncs (every 30 seconds) or 17 seconds if synced once a day
    • Leap seconds: a minute has 59 or 61 seconds; systems have crashed and lost data due to leap second handling
    • Virtual machine clocks: may jump during live migration or when the VM is paused
  • Logical clocks are safer for ordering events — they measure relative event ordering rather than physical time
  • Google Spanner TrueTime API: returns the current time as an interval [earliest, latest] with a confidence bound; Spanner waits for the interval to pass before committing a transaction to guarantee causality — requires GPS receivers and atomic clocks in every datacenter

Process Pauses

  • GC pauses: a "stop-the-world" GC can pause a process for minutes
  • A process may be preempted by the OS scheduler at any time and not run for an arbitrary duration
  • A node can be declared dead by others while it is paused, then resume and act as if it still holds locks/leases it no longer holds
  • "A node in a distributed system must assume that its execution can be paused for a significant length of time at any point, even in the middle of a function"
  • There is no safe way for a process to verify that it hasn't been paused mid-operation

Fencing Tokens

  • Distributed locks and leases have expiry times — a paused process may resume after its lock has expired and another node has taken the lock
  • Solution: fencing tokens — a monotonically increasing number issued by the lock server with each lock grant; the storage server rejects any write with a token older than the last one it's seen
  • This requires the storage server to actively check tokens, not just the lock holder to behave correctly

Byzantine Faults

  • Crash-stop / crash-recovery fault: a node fails silently or restarts; it doesn't lie
  • Byzantine fault: a node sends incorrect or contradictory messages — may be due to a bug or malicious behavior
  • Byzantine-fault-tolerant algorithms exist but are complex; most distributed database systems assume non-Byzantine faults (nodes are honest but may crash or be slow)
  • Relevant for: blockchains (untrusted peers), aerospace (cosmic ray bit flips), multi-organization systems
  • Input validation and sanitization: a weaker defense against byzantine-like behavior from external actors

System Models

  • Timing assumptions: synchronous (bounded delays — unrealistic), partially synchronous (usually bounded, occasionally not — most realistic), asynchronous (no timing assumptions — very restrictive)
  • Node failures: crash-stop (fail permanently), crash-recovery (fail and restart), byzantine (arbitrary behavior)
  • The partially synchronous + crash-recovery model best matches real-world distributed systems
  • Safety vs. liveness:
    • Safety: "nothing bad happens" — a safety violation is permanent; can be checked at any point in time
    • Liveness: "something good eventually happens" — may not hold at a given instant but will eventually
    • Distributed algorithms must guarantee safety always and liveness only when things go well enough (e.g., network eventually recovers)

Chapter 10 - Consistency and Consensus

Linearizability

  • Also called: atomic consistency, strong consistency, immediate consistency, external consistency
  • Informal definition: make a replicated system appear as if there were only one copy of the data and all operations are atomic
  • Any read must return the most recent write — once a client reads a new value, no subsequent read from any client should return the old value
  • Linearizability is a recency guarantee; serializability is a transaction isolation guarantee — they are different concepts that can be confused
    • A system can be serializable but not linearizable (e.g., using snapshot isolation for transactions)
    • A system can be linearizable but not serializable (single-object compare-and-swap)
    • "Strict serializability" = serializable + linearizable
  • Replication and linearizability:
    • Single-leader: linearizable if reads go to the leader or synchronously replicated followers
    • Multi-leader: not linearizable (concurrent writes to different leaders)
    • Leaderless: not linearizable despite quorums — concurrent operations and network delays can still yield stale reads
    • Consensus algorithms (ZooKeeper, etcd): linearizable
  • Use cases: distributed locks and leader election (require exactly one winner), uniqueness constraints (only one user can claim a username), cross-channel ordering (write file to storage, then notify processing queue — must be linearizable to guarantee the processor finds the file)
  • Cost: "Linearizability is slow, and this is true all the time, not just during a network fault" — the CAP theorem says you can't have both linearizability and availability during a network partition

CAP Theorem

  • Consistency (linearizability), Availability, Partition tolerance — pick two; but this is a misleading framing
  • In practice: partition tolerance is not optional (networks do partition); the real choice is between linearizability and availability during a partition
  • CAP theorem does not say much about latency; "PACELC" is a more nuanced framing (latency vs consistency even without partitions)
  • "CAP is sometimes presented as Consistency, Availability, Partition tolerance: pick 2 out of 3. Unfortunately this way of phrasing it is misleading"

Causal Consistency

  • Causality imposes a partial ordering on events (A happened before B if B depends on A)
  • Causal consistency: all operations that are causally related are seen in the same order by all nodes; concurrent operations may be seen in any order
  • "Causal consistency is the strongest possible consistency model that does not slow down due to network delays and remains available in the face of network failures" — it's a sweet spot
  • Causal consistency is weaker than linearizability but stronger than eventual consistency

Lamport Timestamps

  • (counter, nodeID) pairs; nodes increment counter before each operation; max rule on receiving a message: counter = max(local, received) + 1
  • Provide a total ordering consistent with causality: if A happened before B, A's timestamp is less than B's
  • Insufficient alone for uniqueness constraints: to enforce that only one user registers a given username, you need to know that you've seen all requests from all nodes — Lamport timestamps don't tell you when you have complete information

Total Order Broadcast

  • Also called atomic broadcast
  • Two guarantees: reliable delivery (if message delivered to one correct node, it will be delivered to all correct nodes) and total order (all nodes deliver messages in the same order)
  • This is stronger than causal consistency (total order implies causality) but equivalent to consensus
  • Applications: database replication (all nodes apply writes in the same order → state machine replication), serializable transactions, distributed locks, consistent log (Kafka, ZooKeeper)
  • Linearizable compare-and-set can implement total order broadcast; total order broadcast can implement linearizable compare-and-set — these are equivalent to consensus

Two-Phase Commit (2PC)

  • Protocol for achieving atomic commit across multiple nodes (all commit or all abort)
  • Coordinator (transaction manager) + participants (databases/services)
  • Phase 1 (prepare): coordinator sends "prepare" to all participants; participants respond yes (promise to commit if asked) or no
  • Phase 2 (commit/abort): if all participants said yes, coordinator writes "commit" to its log and sends commit; if any said no, coordinator sends abort
  • The coordinator's write to disk is the "point of no return" — once it records commit, it must follow through even if participants crash
  • Problem: if the coordinator crashes after recording commit but before sending to all participants, those participants are stuck in an "in doubt" state waiting for a decision they can never independently make — the system is blocked
  • 2PC is sometimes called a "blocking atomic commit protocol" — manual intervention required if the coordinator fails and can't recover
  • Three-phase commit (3PC) was designed to avoid blocking but assumes bounded network delays — not safe in asynchronous networks
  • XA transactions: a standard C API for 2PC across different databases and message brokers; the transaction manager is typically a library in the application process — makes the application the coordinator, which is problematic (application crash = coordinator crash)

Consensus Algorithms (Paxos, Raft, Zab)

  • Problem: get several nodes to agree on a value despite faults
  • Properties a consensus algorithm must satisfy: uniform agreement (no two nodes decide differently), integrity (decided value was proposed by some node), validity (can't just decide an arbitrary value), termination (every non-crashed node eventually decides)
  • Single-decree Paxos: agree on one value; Multi-Paxos / Raft: extend to a log of values (total order broadcast)
  • Epoch/term numbers: each round of leader election increments the epoch; the leader checks that no higher-epoch election has occurred before proceeding — this prevents conflicting decisions from different epochs
  • Raft: a more understandable consensus algorithm; used in etcd, CockroachDB, TiKV
  • ZooKeeper uses Zab (ZooKeeper Atomic Broadcast), similar to Paxos; ZooKeeper provides: linearizable compare-and-set, total order broadcast, distributed lock, leader election, service discovery
  • Consensus limitations: requires a fixed majority of nodes (quorum) to make progress; can't scale to large numbers of nodes; leader changes are slow; sensitive to network timeouts
  • Applications of consensus or equivalent: linearizable compare-and-set registers, atomic transaction commits, total order broadcasts, locks and leases, membership/coordination services, uniqueness constraints

Chapter 11 - Batch Processing

  • Three paradigms of data processing:

    1. Online systems (services): wait for requests, process one at a time, return response; optimize for response time
    2. Batch processing systems: take a large amount of input data, run a job over it, produce output; optimize for throughput; job takes minutes to days
    3. Stream processing (near-real-time): like batch but on unbounded datasets, runs continuously
  • Unix philosophy: each tool does one thing well; programs communicate via stdin/stdout; compose via pipes — this philosophy was the inspiration for MapReduce

  • Unix pipeline: cat /var/log/nginx/access.log | awk '{print $7}' | sort | uniq -c | sort -r -n | head -5 — this is a simple batch processing chain

MapReduce

  • Programming model for large-scale parallel data processing on clusters of commodity hardware
  • Mapper: called once for each input record; outputs key-value pairs
  • Reducer: receives all values for a given key; aggregates them
  • MapReduce framework handles: partitioning mapper output by key, sorting, shuffling data to the right reducers, fault tolerance (re-run failed tasks), distributed filesystem reads/writes
  • Hadoop HDFS: distributed filesystem; files replicated across multiple nodes; block size typically 128 MB; computation is moved near the data to minimize network I/O
  • Each MapReduce job reads from and writes to HDFS — materialization of intermediate state to disk; this enables fault recovery but is slower than in-memory processing

Join Strategies in MapReduce

  • Reduce-side join (sort-merge join):
    • Both datasets are processed by mappers that emit the join key
    • The reducer receives all records for a given join key from both datasets and merges them
    • Hot key problem: a single key with millions of records overwhelms one reducer — fix with Pig's skew joins: sample to find hot keys, distribute hot-key records across multiple reducers, send the other dataset to all those reducers
  • Map-side joins (faster — no reducer needed):
    • Broadcast hash join: if one dataset is small enough to fit in memory, load it into a hash table in every mapper; then scan the large dataset
    • Partitioned hash join: if both datasets are partitioned by the same key and the same number of partitions, each mapper handles one partition of each dataset
    • Map-side merge join: if both datasets are partitioned by the same key and sorted, mappers can merge-sort them without a reducer
  • Output of batch jobs: write results to databases (for application queries), or to files in HDFS (for further batch processing), or to search indexes

MapReduce vs. MPP Databases

  • MPP (Massively Parallel Processing) databases (Teradata, Vertica, Aster Data, Greenplum): SQL queries distributed across many nodes; usually require structured data and a schema
  • MapReduce: processes arbitrary data including unstructured, semi-structured, messy files; schema-on-read; more flexible
  • "Making data available quickly — even in a quirky, difficult-to-use format — is more valuable than trying to decide on the ideal data model up front"
  • The Hadoop ecosystem blurs the line: Hive (SQL on HDFS), Pig (procedural), SparkSQL, Presto

Beyond MapReduce: Dataflow Engines

  • Spark, Tez, Flink: generalize MapReduce to support arbitrary data flow graphs (DAGs of operators)
  • Advantages over MapReduce:
    • No need to write intermediate results to HDFS between every stage — keep in memory or local disk when possible
    • Sorting is only required when it is actually needed (not forced at every stage boundary)
    • No unnecessary map tasks (combinable with the upstream reduce)
    • Better scheduler locality optimization — run the computation on the node that has the data
    • Pipelining: downstream operators can start as soon as upstream starts producing output
    • JVM reuse: Spark keeps the JVM alive across tasks on the same worker (MapReduce starts a new JVM per task)
  • Fault tolerance: Spark uses RDD lineage — remember how each partition was computed and recompute from the last checkpoint if a partition is lost; Flink uses checkpoints with barriers in the data stream
  • "Operators in dataflow engines are more flexible than the strict map-reduce pattern"

Graph and Iterative Processing

  • Graph algorithms (PageRank, shortest path, community detection) are inherently iterative — compute, check convergence, repeat
  • Bulk Synchronous Parallel (BSP): process all vertices in parallel, send messages along edges, synchronize at the end of each iteration (superstep)
  • Pregel model (Apache Giraph, GraphX in Spark): each vertex has a function called once per superstep; can send messages to other vertices; vertices vote to halt when they have no more work
  • Fault tolerance: checkpoint vertex state to disk after each superstep
  • Graph algorithms in MapReduce are possible but inefficient — many stages and lots of HDFS I/O

Chapter 12 - Stream Processing

  • Stream processing: like batch processing but on unbounded (never-ending) data streams; events are processed shortly after they happen
  • An event is a small, immutable, self-contained record of something that happened; events have a timestamp indicating when the event happened (from the source) or when it was processed
  • Stream vs. batch: in batch processing the dataset is bounded (has a beginning and end); in stream processing the dataset is unbounded and queries run continuously

Messaging Systems

  • The producer sends a message containing the event; the consumer processes it

  • Two key design questions:

    1. What happens if producers send messages faster than consumers can process? Options: drop messages, buffer in a queue (risk: what if the queue fills up or is lost?), apply backpressure (block the producer)
    2. What happens if nodes crash or go offline? Options: lose the message, persist to disk, replicate to multiple nodes
  • Direct messaging (no broker): UDP multicast (StatsD), ZeroMQ, HTTP webhooks — low latency but limited reliability; application code must handle dropped messages

  • Message brokers (queues): JMS/AMQP standard; load balance messages across consumers; redelivery on crash; inherently cause out-of-order processing when combined with redelivery

  • Log-based message brokers (Kafka, Kinesis, Pulsar):

    • Messages written to an append-only log partitioned across nodes
    • Each consumer tracks its own offset in the log — no message deletion on consumption
    • Consumers can re-read old messages by resetting the offset
    • Ordering within a partition is guaranteed; across partitions it is not
    • Throughput is very high (sequential disk writes)
    • Consumer groups: all consumers in a group jointly consume all partitions of a topic; adding consumers to a group increases parallelism (up to the number of partitions)
    • Log compaction: broker retains only the most recent value for each key, discarding older updates — makes the log act like a snapshot of a key-value store

Change Data Capture (CDC)

  • The problem of dual writes: if you write to a database and then to an index/cache, one of the two writes might fail — your systems diverge
  • CDC: capture all writes to the primary database as a stream of change events; derive all other systems (search indexes, caches, analytics) from this stream
  • Initial load: take a snapshot of the database + the log offset of that snapshot; replay the log from that offset
  • Kafka log compaction can act as the initial snapshot: a consumer starting from the beginning of the topic gets all current values without needing a separate snapshot
  • CDC tools: Debezium (reads Postgres/MySQL WAL), Maxwell, Bottled Water, LinkedIn Databus
  • The primary database is the "source of truth"; all derived systems are secondary — this establishes a clear data flow

Event Sourcing

  • Store all changes to application state as a sequence of immutable events, rather than updating a mutable table
  • Different from CDC: CDC extracts the log from a database that was designed for mutable state; event sourcing is a design choice from the start
  • Benefits: strong audit trail; easier debugging (replay history); protection against application bugs (you can fix a bug and replay events); easier to derive multiple views from the same event log
  • Challenges: querying current state requires replaying all events or maintaining snapshots; event schemas must be carefully versioned; only forward-compatible changes are possible

Processing Streams

  • Transformation: convert one stream into another (e.g., map/filter events)
  • Joining streams:
    • Stream-stream join (windowed join): match events from two streams that occurred within the same time window
    • Stream-table join (enrichment): enrich each stream event by looking up a value in a database table; the table is loaded into the stream processor's local state and kept up to date via CDC
    • Table-table join (materialized view maintenance): maintain a materialized view of two tables by joining their CDC streams
  • Time-dependent joins: the "correct" answer depends on the version of the table at the time of the event, not the current version — ordering is non-deterministic across partitions

Windowing

  • Tumbling window: fixed duration, non-overlapping (e.g., events from 12:00–12:01)
  • Hopping window: fixed duration, overlapping at regular intervals (e.g., 5-minute windows every minute)
  • Sliding window: contains all events within a fixed duration of each other (e.g., last 5 minutes of events relative to each event)
  • Session window: no fixed duration; groups events with no gap greater than a configured inactivity threshold

Handling Late Events

  • Event time vs. processing time: network delays and clock skew mean events arrive out of order; the event's actual timestamp differs from when the stream processor receives it
  • Three timestamp options: device clock when event occurred, device clock when event was sent, server clock when received; the third is easiest but misses the true event time
  • Estimating offset: server receipt time minus device send time gives a rough clock correction
  • Watermarks: a mechanism to declare "I have now seen all events with a timestamp less than X"; events arriving after the watermark are "stragglers"
  • Straggler handling: ignore them (drop late events), or publish a correction for the affected time window
  • Alert if straggler rate exceeds threshold — indicates upstream problems

Chapter 13 - A Philosophy of Streaming Systems

  • Streaming systems represent a philosophical shift: rather than treating data as tables to be queried, treat all data as events (a stream of facts), and derive tables/views from those streams
  • The duality of streams and tables: a table is a snapshot of the current state; a stream is a changelog; you can always convert between them (stream → table by replaying; table → stream via CDC)
  • Lambda architecture: run a batch layer (Hadoop) and a stream layer (Storm) in parallel; batch layer provides correct but slow views; stream layer provides fast but approximate views; query layer merges them
    • The problem with lambda architecture: you must maintain the same logic in two different systems; keeping them in sync is operationally complex
  • Kappa architecture: use a single stream processing system for both real-time and historical reprocessing; retain the raw event log long enough to replay; reprocess when you need to fix bugs or update logic
  • "The batch/stream divide is artificial" — the same data processing logic should work on bounded and unbounded datasets; modern systems (Flink, Spark Structured Streaming, Apache Beam) support this

Unbundling Databases

  • Traditional databases bundle everything together: storage, transactions, replication, indexing, query optimizer, caching
  • The trend toward "unbundling" the database: use a combination of specialized tools (Kafka for the write-ahead log, Elasticsearch for search, Redis for caching, Flink for stream processing) connected via event streams
  • The advantage: each tool is optimized for its specialty; you compose the best tools
  • The challenge: you lose the cross-component transactions and consistency guarantees that monolithic databases provide
  • Goal: replicate the Unix philosophy at the distributed systems level — composable tools that do one thing well, connected by streams instead of pipes
  • Comparison: "mysql | elasticsearch" — if such a pipe existed (which it doesn't, natively), it would be what CDC + Kafka achieves

Correctness in Unbundled Systems

  • ACID transactions in a single database provide strong correctness guarantees; unbundled systems need a different approach
  • Exactly-once semantics: process each message exactly once, not zero times (at-most-once) or more than once (at-least-once)
    • At-least-once + idempotence = effectively exactly-once; idempotent operations can be safely retried
    • Atomic commit across heterogeneous systems: requires two-phase commit; XA transactions are available but have poor performance and limited fault tolerance
  • Idempotency: design writes so that applying the same operation multiple times has the same effect as applying it once; include unique message/event IDs; consumers track processed IDs
  • End-to-end argument: correctness must be ensured at the application level, not just the infrastructure level; a database with transactions can still produce wrong answers if the application logic is wrong
  • Fault-tolerant stream processing approaches:
    • Micro-batching (Spark Streaming): divide the stream into small fixed-time batches; process each as a mini-batch job
    • Checkpointing (Flink): periodically save processing state to durable storage; restart from the last checkpoint after failure
    • Transactional producers/consumers (Kafka): Kafka supports transactions for exactly-once delivery within Kafka; Google Dataflow and VoltDB extend this to external systems

Dataflow and Reactive Systems

  • Dataflow: express computation as a directed graph of transformations on data; when input data changes, the outputs update automatically (like a spreadsheet)
  • Differential dataflow (Frank McSherry): efficiently maintain the output of a dataflow computation as inputs change — compute incremental updates rather than recomputing from scratch
  • Reactive programming / Rx: express event processing as composable operations on event streams in application code
  • The tension: stream processors provide strong fault tolerance and distributed execution but are complex; reactive frameworks in application code are simpler but limited to a single process
  • The future direction: unified frameworks that express both real-time stream processing and historical batch reprocessing with the same code, while providing exactly-once semantics

Immutability and Audit Logs

  • Immutable event logs are inherently auditable — you can reconstruct exactly what happened and when
  • Mutable databases lose history; the current state is the only truth
  • Event sourcing + immutable logs provide "audit by design" — valuable in finance, healthcare, compliance
  • The past is immutable; only the future is uncertain — this is why append-only logs are so powerful
  • Derived views are not the truth; the event log is the truth; derived views can be discarded and rebuilt

Chapter 14 - Doing the Right Thing

  • Data systems are not neutral — the choice of what data to collect, how to process it, and what decisions to make with it have real consequences for real people
  • "The world will be what we build" — engineers building data systems bear responsibility for their effects

Predictive Analytics and Discrimination

  • Machine learning and predictive analytics decisions can systematically disadvantage certain groups even without explicit intent
  • Feedback loops: if a system predicts someone is high-risk and treats them accordingly, it may create the very outcome it predicted; the prediction becomes self-fulfilling
  • Proxy discrimination: a model trained on historical data absorbs existing biases; even race-neutral features can serve as proxies for race (zip code, name, school attended)
  • Lack of transparency: "black box" models make it impossible to challenge a decision; affected people may not even know a model was used
  • Accountability gap: when things go wrong, responsibility is diffuse — the data vendor, the model builder, the company deploying it, the regulator all point at each other

Privacy and Surveillance

  • Behavioral data collection is pervasive; people often have no practical ability to opt out
  • The aggregation problem: individually innocuous data points (location at 8am daily, pharmacy visits, political donations) can be combined to reveal sensitive information (health conditions, political views, relationship status)
  • Data collected for one purpose is routinely used for another; intentions at collection time don't constrain future use
  • Data brokers: entire industries exist to collect, aggregate, and sell personal data with little regulatory oversight in many jurisdictions
  • Consent is often illusory: long privacy policies nobody reads, take-it-or-leave-it terms, no meaningful alternative
  • Data retention: data kept indefinitely becomes a liability; breaches expose it; legal processes compel its disclosure; purpose limitation and deletion are important privacy controls

Ethical Framework

  • The fact that something is technically possible does not mean it should be done
  • Five questions to ask: Who benefits? Who bears the costs? Are those groups the same? Could this data be used to harm? What are the second-order effects?
  • Power asymmetry: data systems are built by organizations with power; they are often used on individuals with less power; this asymmetry requires extra ethical care
  • Automation bias: people over-trust automated decisions and under-scrutinize them; human-in-the-loop is not a fix if the human just rubber-stamps the algorithm
  • Opacity and explainability: affected individuals have a legitimate interest in understanding why a decision was made; the right to explanation (GDPR Article 22) is increasingly recognized
  • "The goal of data engineering should be to build systems that empower people, not systems that exploit them"

Legislation and Self-Regulation

  • GDPR (EU), CCPA (California) and similar regulations: right to access, right to erasure, data portability, privacy by design
  • Regulation is necessary but insufficient: laws lag behind technology; enforcement is inconsistent; global data flows complicate jurisdiction
  • Engineers must not wait for regulation to act ethically; self-regulation through professional norms and company practices matters
  • Checklist-style compliance is not ethics; the question is whether the system causes harm, not whether it technically complies with a law

Long-Term Effects of Data Systems

  • Systems outlive the intentions of their designers; a system built for benign purposes may be repurposed maliciously
  • Data collected today may be used in political climates very different from today's; minority groups that are not persecuted today may be persecuted in the future using the data you are collecting now
  • Minimizing data collection is a form of harm reduction — data you don't have can't be breached, subpoenaed, or misused
  • "If you are asked to build a system that would help find people who meet certain criteria — even seemingly benign criteria — think carefully about what could happen if criteria change"
  • The responsibility of engineers: build systems that can be shut down, that forget, that preserve human dignity, that are auditable, and that give people meaningful control over their own data