pk.org: CS 417/Exams

Final Exam Study Guide

The three-hour study guide for the final exam

Paul Krzyzanowski

Disclaimer: This study guide attempts to touch upon the most important topics that may be covered on the exam but does not claim to necessarily cover everything that one needs to know for the exam. Finally, don’t take the one hour time window in the title literally.

Last update: Tue May 05 13:42:51 2026

Week 1: Introduction and Network Communication

What Is a Distributed System?

A distributed system is a collection of independent computers connected by a network that cooperate to accomplish some goal. Each computer has its own processor, memory, operating system, and clock. There is no shared address space and no shared notion of time.

Processes on different machines each have access to their local operating system mechanisms, but those mechanisms apply only within a single system. Shared memory, pipes, message queues, and kernel-managed synchronization primitives such as semaphores or mutexes cannot be used for coordination across machines.

All coordination in a distributed system must therefore be performed explicitly through message passing.

A well-designed distributed system presents a single system image: it appears to users as a single coherent system, hiding the complexity of distribution behind a unified interface.

Failures are expected and often partial failures, meaning some components fail while others continue to operate.

No global knowledge exists in a distributed system. Each component knows only its own state and information received from others, which may be delayed or outdated.

Why Distributed Systems Exist

Distributed systems are built to overcome the limitations of single machines and centralized designs.

Scale is a primary driver. Vertical scaling is limited by hardware constraints, power, and cost. Horizontal scaling allows systems to grow incrementally by adding machines.

Moore’s Law historically enabled performance gains through faster hardware, but those gains have slowed. Performance gains shifted toward multicore and heterogeneous systems.

Amdahl’s Law shows there are limits to parallel processing. Speedup is limited when some portion of a workload remains sequential.

Collaboration and network effects increase the value of systems as more participants join.

Other motivations include reducing latency through geographic distribution, supporting mobility across devices such as phones and IoT sensors, allowing incremental growth from small deployments to large-scale systems, and delegating infrastructure to cloud providers.

Transparency

Transparency is the design goal of hiding the fact that resources are distributed across multiple computers. Users and applications interact with the system as if it were a single machine. Examples include hiding where resources are located, masking failures, and allowing resources to move or be replicated without affecting access.

Full transparency is rarely achievable due to network delays, partial failures, and consistency constraints.

Failure and Fault Tolerance

Distributed systems experience partial failure, unlike centralized systems, which typically fail all-or-nothing.

Fault tolerance relies on redundancy and avoiding single points of failure.

In series systems, the failure of any component causes system failure, making large systems unreliable when all components must function. In parallel systems, redundant components improve availability by allowing the system to continue operating as long as some components remain functional.

Availability measures how often a system is usable and is often expressed in “nines” (e.g., “five nines” means 99.999% uptime). Reliability concerns correctness and time-to-failure.

Failure Models

Systems fail in different ways:

Caching vs. Replication

Keeping multiple copies of data is a recurring theme in distributed systems design.

Replication creates multiple authoritative copies of data to improve availability and fault tolerance. Replicas must be kept consistent, and replica failures trigger recovery procedures.

Caching stores temporary, derived copies to reduce latency and load. Caches are expendable; a cache miss simply fetches from the authoritative source.

The key distinction is that losing a cache incurs performance penalties; losing all replicas loses data.

Network Timing Models

Network technologies exhibit different behaviors and provide different latency, bandwidth, and reliability guarantees.

Synchronous networks have a known upper bound on message delivery time, making failure detection straightforward.

Partially synchronous networks have an upper bound that exists but is not known in advance.

Asynchronous networks have no upper bound on message delivery time. The Internet is asynchronous. This makes it impossible to distinguish between a failed node and a slow one, complicates failure detection, and limits the guarantees protocols can provide. This is what we expect from the Internet.

Security

Security in distributed systems differs from centralized systems because services run on remote machines, communication travels over public networks, and trust boundaries are unclear.

Key issues include authentication (who is making a request), authorization (what they are allowed to do), encryption (protecting data in transit), integrity checking (detecting tampering), and audit logging (recording actions).

Service Architectures

The client-server model has clients send requests to servers. It’s simple, but the server can become a bottleneck or single point of failure. It’s the core mechanism on which most other models are built.

Multi-tier architectures separate concerns into layers (presentation, application logic, data storage) that can be scaled independently.

Microservices architectures decompose applications into small, autonomous services with well-defined interfaces. Flexible but complex, with availability challenges from long dependency chains.

Peer-to-peer (P2P) systems have no central server; all participants communicate directly. Most practical P2P systems use a hybrid P2P model with servers for coordination.

Worker pools (also called processor pools or compute clusters) assign tasks to available computing resources on demand.

Cloud computing provides resources as a network service: IaaS (virtual machines), PaaS (application platforms), and SaaS (complete applications).

Communication Fundamentals

Distributed systems rely exclusively on message passing for coordination.

Network communication is slower, more variable, and less reliable than local computation. Messages may be delayed, duplicated, reordered, or lost, and these behaviors must be handled explicitly.

Internet Design Principles

The Internet is a packet-switched network designed to scale and survive failures. It follows the end-to-end principle, which places complexity at the endpoints rather than inside the network.

The network provides best-effort delivery, meaning packets are attempted but not guaranteed to arrive, to arrive once, to arrive in order, or to arrive within a fixed time. Recovery, reliability, ordering, and security are implemented by software at the endpoints.

Fate sharing places tracking the communication state at the endpoints rather than in the routers, so failures affect only the participants already involved.

Latency and Throughput

Two core metrics of a network are latency and throughput.

Latency is the time it takes for a single message or request to travel from the sender to the receiver.

Throughput (bandwidth) measures how much data can be transferred per unit time.

Latency and throughput are related but distinct. A system can have high throughput but high latency, or low latency but low throughput.

Many design choices in distributed systems trade latency for throughput. Reliability, ordering, and congestion control can improve throughput for sustained transfers while increasing latency for individual messages.

Layered Networking Model

Networking is structured as a layered stack.

The data link layer handles communication on a local network.

The network layer, implemented by IP, routes packets between machines across different physical networks.

The transport layer provides process-to-process communication using ports.

Higher layers implement application-specific protocols.

IP Networking

IP provides connectionless, unreliable datagram delivery between machines. Each machine is assigned an IP address. Data is broken into variable-size chunks called packets, and each packet contains an IP header that includes source and destination IP addresses.

Each packet is routed independently, with no guarantees of delivery or ordering.

Transport Protocols: TCP and UDP

TCP provides a reliable, ordered byte stream with congestion control and retransmission. It simplifies application development but can increase latency due to ordering constraints.

Head-of-line blocking occurs when delivery of later data is delayed because earlier data has not yet arrived, even if the following data has already been received. This can increase latency even when sufficient network capacity is available.

UDP provides best-effort, unordered datagram delivery with minimal overhead. Reliability and ordering, if needed, must be implemented by the application.

Port numbers in TCP and UDP headers complete the addressing needed for process-to-process communication. While an IP address identifies a machine, a port number identifies a specific socket on that machine. A process may open multiple sockets, each with its own port.

Choosing Between TCP and UDP

TCP is widely used because it provides a simple and powerful abstraction for reliable communication.

UDP is used when low latency matters, when applications can tolerate loss, or when applications want control over reliability and timing.

Protocols such as DNS (domain lookups) and NTP (setting time) use UDP because they send short messages, don’t want the overhead of setting up a connection, and the servers don’t want the overhead of maintaining connection state for potentially a huge number of clients. For NTP, any retries because of lost packets can result in asymmetric send/receive times, throwing off synchronization.

Choosing between TCP and UDP is a design decision about where responsibility for correctness and recovery should reside.

QUIC

QUIC is a modern transport protocol built on UDP. It provides TCP-like reliability and congestion control while supporting multiple independent streams to avoid head-of-line blocking.

QUIC runs in user space and exemplifies the end-to-end principle.

Key Points

Distributed systems trade local simplicity for scalability, availability, and flexibility.

The architecture determines whether adding components improves or harms availability.

Transparency is a design goal, but is rarely fully achievable.

Network timing assumptions affect what guarantees a system can provide.

Networking assumptions, particularly best-effort delivery and endpoint responsibility, shape all distributed system designs.

Understanding communication semantics is essential for reasoning about correctness, performance, and failure.

What You Don’t Need to Study


Week 2: Remote Procedure Calls and Web Services

The Remote Procedure Call (RPC) provides an abstraction for process-to-process communication that enables calling functions on remote systems without having to handle data formatting and message parsing. RPC transforms network communication from I/O-based read/write interfaces to familiar procedure call semantics.

RPC Operation

The ability to call a remote function is provided by stub functions. Instead of calling a remote function, the client calls a local client stub. On the server side, the server stub receives the request and calls the local function on that machine.

A Client stub (proxy) is a local function with the same interface as the remote procedure. It packages the parameters, sends them to the server, waits for a reply, extracts the results, and returns them to the caller. The client calls the stub as if it were a local function: it is one.

A Server stub (skeleton) registers the service, awaits incoming requests, extracts data from requests, calls the actual procedure, packages results, and sends responses back to clients. The server’s actual procedure doesn’t know it’s being called remotely.

Static stub generation uses an RPC compiler to read an interface definition and generate stub code before compilation. Dynamic stub generation creates proxy objects at runtime via reflection, eliminating the need for separate compilation. Languages like Java, Python, Ruby, JavaScript, and C# support dynamic generation. Languages like C, C++, Rust, and Go require static generation.

Marshalling is the process of packaging data into a network message. Serialization converts data elements into a flat byte array suitable for transmission. These terms are often used interchangeably in RPC contexts, though marshalling may include additional metadata like function identifiers or version numbers.

RPC is synchronous by default: the client blocks until the server responds. This matches local procedure call behavior but can be problematic for slow services. Some frameworks offer asynchronous variants where the client continues execution and handles responses later through callbacks or futures.

Challenges in RPC

Partial failure is the fundamental challenge that distinguishes distributed systems from local computing. Local procedure calls either work or the entire process crashes. With remote calls, the server may fail, the network may be slow or unreliable, or responses may be lost. The client cannot tell the difference between a slow server and a failed one.

Parameter passing is straightforward for parameters passed by value: just include the data in the message. Pass-by-reference is the harder case, because a reference is a memory address that has no meaning on the remote machine. The common solution is copy-restore (also called copy-in/copy-out): send the referenced data to the remote side, let the server function use a local reference, then send the possibly modified data back to the client. This can be expensive for large data structures.

Data representation differs across machines. Different processors use different byte ordering: big-endian systems store the most significant byte first, while little-endian systems store the least significant byte first. Intel and AMD processors use little-endian. Network protocols traditionally use big-endian (network byte order). Serialization formats must handle these differences so that data can be correctly interpreted on any machine.

Two strategies exist for handling data representation differences:

  1. A canonical format requires all senders to convert data to a standard format before transmission. ONC RPC used this approach with XDR, which always uses big-endian byte order. The disadvantage is that conversion happens on every message, even when communicating between identical architectures.

  2. Receiver-makes-right allows the sender to transmit data in its native format, and the receiver converts only if necessary. DCE RPC used this approach with NDR. This avoids unnecessary conversion when machines share the same architecture.

Interface Definition and Type Systems

Interface Definition Languages (IDLs) describe service interfaces in a language-neutral way. IDLs enable stub generation for multiple languages and provide a contract between clients and servers.

Schemas define data structure, field types, and encoding rules. They enable validation, code generation from definitions, and structured evolution as interfaces change over time. Common schema formats include Protocol Buffers, JSON Schema, Avro, and Thrift IDL.

Schemas define the data that gets serialized. Serialization is important for marshalling and for data storage in file systems or object stores.

Serialization formats use one of two approaches for encoding data:

  1. Implicit typing transmits only values without type information, requiring the receiver to know the expected sequence of parameters. This is efficient but does not easily support optional parameters.

  2. Explicit typing transmits type information with each value, making data self-describing but larger. JSON and XML use explicit typing.

Protocol Buffers (often called protobuf) is a binary serialization format developed at Google. Each field in a message has a unique numeric tag that identifies it in the binary format. Fields may be omitted and will take default values. Protocol Buffers is compact, efficient, and strongly typed. It supports schema evolution because new fields can be added with new tags, and old code ignores fields it does not recognize. It is typically 3 to 10 times smaller than equivalent JSON and is parsed 10 to 100 times faster. Other binary serialization formats include Apache Avro (used in big data ecosystems) and Apache Thrift.

Versioning allows interfaces to change while maintaining compatibility. Backward compatibility means new code can read data written by old code. Forward compatibility means old code can read data written by new code, typically by ignoring unknown fields. Strategies include adding optional fields, avoiding field removal, and maintaining multiple service versions simultaneously.

Versioning is critical in distributed environments because you cannot count on all clients getting updates. For serialized stored data, versioning is critical because new software may encounter older data formats.

Early RPC Systems

ONC RPC (Sun’s Open Network Computing RPC) was one of the first RPC systems to achieve widespread use, thanks to the popularity of Sun workstations and NFS. It introduced several concepts that became standard:

DCE RPC (Distributed Computing Environment RPC), defined by the Open Group, addressed some ONC RPC limitations. It introduced:

DCE RPC became the foundation for Microsoft’s RPC implementation and later DCOM.

Service Discovery

Services must be located before they can be invoked. Service discovery mechanisms include name servers that map service names to network locations, DNS for domain-based resolution, configuration files with hardcoded endpoints, and service meshes that manage service-to-service communication with sidecars.

Security

RPC calls often traverse networks where messages could be intercepted or modified. Security concerns include authentication (verifying the identity of clients and servers) and encryption (preventing eavesdroppers from reading message contents). Modern RPC systems typically use TLS to address both concerns.

Reliability and Failure Handling

Timeouts prevent clients from waiting indefinitely for unresponsive servers. A timeout specifies a duration, such as waiting at most 5 seconds.

Deadlines specify an absolute time by which an operation must complete, such as completing by 10:30:00 UTC. Deadlines are propagated through call chains so downstream services know how much time remains. With deadline propagation, if insufficient time remains, a service can fail fast rather than starting work that it cannot complete.

Cancellation allows clients to terminate in-progress requests when results are no longer needed, freeing resources on both client and server.

Retries handle transient failures but must be used carefully.

Exponential backoff progressively increases the delay between retry attempts to avoid overwhelming struggling services. Random jitter is added to that delay to prevent synchronized retry storms in which many clients retry at exactly the same time.

Circuit breakers prevent cascading failures by failing fast when a dependency is unhealthy. A circuit breaker monitors failure rates and trips when failures exceed a threshold, immediately failing subsequent requests without attempting the call. After a cooling-off period, the circuit breaker allows test requests through, and if these succeed, normal operation resumes. This prevents requests from piling up waiting for timeouts and gives the failing service time to recover.

Distributed Objects and Lifecycle Management

Traditional RPC invokes stateless procedures. Distributed objects extend RPC to support object-oriented programming with stateful remote objects. Objects have identity (each instance is distinct), state (maintained between method calls), and lifecycle (objects must be instantiated and destroyed). This introduces challenges around object identity, state management, and lifecycle.

Microsoft DCOM (Distributed Component Object Model) extended COM to support remote objects. It used a binary interface standard supporting cross-language interoperability and tracked object instances via interface pointer identifiers.

DCOM introduced surrogate processes, which are generic host processes on the server that dynamically load components as clients request them. This eliminated the need for developers to write custom server processes and provided fault isolation since component crashes affected only the surrogate, not the client.

DCOM faced challenges with configuration complexity, platform dependence (Windows only), and stateful object management. It evolved into COM+, which added transactions, security, and object pooling.

Java RMI (Remote Method Invocation) is Java’s native approach to distributed objects. Remote objects implement interfaces extending java.rmi.Remote and are registered with an RMI registry. Clients receive stubs that transparently handle marshalling and communication. RMI uses dynamic stub generation through reflection and Java’s native serialization. It remains relevant primarily in legacy enterprise Java applications.

Distributed Garbage Collection

Because objects are created when needed, they must be deleted when no longer needed. In a local program, the garbage collector tracks object references and frees memory when an object has no more references. In a distributed system, references span machine boundaries, and the garbage collector on one machine cannot see references held by another.

Reference counting tracks how many clients hold references to each object and deletes them when the count reaches zero. This fails when clients crash without releasing references, when messages are lost, or during network partitions. Systems like Python’s RPyC use reference counting. Microsoft COM uses reference counting locally; DCOM extends it with keep-alive pinging for distributed scenarios because reference counting alone cannot survive a crashed client.

Leases provide time-limited references to objects. Clients must periodically renew leases before they expire. If a client crashes or loses connectivity, the lease expires, and the server can safely delete the object. Leases use explicit time bounds: “you own this reference until time T, unless you renew.”

