Chapter 1 - Trade-Offs in Data Systems Architecture
- Data-intensive applications are bottlenecked by data — amount, complexity, or speed of change — rather than by raw compute
- Most data-intensive applications are composed of standard building blocks: databases, caches, search indexes, stream processors, batch processors, message queues
- The boundary between these components is increasingly blurry: Redis can be used as a message queue; Kafka has database-like durability; some databases have built-in full-text search
- When you combine several tools to serve a request, you create a new, special-purpose data system out of smaller general-purpose components
- Your application code becomes responsible for keeping those components consistent with each other
- The performance guarantees you can offer your users are now the composite of all your subsystems' guarantees
- Every architectural decision involves trade-offs between: reliability, scalability, maintainability, latency, throughput, consistency, cost, and operational complexity
- There is no single correct architecture for a data system — the right design depends on the workload, team, scale, and failure tolerance required
- The same problem (e.g., caching) can be solved by many different tools with different characteristics; understanding the trade-offs is the core skill
- Many systems claim to be "schema-less" but this just moves the schema to the application code — data always has structure, the question is where it's enforced
- The costs of a design decision compound over time: a wrong choice made early becomes expensive to change as data accumulates and services grow around it
Chapter 2 - Defining Nonfunctional Requirements
- Three concerns matter the most in software systems: reliability, scalability, and maintainability
- Reliability: the system continues to work correctly even when things go wrong
- Fault: a component deviates from its spec; failure: the whole system stops providing service
- Fault-tolerant systems prevent faults from escalating into failures
- Counterintuitively, it can make sense to deliberately trigger faults (chaos engineering) to ensure fault-tolerance mechanisms are exercised regularly
- Hardware faults are independent and random — add redundancy (RAID, dual-power, hot-swap CPUs)
- Software errors are correlated — a bug can take down many nodes simultaneously; no simple solution, requires careful design, testing, process isolation, and monitoring
- Human errors are the #1 cause of outages — minimize error opportunities through good abstractions, sandboxes, easy rollback, and monitoring
- Scalability: the system's ability to handle growth — in data volume, traffic, or complexity
- Not a binary property; the right question is "if the system grows in this way, how do we cope?"
- Load parameters describe current load; examples: requests per second, database read/write ratio, concurrent users, cache hit rate
- Twitter example: home timeline (read-heavy, ~300k reads/sec) vs. posting a tweet (write-heavy but fan-out amplifies it to followers)
- Approach 1: query all followees' tweets at read time — simple but slow at scale
- Approach 2: pre-compute each user's home timeline cache on write — fast reads but expensive fan-out for users with millions of followers
- Twitter uses a hybrid: pre-compute for most users, query at read time for celebrities
- Performance can be described as throughput (batch) or response time (online); measure with percentiles, not averages
- p50 = median; p95 = 95th percentile; p99, p999 = tail latencies
- Tail latency amplification: a request that fans out to 100 backends is as slow as the slowest one
- Head-of-line blocking: a slow request monopolizes a resource and causes fast requests behind it to appear slow
- Measure response time on the client side — only the client sees the full picture including network and queuing delays
- Scaling up (vertical) vs. scaling out (horizontal): most large systems use a mix; purely horizontal stateless services are easiest; stateful systems add complexity
- Elastic scaling (auto-scaling) vs. manual: manual is simpler and predictable; elastic handles unpredictable load but adds operational complexity
- Maintainability: making life better for the engineering and operations teams who work on the system over time
- Operability: make it easy to keep running — good monitoring, automation, documentation, clear behavior under failure, easy config management
- Simplicity: reduce accidental complexity — complexity not inherent to the problem but from implementation choices; good abstractions hide it
- Evolvability (extensibility, plasticity): make it easy to change — requirements change constantly; systems that can be adapted without full rewrites are worth more over time
Chapter 3 - Data Models and Query Languages
- Data models are the most important part of software development — they affect not only how software is written but how we think about the problem
- Every layer presents a data model to the layer above: hardware (bytes) → OS (files) → database (rows/documents) → application (objects)
- The three main data models: relational, document, and graph
Relational Model
- Data is organized into tables (relations) of rows and columns; relationships are represented by foreign keys
- Schema-on-write: structure is defined at write time; migrations are required to change the schema
- Excellent support for many-to-one and many-to-many relationships via joins
- Normalizing data reduces duplication and makes updates consistent (update one place, not many)
- The object-relational mismatch: application code works with objects; databases work with tables — ORMs reduce but don't eliminate this friction
Document Model
- Data is stored as self-contained documents (JSON, XML); no schema enforced by the database
- Schema-on-read: structure is implied and validated by application code at read time, not at write
- Better locality: retrieving a document fetches all related data in one query (avoids joins for typical access patterns)
- Good for one-to-many tree-structured data (e.g., a résumé: one person, many jobs, many schools)
- Poor support for many-to-many relationships; joins must be done in application code
- A document is usually retrieved and written in its entirety; partial updates require rewriting the whole thing
- Denormalization is common in document databases — the same data may be duplicated in multiple documents, complicating updates
Graph Model
- Designed for highly interconnected data where many-to-many relationships are the norm
- A graph has vertices (nodes) and edges (relationships); both can have properties
- Property graph model (Neo4j, JanusGraph): vertices have a unique ID, a set of outgoing/incoming edges, and key-value properties; edges have a label, direction, and properties
- Triple-store model (Datomic, AllegroGraph): stores data as (subject, predicate, object) triples — e.g., (Lucy, age, 33) or (Lucy, marriedTo, Alain)
- Graph models are extremely flexible for evolving data: adding new types of relationships or new properties requires no schema changes
- "For highly interconnected data, the document model is awkward, the relational model is acceptable, and graph models are the most natural"
Query Languages
- Declarative languages (SQL, Cypher, SPARQL, Datalog) specify what you want, not how to get it; the query optimizer decides execution
- Declarative queries lend themselves to parallelism; you don't prescribe order of operations
- Imperative queries (MongoDB's original API, Gremlin for graphs) specify exact operations step-by-step
- CSS and XSL are declarative — a useful reminder that the concept extends beyond databases
- MapReduce sits between declarative and imperative: the map and reduce functions are pure functions, but you write explicit code
- Cypher: pattern-matching query language for Neo4j property graphs
- SPARQL: query language for RDF triple-stores; predates Cypher
- Datalog: a subset of Prolog; defines rules that can be reused across queries; used in Datomic
Convergence
- Relational databases now support JSON columns (PostgreSQL, MySQL, SQL Server) — blurring the document/relational line
- Document databases increasingly support join-like operations (RethinkDB, Couchbase)
- The practical differences between models are narrowing; pick based on your data access patterns, not dogma
Chapter 4 - Storage and Retrieval
- Databases do two things: store data and retrieve it; these are more complex than they appear
- The data structures a database uses internally determine its performance characteristics for your workload
- Two main families of storage engines: log-structured and page-oriented (B-trees); they have fundamentally different trade-offs
Hash Indexes (Log-Structured)
- Simplest index: an in-memory hash map mapping each key to the byte offset in an append-only log file
- All writes are sequential appends — fast on magnetic disk; old values are not overwritten
- Compaction: periodically merge and compact log segments, keeping only the most recent value for each key
- Tombstones: mark deleted keys; removed during compaction
- Limitations: the entire key set must fit in RAM; range queries are inefficient (no ordering)
- Used by Bitcask (Riak's storage engine)
SSTables and LSM-Trees
- SSTable (Sorted String Table): like a log segment but keys are sorted — makes merging segments efficient (merge-sort style)
- Building an SSTable: writes go into an in-memory balanced tree (memtable — a red-black or AVL tree); when full, flush to disk as an SSTable
- Reads: check memtable → most recent SSTable → next most recent, etc.
- Crash recovery: maintain a separate unsorted append-only log just for recovery; discard it after each successful memtable flush
- Bloom filters avoid expensive reads for keys that don't exist
- Compaction strategies: size-tiered (merge smaller SSTables into larger ones) or leveled (split key range across levels, level L+1 is ~10x larger than level L)
- LSM-tree (Log-Structured Merge-Tree): the architectural pattern underpinning SSTables
- Used by: LevelDB, RocksDB, Cassandra, HBase, ScyllaDB, InfluxDB, Lucene (for full-text indexes)
- Write amplification: each byte written to the database may be written multiple times to disk due to compaction
B-Trees
- The most widely used index structure; found in almost all relational databases
- Data organized into fixed-size pages (typically 4 KB), forming a tree; the root page is accessed first for every lookup
- Each page contains keys and references to child pages; leaf pages contain values (or references to values)
- Branching factor: typically 500; a 4-level tree of 4 KB pages with branching factor 500 can store 250 TB
- Updates: find the leaf page, modify the value, write the page back to disk — in-place modification
- Insertions that overflow a page cause the page to split into two half-full pages; the parent is updated
- Write-ahead log (WAL / redo log): every modification is recorded to a sequential append-only log before being applied; used for crash recovery
- B-tree depth: O(log n); most databases need only 3–4 levels deep
- Optimizations: copy-on-write (instead of WAL, write a new page version; used by LMDB); sibling page pointers for sequential scans; abbreviated keys in interior nodes (B+ trees)
LSM-Trees vs B-Trees
- LSM-trees are generally faster for writes (sequential writes); B-trees are generally faster for reads
- LSM-trees use sequential disk I/O and compress better — smaller files on disk
- B-trees have higher write amplification (write once to WAL, once to page); LSM-trees also have write amplification but often less
- LSM-tree compaction can interfere with ongoing read/write performance — high percentile latency can spike; this is an operational concern
- A key may appear in multiple SSTable segments (temporarily); each key exists in exactly one B-tree location — better for transaction isolation
- B-trees are better suited for strong transactional semantics (locking at exact key location)
Secondary Indexes, Clustered Indexes, Covering Indexes
- Secondary index: an index on a non-primary key; critical for query performance
- Clustered index: the actual row data is stored within the index (MySQL InnoDB primary key is always clustered)
- Heap file: index stores a reference to the actual row stored separately; updates that don't change key can be done in-place in the heap
- Covering index: stores extra columns alongside the index key — some queries can be served entirely from the index without looking up the actual row
- Multi-column / concatenated index: appends columns together; only efficient for queries matching the leftmost prefix of the indexed columns
- Multi-dimensional indexes: needed for geospatial data (e.g., querying by latitude and longitude simultaneously); R-trees used by PostGIS; space-filling curves (like Z-order curves) map multi-dimensional coordinates to a 1D number
In-Memory Databases
- Disk is used for durability (WAL/snapshots), but performance is achieved by keeping everything in RAM
- Not faster just because they avoid reading from disk — the OS often caches hot data anyway; the advantage is no overhead of encoding in-memory data structures for disk
- Examples: VoltDB, MemSQL, Oracle TimesTen (relational); RAMCloud, Redis, Memcached (key-value)
- Redis and Couchbase offer weak durability by writing to disk asynchronously
- Anti-caching: for datasets larger than RAM, evict LRU pages to disk on access — allows in-memory databases to work with datasets larger than memory
- Future: non-volatile memory (NVM/persistent memory) may blur the in-memory/on-disk distinction
OLTP vs OLAP
- OLTP (Online Transaction Processing): low latency, small number of rows per query, frequent reads/writes by many users
- OLAP (Online Analytical Processing): high throughput, aggregate queries over large numbers of rows, used by analysts/data scientists
- Data warehouses: a read-only copy of OLTP data, loaded via ETL (Extract-Transform-Load); kept separate to avoid impacting OLTP performance
- Star schema: a central fact table (each row is one event — a sale, a click) surrounded by dimension tables (who, what, where, when, how, why)
- Snowflake schema: dimension tables are further normalized into sub-dimensions; star schemas are usually preferred for simplicity
| OLTP | OLAP | |
|---|---|---|
| Read | Small set of rows by key | Aggregate over many rows |
| Write | Random-access, low latency | Bulk import / event stream |
| Scale | GBs–TBs | TBs–PBs |
| Bottleneck | Disk seek | Disk bandwidth |
Column-Oriented Storage
- Row-oriented storage: all columns for a row are stored adjacent — efficient for retrieving entire rows
- Column-oriented storage: all values for a column are stored together — efficient for analytical queries that read a few columns across many rows
- A query that reads 5 columns from a 100-column table only needs to read 5% of the data
- Column compression: columns often have low cardinality (few distinct values) — encode with bitmaps; one bitmap per distinct value, one bit per row
- Run-length encoding: further compress sparse bitmaps; "5 zeros, then 3 ones, then 20 zeros"
- Vectorized processing: CPU-level optimizations (SIMD instructions) operate on compressed column data directly
- Sort order: choose a sort key based on common queries; rows in all column files are reordered in the same order; you can maintain multiple sorted copies (like C-Store / Vertica)
- Cassandra and HBase use "column families" — a group of columns stored together — but rows within each column family are row-oriented; this is not the same as column-oriented storage
- Writing to column-oriented storage is hard: inserting a row requires updating every column file; use an LSM-tree in-memory buffer and flush to column-sorted files
- Materialized aggregates and data cubes: pre-compute aggregate values (e.g., sum of sales by product and by date) and cache them in a materialized view; avoids rescanning raw data every query; inflexible because not all query shapes benefit
Chapter 5 - Encoding and Evolution
- Evolvability: systems need to be able to change over time — new features require new fields, old fields get removed, types change
- Backward compatibility: newer code can read data written by older code (usually easy — you know the old format)
- Forward compatibility: older code can read data written by newer code (harder — old code must gracefully ignore unknown new fields)
- Rolling upgrades (staged rollout): deploy new version to a few nodes at a time; means old and new code run simultaneously — both backward and forward compatibility are needed
Encoding Formats
- Programs represent data in two forms: in-memory (objects, structs, lists — optimized for CPU access) and byte-sequence (for storage/network — self-contained)
- Encoding (serialization) = in-memory → bytes; decoding (deserialization/parsing) = bytes → in-memory
- Language-specific formats (Java Serializable, Python pickle): only usable in one language, poor versioning support, security issues — avoid for long-term storage or cross-service communication
- JSON, XML, CSV: widely supported but have quirks
- No distinction between integers and floating-point numbers in JSON
- XML and CSV can't reliably distinguish strings from numbers
- No binary data support in JSON (must base64-encode; increases size by 33%)
- Optional/inconsistent schema support; no enforcement at write time
- Binary JSON variants (BSON, MessagePack): smaller than JSON but sacrifice human readability; not radically better
- Thrift (Facebook) and Protocol Buffers (Google): binary encoding using a schema
- Fields encoded as field tag (integer) + type + value — no field names in the binary, just numbers
- Forward compatibility: old code ignores unknown field tags
- Backward compatibility: new fields must be optional; old required fields must keep their tag numbers
- Thrift has two formats: BinaryProtocol and CompactProtocol (uses variable-length integers, more space-efficient)
- Avro (Hadoop): schema defined in JSON or IDL; no field tags in the binary
- Writer's schema and reader's schema can differ; the Avro library resolves them by matching field names
- Must include writer's schema somewhere: at file start (for files), schema version number (for databases), negotiate schema on connection (for network)
- Avro is friendlier to dynamically generated schemas (e.g., generated from a relational schema)
- More compact than Thrift/Protobuf in some cases
Dataflow Patterns
- Data flows between processes via: databases, service calls (REST/RPC), and asynchronous message passing
- Through databases: writer encodes, reader decodes; both backward and forward compatibility needed because data outlives code; schema migration must handle old data on disk
- Through services (REST and RPC):
- REST: uses HTTP verbs (GET, POST, PUT, DELETE); resources identified by URLs; uses JSON or XML; leverages HTTP features (caching, authentication, content negotiation)
- SOAP: XML-based; independent of HTTP; described by WSDL; complex but well-tooled
- RPC (Remote Procedure Call): makes network calls look like local function calls (location transparency) — a fundamentally flawed abstraction
- Network calls are unpredictable: they may time out, be lost, or execute twice (retries)
- Local calls return a result or throw an exception; network calls can just… not return
- Local calls have consistent types; network calls cross language boundaries requiring encoding
- Modern RPC frameworks (gRPC, Thrift, Finagle) acknowledge these differences and don't pretend to be local; gRPC uses Protocol Buffers and supports streaming
- For backward/forward compatibility in RPC: if the server is updated before clients, responses need forward compatibility (old client reads new response) and requests need backward compatibility (new server reads old request)
- Through message brokers (asynchronous):
- Producer sends to a named queue/topic; broker delivers to one or more consumers
- Benefits: buffer if consumer is down, automatic redelivery, decouple sender from receiver, fan-out (one message to multiple consumers)
- Examples: RabbitMQ, ActiveMQ, Apache Kafka, Amazon SQS
- Message brokers don't enforce schemas — the responsibility is entirely on producers and consumers
- Distributed actor frameworks (Akka, Orleans, Erlang OTP): the actor model extended across network nodes; each actor has a mailbox of messages; location transparency for actors is more reasonable than for RPC because failures are already a first-class concept
Chapter 6 - Replication
- Replication: keeping a copy of the same data on multiple nodes connected by a network
- Reasons to replicate: reduce latency (put data closer to users), increase availability (if one node fails, others continue), increase read throughput (scale reads across replicas)
- All difficulty in replication comes from handling changes to replicated data
Single-Leader (Leader-Follower) Replication
- One node is designated the leader (primary/master); all writes go to the leader
- The leader writes changes to its replication log; followers apply changes in the same order
- Reads can be served from any replica; follower reads may be stale
- Synchronous replication: leader waits for follower confirmation before acknowledging write — guarantees up-to-date follower, but one unresponsive follower blocks all writes
- Semi-synchronous: one follower is synchronous, others are asynchronous — ensures at least one follower is always up to date
- Asynchronous replication: leader doesn't wait — lower latency, but unreplicated writes are lost if leader fails before replicating
- Fully asynchronous replication is common despite the durability trade-off; used by many high-throughput systems
Setting Up New Followers
- Take a snapshot of the leader's database; copy to the new follower; request all changes since the snapshot (using replication log position); catch up
Handling Node Outages
- Follower failure: follower can catch up from its log once it reconnects
- Leader failure (failover): detect failure (typically via timeout) → elect a new leader (usually the most up-to-date replica) → reconfigure all clients and other followers to use the new leader
- Failover problems:
- Async replication: new leader may not have the latest writes — they are discarded, causing data loss
- Discarded writes can cause inconsistency with external systems (e.g., auto-increment IDs used by external services)
- Split brain: two nodes both believe they are the leader — can corrupt data; usually resolved by STONITH (Shoot The Other Node In The Head)
- What timeout to use for detecting leader failure? Too short = false positives; too long = slow recovery
Replication Log Implementations
- Statement-based: log the SQL statement; nondeterministic functions (NOW(), RAND(), auto-increment) cause divergence; fragile, generally not used for MySQL since version 5.1
- Write-ahead log (WAL) shipping: send the same low-level log the storage engine writes; tightly coupled to the storage engine format — prevents zero-downtime upgrades when leader and follower run different storage engine versions; used by PostgreSQL and Oracle
- Logical (row-based) replication: a separate log format at row granularity (which rows were inserted/updated/deleted); decoupled from storage engine internals; MySQL binlog; easier for change data capture (CDC) to downstream systems
- Trigger-based replication: application-level, using database triggers to log changes to a separate table; flexible but higher overhead and more failure-prone; used by Bucardo and Slony
Replication Lag Problems
- Eventual consistency: asynchronous replicas fall behind the leader; the inconsistency is temporary but real
- Read-your-own-writes consistency: a user should see writes they just submitted (even if replicas haven't caught up)
- Solutions: read the user's own data from the leader; or track the timestamp of the last write and refuse to read from a replica that hasn't caught up to that point; or use sticky sessions (route the user to the same replica)
- Cross-device complexity: user writes on mobile, reads on desktop — must coordinate the "last write" timestamp across devices
- Monotonic reads: user should not see older data after already seeing newer data (e.g., two reads to different replicas at different lag levels)
- Solution: route each user's reads to the same replica consistently (e.g., hash the user ID to pick a replica)
- Consistent prefix reads: if writes happen in a causal sequence (A then B), reads should not see B without having seen A
- Problem occurs in partitioned databases where different partitions lag independently
- Solution: write causally-related data to the same partition; this is not always efficient
Multi-Leader Replication
- Multiple nodes accept writes; each leader also acts as a follower to the other leaders
- Use cases: multi-datacenter (one leader per datacenter), offline clients (CouchDB — each device is a leader), collaborative editing (Google Docs — each user's session is a leader)
- Performance benefit: writes are processed locally in each datacenter; inter-datacenter replication is asynchronous
- Considered dangerous in many situations; generally avoided unless the use case clearly demands it
- The same data can be concurrently modified in two different datacenters → write conflicts
- Conflict handling:
- Avoidance: route all writes for a given record through the same datacenter — eliminates conflicts but defeats the purpose during outages
- Last-write-wins (LWW): attach a timestamp to each write; the write with the highest timestamp wins — causes data loss; prone to clock skew issues
- Higher replica ID wins: also causes data loss
- Merge/concatenate: combine both values (works for some data types like sets)
- Record conflicts: store all conflicting versions and let the user resolve (CouchDB)
- CRDTs (Conflict-free Replicated Data Types): data structures that automatically merge in a mathematically correct way (counters, sets, maps) — Riak uses these
- Operational Transformation: algorithm used by collaborative editors (Google Docs) to transform conflicting edits
- Replication topologies:
- Circular: each node forwards writes from one neighbor to the next; one broken node interrupts the chain
- Star: one central node forwards all writes; the central node is a single point of failure
- All-to-all: every leader replicates to every other leader; most fault-tolerant; causal ordering can still be violated due to network delays — use version vectors to order correctly
Leaderless Replication
- Any replica can accept writes; no leader; the client sends writes to multiple replicas in parallel (or via a coordinator that enforces no ordering)
- Examples: Amazon Dynamo (internal), Riak, Cassandra, Voldemort
- Stale reads: if a node was offline during a write, it may return outdated data; the client reads from multiple nodes and takes the most recent value (using version numbers)
- Read repair: when a client reads from multiple replicas and sees a stale value, it writes the newer value back to the stale node
- Anti-entropy: background process that scans replicas and copies missing data between them; no ordering guarantee; may have significant delay
- Quorum reads/writes: with n replicas, require w write confirmations and r read confirmations where
w + r > n- Common:
n = 3, w = 2, r = 2— can tolerate one failed node for both reads and writes w = n, r = 1: good for read-heavy workloads; only one node needs to respond for readsw + r > nguarantees at least one node in the read set has the latest write (pigeon-hole principle)
- Common:
- Quorum edge cases where stale reads can still occur:
- Sloppy quorums: write to non-home nodes during an outage; hinted handoff sends data to the home node later — durability but no immediate consistency
- Concurrent writes where LWW loses a write
- A write that partially succeeds (some w nodes confirm, others don't)
- Read/write race on a node
Concurrent Writes and Version Vectors
- "Two operations are concurrent if neither happens before the other" — physical time doesn't matter, only causal ordering
- Last-write-wins (LWW): assign timestamps to writes; always keep the highest timestamp; popular (Cassandra uses it exclusively) but causes data loss for truly concurrent writes
- Happens-before relationships: operation A → B if B knows about A; otherwise they are concurrent
- Version numbers: a single replica maintains a version number per key, incremented on each write; clients always read the version before writing; on write, include the version number you last read
- Server can then determine which values to overwrite and which to merge
- Siblings (Riak): if writes are concurrent, keep both values — "siblings"; the client is responsible for merging; tombstones needed for deletions
- Version vectors: in a multi-replica system, each replica tracks a version number per replica it has seen; the vector of all these numbers is the version vector; transferred to clients on read, returned on write — enables detection of concurrent writes across replicas
Chapter 7 - Sharding
- Partitioning (called sharding in MongoDB/ElasticSearch/SolrCloud, regions in HBase, tablets in Bigtable, vnodes in Cassandra/Riak, vBuckets in Couchbase): splitting a large dataset across multiple nodes
- Main reason: scalability — data and query load too large for a single machine
- Partitioning is usually combined with replication: each partition is replicated across multiple nodes for fault tolerance
- Skewed partition: uneven data distribution; hot spot: one partition receives disproportionate load
Partitioning Strategies
- Partitioning by key range:
- Assign a continuous range of keys (like encyclopedia volumes: A–D, E–H) to each partition
- Boundaries can be manually set or auto-adjusted; within each partition, keys are sorted
- Enables efficient range queries within a partition
- Risk of hot spots: e.g., sensor data keyed by timestamp routes all today's writes to one partition; fix by prefixing the key with the sensor ID
- Partitioning by hash of key:
- Hash function (e.g., MD5) distributes keys uniformly; each partition owns a range of hash values
- Eliminates hot spots from skewed keys (usually), but destroys sort order — range queries must touch all partitions
- Cassandra uses a compound primary key: first part is hashed for partitioning, remaining parts form an SSTable sort key within the partition — allows range queries on the sorted columns within a single partition
- Relieving hot spots for celebrity/viral data: even with hash partitioning, a single key can be hot (e.g., a celebrity's user ID on a write-heavy social network); add a random suffix to the key and split writes across multiple keys — but reads then must read from all these keys and combine
Secondary Indexes
- Partitioning by document (local indexes):
- Each partition maintains its own secondary index for documents it holds
- Reading by secondary index requires scatter-gather: query all partitions, merge results
- Tail latency amplification: scatter-gather sends queries in parallel, so response time = slowest partition
- Used by MongoDB, Riak, Cassandra, ElasticSearch, SolrCloud, VoltDB
- Partitioning by term (global index):
- A single global index covering all partitions; the index itself is partitioned (by term or hash of term)
- Reads are faster: query only the partition(s) of the index that contain the relevant terms
- Writes are more complex: updating a document may require updating the global index in several different partitions
- Global secondary indexes are usually updated asynchronously — slight staleness is acceptable
- Used by DynamoDB (global secondary indexes are eventually consistent)
Rebalancing Partitions
- Why: nodes are added or removed, node failures
- Bad approach — hash mod N: almost all keys must move when N changes
- Fixed number of partitions: create many more partitions than nodes (e.g., 1000 partitions for 10 nodes); add a new node by taking a few partitions from each existing node; only partitions (not keys) move
- Used by Riak, Elasticsearch, Couchbase, Voldemort
- Choosing the right number of partitions upfront is tricky — too few limits scaling, too many adds overhead
- Dynamic partitioning: partitions split when they exceed a configured size threshold; split partitions can be moved to other nodes; undersized partitions merge
- Used by HBase, MongoDB, RethinkDB
- One concern: empty database starts with one partition, all writes go there until the first split
- Pre-splitting: configure initial partitions upfront if key range is known
- Proportional to nodes (Cassandra): fixed number of partitions per node; adding a node creates new partitions by splitting existing ones; partition size grows with data but shrinks when nodes are added
Request Routing (Service Discovery)
-
Three approaches:
- Any node: client contacts any node; if the node doesn't own the partition, it forwards the request
- Routing tier: a dedicated load balancer or routing tier knows the partition-to-node mapping; all requests go through it
- Client-aware: client has the partition map and connects directly to the right node
-
ZooKeeper: many distributed databases (HBase, SolrCloud, Kafka, Helix for LinkedIn) use ZooKeeper to track partition-to-node mappings; routing tiers subscribe to ZooKeeper for updates
-
Cassandra uses gossip protocol: each node knows the cluster state; clients contact any node which forwards appropriately
-
MongoDB has its own config server + mongos router tier
Chapter 8 - Transactions
- Transactions: a way for the application to group several reads and writes into a logical unit — either all of them succeed (commit) or all fail (abort/rollback)
- The application can ignore certain classes of errors and concurrency issues because the database handles them
- Not every application needs transactions; some trade them away for performance or availability
- ACID: Atomicity, Consistency, Isolation, Durability — the properties that transactions provide (though the definitions are fuzzy and inconsistently applied)
ACID Properties
- Atomicity: all writes in a transaction either happen or are aborted as a whole; not about concurrency — about being able to safely retry after a fault midway through
- Consistency: invariants about the data (e.g., accounting records must balance) are always true; this is actually an application-level property — the database can only enforce specific constraints (uniqueness, foreign keys)
- Isolation: concurrent transactions appear to execute serially — each transaction pretends it's the only one running; serializability is the strongest form
- Durability: data committed to a transaction is not lost, even after a crash; typically involves write-ahead logs or replication; "perfect durability does not exist" — correlated failures (datacenter fire) can destroy all copies
Isolation Levels
- Read uncommitted: transactions can read writes that haven't been committed yet — dirty reads; rarely useful
- Read committed: no dirty reads (only see committed data) and no dirty writes (only overwrite committed data); the default in Oracle, SQL Server, PostgreSQL
- Row-level read locks would prevent dirty reads, but this degrades read performance; instead, databases keep both old and new values and give readers the old committed value
- Does not prevent read skew (nonrepeatable read): reading the same row twice in a transaction can yield different values if another transaction commits between the two reads
- Snapshot isolation (repeatable read): each transaction sees a consistent snapshot of the database as of the start of the transaction
- Implemented via Multi-Version Concurrency Control (MVCC): each row has a creation transaction ID and an optional deletion transaction ID; a transaction ignores rows created after it started and rows deleted before it started
- "Readers never block writers, and writers never block readers"
- Essential for long-running queries (backups, analytics) that must not see inconsistent data mid-query
- Called "serializable" in Oracle, "repeatable read" in MySQL/PostgreSQL — the terminology is inconsistent and confusing; the SQL standard definitions are also flawed
Write Anomalies
- Dirty writes: overwriting a write that hasn't committed yet; almost all databases prevent this with row-level locks
- Lost updates: two transactions read-modify-write the same value; one's update is silently overwritten
- Prevention: atomic operations (
UPDATE counter SET val = val + 1), explicit locks (SELECT FOR UPDATE), automatic lost-update detection (PostgreSQL detects and retries), compare-and-set
- Prevention: atomic operations (
- Write skew: each of two concurrent transactions reads a set of objects, and each writes based on what it read — but the writes to different objects violate an invariant
- Example: two doctors both see that two other doctors are on call; each removes themselves, leaving zero doctors on call
- Phantom: the effect of a write in one transaction changes the search results of a query in another transaction
- Prevention requires serializability or materializing conflicts (creating explicit lock rows for things that don't yet exist)
- Serializable isolation is the strongest guarantee — it ensures that the outcome is equivalent to some serial execution
Implementing Serializability
- Actual serial execution (single-threaded): execute one transaction at a time, in order; surprisingly viable if transactions are short and the active dataset fits in RAM
- VoltDB, Redis, Datomic use this approach
- Stored procedures: instead of interactive multi-round-trip transactions, submit the entire transaction as a stored procedure — eliminates network latency within a transaction
- Limitation: throughput is limited to a single CPU core; partitioning lets different CPU cores handle different partitions, but cross-partition transactions are expensive
- Two-phase locking (2PL): historically the standard approach for about 30 years
- Shared lock for reads (multiple readers OK); exclusive lock for writes (blocks all others)
- Phase 1: acquire locks; Phase 2: release locks at commit/abort — locks are held for the full duration
- Deadlocks: transaction A holds lock that B needs and vice versa — the database detects and aborts one of them
- Predicate locks: lock all rows matching a WHERE condition, even rows that don't yet exist — prevents phantom reads
- Index-range locks: coarser-grained version of predicate locks using an index entry — more efficient in practice
- Performance: significantly lower throughput than weaker isolation; high latency; deadlocks add overhead
- Serializable Snapshot Isolation (SSI): introduced in 2008; optimistic concurrency control
- Allow transactions to proceed under snapshot isolation; track which reads may have been outdated by concurrent writes; at commit time, abort transactions that executed based on stale data
- Better performance than 2PL under low contention; degrades gracefully under high contention
- Compared to serial execution: allows multiple CPU cores; doesn't require all data in RAM
- Used in FoundationDB and PostgreSQL's serializable mode
Chapter 9 - The Trouble with Distributed Systems
- Writing software that runs on a single machine is fundamentally different from writing software for a distributed system
- The fundamental difference: in a single process, a fault typically causes a total failure (crash); in a distributed system, some components fail while others continue — partial failures are nondeterministic and unpredictable
- The goal: build reliable systems from unreliable components (like TCP being reliable on top of unreliable IP)
Unreliable Networks
- Distributed systems use asynchronous packet networks — no guarantee on delay, ordering, or delivery
- Possible outcomes of sending a message: the request was lost, the request is queued, the remote node failed, the remote node is slow, the response was lost, the response is queued
- You cannot tell the difference from the sender's perspective — a timeout does not tell you whether the request was received or not
- Detecting node failures: TCP connection refused (quick); timeout (slow); OS notification; network switch monitoring (if you have access); external monitoring services
- Timeout choice: too short → false positives, premature failover, cascading load; too long → slow recovery; some systems use adaptive timeouts based on observed round-trip times
- Network partitions: some links between nodes are interrupted while the nodes themselves are fine — the system appears split
- Unbounded delays: network queues can grow without bound; routers, OS network buffers, VMM hypervisors, TCP congestion control — all add variable delay
- "In a system with thousands of nodes, something is always broken"
- Human error is the primary cause of network outages, not hardware failure
Unreliable Clocks
- Each node has its own clock (quartz oscillator); clocks drift over time; NTP synchronizes them but imperfectly
- Two kinds of clocks:
- Time-of-day clock: wall clock time (milliseconds since epoch); can jump backward (NTP adjustment, leap seconds); NTP accuracy is ~35ms over the internet; not safe to use for ordering events or measuring elapsed time
- Monotonic clock: guaranteed to move forward; suitable for measuring elapsed time on a single node; absolute value is meaningless; cannot be compared across nodes
- Clock skew issues:
- Last-Write-Wins using timestamps: if clocks are skewed, a "later" write can have an "earlier" timestamp and get silently discarded
- Google assumes 6ms drift between NTP syncs (every 30 seconds) or 17 seconds if synced once a day
- Leap seconds: a minute has 59 or 61 seconds; systems have crashed and lost data due to leap second handling
- Virtual machine clocks: may jump during live migration or when the VM is paused
- Logical clocks are safer for ordering events — they measure relative event ordering rather than physical time
- Google Spanner TrueTime API: returns the current time as an interval [earliest, latest] with a confidence bound; Spanner waits for the interval to pass before committing a transaction to guarantee causality — requires GPS receivers and atomic clocks in every datacenter
Process Pauses
- GC pauses: a "stop-the-world" GC can pause a process for minutes
- A process may be preempted by the OS scheduler at any time and not run for an arbitrary duration
- A node can be declared dead by others while it is paused, then resume and act as if it still holds locks/leases it no longer holds
- "A node in a distributed system must assume that its execution can be paused for a significant length of time at any point, even in the middle of a function"
- There is no safe way for a process to verify that it hasn't been paused mid-operation
Fencing Tokens
- Distributed locks and leases have expiry times — a paused process may resume after its lock has expired and another node has taken the lock
- Solution: fencing tokens — a monotonically increasing number issued by the lock server with each lock grant; the storage server rejects any write with a token older than the last one it's seen
- This requires the storage server to actively check tokens, not just the lock holder to behave correctly
Byzantine Faults
- Crash-stop / crash-recovery fault: a node fails silently or restarts; it doesn't lie
- Byzantine fault: a node sends incorrect or contradictory messages — may be due to a bug or malicious behavior
- Byzantine-fault-tolerant algorithms exist but are complex; most distributed database systems assume non-Byzantine faults (nodes are honest but may crash or be slow)
- Relevant for: blockchains (untrusted peers), aerospace (cosmic ray bit flips), multi-organization systems
- Input validation and sanitization: a weaker defense against byzantine-like behavior from external actors
System Models
- Timing assumptions: synchronous (bounded delays — unrealistic), partially synchronous (usually bounded, occasionally not — most realistic), asynchronous (no timing assumptions — very restrictive)
- Node failures: crash-stop (fail permanently), crash-recovery (fail and restart), byzantine (arbitrary behavior)
- The partially synchronous + crash-recovery model best matches real-world distributed systems
- Safety vs. liveness:
- Safety: "nothing bad happens" — a safety violation is permanent; can be checked at any point in time
- Liveness: "something good eventually happens" — may not hold at a given instant but will eventually
- Distributed algorithms must guarantee safety always and liveness only when things go well enough (e.g., network eventually recovers)
Chapter 10 - Consistency and Consensus
Linearizability
- Also called: atomic consistency, strong consistency, immediate consistency, external consistency
- Informal definition: make a replicated system appear as if there were only one copy of the data and all operations are atomic
- Any read must return the most recent write — once a client reads a new value, no subsequent read from any client should return the old value
- Linearizability is a recency guarantee; serializability is a transaction isolation guarantee — they are different concepts that can be confused
- A system can be serializable but not linearizable (e.g., using snapshot isolation for transactions)
- A system can be linearizable but not serializable (single-object compare-and-swap)
- "Strict serializability" = serializable + linearizable
- Replication and linearizability:
- Single-leader: linearizable if reads go to the leader or synchronously replicated followers
- Multi-leader: not linearizable (concurrent writes to different leaders)
- Leaderless: not linearizable despite quorums — concurrent operations and network delays can still yield stale reads
- Consensus algorithms (ZooKeeper, etcd): linearizable
- Use cases: distributed locks and leader election (require exactly one winner), uniqueness constraints (only one user can claim a username), cross-channel ordering (write file to storage, then notify processing queue — must be linearizable to guarantee the processor finds the file)
- Cost: "Linearizability is slow, and this is true all the time, not just during a network fault" — the CAP theorem says you can't have both linearizability and availability during a network partition
CAP Theorem
- Consistency (linearizability), Availability, Partition tolerance — pick two; but this is a misleading framing
- In practice: partition tolerance is not optional (networks do partition); the real choice is between linearizability and availability during a partition
- CAP theorem does not say much about latency; "PACELC" is a more nuanced framing (latency vs consistency even without partitions)
- "CAP is sometimes presented as Consistency, Availability, Partition tolerance: pick 2 out of 3. Unfortunately this way of phrasing it is misleading"
Causal Consistency
- Causality imposes a partial ordering on events (A happened before B if B depends on A)
- Causal consistency: all operations that are causally related are seen in the same order by all nodes; concurrent operations may be seen in any order
- "Causal consistency is the strongest possible consistency model that does not slow down due to network delays and remains available in the face of network failures" — it's a sweet spot
- Causal consistency is weaker than linearizability but stronger than eventual consistency
Lamport Timestamps
- (counter, nodeID) pairs; nodes increment counter before each operation; max rule on receiving a message: counter = max(local, received) + 1
- Provide a total ordering consistent with causality: if A happened before B, A's timestamp is less than B's
- Insufficient alone for uniqueness constraints: to enforce that only one user registers a given username, you need to know that you've seen all requests from all nodes — Lamport timestamps don't tell you when you have complete information
Total Order Broadcast
- Also called atomic broadcast
- Two guarantees: reliable delivery (if message delivered to one correct node, it will be delivered to all correct nodes) and total order (all nodes deliver messages in the same order)
- This is stronger than causal consistency (total order implies causality) but equivalent to consensus
- Applications: database replication (all nodes apply writes in the same order → state machine replication), serializable transactions, distributed locks, consistent log (Kafka, ZooKeeper)
- Linearizable compare-and-set can implement total order broadcast; total order broadcast can implement linearizable compare-and-set — these are equivalent to consensus
Two-Phase Commit (2PC)
- Protocol for achieving atomic commit across multiple nodes (all commit or all abort)
- Coordinator (transaction manager) + participants (databases/services)
- Phase 1 (prepare): coordinator sends "prepare" to all participants; participants respond yes (promise to commit if asked) or no
- Phase 2 (commit/abort): if all participants said yes, coordinator writes "commit" to its log and sends commit; if any said no, coordinator sends abort
- The coordinator's write to disk is the "point of no return" — once it records commit, it must follow through even if participants crash
- Problem: if the coordinator crashes after recording commit but before sending to all participants, those participants are stuck in an "in doubt" state waiting for a decision they can never independently make — the system is blocked
- 2PC is sometimes called a "blocking atomic commit protocol" — manual intervention required if the coordinator fails and can't recover
- Three-phase commit (3PC) was designed to avoid blocking but assumes bounded network delays — not safe in asynchronous networks
- XA transactions: a standard C API for 2PC across different databases and message brokers; the transaction manager is typically a library in the application process — makes the application the coordinator, which is problematic (application crash = coordinator crash)
Consensus Algorithms (Paxos, Raft, Zab)
- Problem: get several nodes to agree on a value despite faults
- Properties a consensus algorithm must satisfy: uniform agreement (no two nodes decide differently), integrity (decided value was proposed by some node), validity (can't just decide an arbitrary value), termination (every non-crashed node eventually decides)
- Single-decree Paxos: agree on one value; Multi-Paxos / Raft: extend to a log of values (total order broadcast)
- Epoch/term numbers: each round of leader election increments the epoch; the leader checks that no higher-epoch election has occurred before proceeding — this prevents conflicting decisions from different epochs
- Raft: a more understandable consensus algorithm; used in etcd, CockroachDB, TiKV
- ZooKeeper uses Zab (ZooKeeper Atomic Broadcast), similar to Paxos; ZooKeeper provides: linearizable compare-and-set, total order broadcast, distributed lock, leader election, service discovery
- Consensus limitations: requires a fixed majority of nodes (quorum) to make progress; can't scale to large numbers of nodes; leader changes are slow; sensitive to network timeouts
- Applications of consensus or equivalent: linearizable compare-and-set registers, atomic transaction commits, total order broadcasts, locks and leases, membership/coordination services, uniqueness constraints
Chapter 11 - Batch Processing
-
Three paradigms of data processing:
- Online systems (services): wait for requests, process one at a time, return response; optimize for response time
- Batch processing systems: take a large amount of input data, run a job over it, produce output; optimize for throughput; job takes minutes to days
- Stream processing (near-real-time): like batch but on unbounded datasets, runs continuously
-
Unix philosophy: each tool does one thing well; programs communicate via stdin/stdout; compose via pipes — this philosophy was the inspiration for MapReduce
-
Unix pipeline:
cat /var/log/nginx/access.log | awk '{print $7}' | sort | uniq -c | sort -r -n | head -5— this is a simple batch processing chain
MapReduce
- Programming model for large-scale parallel data processing on clusters of commodity hardware
- Mapper: called once for each input record; outputs key-value pairs
- Reducer: receives all values for a given key; aggregates them
- MapReduce framework handles: partitioning mapper output by key, sorting, shuffling data to the right reducers, fault tolerance (re-run failed tasks), distributed filesystem reads/writes
- Hadoop HDFS: distributed filesystem; files replicated across multiple nodes; block size typically 128 MB; computation is moved near the data to minimize network I/O
- Each MapReduce job reads from and writes to HDFS — materialization of intermediate state to disk; this enables fault recovery but is slower than in-memory processing
Join Strategies in MapReduce
- Reduce-side join (sort-merge join):
- Both datasets are processed by mappers that emit the join key
- The reducer receives all records for a given join key from both datasets and merges them
- Hot key problem: a single key with millions of records overwhelms one reducer — fix with Pig's skew joins: sample to find hot keys, distribute hot-key records across multiple reducers, send the other dataset to all those reducers
- Map-side joins (faster — no reducer needed):
- Broadcast hash join: if one dataset is small enough to fit in memory, load it into a hash table in every mapper; then scan the large dataset
- Partitioned hash join: if both datasets are partitioned by the same key and the same number of partitions, each mapper handles one partition of each dataset
- Map-side merge join: if both datasets are partitioned by the same key and sorted, mappers can merge-sort them without a reducer
- Output of batch jobs: write results to databases (for application queries), or to files in HDFS (for further batch processing), or to search indexes
MapReduce vs. MPP Databases
- MPP (Massively Parallel Processing) databases (Teradata, Vertica, Aster Data, Greenplum): SQL queries distributed across many nodes; usually require structured data and a schema
- MapReduce: processes arbitrary data including unstructured, semi-structured, messy files; schema-on-read; more flexible
- "Making data available quickly — even in a quirky, difficult-to-use format — is more valuable than trying to decide on the ideal data model up front"
- The Hadoop ecosystem blurs the line: Hive (SQL on HDFS), Pig (procedural), SparkSQL, Presto
Beyond MapReduce: Dataflow Engines
- Spark, Tez, Flink: generalize MapReduce to support arbitrary data flow graphs (DAGs of operators)
- Advantages over MapReduce:
- No need to write intermediate results to HDFS between every stage — keep in memory or local disk when possible
- Sorting is only required when it is actually needed (not forced at every stage boundary)
- No unnecessary map tasks (combinable with the upstream reduce)
- Better scheduler locality optimization — run the computation on the node that has the data
- Pipelining: downstream operators can start as soon as upstream starts producing output
- JVM reuse: Spark keeps the JVM alive across tasks on the same worker (MapReduce starts a new JVM per task)
- Fault tolerance: Spark uses RDD lineage — remember how each partition was computed and recompute from the last checkpoint if a partition is lost; Flink uses checkpoints with barriers in the data stream
- "Operators in dataflow engines are more flexible than the strict map-reduce pattern"
Graph and Iterative Processing
- Graph algorithms (PageRank, shortest path, community detection) are inherently iterative — compute, check convergence, repeat
- Bulk Synchronous Parallel (BSP): process all vertices in parallel, send messages along edges, synchronize at the end of each iteration (superstep)
- Pregel model (Apache Giraph, GraphX in Spark): each vertex has a function called once per superstep; can send messages to other vertices; vertices vote to halt when they have no more work
- Fault tolerance: checkpoint vertex state to disk after each superstep
- Graph algorithms in MapReduce are possible but inefficient — many stages and lots of HDFS I/O
Chapter 12 - Stream Processing
- Stream processing: like batch processing but on unbounded (never-ending) data streams; events are processed shortly after they happen
- An event is a small, immutable, self-contained record of something that happened; events have a timestamp indicating when the event happened (from the source) or when it was processed
- Stream vs. batch: in batch processing the dataset is bounded (has a beginning and end); in stream processing the dataset is unbounded and queries run continuously
Messaging Systems
-
The producer sends a message containing the event; the consumer processes it
-
Two key design questions:
- What happens if producers send messages faster than consumers can process? Options: drop messages, buffer in a queue (risk: what if the queue fills up or is lost?), apply backpressure (block the producer)
- What happens if nodes crash or go offline? Options: lose the message, persist to disk, replicate to multiple nodes
-
Direct messaging (no broker): UDP multicast (StatsD), ZeroMQ, HTTP webhooks — low latency but limited reliability; application code must handle dropped messages
-
Message brokers (queues): JMS/AMQP standard; load balance messages across consumers; redelivery on crash; inherently cause out-of-order processing when combined with redelivery
-
Log-based message brokers (Kafka, Kinesis, Pulsar):
- Messages written to an append-only log partitioned across nodes
- Each consumer tracks its own offset in the log — no message deletion on consumption
- Consumers can re-read old messages by resetting the offset
- Ordering within a partition is guaranteed; across partitions it is not
- Throughput is very high (sequential disk writes)
- Consumer groups: all consumers in a group jointly consume all partitions of a topic; adding consumers to a group increases parallelism (up to the number of partitions)
- Log compaction: broker retains only the most recent value for each key, discarding older updates — makes the log act like a snapshot of a key-value store
Change Data Capture (CDC)
- The problem of dual writes: if you write to a database and then to an index/cache, one of the two writes might fail — your systems diverge
- CDC: capture all writes to the primary database as a stream of change events; derive all other systems (search indexes, caches, analytics) from this stream
- Initial load: take a snapshot of the database + the log offset of that snapshot; replay the log from that offset
- Kafka log compaction can act as the initial snapshot: a consumer starting from the beginning of the topic gets all current values without needing a separate snapshot
- CDC tools: Debezium (reads Postgres/MySQL WAL), Maxwell, Bottled Water, LinkedIn Databus
- The primary database is the "source of truth"; all derived systems are secondary — this establishes a clear data flow
Event Sourcing
- Store all changes to application state as a sequence of immutable events, rather than updating a mutable table
- Different from CDC: CDC extracts the log from a database that was designed for mutable state; event sourcing is a design choice from the start
- Benefits: strong audit trail; easier debugging (replay history); protection against application bugs (you can fix a bug and replay events); easier to derive multiple views from the same event log
- Challenges: querying current state requires replaying all events or maintaining snapshots; event schemas must be carefully versioned; only forward-compatible changes are possible
Processing Streams
- Transformation: convert one stream into another (e.g., map/filter events)
- Joining streams:
- Stream-stream join (windowed join): match events from two streams that occurred within the same time window
- Stream-table join (enrichment): enrich each stream event by looking up a value in a database table; the table is loaded into the stream processor's local state and kept up to date via CDC
- Table-table join (materialized view maintenance): maintain a materialized view of two tables by joining their CDC streams
- Time-dependent joins: the "correct" answer depends on the version of the table at the time of the event, not the current version — ordering is non-deterministic across partitions
Windowing
- Tumbling window: fixed duration, non-overlapping (e.g., events from 12:00–12:01)
- Hopping window: fixed duration, overlapping at regular intervals (e.g., 5-minute windows every minute)
- Sliding window: contains all events within a fixed duration of each other (e.g., last 5 minutes of events relative to each event)
- Session window: no fixed duration; groups events with no gap greater than a configured inactivity threshold
Handling Late Events
- Event time vs. processing time: network delays and clock skew mean events arrive out of order; the event's actual timestamp differs from when the stream processor receives it
- Three timestamp options: device clock when event occurred, device clock when event was sent, server clock when received; the third is easiest but misses the true event time
- Estimating offset: server receipt time minus device send time gives a rough clock correction
- Watermarks: a mechanism to declare "I have now seen all events with a timestamp less than X"; events arriving after the watermark are "stragglers"
- Straggler handling: ignore them (drop late events), or publish a correction for the affected time window
- Alert if straggler rate exceeds threshold — indicates upstream problems
Chapter 13 - A Philosophy of Streaming Systems
- Streaming systems represent a philosophical shift: rather than treating data as tables to be queried, treat all data as events (a stream of facts), and derive tables/views from those streams
- The duality of streams and tables: a table is a snapshot of the current state; a stream is a changelog; you can always convert between them (stream → table by replaying; table → stream via CDC)
- Lambda architecture: run a batch layer (Hadoop) and a stream layer (Storm) in parallel; batch layer provides correct but slow views; stream layer provides fast but approximate views; query layer merges them
- The problem with lambda architecture: you must maintain the same logic in two different systems; keeping them in sync is operationally complex
- Kappa architecture: use a single stream processing system for both real-time and historical reprocessing; retain the raw event log long enough to replay; reprocess when you need to fix bugs or update logic
- "The batch/stream divide is artificial" — the same data processing logic should work on bounded and unbounded datasets; modern systems (Flink, Spark Structured Streaming, Apache Beam) support this
Unbundling Databases
- Traditional databases bundle everything together: storage, transactions, replication, indexing, query optimizer, caching
- The trend toward "unbundling" the database: use a combination of specialized tools (Kafka for the write-ahead log, Elasticsearch for search, Redis for caching, Flink for stream processing) connected via event streams
- The advantage: each tool is optimized for its specialty; you compose the best tools
- The challenge: you lose the cross-component transactions and consistency guarantees that monolithic databases provide
- Goal: replicate the Unix philosophy at the distributed systems level — composable tools that do one thing well, connected by streams instead of pipes
- Comparison: "mysql | elasticsearch" — if such a pipe existed (which it doesn't, natively), it would be what CDC + Kafka achieves
Correctness in Unbundled Systems
- ACID transactions in a single database provide strong correctness guarantees; unbundled systems need a different approach
- Exactly-once semantics: process each message exactly once, not zero times (at-most-once) or more than once (at-least-once)
- At-least-once + idempotence = effectively exactly-once; idempotent operations can be safely retried
- Atomic commit across heterogeneous systems: requires two-phase commit; XA transactions are available but have poor performance and limited fault tolerance
- Idempotency: design writes so that applying the same operation multiple times has the same effect as applying it once; include unique message/event IDs; consumers track processed IDs
- End-to-end argument: correctness must be ensured at the application level, not just the infrastructure level; a database with transactions can still produce wrong answers if the application logic is wrong
- Fault-tolerant stream processing approaches:
- Micro-batching (Spark Streaming): divide the stream into small fixed-time batches; process each as a mini-batch job
- Checkpointing (Flink): periodically save processing state to durable storage; restart from the last checkpoint after failure
- Transactional producers/consumers (Kafka): Kafka supports transactions for exactly-once delivery within Kafka; Google Dataflow and VoltDB extend this to external systems
Dataflow and Reactive Systems
- Dataflow: express computation as a directed graph of transformations on data; when input data changes, the outputs update automatically (like a spreadsheet)
- Differential dataflow (Frank McSherry): efficiently maintain the output of a dataflow computation as inputs change — compute incremental updates rather than recomputing from scratch
- Reactive programming / Rx: express event processing as composable operations on event streams in application code
- The tension: stream processors provide strong fault tolerance and distributed execution but are complex; reactive frameworks in application code are simpler but limited to a single process
- The future direction: unified frameworks that express both real-time stream processing and historical batch reprocessing with the same code, while providing exactly-once semantics
Immutability and Audit Logs
- Immutable event logs are inherently auditable — you can reconstruct exactly what happened and when
- Mutable databases lose history; the current state is the only truth
- Event sourcing + immutable logs provide "audit by design" — valuable in finance, healthcare, compliance
- The past is immutable; only the future is uncertain — this is why append-only logs are so powerful
- Derived views are not the truth; the event log is the truth; derived views can be discarded and rebuilt
Chapter 14 - Doing the Right Thing
- Data systems are not neutral — the choice of what data to collect, how to process it, and what decisions to make with it have real consequences for real people
- "The world will be what we build" — engineers building data systems bear responsibility for their effects
Predictive Analytics and Discrimination
- Machine learning and predictive analytics decisions can systematically disadvantage certain groups even without explicit intent
- Feedback loops: if a system predicts someone is high-risk and treats them accordingly, it may create the very outcome it predicted; the prediction becomes self-fulfilling
- Proxy discrimination: a model trained on historical data absorbs existing biases; even race-neutral features can serve as proxies for race (zip code, name, school attended)
- Lack of transparency: "black box" models make it impossible to challenge a decision; affected people may not even know a model was used
- Accountability gap: when things go wrong, responsibility is diffuse — the data vendor, the model builder, the company deploying it, the regulator all point at each other
Privacy and Surveillance
- Behavioral data collection is pervasive; people often have no practical ability to opt out
- The aggregation problem: individually innocuous data points (location at 8am daily, pharmacy visits, political donations) can be combined to reveal sensitive information (health conditions, political views, relationship status)
- Data collected for one purpose is routinely used for another; intentions at collection time don't constrain future use
- Data brokers: entire industries exist to collect, aggregate, and sell personal data with little regulatory oversight in many jurisdictions
- Consent is often illusory: long privacy policies nobody reads, take-it-or-leave-it terms, no meaningful alternative
- Data retention: data kept indefinitely becomes a liability; breaches expose it; legal processes compel its disclosure; purpose limitation and deletion are important privacy controls
Ethical Framework
- The fact that something is technically possible does not mean it should be done
- Five questions to ask: Who benefits? Who bears the costs? Are those groups the same? Could this data be used to harm? What are the second-order effects?
- Power asymmetry: data systems are built by organizations with power; they are often used on individuals with less power; this asymmetry requires extra ethical care
- Automation bias: people over-trust automated decisions and under-scrutinize them; human-in-the-loop is not a fix if the human just rubber-stamps the algorithm
- Opacity and explainability: affected individuals have a legitimate interest in understanding why a decision was made; the right to explanation (GDPR Article 22) is increasingly recognized
- "The goal of data engineering should be to build systems that empower people, not systems that exploit them"
Legislation and Self-Regulation
- GDPR (EU), CCPA (California) and similar regulations: right to access, right to erasure, data portability, privacy by design
- Regulation is necessary but insufficient: laws lag behind technology; enforcement is inconsistent; global data flows complicate jurisdiction
- Engineers must not wait for regulation to act ethically; self-regulation through professional norms and company practices matters
- Checklist-style compliance is not ethics; the question is whether the system causes harm, not whether it technically complies with a law
Long-Term Effects of Data Systems
- Systems outlive the intentions of their designers; a system built for benign purposes may be repurposed maliciously
- Data collected today may be used in political climates very different from today's; minority groups that are not persecuted today may be persecuted in the future using the data you are collecting now
- Minimizing data collection is a form of harm reduction — data you don't have can't be breached, subpoenaed, or misused
- "If you are asked to build a system that would help find people who meet certain criteria — even seemingly benign criteria — think carefully about what could happen if criteria change"
- The responsibility of engineers: build systems that can be shut down, that forget, that preserve human dignity, that are auditable, and that give people meaningful control over their own data