Heartbeats (also called keep-alive pinging) require clients to periodically send messages listing all objects they are using. The server treats silence as abandonment. Both leases and heartbeats require periodic messages from the client; the difference is in how the server decides the client is gone. With leases, each reference has an explicit expiration time and the server checks whether that time has passed. With pinging, the server tracks how long it has been since the last ping and assumes the client is dead if too many consecutive pings are missed. Leases are declarative (“you own this until time T”); pinging is inferential (“I have not heard from you in a while”). Java RMI uses lease-based garbage collection with explicit dirty/clean messages. Microsoft DCOM/COM+ uses keep-alive pinging, where clients periodically send ping sets listing active objects.

Both approaches accept that objects might occasionally be deleted prematurely if network delays prevent timely renewal, or kept alive slightly longer than necessary. These tradeoffs are generally acceptable given the alternative of memory leaks from crashed clients.

Why Web Services

Traditional RPC systems (ONC RPC, DCE RPC, DCOM) were designed for enterprise networks. They faced challenges on the internet: proprietary binary formats created interoperability problems across organizations, dynamic ports were blocked by firewalls, stateful objects complicated replication and load balancing, and strict request-response patterns could not handle streaming or notifications.

Web services emerged to solve these problems. By using HTTP as the transport, services could work through firewalls and proxies. By using text-based formats such as XML and JSON, services could achieve interoperability across different platforms and languages. The web’s existing infrastructure for security, caching, and load balancing could be reused.

XML-RPC was one of the first web services, marshalling data into XML messages transmitted over HTTP. It was human-readable, used explicit typing, and worked through firewalls. It failed to gain widespread adoption due to limited data types, lack of extensibility, XML’s verbosity, and missing features for interface definitions and schemas.

SOAP (formerly Simple Object Access Protocol) extended XML-RPC with user-defined types, message routing, and extensibility. SOAP supported various interaction patterns: request-response, request-multiple-response, asynchronous notification, and publish-subscribe. WSDL (Web Services Description Language) served as SOAP’s IDL for describing data types, messages, operations, and protocol bindings. SOAP declined due to complexity, heavyweight frameworks, interoperability issues between implementations, and verbosity.

REST

REST (Representational State Transfer) treats the web as a collection of resources rather than remote procedures. Resources are data identified by URLs. HTTP methods operate on resources using CRUD operations (Create, Read, Update, Delete):

REST is stateless, meaning each request contains all information needed to process it. Servers do not maintain client session state between requests. This improves scalability but shifts state management to clients or to the server application using session IDs and data stores.

REST typically uses JSON for data serialization. JSON is human-readable, lightweight, and has native browser support. However, it lacks schemas, requires parsing overhead, and has larger message sizes than binary formats.

REST emphasizes discoverability, with responses containing links to related resources that allow clients to navigate the API. REST is well-suited for CRUD operations but can be awkward for complex operations that do not map cleanly to resources.

gRPC

gRPC was developed at Google and uses Protocol Buffers for serialization and HTTP/2 for transport. HTTP/2 allows multiple requests and responses to flow over a single TCP connection simultaneously, uses compact binary headers, and supports server-initiated messages.

gRPC supports four communication patterns: unary (traditional request-response), server streaming (one request, multiple responses), client streaming (multiple requests, one response), and bidirectional streaming (multiple requests and responses interleaved).

gRPC includes built-in support for deadlines, cancellation, and metadata propagation. It has become the dominant choice for internal microservice communication.

gRPC is preferred for internal service-to-service communication, performance-critical paths, streaming requirements, and strongly typed contracts. REST over HTTP remains the default for public APIs, browser-based applications, basic CRUD operations, and situations where human readability is important.

Service-Oriented Architecture and Microservices

Service-Oriented Architecture (SOA) is an architectural approach that treats applications as integrations of network-accessible services with well-defined interfaces. Services are designed to be reusable components that communicate through standardized protocols. SOA emerged in the late 1990s and early 2000s alongside SOAP. SOA emphasized loose coupling between services, allowing them to evolve independently.

Microservices are a modern evolution of SOA principles. Both approaches structure applications as collections of services with well-defined APIs. The key differences are in implementation philosophy:

Microservices can be viewed as SOA implemented with modern tooling and a bias toward lightweight communication. The core insight is the same: decompose applications into independent services that can evolve separately.

Benefits of microservices include independent scaling of components, technology flexibility per service, fault isolation, and parallel development by multiple teams. Drawbacks include distributed system complexity, network overhead, complex debugging and testing, operational burden, and eventual consistency challenges.

The modular monolith alternative keeps a single deployable unit while enforcing internal module boundaries. It captures organizational benefits like clearer ownership and cleaner interfaces without paying the cost of distributed communication.

Microservices make sense when independent change is the real problem: different teams, different release schedules, or very different scaling requirements. They are a poor fit when the main problem is product iteration, when teams are small, or when operational overhead is unaffordable.

Observability

Observability is the ability to reconstruct what happened from the signals a system emits. When requests cross multiple services, failures and slowdowns rarely show up where the problem actually is.

Logs record discrete events and are most useful when they include a request ID.

Traces record the path of a single request through the system across multiple services. The general idea is that tracing tells you where the time went when one user request fans out into many internal calls. You do not need to remember the implementation details.

Request IDs (also called correlation IDs or trace IDs) are generated when a request enters the system and propagated through all downstream calls. The ID is included in logs and trace context to reconstruct the full request path during debugging. Request IDs are for observability, helping you understand what happened. They differ from idempotency keys, which ensure operations are not duplicated.

Circuit breakers prevent cascading failures by failing fast when a dependency is unhealthy, rather than letting requests pile up waiting for timeouts.

General Summary

RPC transforms low-level network communication into familiar procedure call interfaces through generated stubs, marshalling, and interface definitions. The evolution from traditional RPC through SOAP and REST to gRPC reflects how distributed computing has changed: from enterprise networks to the internet, from XML to JSON and Protocol Buffers, from request-response to streaming.

Communication in distributed systems requires careful attention to reliability and failure handling. Timeouts prevent resource exhaustion. Retries handle transient failures. Idempotency and idempotency keys make operations safe to retry. Circuit breakers prevent cascading failures. Observability with request IDs and tracing enables understanding system behavior.

The right technology choice depends on context: REST for public APIs that need universal client support and human-readable payloads, gRPC for internal communication and performance, with many systems using multiple technologies for different purposes.

Microservices are a tool for organizational scaling, not a technical silver bullet. They make sense when independent deployment is the real problem, but introduce significant operational complexity.

What You Don’t Need to Study


Week 3: Clock Synchronization

UTC (Coordinated Universal Time) is the primary time standard used worldwide to track time. Time zones are an offset from UTC. Computers track time and try to stay close to UTC. They accomplish this by synchronizing their clocks with a system that knows the time.

How Computers Keep Time

Computers maintain time using hardware and software components. When a computer boots, the operating system reads time from a battery-powered real-time clock (RTC), a chip designed for low power consumption to survive power outages, not for accuracy. Once the OS is running, it maintains a more accurate system clock by reading high-resolution hardware counters, such as the Intel timestamp counter (TSC) or the ARM Generic Timer.

Most systems represent time as elapsed seconds since an epoch: January 1, 1970, 00:00:00 UTC for Unix systems, or January 1, 1601 for Windows. This representation avoids timezone confusion, daylight saving time ambiguities, and reduces time arithmetic to integer operations. The system clock that applications see is called wall time: it tracks UTC but can jump when synchronization corrections are applied.

Accuracy, Precision, and Resolution

Accuracy measures how close a measurement is to the true value. If your clock shows 12:00:00.005 and true UTC is 12:00:00.000, your clock has 5ms of error.

Resolution is the smallest time increment a clock can represent. A nanosecond-resolution clock can distinguish events 1 nanosecond apart. Higher resolution does not guarantee accuracy.

Precision is the consistency of repeated measurements. A clock consistently 5ms fast is precise but not accurate.

When we say “NTP achieves 10ms accuracy,” we mean clocks are within 10ms of true UTC, not that they measure time in 10ms increments.

Why Physical Clocks Drift

All physical clocks drift. A quartz oscillator’s frequency depends on temperature, manufacturing variations, atmospheric pressure, humidity, and aging. Consumer hardware typically drifts at 50-100 parts per million (ppm), meaning clocks can drift apart by almost nine seconds per day. Without synchronization, distributed systems quickly lose any agreement on time.

The Clock Model

A physical clock can be modeled as:

\[C(t) = \alpha t + \beta\]

where:

Drift is the rate error: how fast a clock runs compared to true time. Offset is the instantaneous difference between a clock and true time. Even after perfect synchronization (zero offset), drift causes the offset to grow again. This is why periodic resynchronization is essential.

Clock Adjustment

When synchronization detects an offset, systems prefer slewing over stepping:

Slewing gradually adjusts the rate at which the clock advances, so each tick of real time advances the system clock by slightly more or slightly less than one tick. The displayed time catches up or falls back without ever going backward. This is preferred for small offsets (typically below 128ms).

Stepping instantly jumps to the correct time. This may be used for larger offsets (often ≥ 128ms). Stepping may break applications measuring durations or using timestamps (e.g., software build environments).

Cristian’s Algorithm

The most direct synchronization approach sends a request to a time server and receives a timestamped reply. The challenge is network delay: by the time the response arrives, it no longer reflects the current time. Cristian’s algorithm assumes the delay is symmetric and the timestamp was generated at the midpoint of that delay.

Algorithm:

  1. Client sends request at local time t0

  2. Server responds with timestamp TS

  3. Client receives reply at t1

  4. Client estimates time as \(T_S + \frac{t_1 - t_0}{2}\)

In reality, the server’s time may have been generated before or after the midpoint of the delay, potentially leading to an error in the time value. If we know the best-case network transit time, it will place additional limits on the error beyond the overall delay.

Error bound: If the minimum one-way delay is tmin, the error will be:

\[\epsilon \leq \frac{(t_1 - t_0) - 2t_{\min}}{2}\]

Clients can retry to find the lowest round-trip time, which yields the tightest error bound.

Additive errors: When machines synchronize in chains (A from B, B from C), errors accumulate. A’s total error = εA + εB. This is why systems generally would try to avoid a deep hierarchy.

A limitation of Cristian’s algorithm is that it has a single point of failure: the server.

Network Time Protocol (NTP)

NTP solves the single point of failure problem through a hierarchical architecture:

Fault tolerance through multiple sources: NTP encourages systems to query multiple time servers and use statistical techniques to identify and reject outliers. NTP combines the remaining time offset estimates using a weighted average, with more weight given to more reliable servers. NTP tracks each server’s jitter (delay variation) and dispersion (accumulated timing uncertainty), favoring more reliable sources.

Synchronization algorithm uses four timestamps:

Offset: \(\theta = \frac{(T_2 - T_1) + (T_3 - T_4)}{2}\)

The network delay is the round-trip time minus the estimate of the processing delay on the server:

Delay: \(\delta = (T_4 - T_1) - (T_3 - T_2)\)

Clock discipline gradually adjusts the system clock. For small offsets (< 128ms), it slews. For large offsets, it steps. The discipline learns and compensates for drift over time by adjusting the tick frequency of the system clock.

SNTP is a stripped-down subset suitable for clients that only consume time. It omits the sophisticated filtering and clock discipline of full NTP.

Precision Time Protocol (PTP)

PTP achieves sub-microsecond synchronization through hardware timestamping. Network interface cards with PTP support capture packet transmission and receipt timestamps at the physical layer, eliminating millisecond-level variability from software network stacks.

Architecture: A grandmaster clock provides authoritative time. Unlike NTP, where clients initiate requests, PTP is master-initiated: the grandmaster periodically multicasts sync messages.

The Best Master Clock Algorithm (BMCA) automatically selects the most suitable grandmaster based on priority, clock quality, accuracy, and stability.

PTP uses a four-message exchange:

  1. Sync message at T1

  2. Follow_Up containing T1

  3. Delay_Req from slave at T3

  4. Delay_Resp containing T4

The first two messages are split because some hardware cannot embed an accurate timestamp in the Sync message itself. The Follow_Up message exists only to deliver the timestamp T1 that the master captured when it sent Sync.

Offset: \(\frac{(T_2 - T_1) - (T_4 - T_3)}{2}\)

Cost: Unlike NTP, PTP requires specialized network cards and switches with hardware timestamping support.

TrueTime

Google’s Spanner database uses a system called TrueTime that takes a different approach to distributed time. Instead of returning a single time value, TrueTime returns an interval that is guaranteed to contain true UTC. A typical interval might be 7 milliseconds wide. Spanner uses these intervals to order globally distributed transactions: when a transaction commits, Spanner waits out the uncertainty interval before acknowledging, so any later transaction is guaranteed to have a strictly later timestamp.

TrueTime requires GPS clocks and atomic clocks in every data center, so it is out of reach for most systems. The interesting contrast with NTP and PTP is that TrueTime makes the uncertainty in distributed time explicit rather than hiding it inside a point estimate.

Cloud Time Services

Cloud providers offer high-accuracy time services as managed infrastructure. AWS Time Sync, Azure Precision Time Protocol, and Google Public NTP each combine GPS and atomic clocks to deliver microsecond to sub-millisecond accuracy without requiring customers to deploy GPS hardware. Google’s NTP service uses leap second smearing, spreading the leap second over many hours so applications never see a 23:59:60 second or a sudden one-second jump. Hybrid and cross-cloud deployments still need to account for skew between providers.

When Physical Time Is Not Enough

Even perfectly synchronized physical clocks cannot order events that occur faster than clock resolution. At hundreds of thousands of events per second, many events share the same timestamp. More fundamentally, network delays obscure true ordering: an event timestamped earlier at one machine might arrive at another machine after local events with later timestamps.

For distributed databases, the question that has to be answered is one of causality: whether one update could have seen another, not chronology. This leads to logical clocks.

What You Don’t Need to Study


Week 3: Logical Clocks

The Limits of Physical Time

Physical clock synchronization cannot solve all ordering problems in distributed systems. Even with perfectly synchronized clocks, events can occur faster than clock resolution. Many events share the same timestamp. More fundamentally, network transmission delays mean that when an event is timestamped at one machine, its timestamp may not reflect when it becomes visible elsewhere.

Consider a distributed database where two clients concurrently update the same record. The question is not when the updates occurred in absolute time, but whether one update could have seen the other. If the updates are truly concurrent (neither saw the other), the system needs conflict resolution. If one happened after the other, the later one supersedes the earlier. This is a question of causality, not chronology.

The Happened-Before Relationship

Leslie Lamport’s 1978 paper recast event ordering in distributed systems around causality rather than physical time. The relevant question is not “when did this happen?” but “could this event have influenced that one?” Lamport defined the happened-before relationship, written \(\rightarrow\), as a partial ordering on events. A partial ordering allows some pairs of elements to be incomparable: concurrent events have no ordering. This contrasts with a total ordering where every pair must be ordered consistently.

Definition: For events a and b:

  1. If a and b occur on the same process and a occurs before b in that process’s execution, then \(a \rightarrow b\)

  2. If a is the event of sending a message and b is the event of receiving that message, then \(a \rightarrow b\)

  3. If \(a \rightarrow b\) and \(b \rightarrow c\), then \(a \rightarrow c\) (transitivity)

If neither \(a \rightarrow b\) nor \(b \rightarrow a\), we say a and b are concurrent, written \(a \parallel b\). Concurrent events happened independently with no potential causal influence.

The happened-before relationship captures potential causality: if \(a \rightarrow b\), then information from event a could have reached event b through the system’s communication channels.

Lamport Timestamps

Lamport timestamps assign each event a logical clock value such that if \(a \rightarrow b\), then timestamp(a) < timestamp(b).

Note the direction: we guarantee causally ordered events have increasing timestamps, but the converse is not true. If timestamp(a) < timestamp(b), we cannot conclude that \(a \rightarrow b\). The events might be concurrent.

Algorithm:

  1. Each process maintains a counter, initially zero

  2. On an internal event: increment the counter

  3. When sending a message: increment the counter and include its value in the message

  4. When receiving a message with timestamp T: set counter = max(counter, T) + 1

This ensures the happened-before property: if \(a \rightarrow b\) through a chain of events and messages, each step increases the counter, so timestamp(a) < timestamp(b).

Creating a total ordering: Lamport timestamps only provide a partial ordering: some timestamps may be identical. We can extend them to make each timestamp unique by combining Lamport timestamps with process IDs:

\((t_1, p_1) < (t_2, p_2)\) if \(t_1 < t_2\), or if \(t_1 = t_2\) and \(p_1 < p_2\)

This breaks ties by process ID, giving every pair of events a definite order even if they are concurrent.

Limitation: Lamport timestamps cannot detect concurrency. If you observe timestamp(a) < timestamp(b), you cannot determine whether a causally precedes b or whether they are concurrent.

Vector Clocks

Vector clocks fully capture causal relationships and can detect concurrent events. Instead of maintaining a single counter, each process maintains a vector of counters: one for each process in the system.

For a process Pi, the entry Vi[j] represents “process i’s knowledge of how many events process j has executed.” Initially all entries are zero.

Algorithm:

  1. On an internal event at Pi: increment Vi[i]

  2. When sending a message from Pi: increment Vi[i] and include the entire vector in the message

  3. When receiving a message with vector Vmsg:

    • For all k: set Vi[k] = max(Vi[k], Vmsg[k])

    • Then increment Vi[i]

In words, what vector clocks do is increment the count in the vector position representing the process. When a message is received, the process updates each element of the vector with the corresponding value in the received vector if that value is greater.

This propagates knowledge: when you receive a message, you learn everything the sender knew.

Comparing vector clocks:

Vectors Va and Vb satisfy:

Implementation: Real systems usually implement vector clocks as sets of (processID, counter) tuples rather than fixed-size arrays. This handles systems where processes join dynamically and not all processes communicate with all others. You only track processes you have heard from.

Scalability: Each process maintains O(n) state where n is the number of processes. This works well for dozens to hundreds of processes, but at larger scale the vector itself can dominate the message size.

Hybrid Logical Clocks (HLC)

Hybrid logical clocks bridge the gap between physical and logical time. Physical clocks provide real-world time but drift and jump. Logical clocks provide perfect causality but lose connection to wall-clock time. For some applications, like databases or version control systems, it is useful to have both: human-friendly wall time and the ability to track causal relationships.

An HLC timestamp consists of two components:

Together, (L, C) behaves like a logical clock while staying close to physical time.

Properties:

HLC works with commodity servers that synchronize their clocks (typically via NTP).

Trade-off: HLCs are more complex than pure logical clocks, with timestamps that are slightly larger (two components), but they fill a gap that pure physical or pure logical clocks could not for systems that need both causal consistency and time-based queries.

Process Identification

In distributed systems, “process ID” refers to a globally unique identifier for a computation entity, not a local Unix process ID. Common approaches to identify a process are:

The reincarnation problem: If a process crashes and restarts, should it be the same process or a new one? Most systems treat it as new to avoid causality violations from lost state.

Choosing a Clock

For most distributed systems, the choice is between vector clocks and hybrid logical clocks:

Use vector clocks when:

Use hybrid logical clocks when:

Lamport timestamps provide the conceptual foundation but are rarely used directly: systems that need ordering usually also need concurrency detection.

Matrix clocks exist for specialized applications requiring common knowledge tracking. They require O(n2) space and are rarely deployed.

What You Don’t Need to Study


Week 4: Group Communication, Mutual Exclusion, and Leader Election

Sending, Receiving, and Delivering

These distinctions apply to any protocol handling reliability or ordering, not just multicast.

Sending is the act of transmitting a message from an application through the communication layer to the network.

Receiving is the act of a machine accepting a message from the network. The message has arrived at the machine but is not yet visible to the application.

Delivering is the act of passing a received message to the application. This is when the application actually sees and processes the message.

Between receiving and delivering, the communication layer may take one of three actions:

All ordering and reliability guarantees refer to the delivery order, not the receipt order.

IP Multicast

IP Multicast provides network-layer one-to-many delivery. It uses two protocols: IGMP handles communication between hosts and their local routers, while PIM handles the routing of multicast traffic between routers.

IGMP (Internet Group Management Protocol) operates between hosts and their directly connected routers. When a host wants to receive multicast traffic for a group, it sends an IGMP membership report (join message) to its local router. The router then ensures that multicast traffic for that group flows to that network segment. Routers periodically send IGMP queries to discover which groups have active members, and hosts respond with membership reports.

PIM (Protocol Independent Multicast) routes multicast traffic across the network. It is called “protocol independent” because it uses the existing unicast routing table rather than building its own. PIM operates in two modes that reflect different assumptions about how receivers are distributed.

PIM Dense Mode uses a flood-and-prune approach. The protocol initially floods multicast traffic to all routers, and routers with no interested receivers send prune messages upstream to stop receiving traffic. This mode is appropriate when most subnets have receivers interested in the traffic.

PIM Sparse Mode requires receivers to explicitly request traffic. The protocol uses a Rendezvous Point (RP) that serves as a meeting point for sources and receivers. When a host joins a multicast group, routers send Join messages toward the RP, building a shared distribution tree. Sources send their traffic to the RP, which then distributes it down the tree to all receivers. This means a source only needs to send one stream to the RP regardless of how many receivers exist. Sparse mode is appropriate when receivers are sparsely distributed across the network.

IP Multicast works well in controlled environments such as cable TV networks and data center trading systems. However, most ISPs block it at network boundaries due to traffic engineering complexity, billing challenges, and security concerns.

Application-Level Multicast

Production distributed systems implement application-level multicast over reliable unicast. Reliability and ordering are two independent dimensions that can be combined as needed.

Reliability Levels

Unreliable multicast provides best-effort delivery with no guarantees. Messages may be lost, duplicated, or delivered to only some recipients.

Best-effort reliable multicast guarantees that if the sender completes without crashing, all live recipients receive the message. It also guarantees no duplication and no spurious messages.

The implementation uses timeouts and retransmission: the sender transmits to all recipients using reliable unicast (such as TCP), waits for acknowledgments, and retransmits if an acknowledgment does not arrive within the timeout period. However, this approach does not guarantee consistency if the sender crashes mid-transmission, and messages do not survive system restarts.

Reliable multicast provides strong consistency guarantees even when the sender crashes during transmission. It guarantees three properties:

Reliable multicast does not guarantee persistence across system restarts.

Durable multicast adds persistence to reliable multicast. Messages are written to stable storage before being acknowledged, so they survive crashes and restarts. Apache Kafka is an example of a system that provides durable multicast.

Publish-subscribe (pub/sub) systems apply group communication at scale. Publishers send to named topics; subscribers register interest in topics. The topic acts as a named multicast group. Pub/sub is covered in detail later.

Ordering Levels

Unordered delivery provides no guarantees about message sequence. Messages may arrive in any order at different recipients.

Single-source FIFO ordering (SSF) guarantees that messages from the same sender are delivered in the order they were sent.

Formally: if a process sends multicast(G, m) and later sends multicast(G, m′), then every correct process that delivers m′ will have already delivered m.

The implementation uses per-sender sequence numbers: each sender maintains a counter and attaches it to each message, and receivers buffer messages until they can be delivered in sequence.

Causal ordering guarantees that if message m1 happened-before message m2, then m1 is delivered before m2 at all processes.

Formally: if multicast(G, m) → multicast(G, m′), where → denotes happened-before, then every correct process that delivers m′ will have already delivered m. Causal ordering implies single-source FIFO ordering because messages from the same sender are causally related.

The implementation uses vector clocks (the same data structure from the logical clocks notes). Each message carries the sender’s vector, and the receiver buffers messages until all causally preceding messages have been delivered.

Total ordering guarantees that all processes deliver all messages in the same order.

Formally: (1) if a process sends m before m′, then any correct process that delivers m′ will have already delivered m; and (2) if a correct process delivers m before m″, then every correct process that delivers both will deliver m before m″.

Total ordering does not imply causal or single-source FIFO ordering; it only requires that everyone agrees on the same order. One implementation uses a central sequencer; another uses distributed agreement where processes propose and converge on sequence numbers.

Synchronous ordering uses a sync primitive (a special set of messages) that acts as a barrier. When a process issues a sync, it blocks until all in-flight messages have been delivered to all recipients.

Formally: if a process sends messages M, then issues a sync, then sends messages M′, every correct process will deliver all messages in M before delivering any message in M′.

This creates logical message groups or epochs with clean boundaries between them. The sync primitive is used for view changes to ensure all old-view messages are delivered before transitioning to a new view.

Real-time ordering would deliver messages in actual physical time order (mirroring exactly when they were sent). This is impossible to implement perfectly because clocks cannot be perfectly synchronized.

Atomic multicast combines reliable multicast with total ordering. It guarantees that all correct processes deliver the same set of messages in the same order. Some systems additionally require the total order to respect causality.

Failure Detection

Failure detection determines which processes have crashed. In asynchronous systems, perfect failure detection is impossible because a slow process is indistinguishable from a crashed one. This observation underlies the FLP impossibility result, which proves that consensus cannot be guaranteed in asynchronous systems where even one process might crash.

Real systems work around FLP by adding timeouts (accepting occasional false suspicions), assuming partial synchrony (eventual timing bounds), or using randomness.

False positives occur when a failure detector incorrectly suspects a live process has crashed. False negatives occur when a failure detector fails to detect that a process has actually crashed.

Heartbeat-based detection uses periodic messages to indicate liveness. There are two approaches:

A monitor that does not receive heartbeats within a timeout period suspects the process has failed. The choice of timeout involves a tradeoff: shorter timeouts detect failures faster but generate more false positives.

The Phi Accrual Failure Detector

The phi accrual failure detector outputs a continuous suspicion level (φ) rather than a binary alive/dead judgment.

The detector maintains a sliding window of time gaps between consecutive heartbeats and learns what “normal” timing looks like for this particular connection. When a heartbeat is late, the detector calculates how surprising this silence is given the learned distribution.

The φ value is on a logarithmic scale: φ = k means roughly a 10−k probability that this delay is a normal variation. For example, φ = 3 means about a 0.1% (one in a thousand) chance that the process is still alive and the heartbeat is just delayed; φ = 8 means about a 0.000001% chance.

Applications choose a threshold based on their needs. A lower threshold means faster detection but more false positives. Apache Cassandra (a distributed database) uses this detector with a configurable threshold that defaults to 8. The key advantage is that it separates monitoring from decision-making: the detector provides suspicion information, and applications decide when to act.

Virtual Synchrony and Group Membership

Group membership maintains a consistent view of which processes are in a group. The membership is tracked collectively by all members rather than by a central server. Each process runs a group membership service (GMS) layer that monitors other members, participates in view change protocols, and notifies the application when membership changes.

A view is a snapshot of group membership containing a unique identifier (typically a monotonically increasing number) and a list of member processes. All processes in a view agree on its membership.

Message Stability

When a process multicasts a message, it does not get delivered immediately. The protocol works as follows:

  1. The sender sends the message to all group members.

  2. Each member receives the message, buffers it, and sends an acknowledgment back to the sender.

  3. When the sender receives acknowledgments from all members of the current view, the message is stable.

  4. The sender sends a stability announcement to all members.

  5. Only after receiving this announcement can members deliver the message to the application.

A message is unstable if it has been received by some members but the sender has not yet confirmed stability (either because acknowledgments are still pending or because the sender crashed before completing the protocol). Unstable messages are held in a buffer and cannot be delivered to applications.

This protocol ensures that if a sender crashes mid-transmission, no member will have delivered a partially-sent message. Either the message becomes stable (all members have it and can deliver), or it remains unstable and is eventually discarded.

View Changes

A view change is a protocol that transitions all members from one view to another when membership changes due to a join, leave, or failure. The protocol ensures that all processes agree on which messages were delivered in the old view before transitioning.

The challenge is handling in-flight messages: messages that have been received and buffered by some members but are unstable because the sender crashed or has not yet confirmed stability. These messages are sitting in member buffers, waiting for a stability announcement that may never come.

The view change protocol works in three phases:

  1. Flush: Processes stop sending new application messages and exchange information about which messages they have buffered but not yet delivered. This propagates any in-flight messages to all surviving members.

  2. Collect: Each process waits to receive flush messages from all surviving members. After this phase, all survivors have the same set of buffered messages.

  3. Commit: Processes agree on the new view membership. Messages that all survivors have are marked stable and delivered. Messages that only some members received are discarded. All processes then atomically transition to the new view.

When a process fails, the failure detector notices the unresponsive process, and any detecting process can initiate a view change. The failed process is excluded from the new view. A recovered process must rejoin as a new member.

Recovery and State Transfer

If a failed process recovers, it cannot simply rejoin the group with its old state. While the process was dead, the group continued operating: messages were delivered, state changed, and views advanced. The recovered process has stale state and an outdated view of the world.

To rejoin, the recovered process must perform a state transfer:

  1. The recovering process contacts an existing group member and requests a state transfer.

  2. The existing member sends its current state (either a full snapshot or a recent checkpoint plus subsequent updates).

  3. The recovering process initializes its replica to this state.

The state transfer is treated as an atomic event: no other processing occurs at the recovering process until the transfer is complete. This ensures the process does not attempt to participate in the group with partially updated state.

Once the state transfer completes, the recovered process joins the group as a new member through the normal join protocol, triggering a view change that adds it to the membership.

Virtual synchrony coordinates membership changes with message delivery. The key guarantee is view synchrony: if a message is delivered in a view, it is delivered in that same view at all processes that deliver it. This ensures that all processes agree on what the group membership was when each message was delivered.

Distributed Mutual Exclusion

Distributed mutual exclusion ensures that at most one process is in a critical section at any time without relying on shared memory. The algorithms must satisfy three properties:

Centralized Mutual Exclusion Algorithm

The centralized approach designates one process as the coordinator. When a process wants to enter the critical section, it sends a request message to the coordinator. If the critical section is free, the coordinator sends a grant message back immediately. If the critical section is occupied, the coordinator queues the request. When the process in the critical section finishes, it sends a release message to the coordinator, which then sends a grant to the next process in the queue.

This approach requires only three messages per critical section entry (request, grant, release) and is simple to implement. The drawback is that the coordinator is a single point of failure and a potential performance bottleneck.

Lamport’s Mutual Exclusion Algorithm

Lamport’s algorithm is fully distributed with no central coordinator. Each process maintains a request queue ordered by Lamport timestamp, with ties broken by process ID to ensure a total order.

When a process wants to enter the critical section, it timestamps its request and sends it to all other processes. When a process receives a request, it adds it to its local queue and sends an acknowledgment back to the requester. A process may enter the critical section when two conditions are met:

  1. Its own request is at the head of its queue (meaning it has the earliest timestamp), and

  2. It has received acknowledgments from all other processes.

When a process exits the critical section, it sends a release message to all other processes. Upon receiving a release, each process removes that request from its queue.

The algorithm requires 3(N−1) messages per critical section entry: N−1 requests, N−1 acknowledgments, and N−1 releases. It guarantees fairness because requests are ordered by timestamp. However, it assumes all processes are correct and responsive; if one process crashes and stops sending acknowledgments, other processes will block indefinitely.

Ricart-Agrawala Algorithm

The Ricart-Agrawala algorithm optimizes Lamport’s algorithm by eliminating the release messages. Instead of always acknowledging immediately, a process defers its reply if it also wants the critical section and has a higher priority (earlier timestamp).

When a process wants to enter the critical section, it timestamps its request and sends it to all other processes. When a process receives a request, it compares the request’s timestamp to its own. If it does not want the critical section, or if its own request has a later timestamp, it sends a reply immediately. Otherwise, it defers the reply until after it exits the critical section.

A process enters the critical section when it has received replies from all other processes. Upon exiting, it sends all the deferred replies.

This reduces the message count to 2(N−1) per entry: N−1 requests and N−1 replies. Like Lamport’s algorithm, it requires all processes to respond.

Token Ring Mutual Exclusion

The token ring algorithm organizes processes in a logical ring and uses a circulating token. Only the process holding the token may enter the critical section.

The token circulates continuously around the ring. When a process receives the token and wants the critical section, it enters. When it finishes (or if it does not want the critical section), it passes the token to its neighbor.

This approach provides bounded waiting and is simple to implement. The drawbacks are that the token generates continuous network traffic even when no process wants the critical section, and if the token is lost due to a crash, a recovery mechanism must regenerate it while avoiding duplicate tokens.

Algorithm Comparison

Algorithm Messages Fault Tolerance When to Use
Centralized 3 Coordinator failure blocks progress When simplicity is important and fast coordinator recovery is available
Lamport 3(N−1) Requires all processes respond When fairness and full distribution are needed
Ricart-Agrawala 2(N−1) Requires all processes respond Optimal choice for permission-based mutual exclusion
Token Ring 1 to N−1 Token loss requires recovery When bounded waiting is important and requests are frequent

Leader Election

Leader election selects a single coordinator from a group of processes. The elected leader typically has the highest process ID among surviving processes.

Bully Algorithm

The bully algorithm assumes a synchronous model with timeouts for failure detection. It uses three message types: ELECTION (to announce a new election), OK (to acknowledge an election message and tell the sender a higher-ID process is alive), and COORDINATOR (to announce the winner).

When a process P detects that the coordinator has failed, it initiates an election by sending ELECTION messages to all processes with higher IDs. If P receives no OK responses within a timeout, it declares itself coordinator and sends COORDINATOR messages to all other processes. If P does receive an OK response, it knows a higher-ID process will take over and waits for a COORDINATOR message.

When a process receives an ELECTION message from a lower-ID process, it sends an OK response and starts its own election if it has not already. The highest-ID surviving process always wins.

The worst-case message complexity is O(n2), occurring when the lowest-ID process initiates the election. The best case is O(n).

Ring Election Algorithm

The ring election algorithm organizes processes in a logical ring. When a process notices the coordinator has failed, it creates an election message containing its own ID and sends it clockwise to its neighbor.

When a process receives an election message, it compares the ID in the message with its own:

When a process receives an election message containing its own ID, it knows its message has traveled all the way around the ring and its ID is the largest. It then sends an ELECTED message around the ring to announce itself as the new coordinator.

The worst case is 3N−1 messages, occurring when the process immediately following the highest-ID process initiates the election.

What You Do Not Need to Study

You do not need to memorize exact pseudocode, PIM protocol details, or the phi calculation formula. Focus on understanding the concepts: why IGMP and PIM exist (host membership vs. router distribution), why sparse mode uses an RP (explicit joins vs. flooding), how phi adapts to observed network conditions, and why virtual synchrony coordinates membership with delivery.


Week 5: Consensus

The Core Problem

Consensus is the problem of getting a group of nodes to agree on a single value, even when some nodes crash or messages are delayed. It comes up in leader election, transaction commit, log ordering, and configuration management. Every fault-tolerant distributed system is built on top of some form of consensus.

Without proper consensus, a network partition can cause both sides to elect their own leader and accept writes independently. This condition is called split-brain: the system ends up with two divergent versions of state that must be reconciled or rolled back when the partition heals.

Replicated State Machines

A state machine is deterministic: the same inputs in the same order always produce the same state. State machine replication runs identical copies of the state machine on multiple servers, ensuring they all apply the same commands in the same order. A replicated log is the data structure that imposes that order – consensus ensures all servers agree on the contents of the log.

Key property: if every server starts from the same initial state and executes the same log entries in order, they will all reach the same state. This is why log ordering is the central challenge.

Consensus Properties

Three properties required for consensus are:

Agreement and validity are safety properties (nothing bad happens). Termination is a liveness property (something good eventually happens). The tension between them is central to the difficulty of consensus.

FLP Impossibility

The FLP Impossibility Result proves that in a purely asynchronous distributed system, no deterministic algorithm can guarantee consensus if even one process may crash.

The core obstacle: in an asynchronous system, there is no way to distinguish a crashed process from a very slow one. This makes it impossible to deterministically break certain deadlocked states without risking a safety violation.

FLP does not mean consensus is impossible in practice. All real protocols handle it by guaranteeing safety unconditionally but sacrificing liveness under extreme instability (for example, if no leader can be elected because the network is too chaotic). Eventually, when the system stabilizes, progress resumes.

Paxos

Paxos was created by Leslie Lamport around 1989 and finally published in 1998. It is the foundational consensus algorithm in the field.

Paxos has three roles: proposers (initiate proposals), acceptors (vote), and learners (learn the decided value). In practice, servers typically play all three roles.

The protocol relies on a simple but powerful property: any two majorities of acceptors in a group of n share at least one member. That overlap means a new majority cannot form without including at least one acceptor that participated in an earlier majority. Combined with Paxos’s promise and value-selection rules, this prevents two different values from being chosen for the same decision.

The algorithm runs in two phases:

A value is decided once a majority of acceptors have accepted it.

Multi-Paxos extends single-decree Paxos to decide a sequence of values (a log) by running a separate Phase 2 per log slot, with a stable leader that skips Phase 1 after the first slot. When the leader changes, the new leader must run Phase 1 again for any log slots it has not yet resolved, which is why leader instability is expensive in Paxos.

Paxos is notoriously difficult to implement correctly. It leaves important practical questions unspecified: conflict resolution between concurrent proposers, cluster membership changes, and recovery from partial failures. Real deployments (Google Chubby, Apache ZooKeeper’s Zab variant, Google Spanner) required substantial engineering beyond the base algorithm.

Raft

Raft was developed to provide the same safety guarantees as Multi-Paxos but is designed to be significantly easier to understand and implement. It is used in many newer systems, including etcd, CockroachDB, TiKV, Consul, and YugabyteDB.

Terms

Raft divides time into terms, each numbered with a consecutive integer. A term begins with an election. If a candidate wins, it serves as leader for that term. If no one wins (split vote), a new term begins. Terms serve as a logical clock – servers reject messages from older terms and update their term when they see a higher one.

Server States

Every server is in exactly one of three states. A follower is passive: it responds to requests from leaders and candidates but does not initiate any. All servers start as followers. A candidate is a follower that has timed out waiting for a heartbeat and has initiated an election. A leader handles all client requests, replicates log entries to followers, and sends periodic heartbeats to prevent new elections.

Leader Election

Each follower maintains a randomized election timeout (typically 150–300 ms). If it expires without hearing from a leader, the follower starts an election:

  1. Increment the current term.

  2. Transition to candidate state and vote for itself.

  3. Send RequestVote RPCs to all other servers.

  4. A server grants its vote if it has not already voted this term and the candidate’s log is at least as up-to-date as its own.

  5. If the candidate receives votes from a majority, it becomes leader and immediately sends heartbeats to suppress new elections.

  6. If no candidate wins (split vote), the term ends with no leader and a new election begins with a higher term.

“More up-to-date” is defined precisely: a log is more up-to-date if its last entry has a higher term, or if the terms are equal, the longer log wins. This restriction ensures that a candidate cannot win unless its log contains all committed entries. The randomized timeout makes it unlikely that multiple candidates start elections at the same time.

Log Replication

Once a leader is elected, it handles all client requests. For each command, the sequence is:

  1. The leader appends the command to its own log, tagged with the current term and index.

  2. It sends AppendEntries RPCs to all followers in parallel.

  3. Once a majority of servers have acknowledged the entry, the leader commits it.

  4. The leader applies the entry to its state machine and returns the result to the client.

  5. Subsequent AppendEntries messages inform followers of the commit index, at which point followers apply the committed entries to their own state machines.

Each AppendEntries RPC includes the index and term of the entry immediately preceding the new one. A follower rejects the RPC if its own log does not match at that position. When this happens, the leader backs up and retries from an earlier entry until it finds a point of agreement, then overwrites any conflicting entries from that point forward.

The Log Matching Property guarantees that if two entries in different logs share the same index and term, the logs are identical through that index. This invariant is what makes the consistency check in AppendEntries sufficient to detect and repair divergence.

Commit Rules

An entry is committed once stored on a majority. However, the leader may not commit entries from previous terms directly – it must first commit an entry from its own current term. Old entries become committed implicitly when the current-term entry is committed. This prevents a subtle safety bug involving overwriting entries that were transiently replicated but not truly committed.

Safety: The Leader Completeness Property

If an entry is committed in a given term, it will appear in the log of every leader in all subsequent terms. This follows from the election restriction: a candidate can only win if its log is at least as up-to-date as any majority, which means it must have all committed entries. Safety in Raft is unconditional – the protocol never allows two servers to commit different entries at the same index, as long as fewer than half the servers fail.

Liveness

Liveness is conditional. Raft requires a stable elected leader to make progress. If elections repeatedly fail due to network instability, the system stalls. In practice, randomized timeouts prevent this from being a persistent problem.

Cluster Membership Changes

Adding or removing servers requires care. Raft uses joint consensus, a two-phase approach: the cluster transitions through a configuration that includes both old and new member sets, requiring majority agreement from both before switching to the new configuration alone. Even when adding or removing one server at a time, the reconfiguration must ensure majority intersection across the transition – joint consensus is how Raft guarantees that property.

Log Compaction

Logs grow indefinitely. Servers periodically take a snapshot of the state machine and discard all log entries before that point. If a follower falls too far behind, the leader sends it the snapshot directly via an InstallSnapshot RPC.

Key Takeaways

Consensus is hard because crash failures make it impossible to distinguish a dead process from a slow one. FLP tells us something fundamental about this: in a fully asynchronous model, no algorithm can be both safe and live if any process can crash. Real protocols choose safety and accept that they may stall temporarily.

Raft improves on Paxos primarily through clarity of design: a single strong leader, randomized election timeouts, and unified handling of log replication and leader completeness. The safety guarantees are equivalent; the difference is in how easy the protocol is to reason about and implement correctly.

What You Don’t Need to Study


Week 6: Coordination Services and Network-Attached Storage

Coordination Services

Distributed systems frequently need to answer questions like: who is the current leader? Is this lock held? What is the current configuration? Getting these answers wrong can be catastrophic. If two nodes both believe they are the leader, they will independently accept writes and the system state will diverge.

The natural approach is a dedicated replicated coordinator made fault-tolerant through consensus. A coordination service provides that. It is a small, strongly consistent, highly available store that distributed applications use to coordinate decisions and share small amounts of state. Every coordination service we study (Chubby, ZooKeeper, and etcd) uses consensus internally.

What Coordination Services Provide

All three services share a core set of capabilities:

Chubby

Chubby is a lock service and configuration store from Google. A Chubby deployment is a cell of five replicas, one of which is the master elected via Paxos. The other replicas participate in getting replicated data via consensus, but redirect client requests to the master. Three of five replicas must be alive for the cell to function, allowing two simultaneous failures. For performance, Chubby caches all contents in memory.

Chubby exposes a file system interface: locks, configuration data, and service addresses are all stored as named files in a hierarchical namespace. Locks are advisory and coarse-grained, meaning they are held for long periods and are not enforced by the system if code chooses to ignore them.

When a client opens a file, it receives a lease – a time-bounded guarantee that its cached copy is valid. The master sends cache invalidations to other clients when a file is written. If the master fails, a new one is elected, broadcasts a new epoch, and gives clients a grace period to reconnect. Clients that do not reconnect within the grace period have their sessions and locks released.

ZooKeeper

ZooKeeper was developed at Yahoo and is open-source. Rather than providing locks as a primitive, it provides building blocks from which locks, leader election, barriers, and other coordination patterns can be constructed.

Data is stored in a tree of znodes, each holding a small amount of data. There are two types of znodes:

  1. Persistent znodes survive client disconnection and remain until explicitly deleted.

  2. Ephemeral znodes are automatically deleted when the client session that created them ends. This is the key mechanism for failure detection.

Either type can optionally be created as a sequential znode, which causes ZooKeeper to append a monotonically increasing integer to the name. This is essential for implementing locks without causing thundering herd problems.

The thundering herd problem occurs when many clients are all waiting for the same condition, and a single state change wakes them all simultaneously. They all rush to retry at once, creating a burst of load on the coordination service, while only one of them can make progress. In a ZooKeeper lock implementation, the fix is to have each waiting client watch only its immediate predecessor in the queue rather than the lock node itself. When the lock is released, exactly one client is notified instead of all of them.

ZooKeeper uses a consensus protocol similar to Raft to replicate writes through a leader in a globally consistent order. Reads can be served by any replica and are sequentially consistent; clients can issue a sync to ensure a replica has caught up to recently committed writes before reading.

Watches are one-shot notifications. A client sets a watch when it reads a znode (checking existence, data, or children), and ZooKeeper delivers an event if the relevant state changes. The watch fires once and is then removed; the client must re-register if it wants continued monitoring. The usual pattern is to treat the event as “something changed,” re-read the current state from ZooKeeper, and re-register the watch. This keeps clients consistent even if multiple changes occur quickly or during a brief disconnection.

etcd

etcd is the authoritative store for Kubernetes cluster state. It uses Raft for consensus. Unlike Chubby and ZooKeeper’s hierarchical namespace, etcd stores a flat key-value map: keys are arbitrary byte strings, and hierarchy is a naming convention, not an enforced structure. Prefix range queries and prefix watches provide directory-like behavior.

etcd routes reads through the leader by default, giving linearizable reads: reads that reflect the most recently committed write, as if the entire system had a single consistent view at that instant. Serializable reads (served locally by any replica) are available as an opt-in for workloads that can tolerate slight staleness. Leases with TTLs provide the same self-cleaning behavior as ZooKeeper’s ephemeral znodes. Transactions with compare-and-swap conditions enable atomic leader election and lock acquisition.

Common Coordination Patterns

These patterns apply to all three coordination services; only the specific primitives differ.

Leader election. Replicas contend for a well-known name in the coordination service. Exactly one wins. The winner’s claim disappears when its session expires, allowing others to compete again.

Distributed locks. Acquiring the lock is a write-through consensus that provides global ordering. Locks built on ephemeral nodes or leases are self-cleaning: a crashed holder’s session expires, and the lock releases automatically. Coordination services are suited to coarse-grained locks: locks held for long periods protecting large resources (a master election, a configuration update). They are not suited to fine-grained locks held for milliseconds to protect individual rows or records. High-frequency lock acquisitions and releases would overwhelm a system built around consensus.

Configuration management. Services store configuration in the coordination service. Updates go through consensus and are applied consistently. Clients watch configuration keys for changes.

Service discovery. A running instance registers its address under a known prefix using an ephemeral key. The list of active instances stays current because dead servers’ keys expire automatically.

Fencing tokens. A monotonically increasing number associated with each lock grant. The protected resource rejects any request carrying a token lower than the highest it has seen, preventing a stale lock holder that woke up after a pause from corrupting shared state.

What Coordination Services Do Not Do

A coordination service is designed to small amounts of metadata. It is not a database or a message queue, and is not suitable for large files, frequent access, or high-throughput writes. A useful rule of thumb is: if the data is on the critical path of every client request, it does not belong in a coordination service.

More fundamentally, electing a leader through a coordination service does not guarantee that the application as a whole behaves correctly. Coordination serializes decisions; application correctness is still the developer’s responsibility.

Network-Attached Storage

Access Transparency and VFS

The goal of networked file systems is access transparency: applications use standard file system calls (open, read, write, close) against remote files without any awareness that the storage is remote. This is achieved through the Virtual File System (VFS) layer, adopted by every major Unix-derived OS. VFS defines a standard interface that any file system driver must implement. The kernel always talks to this interface; whether the driver beneath it issues disk commands or sends network requests is invisible to applications.

Mount points attach different file systems into a single directory tree. A remote file system client is a VFS driver that translates standard file operations into network requests. When the response arrives, the result passes back through the VFS interface to the application.

Design Dimensions

Every networked file system must navigate three fundamental tradeoffs:

Consistency. Multiple clients may cache the same file. Keeping those caches consistent requires either frequent polling against the server or a protocol where the server pushes invalidations to clients.

State. A stateless server holds no information about client activity between requests. Every request is self-contained. Crash recovery is trivial because there is nothing to recover. But statelessness makes locks, open file tracking, and cache invalidation impossible. A stateful server enables richer semantics at the cost of recovery complexity: after a crash, open files, locks, and cached state must be rebuilt or cleaned up.

Caching. Options range from write-through (immediate server update), to write-behind (delayed batch), to write-on-close / session semantics (send changes only on close). Callbacks, where the server tracks which clients have cached a file and pushes invalidations on modification, require statefulness but eliminate polling.

NFS

NFS was designed to be simple, stateless, and interoperable across any networked system. It was built on openly published RPC and data encoding standards and was ported to many operating systems in both client and server roles.

Because the server is stateless, NFSv2 has no OPEN, CLOSE, LOCK, SEEK, or APPEND procedures. Clients identify files by file handles: opaque server-generated identifiers that persist across server restarts. The server uses UDP as the transport because the stateless, idempotent design makes retries safe.

Key limitations of stateless NFS:

NFS caches data in blocks and validates cached data using timestamp comparison: the client checks the file’s modification time on the server when a file is opened, and after a short validity timeout. This gives close-to-open consistency: stale reads are possible between opens.

AFS

AFS was designed to fix the scalability problem of NFS. Workload measurements showed that most file accesses are reads, files are usually accessed by one user at a time, and most files are small enough to cache entirely. This motivated the upload/download model and whole-file caching: when a file is opened, the entire file is downloaded to the client’s local disk. Reads and writes operate on the local copy. On close, if modified, the file is uploaded back. This gives session semantics: changes are visible to other clients only after the file is closed.

The mechanism that makes aggressive caching safe is the callback promise: when the server delivers a file to a client, it promises to notify the client if the file is modified. When a client uploads a modified file, the server sends callback revocations to all other clients that hold the file. Those clients invalidate their cached copies. Because files are read far more than they are written, most accesses proceed from the local cache with no server interaction at all.

AFS enforces a uniform global namespace: all AFS content appears under /afs on every client machine, with the cell name (e.g., cs.rutgers.edu) as the second path component. The same path resolves to the same file regardless of which client machine the user is on. NFS has no such guarantee; administrators mount remote directories at arbitrary local paths. File system content is organized into volumes that administrators can move between servers transparently via referrals.

Coda

Coda extended AFS to support laptops and mobile workstations that might lose network connectivity.

Coda allows a volume to be replicated across a Volume Storage Group (VSG). The subset of VSG servers reachable at any moment is the Accessible Volume Storage Group (AVSG).

When no server is reachable, the client enters disconnected operation mode and works entirely from its local disk cache.

AFS and Coda are no longer widely deployed. AFS survives at some universities and research institutions, but its operational complexity and aging authentication model have made it difficult to justify in new deployments. Coda remained a research prototype.

SMB

Microsoft’s Server Message Block protocol was designed with the opposite philosophy from NFS: stateful, connection-oriented, and built to enforce Windows file-sharing semantics. SMB tracks every open file, every lock, and every byte range under lock at the server. This enabled mandatory locking, byte-range locks, and the semantics Windows applications expected. The cost was that server crashes lost all session state.

Opportunistic locks (oplocks) give the server a way to grant clients caching rights. The server monitors file access and sends an oplock break to the caching client when a conflict arises, requiring it to flush writes before the server allows the competing open. This is the same idea as AFS callbacks, applied at finer granularity. Later versions of Windows generalized oplocks into leases with cleaner semantics that can also cover directory metadata.

SMB 2 dramatically modernized the protocol with several performance improvements:

macOS adopted SMB 2 as its default file sharing protocol (replacing AFP). macOS also supports NFS for Unix-oriented environments, but SMB is the default.

NFSv4

NFSv4 abandoned statelessness. Clients now open and close files explicitly, and the server tracks state. Key improvements over NFSv2/v3:

The Convergence

The key mechanisms that modern NFS and SMB both now provide, starting from very different origins:

Mechanism NFS v2/v3 NFSv4 SMB 1 SMB 2+
Stateful server No Yes Yes Yes
Compound/pipelined requests No Yes No Yes
Client caching grants No Yes (delegations) Yes (oplocks) Yes (oplocks + leases)
Server-to-client notification No Yes Yes Yes
Referrals No Yes Yes (via DFS) Yes (via DFS)
Strong authentication Optional Mandatory NTLM/Kerberos Kerberos/NTLMv2
Transport UDP or TCP TCP only TCP TCP

NFS is dominant in Linux, Unix, and HPC environments. SMB is dominant in Windows enterprise environments and is the default on macOS.

Microsoft’s referral support comes via DFS (Distributed File System), a separate namespace service that has worked alongside SMB since the late 1990s. DFS maps logical paths to physical server locations and issues referrals when clients access those paths. It is not specific to SMB 2 or later; it predates SMB 2 and works across SMB versions.

Consistency Semantics Summary

What You Do Not Need to Memorize


Week 7: Decentralized Storage

We cover three distinct approaches to distributing storage and lookup across many nodes: data-intensive distributed file systems (GFS and HDFS), distributed hash tables (CAN, Chord, and Dynamo), and DNS as a planet-scale distributed data store.

Distributed File Systems: GFS and HDFS

Goals and Workload

The Google File System (GFS) was designed for a very specific workload: enormous files (multi-GB), sequential reads and writes, a high rate of concurrent appends, and an environment where commodity hardware failures are routine. It prioritizes throughput over latency and fault tolerance by design rather than by exception.

The key insight driving the design is that if you know your workload in advance, you can build a system optimized for it rather than a general-purpose system that handles everything adequately.

Separation of Data and Metadata

GFS separates the control plane (metadata) from the data plane (file content). A single master node holds all metadata in memory: the namespace, the mapping of files to chunks, and chunk replica locations. Clients contact the master only to find out where data lives. All actual data transfer happens directly between clients and chunkservers, bypassing the master entirely.

This design keeps the master out of the data path, allowing it to handle many concurrent clients without becoming a throughput bottleneck.

Chunks and Replication

Files are divided into fixed-size chunks of 64 MB, each identified by a unique 64-bit handle. Each chunk is stored as an ordinary file on a chunkserver’s local disk and replicated on three chunkservers by default. Replication is what makes the system fault-tolerant: if one chunkserver fails, two others still hold the data.

Master State and Fault Tolerance

The master stores metadata in memory for fast access. Namespace and file-to-chunk mappings are persisted to an operation log that is replicated on remote machines. Chunk locations are not persisted; the master rebuilds them by polling chunkservers at startup.

The operation log is the authoritative record of the file system. The master periodically checkpoints its state so that log replay after a crash is fast.

The Two-Phase Write

Writes in GFS involve two distinct phases, and understanding why they are separated matters.

Phase 1 (data transfer): The client pushes data to all replicas in a pipelined chain, from one chunkserver to the next. No writes occur yet; the data is simply buffered at each replica.

Phase 2 (write request): Once all replicas confirm receipt, the client sends a write request to the primary replica, the one holding the current lease for that chunk. The primary assigns a serial number to the mutation and applies it locally. It then forwards the write request to all secondaries, which apply the mutation in the same order. The primary acknowledges success to the client after all secondaries respond.

Separating data flow from control flow improves network efficiency. The pipelining in phase 1 ensures each network link carries the data exactly once. The primary’s lease serializes concurrent mutations in phase 2, ensuring all replicas apply changes in the same order.

Additional Operations

GFS adds two operations beyond the standard file interface. Record append lets GFS choose the offset and guarantees that the data appears atomically at least once, even with concurrent appenders; this supports producer-consumer workflows without explicit locking. Snapshot creates a copy of a file or directory tree cheaply using copy-on-write: chunks are shared with the original until one is modified.

Fault Detection

Chunkservers send heartbeats to the master. A chunkserver that stops sending heartbeats is marked as failed; its chunks are re-replicated from surviving copies. Each chunk stores checksums over its data; a chunkserver detects corruption by verifying checksums on read or during background scans.

When granting a lease, the master also increments the chunk version number and notifies all replicas. Any replica that missed the update because it was offline will have a stale version number; the master will not direct clients to it and will schedule re-replication from an up-to-date copy.

Client Caching

GFS clients do not cache file data. They do cache chunk location metadata returned by the master, with a timeout, so they can read from chunkservers repeatedly without re-querying the master for every request.

The Namespace

GFS does not use per-directory data structures. The namespace is a single flat lookup table mapping full pathnames to metadata. There are no hard links or symbolic links.

HDFS: What Carried Over and What Changed

HDFS (Hadoop Distributed File System) is essentially an open-source re-implementation of GFS in Java. The one-to-one mapping is:

The architecture, replication strategy, and fault-detection mechanisms remain the same.

Some key differences are:

Beyond GFS

GFS served Google well for nearly a decade. As files became smaller and more numerous, the single master became a metadata bottleneck. Google replaced GFS with Colossus, which distributes the metadata function. HDFS followed the same direction with Federation. Both converged on the same lesson: at sufficient scale, metadata is itself a distributed systems problem.

Dropbox

Dropbox applies the same core principle as GFS (separate data from metadata) to a consumer file synchronization service. Its design decisions are worth understanding because it faced a different workload and scaling problem than internal batch-processing systems.

The original design goal of Dropbox was to keep a designated folder synchronized across all of a user’s devices. Any change on one device should propagate to the server and then to all other connected devices, transparently and in the background.

Unusual read/write ratio: Most content sites (social media, news, streaming) are heavily read-dominated: data is written once and read many times. Dropbox is close to a 1:1 ratio, because stored data is rarely read except to push a change to another device. This means Dropbox must handle an unusually high volume of uploads relative to its user base.

Chunking and deduplication: Dropbox splits files into fixed-size blocks, identifies each block by its SHA-256 hash, and uploads only the blocks the server does not already have. If two users store identical content, only one copy exists on the server. This dramatically reduces storage and upload volume.

The metadata server: Like GFS, Dropbox separates block data from metadata. Block data (actual file content) lives in a scalable block store. The metadata server stores file names, directory structure, the list of blocks that make up each file, and version history. When a client downloads a changed file, it asks the metadata server for the current block list and then fetches only the blocks it is missing from the block store. The metadata server is never in the data path for actual file content.

Polling vs. notifications: Early Dropbox clients polled the server every few seconds, asking “anything new?” With tens of thousands of clients polling simultaneously, most server responses were just “no.” The solution was a notification server. Clients hold a persistent connection to the notification server. When a change occurs, the server pushes a message to the affected clients, telling them to sync. Clients then contact the metadata server to find out what changed. This shifts the server from constantly answering empty polls to only sending messages when there is actually something to say.

Distributed Hash Tables

A Distributed Hash Table (DHT) is a decentralized system that provides a key-value lookup interface: given a key, find the value. There is no central server. Responsibility for keys is divided among participating nodes, and any node can route a query to the correct node in a bounded number of hops.

The motivation was peer-to-peer networks. Centralized index servers (like Napster) are vulnerable to shutdown. A DHT provides the same lookup functionality without any central authority.

Consistent Hashing

All DHTs rely on consistent hashing. Keys are hashed into a large identifier space, and each node is responsible for a contiguous range of values within that space. Adding a new node takes over a portion of an existing node’s range; removing a node transfers its range to neighboring nodes. In either case, only the keys in the affected range move. This minimizes key movement under membership changes. Different DHTs implement the range assignment differently: CAN uses geometric zones in a multi-dimensional space; Chord uses a one-dimensional ring with successor pointers.

CAN

CAN (Content Addressable Network) maps keys to points in a d-dimensional Cartesian coordinate space. Each node owns a rectangular zone. A lookup for a key hashes to a point in the space and routes to that point: each hop moves closer. The expected lookup time is O(d · n^(1/d)) hops with O(d) neighbors per node (roughly 2d neighbors for a d-dimensional space). Node joins split an existing zone; departures merge zones with a neighbor.

Chord

Chord organizes nodes and keys on a one-dimensional ring using SHA-1 hashes. A key belongs to its successor: the node with the smallest ID greater than or equal to the key’s hash. Pure successor-pointer routing takes O(n) hops. The finger table at each node stores pointers to nodes at exponentially increasing distances around the ring, enabling O(log n) lookup. Node joins and departures are managed by a stabilization protocol that keeps successor pointers and finger tables up to date.

Amazon Dynamo

Dynamo is Amazon’s internal key-value store, designed to power shopping cart and similar services with strict availability and latency requirements. It builds on consistent hashing but adds mechanisms that make it a complete production storage system rather than a lookup primitive.

Virtual nodes: Each physical server owns many positions (vnodes) on the ring. This balances the load more evenly and spreads the impact of failures and joins across many existing nodes rather than concentrating it on one.

Replication: Each key is replicated on N consecutive nodes on the ring (N is typically 3). This preference list is known to all nodes.

Quorum reads and writes: A write completes after W replicas acknowledge it; a read completes after R replicas respond. With R + W > N, every read is guaranteed to overlap with the most recent write. With N=3, R=2, W=2, the system tolerates one replica failure without blocking either reads or writes.

Conflict resolution: Dynamo uses eventual consistency. Concurrent writes may produce conflicting versions. Dynamo attaches vector clocks to values so that the system (and application) can detect divergence. Applications are responsible for merging conflicts; for a shopping cart, this means taking the union of both versions.

Hinted handoff: When a preferred replica is unavailable, a write goes to a different node with a “hint” indicating its intended destination. The hinted node delivers the write to the intended replica once it recovers.

Gossip protocol: Membership and failure information propagate through the cluster via gossip: periodic exchanges with randomly selected peers rather than through a central coordinator.

Dynamo vs. Classic DHTs

Property Chord Dynamo
Purpose Decentralized routing Production key-value store
Routing O(log n) hops through intermediaries One-hop to target (full ring state known)
Consistency Not a data store; n/a Eventual consistency
Replication Not included N replicas, quorum (R, W)
Deployment Designed for open internet/churn Stable data center environment
Conflict handling n/a Vector clocks + application merge

Domain Name System (DNS)

DNS maps human-readable domain names to IP addresses. It is a hierarchical, distributed, cacheable database that handles hundreds of billions of queries per day without a central server.

Hierarchy and Delegation

The DNS namespace is a tree. Responsibility is delegated at each level: the root delegates top-level domains (TLDs, like .edu), TLD operators delegate second-level domains (like rutgers.edu), organizations delegate subdomains (like cs.rutgers.edu). Each delegated zone is managed independently by its owner. No single server knows all mappings.

Resolution

When an application looks up a name, it calls a local library function that hands the query to a stub resolver: a minimal client built into the operating system that checks the local hosts file, checks its cache, and if neither has the answer, forwards the query to a recursive resolver.

The recursive resolver (typically provided by an ISP or a public service like 8.8.8.8) does the actual work. It performs iterative resolution by walking down the hierarchy.

Each level in this chain is authoritative for its own zone and only its own zone. No server needs global knowledge; each one knows only who to refer to next.

Caching and TTL

Every DNS response carries a Time To Live (TTL). Resolvers cache responses for the TTL duration, answering subsequent queries without hitting authoritative servers. Caching is aggressive and layered: the recursive resolver, the OS, and the browser all cache. TTL is a tunable trade-off: longer TTL means lower load on authoritative servers but slower propagation of updates; shorter TTL means faster updates but higher query load.

Limitations

DNS is not strongly consistent: clients may hold stale cached answers until the TTL expires. It is read-mostly: updates propagate through TTL expiry, not through a distributed write protocol. It is not searchable: you can look up a name but not query across names. And it was not designed with security: DNS spoofing and cache poisoning are real attacks; DNSSEC adds cryptographic signatures to mitigate them but deployment is incomplete.

DNS as a Design Pattern

DNS illustrates what makes a distributed system scale globally: hierarchical organization, delegation, aggressive caching with bounded staleness, and a workload that is overwhelmingly read-heavy. These properties keep the query load from concentrating on any single server while allowing each zone owner to manage its own data independently.


What You Don’t Need to Study


Week 8: Distributed Transactions

Transactions and ACID

A transaction is a sequence of operations treated as a single logical unit of work. A transaction either commits (all changes made permanent) or aborts (all changes rolled back). Transactions are specifically designed to be abortable: aborting leaves no partial state behind. The write-ahead log makes this work: changes are written to a sequential log before being applied to data, allowing recovery to redo committed transactions and undo incomplete ones after a crash.

The ACID properties define correctness for transactions: Atomicity (all-or-nothing), Consistency (valid state to valid state), Isolation (concurrent transactions do not observe each other’s partial results), and Durability (committed changes survive crashes). These properties are straightforward to provide on a single machine. In a distributed setting, each property requires expensive coordination: atomicity needs 2PC, isolation needs distributed locking, and the availability cost multiplies with every participant.

Note that the “consistency” in ACID refers to application-level data integrity constraints. The “consistency” in distributed systems and the CAP theorem refers to what values a read is allowed to return across replicas. The two uses of the word are unrelated.

Concurrency Control

Concurrency control enforces isolation. The standard goal is serializability: concurrent transactions must produce results equivalent to some serial execution. A schedule is a sequence of interleaved operations from concurrent transactions; a serializable schedule is one equivalent to some serial execution.

There are two main approaches: pessimistic concurrency control assumes conflicts are likely and prevents them using locks; optimistic concurrency control assumes conflicts are rare and checks for them only at commit time.

Two-phase locking (2PL) is the standard pessimistic protocol. A transaction has a growing phase (acquires locks, releases none) and a shrinking phase (releases locks, acquires none). This rule prevents the interleavings that produce inconsistent reads. Strict 2PL holds write locks until commit or abort, preventing cascading aborts. SS2PL holds all locks until commit or abort and is implemented by many commercial databases.

Read (shared) locks allow multiple concurrent readers. Write (exclusive) locks grant exclusive access. Multiple read locks can coexist; a write lock conflicts with all other locks.

Optimistic concurrency control (OCC) proceeds in three phases: working (reads and writes go to a private workspace, no locks held), validation (check for conflicts at commit time), and update (apply the workspace if validation passes). OCC is deadlock-free and efficient when conflicts are rare, but transactions may be aborted and restarted after completing all their work.

Multi-Version Concurrency Control (MVCC) maintains multiple versions of each data item. A common design gives each transaction a snapshot at start time and returns the newest committed version visible in that snapshot. This is called snapshot isolation. Different systems implement the exact visibility rules differently, but reads never block because they always draw from a stable snapshot. Write-write conflicts are resolved at commit time by a first-committer-wins rule.

Deadlock

Deadlock arises from locking: a set of transactions each hold locks needed by another in the set, forming a cycle nobody can break. Four conditions must hold simultaneously: mutual exclusion, hold and wait, non-preemption, and circular wait. OCC and MVCC do not hold blocking locks and are therefore deadlock-free.

The wait-for graph (WFG) represents lock dependencies: an edge from T1 to T2 means T1 is waiting for a lock held by T2. A cycle indicates deadlock. In a distributed system, each node sees only its local WFG edges; a deadlock can span multiple machines with no single node seeing the full cycle.

Three practical approaches exist. Ignoring deadlocks relies on application-level timeouts; acceptable in some systems but not in transactional databases, where a slow-but-live node triggers the same timeout as a genuinely deadlocked one. Detection finds cycles; Prevention makes cycles structurally impossible.

Centralized detection has one node collect all local WFGs and search for cycles. It is simple but produces phantom deadlocks: false positives caused by asynchronous snapshot collection, where an edge appears in the global graph after the underlying lock has already been released. The Chandy-Misra-Haas algorithm avoids the global snapshot by chasing edges with probe messages. A blocked transaction T0 sends a probe to the node holding the resource. The probe propagates along dependency edges; if it returns to T0, a cycle exists. When a deadlock is confirmed, the system aborts at least one transaction, typically the youngest or the one that has done the least work.

Timestamp-based prevention assigns each transaction a unique timestamp at start time and uses it to decide who waits and who aborts on a conflict, ensuring that WFG edges always point in the same direction so cycles are impossible. The two standard schemes are wait-die and wound-wait.

Two-Phase Commit (2PC)

Two-Phase Commit ensures all nodes in a distributed transaction either all commit or all abort.

The protocol uses a coordinator-participant model. In Phase 1 (Prepare/Voting), the coordinator sends a PREPARE message to all participants. Each participant checks whether it can commit, writes a prepare record to stable storage, and responds YES or NO. A YES vote is a durable promise: the participant must be able to commit even after a crash. If a participant fails to respond, the coordinator waits and keeps retrying; it does not treat silence as a NO. The protocol assumes a fail-recover model.

In Phase 2 (Commit or Abort), if all participants voted yes, the coordinator writes a commit record and broadcasts COMMIT. If any participant voted no, the coordinator broadcasts ABORT. Participants execute the decision and release their locks.

2PC requires unanimous agreement, not a majority. Any single participant can veto, and any unresponsive participant blocks the protocol indefinitely. This is inherent: the transaction must complete everywhere or nowhere, so no majority shortcut is available.

The critical vulnerability is the uncertain state: a participant that has voted yes but has not received the coordinator’s decision cannot unilaterally commit or abort. If the coordinator fails in this window, all participants block with locks held. 2PC is a blocking protocol. The practical remedy is to replace the single coordinator with a Raft- or Paxos-replicated group.

Availability cost: 2PC chains availability multiplicatively. At 99.9% per database, a five-database transaction has only 0.999^5 ≈ 99.5% availability, close to two days of downtime per year. This cost is a major driver of BASE-style architectures at internet scale.

Three-Phase Commit (3PC)

Three-Phase Commit was designed to eliminate 2PC’s blocking behavior. It inserts a PreCommit phase between voting and the final commit so a recovery coordinator can determine the intended decision: if any participant has seen PreCommit, commit; if none has, abort.

3PC eliminates blocking under single-node failures but assumes a synchronous network with bounded message delay. A partition during the PreCommit phase can cause split-brain behavior. Because of this, 3PC is not used in practice.

How 2PC Relates to Raft, Paxos, and Virtual Synchrony

Virtual Synchrony provides atomic multicast within a process group and is fast, but cannot survive network partitions. Raft and Paxos are fault-tolerant consensus algorithms that use majority agreement and are useful for making the 2PC coordinator fault-tolerant, but neither has a concept of a participant vetoing a decision and neither can substitute for 2PC itself. 2PC uses unanimous agreement and is designed specifically for transactional atomicity. In practice, Raft or Paxos replicates state within each participant group, and 2PC coordinates across groups.

ACID

ACID formalizes the correctness requirements that concurrency control and 2PC are designed to satisfy.

Atomicity is all-or-nothing; 2PC achieves this across multiple nodes. Consistency ensures valid state transitions; the database enforces integrity constraints by aborting violations. Isolation prevents concurrent interference; locking, OCC, and MVCC enforce this. Durability ensures committed changes survive crashes; the write-ahead log provides this.

Consistency Models

Consistency models define what values a read is allowed to return given a history of writes across replicas. Stronger models give more intuitive guarantees at higher coordination cost.

Linearizability is the strongest practical model. Every operation appears to take effect instantaneously at some point between its invocation and completion, in an order consistent with real time. Two key ideas follow from this:

  1. Each operation appears atomic (all at once, not partially), and

  2. If one operation finishes before another begins, the first must appear earlier in everywhere.

Linearizability does not require wall-clock timestamps for overlapping operations: if two operations overlap in time, either order is valid; only non-overlapping operations must respect real-time ordering. etcd provides linearizability for all operations; ZooKeeper provides linearizable writes but sequentially consistent reads by default. Linearizability is the definition of “C” in CAP.

Sequential consistency relaxes the real-time requirement of linearizability. There must exist some global total order of all operations consistent with each process’s program order, but that order need not match wall-clock time.

Under linearizability, if a write completes before a read begins, the read must see that write. Under sequential consistency, the system may order them differently as long as each client’s own program order is preserved.

Causal consistency only requires causally related operations to appear in the same order for all processes. Causally independent operations may be seen in different orders. Vector clocks implement this model.

Eventual consistency is the weakest useful model. All replicas will eventually converge if no new updates arrive, but there is no constraint on what reads return in the interim.

Serializability and Linearizability

Serializability is a property of transactions (multi-step, multi-object operations). It requires that concurrent transactions produce results equivalent to some serial execution, with no real-time constraint on which order is chosen.

Linearizability is a property of individual operations on a single object. It requires each operation to appear instantaneous in an order consistent with real time.

The two properties are independent: a database can be serializable without being linearizable, and a key-value store can be linearizable without supporting transactions.

The CAP Theorem

The CAP theorem states that when a network partition occurs, a distributed system cannot simultaneously guarantee both consistency (specifically linearizability) and availability. Partition tolerance is not a design choice; real networks partition. The practical choice is between C and A when a partition actually happens.

CAP is commonly summarized as “you can have at most two of C, A, and P.” That framing is imprecise: partition tolerance is a constraint imposed by the network, not a property you trade away. The C-versus-A trade-off only arises during a partition; when the network is healthy, a well-designed system can provide both.

PACELC

PACELC extends CAP by observing that even during normal operation, there is a trade-off between latency and consistency. Strong consistency requires coordinating writes across a quorum before responding, which adds latency. Eventual consistency allows a local replica response, which is faster. PACELC classifies systems as PA/EL, PA/EC, or PC/EC. Cassandra and Dynamo are PA/EL; Spanner and HBase are PC/EC.

BASE

BASE (Basically Available, Soft State, Eventually Consistent) is the design philosophy adopted by large-scale internet systems as a response to the constraints revealed by CAP and PACELC. If strong consistency imposes availability costs during partitions and latency costs during normal operation, you design systems that accept weaker consistency in exchange for higher availability and lower latency. BASE is not a protocol; it shifts the burden of handling inconsistency from the system to the application.

ACID vs. BASE

The choice is driven by application requirements. Financial transfers and medical records need ACID; social feeds and product catalogs can tolerate BASE semantics. Many modern systems are hybrid, using ACID for core transactional data and BASE for derived or display data.

Key Relationships

Concept Notes
Concurrency control Pessimistic (2PL/SS2PL) vs. optimistic (OCC) vs. versioned (MVCC)
Commit protocol 2PC (unanimous, blocking, practical) vs. 3PC (non-blocking but impractical)
Consistency model strength Linearizability → Sequential → Causal → Eventual
CAP choice during partition CP (block rather than serve stale) vs. AP (serve stale rather than block)
Design philosophy ACID (correctness over availability) vs. BASE (availability over consistency)

You Don’t Need to Study

The following topics are covered in the lecture notes for completeness but will not appear on exams or homework.


Week 9: Distributed Databases

NoSQL and Data Model Categories

NoSQL systems are databases designed to scale horizontally across many machines. They often relax some of the assumptions of traditional relational databases, such as fixed schemas or full transactional support across all data, in order to improve scalability, availability, or flexibility. The term does not describe one single design. It covers several different data models.

The main categories are the following:

  1. Key-value stores store data as a mapping from a key to a value. The system treats the value as opaque. This makes the model simple and easy to distribute, but limits queries beyond direct key lookup. Amazon Dynamo and distributed hash tables (DHTs) are the main examples to remember here.

  2. Column-family stores, also called wide-column stores, organize data into rows and columns, but allow different rows to have different columns. They are designed for large-scale storage and high write throughput. Bigtable, HBase, and Cassandra are the main examples.

  3. Document stores store structured documents, usually in a JSON-like form. They keep more structure than key-value stores and often support richer queries over document fields.

  4. Graph databases store data as nodes and edges and are optimized for traversing relationships among entities.

In this material, the main focus is on column-family stores and on Spanner, which brings strong transactional guarantees back into a distributed setting.

Bigtable

Bigtable is a distributed column-family store that organizes data as a sparse, sorted map indexed by row key, column, and time. It was designed for large-scale structured data and for applications that need high throughput across many machines.

Data Model

Bigtable’s data model is easiest to understand by breaking it into its three indexing dimensions.

  1. Row keys are arbitrary byte strings stored in sorted lexicographic order. That sorted order is important because it makes range scans efficient. Rows are also the unit of atomicity, so a read or write to a single row is atomic.

  2. Column families group related columns and must be declared when the table is created. Within a family, column qualifiers can be created dynamically by any client. Column names therefore have the form family:qualifier.

  3. Timestamps allow multiple versions of a value to be stored in a single cell. Version retention is configurable, so the system can keep several recent versions or discard older ones automatically.

Bigtable is also sparse. Empty cells are never stored. A row can have no columns at all in a given family, and only the cells that actually contain data occupy storage.

Partitioning and Structure

Bigtable scales by partitioning the table by row-key range.

A tablet is a contiguous range of rows. Each tablet is served by one tablet server, and tables are split into multiple tablets as they grow. Since rows are stored in sorted row-key order, each tablet is a slice of that sorted key space.

A master server tracks which tablet server owns which tablet and handles reassignment when failures occur. The important point is that data does not flow through the master. Clients use metadata to locate the right tablet server and then talk to that server directly.

Storage and Writes

Bigtable stores recent writes in memory and older data on disk.

The write path works as follows:

  1. A write is recorded in a log for recovery.

  2. The write is placed in a memtable, which is an in-memory sorted buffer.

  3. When the memtable fills, it is flushed to disk as an SSTable.

An SSTable is an immutable, sorted file of key-value pairs stored in GFS. Since SSTables are immutable, the system periodically compacts them to merge files and discard obsolete data. Reads may therefore need to combine information from the memtable and multiple SSTables.

Key Concepts in Bigtable

The main ideas to remember are:

Bigtable scales very well, but it does not provide general transactional support across rows and does not support multiple tables.

Cassandra

Cassandra is a distributed column-family store designed for high availability, decentralized control, and horizontal scalability. It combines ideas from Bigtable, especially the wide-column model and SSTable-style storage, with ideas from Dynamo, especially decentralized partitioning and replication.

Architecture and the Hash Ring

Cassandra uses a peer-to-peer architecture. There is no master node, and all nodes are equal. Any node can accept a client request and coordinate the work needed to satisfy it.

Cassandra distributes data using a distributed hash table organized as a hash ring. Each node owns one or more tokens, which correspond to positions on that ring. A node is responsible for the range of hash values between its token and the previous token on the ring.

When Cassandra stores a row, it hashes the row’s partition key. The row is then routed to the node whose token is the first one that follows that hash value on the ring.

This design has an important scaling property. When a node is added or removed, only the data in the affected adjacent ring arc has to move. The rest of the data stays where it is. That is one of the main reasons DHT-style partitioning is attractive.

Data Model

Cassandra separates distribution across the cluster from ordering within one partition.

The key ideas are these:

  1. The partition key determines where the data lives. Rows with the same partition key are placed in the same partition and stored on the same set of replica nodes.

  2. The clustering columns determine how rows are ordered within that partition.

  3. Columns store the actual data values within each row.

That separation is central to understanding Cassandra.

An example is a database of McDonald’s restaurants. If the partition key is restaurant_id, each restaurant will likely be distributed independently across the cluster. That is good for load balancing, but not very useful if queries often want all restaurants in one country.

If the partition key is country_code, then all restaurants in the same country live in the same partition and therefore on the same set of machines. If the clustering columns are state and county, then the rows within that country partition are sorted first by state and then by county. This makes it efficient to iterate through all restaurants in one country, one state, or one county.

So the division of roles is:

Replication and Consistency

Cassandra replicates data across multiple nodes for fault tolerance. Each partition is stored on several nodes according to a chosen replication factor. This allows the system to continue operating even when some nodes fail.

Cassandra provides tunable consistency. The application can choose how many replicas must respond to a read or write. Waiting for more replicas gives stronger consistency, while waiting for fewer improves availability and often reduces latency.

The basic trade-off is:

Storage Model

Cassandra’s storage engine resembles Bigtable’s.

Writes are first recorded durably and placed in memory. Later, they are flushed to disk as immutable files. Reads may have to combine recent in-memory data with older on-disk files. Compaction merges those files over time.

The important connection is that Cassandra resembles Dynamo in how it distributes and replicates data across the cluster, but resembles Bigtable in how each node stores and retrieves data locally.

What to Remember About Cassandra

The main ideas to remember are:

Spanner (NewSQL)

Spanner is a globally distributed relational database that combines horizontal scalability with strong transactional guarantees. It keeps the key-range partitioning ideas of systems like Bigtable, but adds distributed transactions, multiversion data, and globally meaningful timestamps.

A compact mental model is this: Spanner combines Bigtable-style partitioning, distributed transaction mechanisms, and carefully managed time.

What Spanner Combines

Spanner is best understood as a combination of several mechanisms, each solving a different problem.

The main components are:

  1. Key-range partitioning into splits

  2. Paxos replication within each split

  3. Two-phase commit (2PC) for transactions that span multiple splits

  4. Strict two-phase locking (2PL) for read-write transactions

  5. Wound-wait for deadlock prevention

  6. Multiversion concurrency control (MVCC) for versioned data

  7. TrueTime for bounded clock uncertainty and globally meaningful timestamps

Partitioning gives scale. Paxos gives fault tolerance and a consistent order of updates within each split. Two-phase commit coordinates work across splits. Strict 2PL and wound-wait control concurrency. MVCC supports snapshot reads. TrueTime and commit wait make timestamp order line up with real time.

Physical and Logical Structure

Spanner has both a physical and a logical structure.

At the physical level, data is stored on spanservers organized into zones. A zone is a large administrative and failure domain, roughly at the datacenter level. Replicating data across zones allows the system to survive larger failures than a single machine crash.

At the logical level, a Spanner deployment is called a universe. A universe contains databases, databases contain tables, and tables are divided into splits.

A split is a contiguous range of keys. This is directly analogous to a Bigtable tablet. Rows are stored in sorted key order, and a split is one slice of that sorted key space.

Replication and Paxos Groups

Each split is replicated across multiple servers. The set of servers maintaining replicas of one split is called a Paxos group. These replicas use Paxos to agree on the order of updates.

The core structural idea is worth stating clearly:

As long as a majority of replicas in a Paxos group are available, that split can continue to make progress.

Transactions Across Splits

If a transaction touches data in only one split, that split’s Paxos group can handle the transaction locally.

If a transaction touches data in more than one split, Spanner uses two-phase commit. This is the same protocol discussed earlier: a coordinator drives a prepare phase and then a commit phase across all participating groups.

This leads to an important distinction:

Read-Write Transactions, Locks, and Wound-Wait

For read-write transactions, Spanner uses strict two-phase locking.

A read acquires a shared lock, and a write acquires an exclusive lock. Locks are held until commit. Holding locks until commit prevents other transactions from seeing partial results and gives serializable behavior.

Shared and exclusive locks affect concurrency in different ways. Multiple readers can hold shared locks at the same time, but an exclusive lock prevents other reads and writes on that data item.

Locking creates the possibility of deadlock, so Spanner uses wound-wait.

In wound-wait:

  1. If an older transaction needs a lock held by a younger transaction, the younger transaction is aborted and retried.

  2. If a younger transaction wants a lock held by an older transaction, it waits.

That policy prevents deadlock while giving older transactions priority.

Snapshot Reads and MVCC

Spanner stores multiple committed versions of data, each tagged with a timestamp. That is the role of MVCC.

A snapshot read returns the state of the database at a specific time \(t\). More precisely, for each item, the system returns the most recent committed version whose timestamp is less than or equal to \(t\).

That gives a consistent cut through time.

Snapshot reads are especially useful for large searches, scans, and reports. Because they read committed older versions, they usually do not need to lock the current data. Writers can continue updating the newest versions while the read-only operation sees a stable snapshot.

This is one of the main practical advantages of MVCC. Large read-only workloads can proceed without blocking current writes.

Snapshot reads also motivate one of Spanner’s most important requirements. Since a snapshot may span many splits and datacenters, commit timestamps must be meaningful across the whole system. Otherwise, “the state of the database at time \(t\)” would not define one coherent global state.

External Consistency

Spanner provides external consistency, also called strict serializability.

The guarantee is this: if transaction \(T_1\) commits before transaction \(T_2\) begins in real time, then \(T_1\)’s commit timestamp must be less than \(T_2\)’s commit timestamp.

This is the transaction-level version of linearizability. Linearizability is usually stated for single operations on shared objects: if one operation finishes before another begins, the first must appear before the second. External consistency applies that same real-time ordering idea to transactions.

This guarantee is stronger than ordinary serializability. Serializability only requires that the result be equivalent to some serial order. That order does not have to match wall-clock time. External consistency requires both a serial order and agreement with real time.

An example is a bank transfer. Suppose one transaction transfers money from checking to savings and returns “committed” to the client. If another transaction starts afterward and reads the balances, external consistency requires that the second transaction see the transfer. A merely serializable system could still choose some serial order, but that order might not match real time. External consistency rules that out.

TrueTime

Spanner cannot enforce external consistency using naive local clock readings, because clocks in distributed systems are never perfectly synchronized.

Spanner’s solution is TrueTime, which represents time as an interval rather than a single exact value:

\[TT.now() = [earliest, latest]\]

The real current time is guaranteed to lie somewhere inside that interval.

The two bounds provided by the TrueTime API are:

The width of the interval reflects clock uncertainty. Google keeps that uncertainty small by synchronizing clocks using GPS receivers at each data center, with atomic clocks as backup references.

The key idea is that Spanner does not need perfect synchronization. It only needs the uncertainty to be bounded.

Commit Wait

TrueTime by itself is not enough. Spanner also uses commit wait.

When a read-write transaction is ready to commit, Spanner follows this pattern:

  1. It chooses a commit timestamp \(t = TT.now().latest\).

  2. It waits until \(TT.now().earliest > t\).

  3. It then makes the transaction visible.

That waiting step ensures that the chosen timestamp is definitely in the past relative to real time before the commit becomes visible.

The logic is straightforward. If transaction \(T_1\) becomes visible only after time \(t\) is definitely in the past, then any later transaction \(T_2\) that begins afterward must receive a later timestamp. That is how Spanner enforces external consistency.

Commit Wait and Replication

Commit wait sounds expensive, but Spanner overlaps it with replication.

The sequence is:

  1. Acquire locks and perform the transaction’s work.

  2. Choose a commit timestamp.

  3. In parallel:

  4. Run Paxos so the commit is replicated durably.

  5. Perform commit wait so real time advances past the chosen timestamp.

  6. Once both are complete, commit and release locks.

This overlap hides much of the waiting cost. In Google’s environment, thanks to accurate time sources at each datacenter, the uncertainty window is typically only a few milliseconds, and the average commit wait time is on the order of 4 ms.

What to Remember About Spanner

The main ideas to remember are:

Key Points

Across these systems, the main design questions are about partitioning, replication, concurrency control, and time.

The high-level distinction among the systems is:

You should also be comfortable with these ideas:

  1. Key-range partitioning keeps nearby keys together and supports efficient range scans.

  2. Hash partitioning spreads load evenly but does not preserve global key order.

  3. Paxos provides replication and ordered updates within one partition.

  4. 2PC coordinates transactions across multiple partitions.

  5. Strict 2PL and wound-wait manage read-write concurrency.

  6. MVCC enables snapshot reads by keeping multiple committed versions.

  7. TrueTime and commit wait allow Spanner to align transaction order with real time.

You Do Not Need to Study

Some details in the notes are useful for understanding the systems, but are too detailed to matter for the exam.

The following details are not required for upcoming exams:


Week 10: Distributed Computation

Distributed computation frameworks solve the same core problem in different ways: how to divide a large computation across many machines, move data where it needs to go, recover from failures, and keep the overall job efficient. The main goal of this topic is to understand the execution model each framework introduces and the kinds of workloads that model supports well.

MapReduce

MapReduce is a batch-processing framework built around a master-worker architecture. The master coordinates the job, assigns map and reduce tasks to workers, tracks their progress, and reassigns work when failures occur. The programmer supplies a map function that emits intermediate key-value pairs and a reduce function that processes one key together with all values associated with that key.

The execution model has a fixed structure. Input is divided into map shards, map workers process those shards in parallel, intermediate key-value pairs are partitioned by key, reducers fetch their assigned partitions, and the framework then performs shuffle and sort. In this phase, data is moved across the cluster so that all records with the same key reach the same reducer, and the reducer’s input is sorted and grouped by key before reduction begins. Reduce workers then process each key and produce the final output.

Key concepts in MapReduce include:

Failure handling follows the structure of the job. If a map worker fails, its map task is rerun, and any lost intermediate output is regenerated by re-executing that map task. If a reduce worker fails, the reduce task is rerun after the reducer again fetches its required intermediate partitions. Stragglers are handled through speculative execution, where a slow task may be launched on another worker and the first result to finish is used.

MapReduce is best suited for large batch jobs. It is a poor fit for iterative workloads because each stage typically writes its output to storage before the next stage begins.

BSP

Bulk Synchronous Parallel, or BSP, organizes a distributed computation into supersteps. Each superstep has three parts: local computation, communication, and barrier synchronization. Messages sent during one superstep become available in the next.

The key idea is that BSP gives computation a round-based structure. That makes communication easier to reason about and creates natural points for synchronization and checkpointing. The cost is that fast workers must wait for slow workers at each barrier.

Key concepts in BSP include:

BSP itself is a general model of round-based computation, not a specific graph-processing system with a built-in rule such as vote to halt. In a BSP-style program, the stopping condition is defined by the algorithm or the framework built on top of BSP. A computation may stop after a fixed number of rounds or when no worker has any further useful work to do.

BSP is a better fit than MapReduce for iterative algorithms because it keeps repeated rounds of computation explicit.

Pregel and Giraph

Pregel applies the BSP model to graph processing. Its computation is vertex-centric: each vertex receives messages, updates its state, sends messages to other vertices, and may vote to halt. The graph remains present across iterations rather than being reconstructed as key-value data at each round.

In Pregel, vote to halt means that a vertex declares that it currently has no more work to do. The vertex becomes inactive and is skipped in later supersteps unless a new message arrives for it. If a message does arrive, the vertex becomes active again.

This structure is a natural fit for graph algorithms such as shortest paths and PageRank, where information repeatedly propagates along edges. A vertex becomes active when it has work to do, may become inactive when it does not, and may become active again if it later receives a message. The computation terminates only when every vertex has voted to halt, all vertices are inactive, and no messages remain in transit anywhere in the system.

Key concepts in Pregel and Giraph include:

Giraph is an Apache open-source system based on the Pregel model.

Spark

Spark was designed for computations that require more flexibility than MapReduce, especially multi-stage and iterative workloads. Its original core abstraction is the Resilient Distributed Dataset, or RDD, which represents a partitioned collection of data distributed across a cluster.

Spark includes several major architectural components:

Spark organizes computation as a dataflow graph. Transformations create new RDDs lazily, while actions trigger execution. This design allows Spark to support multi-stage pipelines without forcing every stage to materialize its output before the next one begins.

An RDD has several important properties:

Spark performance depends heavily on dependencies between partitions. Narrow dependencies usually allow computation to proceed without redistributing data. Wide dependencies usually require a shuffle. In Spark, a shuffle means that data is moved across workers and reorganized into new partitions so that related records end up together for the next stage. It is therefore both communication across the cluster and repartitioning of the data.

Key concepts in Spark include:

Comparing the Frameworks

Each framework organizes distributed computation around a different core abstraction.

The main goal is to understand what each framework makes easy, what costs it exposes, and what kinds of workloads it handles best.

What You Do Not Need to Study

Focus on the core abstractions, execution models, and major tradeoffs of each framework. You do not need to study details that are outside that scope.

You should not need to study:


Week 11: Data In Motion

Part 1: Message Queues and Event Streaming

The Publish-Subscribe Model

A message broker decouples producers from consumers: neither side needs to know anything about the other, they do not need to be running at the same time, and they do not need to operate at the same speed. This last point matters: a producer can generate messages faster than consumers can process them, and the broker absorbs the difference. Producers write messages to the broker; consumers read from it. Messages are organized by topic (a named category or stream).

Delivery Semantics

Three guarantees apply to messaging systems, the same ones that apply to RPC:

RabbitMQ

RabbitMQ is a message broker where routing logic lives entirely in the broker. Producers publish messages to an exchange, which routes them to one or more queues based on configured rules. Once a consumer acknowledges a message, the broker removes it. This model works well for task queues and routing-heavy workflows, but messages cannot be replayed and RabbitMQ is not designed for Kafka-style scale-out.

Apache Kafka

Kafka treats the log as the central architectural primitive: an append-only, totally ordered sequence of records that persists for a configurable retention period. Unlike a traditional queue, messages are not deleted after consumption. The following core concepts are essential:

Topic
A named log, divided into partitions.
Partition
An ordered log that grows only by appending; records are never modified once written. Each record is identified by an offset.
Ordering guarantee
Total order is per-partition only. To preserve order for related events, route them to the same partition using a consistent key.
Offset
A sequential integer that identifies a record’s position in a partition; each consumer group independently tracks the offset it has read up to.
Consumer group
A set of consumers that collectively consume a topic; each partition is assigned to one consumer at a time. Within a group this gives a queuing model; across independent groups it gives a pub-sub model.
Leader/follower replication
Each partition has one leader (handles reads and writes) and zero or more follower replicas. If the leader fails, a follower is elected.
Log compaction
An alternative to time- or size-based retention. Kafka retains only the most recent record per key, making the log a durable store of current state.

Producers control durability via the acks setting: acks=0 means fire and forget; acks=1 means the leader acknowledges on write; acks=all means all in-sync replicas must acknowledge before the producer receives confirmation.

Kafka is fast despite writing to disk because it relies entirely on sequential I/O, which is orders of magnitude faster than random I/O, and it exploits the OS page cache aggressively.

Stream Processing

Backpressure is the problem that arises when producers generate data faster than consumers can handle it. Systems address it in three main ways: buffering (absorbing bursts in a queue, which is what Kafka does by design), dropping (discarding messages when the buffer is full, only acceptable for loss-tolerant data), and slowing the producer (explicit flow control, which is backpressure in its strict sense).

Event time is when an event occurred; processing time is when the system received it. Using event time gives correct results for time-based aggregations, while processing time is easier to implement but inaccurate when data arrives late or out of order.

A window defines how events are grouped for aggregation over time. Stream processors support three main window types:

A watermark is the system’s estimate of how far event time has progressed. Events with timestamps earlier than the watermark are considered unlikely to still arrive, and the system uses the watermark to decide when to close a window and emit results. The stream processor derives it by taking the maximum event timestamp seen so far and subtracting a configured lag. The lag is a tradeoff: too small and late-arriving events are dropped; too large and result latency and memory use increase.

Spark Structured Streaming

Spark Structured Streaming uses a micro-batch model: events are collected into small batches and processed using the standard Spark API. The stream is treated as an unbounded table that grows over time. Event-time windows and watermarks are supported.

Spark provides three output modes for writing results: append writes only newly completed rows; complete rewrites the full result table on each trigger; update writes only rows that changed since the last trigger.

Exactly-once semantics require checkpointing plus an idempotent or transactional sink. Checkpointing prevents skipping events on recovery (at-least-once). When the source supports offset-based replay and the sink supports idempotent writes, they can together provide exactly-once effect, but only under those specific conditions.

Apache Flink

Apache Flink is designed around continuous record-at-a-time streaming rather than Spark’s micro-batch model, giving lower latency at the cost of higher operational complexity.

Part 2: Content Delivery Networks

The Flash Crowd Problem

A flash crowd occurs when a sudden surge in demand overwhelms a single origin server. CDNs solve this by distributing cached copies of content globally so requests are served by nearby servers rather than the origin.

Pre-CDN Approaches and Their Limits

Before CDNs, operators tried several techniques to handle load, each with significant limitations:

CDN Architecture

A CDN has three tiers: edge servers (close to users, often inside ISPs), parent servers (regional aggregators), and the origin (the content provider’s infrastructure). The tiered lookup reduces origin load because popular content is served entirely from caches.

CDNs come in two operational models. A push CDN requires the content provider to pre-position content on storage nodes before demand arrives, which is suitable for large files such as software packages or video assets. A pull CDN has edge servers fetch from the origin on the first cache miss and then cache the result, which is simpler to operate and works well for general web assets.

CDN Providers

There are several high volume CDN providers. Three popular ones are:

Request Routing

CDNs use two main approaches to direct users to the nearest edge server. With DNS-based routing, the content provider creates a CNAME record pointing to the CDN, and the CDN’s dynamic DNS servers return different IP addresses based on user location, server load, and server health.

With anycast routing, all CDN nodes share the same IP address, and the connection is directed to the nearest advertising node based on network routing state rather than a DNS lookup. Many CDNs combine both approaches.

Caching: Content Types

CDNs cache different types of content with different strategies. The main cases are:

CDN Overlay Network

When an edge server must contact the origin, the CDN routes traffic through its own overlay network rather than the public internet. Nodes continuously measure latency and packet loss to their peers and select paths based on measured performance rather than BGP routing policy.

Security Benefits

A CDN shields the origin’s real IP address from the public internet, so attack traffic hits the CDN’s distributed infrastructure rather than the origin. TLS termination at the edge reduces handshake latency and offloads cryptographic work from the origin.

BitTorrent: Peer-to-Peer Content Delivery

BitTorrent inverts the CDN model: every downloader becomes an uploader, so as more peers join, supply grows automatically. The protocol works through the following mechanisms:

.torrent file
Contains file metadata and cryptographic hashes for each piece.
Tracker
A central server that maintains the list of peers in the swarm; often supplemented or replaced by DHT in modern implementations.
Pieces
Fixed-size chunks of the file, each independently verified by hash.
Rarest-first
Peers preferentially download the pieces that the fewest other peers currently have, ensuring rare pieces spread quickly through the swarm.
DHT
A decentralized alternative to the tracker; peer-list information is distributed across the swarm using a protocol conceptually similar to Chord.

BitTorrent is not well-suited for streaming because pieces are downloaded out of order, it requires upload bandwidth from clients, and it depends on community participation.

Edge Computing

Edge computing runs application logic on CDN edge nodes rather than at the origin, reducing round-trip latency for dynamic operations. Cloudflare Workers runs JavaScript in V8 isolates, which are lightweight sandboxes that provide memory isolation between concurrent workers. Workers can handle authentication, routing, personalization, and similar tasks at the edge, sometimes returning a response directly and sometimes modifying the request before forwarding it to the origin.

The key constraint is state. Reaching a central database from an edge node can add enough latency to erase the benefit, so edge platforms provide local data stores for low-latency state. Complex transactional logic stays at the origin. Edge compute is a complement to the origin, not a replacement.

Key Comparisons

Kafka vs. RabbitMQ: Kafka is a durable, replayable log where consumers track their own position; RabbitMQ routes messages to queues via exchanges and deletes them after acknowledgment. Kafka scales better and supports replay; RabbitMQ offers more flexible routing but does not distribute workload the way Kafka does.

CDN vs. BitTorrent: CDNs are centrally operated, commercially provisioned, and predictably performant; BitTorrent is decentralized, requires no infrastructure investment, and scales with the number of participants.

DNS routing vs. anycast: DNS routing selects a server at resolution time based on observed conditions; anycast routes based on network routing state rather than DNS lookup. Many CDNs use both.

What You Don’t Need to Study

Focus on architectural concepts and how the systems compare. You do not need to memorize:


Week 12: Security in Distributed Systems

Focus on the concepts and design principles, not the cryptographic details. You should understand why these mechanisms exist in distributed systems and what goes wrong when they are misapplied, not whether you can recall algorithm parameters, protocol steps, or product names.

What you don’t need to study

Security goals

Know these five goals and be able to explain what each one means.

Confidentiality means keeping data secret from parties who are not authorized to see it. Encryption is the primary mechanism.

Integrity means detecting unauthorized modification. A message has integrity if the receiver can confirm it has not been altered in transit.

Authentication means establishing who is on the other end of a connection or who created a message. A service needs to know whether it is really talking to the orders service or to an attacker pretending to be it.

Authorization means deciding what an authenticated party is allowed to do. A principal is any entity that can be identified and granted access: a user, a service, a device, or a background process.

Non-repudiation means being able to prove that a specific party created or approved some data. Digital signatures and audit logs are the typical mechanisms.

These are distinct properties. Encrypting a message does not prevent it from being modified in transit if there is no integrity check. A message can arrive unmodified but fully visible to an attacker. Authentication can succeed while authorization fails. Getting one right does not mean you have the others.

Why distributed systems change the security problem

In a monolithic application, many trust decisions are implicit. The OS enforces boundaries. There is one identity model and one administrator. Distributed systems remove those guarantees.

Several properties define where the problem gets harder:

The cryptographic building blocks

Understand the conceptual role of each mechanism, not the math.

Symmetric encryption uses the same key on both sides. It is fast and suited for bulk data, but it requires both parties to already share a key. It cannot establish a secure channel between strangers on its own.

Asymmetric cryptography uses a key pair. The public key can be shared openly. The private key stays secret. It makes key establishment practical without pre-shared secrets: two parties who have never shared a secret can establish a shared key. It also enables digital signatures.

Hashes, MACs, and digital signatures serve different purposes. A hash function maps data of any length to a fixed-size digest. The same input always produces the same digest, and it is computationally infeasible to reverse the process or find two inputs with the same digest. A plain hash provides no protection against an active attacker because anyone can modify a message and recompute the hash. It is useful for detecting accidental corruption, but it proves nothing about who sent the message.

MACs and digital signatures both extend the hash concept to provide authentication as well as integrity.

A MAC (message authentication code) computes a hash of the message combined with a shared secret key. A receiver with the same key can verify the hash, confirming both that the message was not modified and that it came from someone who knows the key.

A digital signature also operates on a hash of the message, but instead of a shared secret, the sender computes the signature using their private key. Any party with the corresponding public key can verify the signature, confirming the message’s origin and integrity without a pre-shared secret. Signatures also support non-repudiation.

The following table summarizes the key differences:

Mechanism Protects against active attacker Provides origin authentication Requires pre-shared secret
Hash No No No
MAC Yes Yes, to parties with the shared secret Yes
Digital signature Yes Yes, verifiable via public key No

Replay attacks are easy to miss. Integrity checks tell you a message was not modified. They tell you nothing about whether it is a copy of an older message. A valid signed request captured and retransmitted later will still pass an integrity check. Distributed protocols add freshness mechanisms: nonces, timestamps, sequence numbers, or expiration times.

Transport Layer Security (TLS) combines asymmetric key exchange, symmetric bulk encryption, and integrity checks into a secure channel. It provides confidentiality, integrity, and server authentication. With mutual configuration, it also authenticates the client. Know what TLS provides, not the handshake state machine.

Certificates bind a public key to an identity. A certificate authority (CA) signs the binding. Trust is organized as a chain from a root CA through intermediate CAs to the certificates issued to specific services, workloads, or users. In distributed systems, the identity in a certificate may be a service name or workload identifier rather than a domain name. Certificate lifecycle (expiration, rotation, revocation) is a security design constraint, not just an operational concern.

Mutual TLS (mTLS)

Mutual TLS (mTLS) authenticates both sides of a connection. Standard TLS authenticates only the server. With mTLS, each side presents a certificate and verifies the other’s. The result is a cryptographically verified identity for both caller and receiver, not just an IP address.

In microservice systems, mTLS turns “inside the cluster” from a passive trust assumption into a verifiable property of each connection. Once both sides have cryptographically verified identities, authorization decisions can be based on who is calling rather than where the call came from.

Authentication and authorization

Know this distinction cold. Authentication asks who you are. Authorization asks what you are allowed to do.

A system can authenticate correctly but authorize incorrectly. The most common failure pattern is broken object-level authorization (BOLA): a service checks the token but does not verify that the caller is allowed to access the specific resource. An attacker with any valid token can read any object by changing an ID in the request.

In distributed systems, authorization must happen at multiple layers independently. The API gateway can check whether a client is allowed to call the API at all. A backend service must check whether the caller is allowed to invoke this specific operation. A data service must check whether this caller can access this specific record. None of these can be fully delegated to the layer above.

OAuth, OIDC, and JWTs

These three are not interchangeable.

OAuth is an authorization framework. It produces an access token that a service validates to determine what access to grant. It answers “what access is being delegated?” not “who is this user?”

OpenID Connect (OIDC) is an identity layer on top of OAuth. It produces an ID token that describes who the authenticated user is. It answers the authentication question that OAuth does not.

A JWT (JSON Web Token) is a compact token format, not a protocol. A JWT carries claims: key-value assertions about the subject, such as user identity, expiration time, issuer, and granted scopes. OAuth access tokens and OIDC ID tokens are often encoded as JWTs, but a JWT by itself has no security meaning without knowing the protocol context.

Signed JWTs can be validated locally without contacting the issuer, which reduces latency but makes revocation hard. The tradeoff is between local validation and centralized revocation control: local validation is fast and requires no runtime dependency on the issuer, but enforcing revocation globally requires coordination with the issuer. The most widely used response is short-lived access tokens paired with a refresh token, a longer-lived credential the client uses to obtain a new access token at renewal; revocation takes effect when the client next tries to renew.

Workload identity

Users are not the only principals. Services, containers, and batch jobs also need identities. Workload identity lets services prove which workload they are cryptographically rather than by network location. SPIFFE (Secure Production Identity Framework for Everyone) is a widely adopted standard for workload identity credentials. The key concept is that workload identities should be short-lived and automatically rotated, just like access tokens.

Cloud IAM

Cloud identity and access management (IAM) policies bind identities to permissions on cloud resources. Least privilege means granting each service only the permissions it actually needs and no more.

Over-privileged service accounts and roles are a major escalation path: a microservice with broad permissions becomes a much larger incident when compromised than one with least-privilege access. Short-lived credentials issued via workload identity federation are preferred over long-lived service account keys embedded in container images.

Zero Trust

Zero Trust is an architectural principle: network location does not imply trust. A request from inside the cluster still needs to be authenticated and authorized. The slogan “never trust, always verify” is a shorthand for the principle that trust must be earned by presenting verified credentials, not assumed based on where a connection originates.

Micro-segmentation

Micro-segmentation divides a system into fine-grained trust domains and explicitly controls which services can communicate with which others. It limits the blast radius of a compromise: if the orders service cannot connect directly to the database, compromising the orders service does not automatically give an attacker database access.

Service meshes and API gateways

Know the difference in scope.

An API gateway handles external traffic entering or leaving the system (called north-south traffic). It centralizes TLS termination (often decrypting incoming HTTPS connections at the boundary so internal services communicate over separate connections), token validation, rate limiting, and coarse-grained authorization at the edge.

A service mesh handles internal service-to-service calls (called east-west traffic). It provides mTLS, workload identity, and authorization enforcement consistently across all internal calls without requiring each service to reimplement those mechanisms. A sidecar proxy running alongside each service intercepts its network traffic to enforce these policies transparently.

A gateway protects the front door. A service mesh protects what happens inside.

Secret management

Secrets (API keys, database passwords, TLS private keys) must be distributed, rotated, revoked, and audited. Baking secrets into container images or source repositories is one of the most common and damaging mistakes. A well-designed system issues short-lived credentials to authenticated workloads at runtime rather than using long-lived static secrets.

Base64 encoding is not encryption. A base64-encoded secret is fully readable to anyone who can see the string.

Key and certificate rotation should be routine and automated. A system that requires manual edits or downtime to rotate a certificate is brittle by design.

Common design mistakes

Each of these reflects a trust assumption that was wrong from the start, not just a coding mistake.

Understanding why each one is wrong is understanding the security design principles covered in these notes.


Week 13: Clusters

Clusters are groups of independent machines that are managed so they behave like one larger system. The goal is not to make hardware stop failing. The goal is to use software to detect failures, move work, replicate state, and keep the service usable.

A cluster gives users and applications a single system image. A client sees one service name, one virtual address, or one storage namespace, even though many machines may be doing the work behind it.

The commodity hardware idea

Modern clusters are usually built from many ordinary servers rather than a few expensive machines. Adding capacity means adding more nodes.

This design works because many distributed workloads can be split across machines. The tradeoff is that hardware failure becomes normal. At large scale, disks fail, machines crash, switches break, and network links have problems. Cluster software is built with that assumption.

The main lesson is that reliability comes from software mechanisms such as replication, redundancy, failure detection, failover, and coordination.

Cluster types

High availability (HA) cluster
A cluster designed to keep a service usable when machines or processes fail.
High performance computing (HPC) cluster
A cluster designed to run one large, tightly coupled computation across many nodes.
Load-balancing cluster
A cluster that spreads client requests across a group of interchangeable servers.
Storage cluster
A cluster that combines storage from many machines into a larger storage service or namespace.
Scheduling cluster
A cluster that accepts work from many users or applications and decides which machine should run each task.

Real systems often combine these roles. For example, a Kubernetes cluster schedules work, runs replicated services, depends on storage systems, and uses load balancers to route traffic.

Single system image

A single system image is the illusion that many machines behave like one system. A user submits a job, calls a service, or accesses a storage namespace without choosing a particular machine.

This abstraction is created by software: schedulers choose machines, load balancers route requests, service discovery maps names to current backends, and storage systems hide where data is stored.

The abstraction is useful because machines can be added, removed, replaced, or restarted without changing how clients address the service.

Cluster networking

Servers are usually mounted in racks. A rack often holds a few dozen servers. Each rack connects to a top-of-rack (ToR) switch. ToR switches connect upward into a spine layer, producing a spine-leaf or Clos topology.

A spine-leaf topology gives many parallel paths between racks. It is designed for heavy server-to-server traffic.

East-west traffic is traffic between servers inside the data center. North-south traffic is traffic entering or leaving the data center. Modern services often generate much more east-west traffic than north-south traffic because one external request may trigger many internal service calls.

Bisection bandwidth is the network capacity available across a cut that splits the cluster into two parts. Higher bisection bandwidth means the network can support more communication between different parts of the cluster.

High-speed communication

Most services run well on normal Ethernet and TCP. Some workloads, especially HPC and large machine learning jobs, need lower latency and higher bandwidth.

NIC offloads reduce CPU overhead by moving selected packet-processing work to the network card.

RDMA (Remote Direct Memory Access) lets one machine read or write memory on another machine with little CPU involvement on the remote side.

InfiniBand is a high-speed network technology used for low-latency cluster communication, especially in HPC and AI training systems.

You do not need to know the details of specific GPU interconnects or RDMA-over-Ethernet technologies for this topic. The key idea is that tightly coupled computations need faster communication than ordinary web services.

Failure handling

Cluster availability depends on detecting failures, moving work away from failed machines, and preventing two machines from acting as the same authority.

Heartbeats are periodic messages that indicate a node is still alive. If enough heartbeats are missed, the system may treat the node as failed and move its work elsewhere. A short timeout detects failures quickly but risks false positives. A long timeout avoids false positives but delays recovery.

Leases give a node time-limited authority to do something, such as act as leader or write to shared state. A node must renew its lease before it expires. Heartbeats and leases often work together: heartbeats help detect liveness, while leases control authority.

Failover moves work from a failed node to another node. In an active/passive design, one node serves traffic and a standby takes over on failure. In an active/active design, multiple nodes serve traffic and load shifts to the survivors when one fails.

Fencing prevents a suspected failed node from continuing to write after another node takes over. This is necessary because a node may be alive but unreachable due to a network partition.

Split brain occurs when two parts of a system both believe they are authoritative. If both accept writes, the system may end up with conflicting histories that cannot be merged safely.

Quorum prevents split brain by requiring a majority of replicas to agree before electing a leader or committing an update. In a 5-node group, a majority is 3. If a partition splits the group into 3 nodes and 2 nodes, only the 3-node side can continue making authoritative decisions.

Borg

Borg is Google’s internal cluster management system. It is important because it shaped many of the ideas later seen in Kubernetes.

The basic structure is:

Borg uses declarative configuration. A user describes what should run, how many copies are needed, and what resources are required. Borg decides where to place the work and keeps it running.

Borg also uses priority and quota. Priority decides which work should win when resources are scarce. Quota limits how much a user or team can consume. Higher-priority work can preempt lower-priority work.

The scheduler has two broad steps:

Tasks are isolated using Linux mechanisms such as cgroups and namespaces. Cgroups enforce resource limits such as CPU and memory. If a task exceeds its memory limit, the kernel may kill it, and Borg can restart it.

The main Borg lessons are that declarative control works at large scale, replicated control-plane state is essential, and mixed workloads can share machines when priority, quota, and isolation are handled carefully.

Kubernetes

Kubernetes is the open-source cluster manager most directly influenced by Borg. It provides a portable way to run containers across machines.

Kubernetes uses a declarative model. Users submit objects that describe desired state. Controllers then work to make actual state match desired state.

The main workload objects are:

Pod
The smallest unit Kubernetes schedules; it contains one or more tightly related containers that share networking and volumes.
Deployment
An object that keeps a desired number of identical pods running and supports rolling updates.
Service
A stable virtual IP address and DNS name that routes traffic to a changing set of pods.

Kubernetes control plane

The API server is the entry point to the cluster. Users and components communicate through it.

etcd stores cluster state using Raft replication.

The scheduler assigns pending pods to nodes. It filters out nodes that cannot run the pod, scores the remaining nodes, and writes the chosen node back to the API server.

The controller manager runs controllers that keep actual state close to desired state. For example, if a Deployment says 10 pods should exist and only 9 are running, a controller creates another pod.

The cloud controller manager connects Kubernetes to cloud-provider features such as load balancers and block storage.

Kubernetes worker nodes

Each worker node runs a few core components.

Kubelet is the local node agent. It watches for pods assigned to the node, asks the container runtime to start them, and reports status.

Container runtime runs the containers. Common runtimes include containerd and CRI-O.

Kube-proxy helps implement Service networking so traffic sent to a Service reaches one of the current backing pods.

Controllers and reconciliation

A controller is a control loop. It watches the API server, compares desired state with actual state, and takes action when they differ.

This process is called reconciliation. The controller repeatedly observes the cluster, compares what exists with what should exist, and acts to close the gap.

Kubernetes gets much of its behavior by composing many small controllers. One controller manages Deployments, another manages ReplicaSets, another manages nodes, and others manage Services and endpoints.

Kubernetes can also be extended with new API object types and custom controllers. This is how operators, storage integrations, and service meshes plug into Kubernetes without changing the Kubernetes core.

Services and discovery

Pods are temporary. They can be created, destroyed, moved, or replaced, and their IP addresses change.

A Service gives a changing set of pods a stable virtual IP address and DNS name. Clients talk to the Service, not to individual pods.

Kubernetes keeps track of which pods currently back the Service. As pods come and go, the Service name stays the same while the endpoint list changes.

External traffic usually enters through a LoadBalancer Service or an Ingress. A LoadBalancer Service asks the environment for an external load balancer. An Ingress provides HTTP-level routing to one or more Services.

Load balancing

A load balancer sends each request or connection to one healthy backend. It spreads load, hides individual backend failures, gives clients a stable address, and supports rolling updates by draining traffic before replacement.

Layer 4 load balancing operates on TCP or UDP. It sees IP addresses and ports, not HTTP requests. It is fast and works for many protocols.

Layer 7 load balancing operates at the application layer, usually HTTP. It can route based on URL path, headers, cookies, or method.

With HTTPS, a Layer 7 load balancer can inspect the HTTP request only if TLS terminates at the balancer. The balancer decrypts the request, makes the routing decision, and then forwards the request to the backend. The backend connection may be re-encrypted.

Common load-balancing algorithms are:

Round robin
Sends requests to backends in rotation.
Least connections
Sends the next request to the backend with the fewest active connections.
Weighted balancing
Sends different shares of traffic to different backends, often based on capacity or rollout policy.
Latency-aware routing
Prefers backends that have been responding quickly.
Power of two choices
Samples two backends and sends the request to the less loaded one.

Health checks

A load balancer needs an accurate view of backend health. It usually checks backends with periodic probes.

Liveness asks whether the process should be restarted.

Readiness asks whether the process should receive traffic.

Startup gives slow-starting services time before liveness checks apply.

Health checks should be cheap, frequent, and specific to the service. A TCP connection can succeed even when the application is stuck, so application-aware checks are often better.

Stateless and stateful services

A stateless service keeps no important local state between requests. Any instance can serve any request. This makes the service easy to load balance, restart, scale, and update.

Stateless does not mean the service never uses state. It means durable state lives somewhere else, such as a database, cache, object store, or queue.

A stateful service must preserve data or identity across restarts. Databases, caches, and coordination systems are common examples.

Stateful services require more careful handling: replication, sharding, backup, failover, recovery, and migration all become part of the design. Kubernetes StatefulSets help by giving pods stable identities and ordered startup and rollout behavior.

What you should know

Focus on the role each part plays in the cluster.

You should be able to explain:

What You Don’t Need to Study

You do not need to study implementation details or vendor-specific mechanisms. Focus on the concepts: what problem each cluster component solves and how the pieces fit together.

You do not need to know:

Focus your study on the main ideas: clusters present many machines as one managed system, schedulers place work on machines, Kubernetes runs pods through node agents, HA uses redundancy and failover, load balancers route traffic to healthy replicas, and stateless services are easier to scale and replace.