Final exam study guide

The three-hour study guide for the final exam

Paul Krzyzanowski

Latest update: Tue Dec 12 15:04:43 PST 2017

Disclaimer: This study guide attempts to touch upon the most important topics that may be covered on the final exam but does not claim to necessarily cover everything that one needs to know for the exam. Finally, don't take the three hour time window in the title literally.


Go here for lecture notes

Why are distributed systems more interesting now than they may have been one or two dozen years ago? Several advances in various areas of computing technology had a profound effect on the value and design of distributed systems. Since computer networking went mass market in the 1980s, local area network speeds increased by a factor of a thousand and wide-area (Internet) speeds by even more. Connectivity within a local area network (LAN) moved from shared to switched networking, allowing the network to scale without increasing congestion. On the wide area, Internet access has become available to the population at large, not just to researchers on Department of Defense projects. In 1965, Intel co-founder Gordon Moore predicted that the number of transistors on integrated circuits, and hence processor performance, would double approximately every two years. This prediction, known as Moore’s Law has turned out to hold (approximately) true for the past fifty years. Processor performance, system memory, and disk capacity has also increased by more than a thousandfold over the past couple of decades.

Even though these improvements make it easier to store, process, and move data between systems, what are the motivations for distributing computing? There are several. Performance does not scale linearly with an increase in price with a single computer; with a collection of computers this scaling may be possible. Secondly, distributing systems makes sense in certain environments: databases may reside in different locations than the user, for example. Thirdly, we may need to interact with other people or remote data that are geographically dispersed. Metcalfe’s “Law” states that the value of a telecommunications network is proportional to the square of the number of connected users of the system. As with Moore’s Law, this isn’t a real law, of course. The “square” part comes from the fact that the number of edges in a fully-connected graph is proportional to the square of the number of vertices. A vertex represents a person and an edge represents the communication path from one person to another. Simply put, there’s a lot of value in being able to communicate with a lot of people. Without it, services such as Skype, Google, Twitter, eBay, Instagram, Facebook, and countless others would not be nearly as useful.


One way of classifying system architectures is via Flynn’s taxonomy, proposed by Michael J. Flynn way back in 1966. He categorized computers based on the number of concurrent instruction streams and the number of data streams.

(single instruction stream, single data stream) refers to conventional single processor systems.
(single instruction stream, multiple data streams) refers to single processor computers where each instruction may process a collection of data. Vector and array processors fall into this category. SIMD includes graphics processors, cell processors, Intel’s Streaming SIMD Extensions (SSE4) in Intel’s Core microarchitecture and AMD’s K10 family of processors, Intel’s Advanced Vector Extensions, (AVX), the PowerPC’s AltiVec instructions, and the ARM® NEONTM SIMD engine.
(multiple instruction streams, single data stream) does not really make a lot of sense since it implies that multiple processors all process the same data. The term has occasionally been used to refer to used to replicated fault-tolerant systems.
(multiple instruction streams, multiple data streams) refer to any computers with multiple processors, where each processor operates on its own stream of data. This category covers both parallel (multiprocessor) and distributed systems.

MIMD can be further categorized by identifying whether the system has shared memory or not. Systems with shared memory are known as multiprocessor systems. Examples are conventional PCs with multiple processors on a single system bus or multi-core systems.

An architecture where multiple identical processors communicate with a single shared memory is called a multiprocessor. Systems without shared memory are collections of separate computers, each with its own memory. They have to rely on a network to communicate and are sometimes referred to as networked computers or multicomputers.

Multiprocessor systems are characterized by three features: (1) the processors all share the same memory, (2) they all share the same clock, and (3) they exhibit an all-or-nothing property to system failure. What this last item means is that if the system is dead, none of the processors are working. With a multicomputer system, it is certainly possible to have one computer functioning while another is not.

The most common architecture for a multiprocessor system is symmetric multiprocessing, or SMP. In this kind of system, all processors are connected to the same shared memory and run the same operating system. No one processor has faster or prioritized access to memory or system peripherals than any other.


When we switch focus away from multiprocessors to multicomputers, the fundamental difference is that the processing elements no longer have access to the same shared memory system. Since computers need to communicate, an alternate communication mechanism is needed. This mechanism is the network interconnect. As with multiprocessors, the network interconnect may be bus-based or switched. A bus-based interconnect means that all systems are connected to the same communications bus and can see all the traffic on the network. The original design of the Ethernet was bus-based. Today, it is generally not seen on local area networks as switches are cost-effective. A switched interconnect allows any pair of computers to communicate without affecting the bandwidth of other systems: it provides scalable bandwidth. Ethernet switches simulate some the behavior of a bus-based network to allow things like network broadcasts and multicasts to work.

Fault tolerance

While we do not expect our personal computers to fail at any given time, failure is a fact of life in distributed systems. It’s simply a matter of statistics: if you have a collection of thousands of systems, it is very likely that on any given day something goes wrong: a computer or disk dies, a switch goes bad, a network cable is unplugged, or something loses power. Should our local computer die, it is an all-or-nothing failure: the entire system is dead. In distributed systems, one system may fail while others continue to work. This is a partial failure. Partial failures can be insidious. If one computer sends a message to another and does not get a reply, what happened? It could be any or all of several things: the system is slow to respond, the receiving process failed, the entire receiving computer failed, a network connection was broken, or a routing issue incurred huge delays.

Handling failure is one of the central themes of distributed system design. We need to be able to handle detection, recovery, and restart.

Identify the cause of the failure
Services - the distributed algorithms - need to work around the failure and continue to function properly. This might involve starting a service on a different system, electing a new coordinator, or stopping any attempts at communicating with the failed system.
At some point, the failed element may be brought back into the distributed system. It may be moments later, when a network cable is reinserted, or a far longer time if, for example, a failed processor needs to be replaced. Regardless of the elapsed time. the restarted system needs to reintegrate itself into the whole system. It may have missed messages and its knowledge of the world is outdated. For example, it may have hosted a replicated object store and missed getting information about new, updated, and deleted objects.

Fault tolerance needs to address both availability and reliability. Availability refers to the fraction of time that the system as a whole is usable. Since individual systems may fail, we tend to achieve high availability via redundancy: deploying duplicate, triplicate (and more) systems. The design of redundant systems has to consider all of the three aforementioned points. Failure to properly address the restart of a system may create consistency problems, where one replicated server returns different data than another one. Reliability deals with the integrity of data. We can have systems that appear to be functioning well but transmit garbled data. Or we might have malicious interference where an intruder is sending messages to confuse the systems. We will can address message integrity with error detection but will also need to address issues of message authentication.

Failure can manifest itself in different ways:

With fail-stop failure, the failed component simply stops functioning. Ideally, it will be able to detect its own failure and notify other members of the system first but this is most often not possible. Halting refers to the explicit case where a component stops without any notice. We can try to detect failed components by sending messages over a network and setting timeouts for a response. Unfortunately, this is not foolproof because network latency is variable and the response might arrive after our timeout. Moreover, we can have problems with network connectivity between some hosts.
Fail-restart is when a component restarts after a failure. The restart may be nearly instantaneous, so other systems didn’t notice, or it may be after a long interval. As we discussed earlier, the danger is stale state. The restarted component may have missed messages and hence has a view of the world that is obsolete.
Omission failure deals with networking. It is the failure to send or receive messages. This can be due to data corruption, queue overflows in routers, or overflows in the receive buffer in the operating system. An omission failure may cause a query or its response to get dropped, resulting in one system assuming that another one has failed.
With asynchronous networks such as IP, messages may take longer to arrive than we might expect. This can lead us to assume that a system is not responding and hence not functioning when it acturally is operating. Another problem that is based on timing is that each system has its own clock and hence its own concept of time of day. This can create undesirable behavior with process coordination, message ordering, and system logs.
A network of computers may be working but a link between two groups of systems may fail. For example, an Ethernet switch connecting two racks may fail or a cable may be disconnected. In this case, the network effectively fragments into two or more sub-networks that cannot communicate with each other. Each group of systems thinks the other group is dead.
Byzantine failures cover any failures where a component does not cease to function but instead produces faulty data. This can be due to bad hardware, software logic errors, network problems or it can be due to malicious interference. To a large extent, we will address byzantine failures on the network with the use of cryptography.

Regardless of the type of failure, a basic goal in distributed systems is to design a system that avoids a single point of failure. This is the case where one component is crucial to the functioning of the entire system. For example, we might have one process on one system that serves as a coordinator to dispatch and check on computation taking place on thousands of other systems (or keeps track where various blocks of data live in a distributed storage system). A failure of the coordinator effectively causes the entire system to fail.

Global state

In a distributed environment, it helps helps for one process to know what other systems are doing. For instance, a process may need to know the currently active members of a group of processes that hold replicated data. A problem with distributed systems design is that nobody has the true global state of a system. Because we lack shared memory, we cannot instantaneously see the data or liveness of other processes. Any data that changes over the execution of a program is referred to as state. For example, state may be lists of live processes, group members, contents of a database, computation progress, lists of processes that have remote files open, etc.

A process obviously knows its own state. It can periodically report its state to other processes via network messages and it may receive updates from other processes over the network. However, these updates are neither continuous nor instantaneous. A process will only know the last reported state of other processes, which may not be equivalent to the current state of those processes.

One type of state is not just information about group membership or the status of processes but the data stored by network file systems, databases, and object stores. This data is shared among systems that are designed to act as replicas – redundant systems that will give us instantaneous backups in case one system dies or allow us to load balance requests among multiple systems. Here we also have to deal with the fact that not all processes will be updated instantaneously.

A restricted form of replication is a cache. A cache is simply local storage of frequency-accessed data to reduce access latency. Instead of making a network request, a process can have a stored copy of the results. For example, a process may store the result set of common database queries. Caching can do a wonderful job in improving performance but also poses the risk of stale data – cached copies of data that are no longer valid since the original data has been modified.


One goal in some distributed systems software is providing a single-system image. This is software that makes a collection of independent computers appear as a single system to the users. This involves hiding the fact that system or computation is distributed. The software should “just work.” A few areas when we want to provide transparency include:

The user should not be aware of where the software is actually running or where resources reside.
The user should not be aware of the fact that the location of resources may have moved from one place to another or that a process was restarted on another computer, possibly in another data center.
The user should not be aware that data might be replicated for fault tolerance or for proximity in order to provide faster access.
The user should not be aware that multiple processes might be accessing resources at approximately the same time. Results should appear as if all the processes ran one after another in some order. This means that some processes might be temporarily locked from being able to access a set of resources while another process is using them.

Service models

In software design, we often turn to layered architectures, where we break up application functionality into multiple layers of abstraction. each layer presents well-defined interfaces and hides the specifics of its implementation. For example, a typical computer system has an operating system that provides well-defined access to system resources, middleware that is linked to the application as a set of libraries that abstract things such as message encoding, communication, encryption, and database access, and various layers of abstraction created by the application designer.

With network systems, we often experience similar layers of abstraction but this time across systems. When our network-based software architecture mimics a layered design, we use autonomous processes that communicate with each other via a network interface rather than procedure calls. Each such layer of abstraction is known as a tier in a multi-tier model. It is a generalization of a client-server model.

The original, non-networking, computing model is a centralized one, where all computing takes place on a single system.
This is the dominant model of interaction in a networked system. One application, called the client (and usually run by the end user), requests something from another application, called a server. The server provides a service. Examples of this are a web browser (client) requesting a web page from a web server, aa mail application (client) accessing a mail server to get mailbox contents, or a print server being given content to print. In this model, clients communicate with the server and not with other clients. The model can be enhanced with multiple “layers,” or services, to mimic a layered architecture, resulting in a build a multi-tier system.
A peer-to-peer architecture employs a collection of applications, any of which can talk to any other.These applications are peers and are generally run by a collection of end users rather than some service provider. The name peer implies that there is no leader: applications all have equal capabilities. An appealing aspect of a peer to peer design is self-scalability. As more and more computers join the collection of peers, the system has more peers to do the work and can hence handle a large workload. Examples of peer-to-peer architectures are BitTorrent and Skype.
A difficulty with peer-to-peer architectures is that one often needs to do things such as keep track of peers, identify which system can take on work or has specific content, and handle user lookup and authentication. This led to a variation of the peer-to-peer model where a coordinator, a central server, is in place to deal with these centralized needs. However, the peers still handle all the bandwidth-intensive or compute-intensive work.


Go here for lecture notes

Goal: Enable computers to communicate with each other; create the illusion of machine-to-machine and process-to-process communication channels.

Without shared memory, we need a way for collections of systems (computers or other endpoint devices) to communicate. To do so, they use a communication network. If every communicating pair of hosts would have a dedicated physical connection between them, then there would be no sharing of the network infrastructure and we have a true physical circuit. This is not practical since it limits the ability for arbitrary computers to talk with each other concurrently. It is also incredibly wasteful in terms of resource use: the circuit would be present even if no data is flowing.

What is needed is a way to share the network infrastructure among all connected systems. The challenge in doing so is to allow these systems to talk but avoid collisions, the case when two nodes on a network transmit at the same time, on the same channel, and on the same connection. Both signals then get damaged and data does is not transmitted, or is garbled. This is the multiple access problem: how do you coordinate multiple transmitters on a network to ensure that each of them can send their messages?

There are three broad approaches that enable us to do this:

  1. Channel partitioning divides the communication channel into “slots”. If the network is divided into short, fixed-length time slots, we have Time Division Multiplexing, or TDM. Each host must communicate only during its assigned time slot. Routers may connect multiple networks together. When two hosts need to communicate, they establish an end-to-end route called a virtual circuit. It is called a virtual circuit because the setup of this route configures routers and assigns communication slots. This provides the illusion of a true circuit switched network in that all messages are guaranteed to arrive in order with constant latency and a guaranteed bandwidth. The switched telephone network is an example of virtual circuit switching, providing a maximum delay of 150 milliseconds and digitizing voice to a constant 64 kbps data rate.

    If the network is partitioned into frequency bands, or channels, then we have Frequency Division Multiplexing, or FDM. This defines a broadband network. Cable TV is an example of a broadband network, transmitting many channels simultaneously, each in using a well-defined frequency band.

    The problem with a channel partitioning approach is that it is wasteful. Network bandwidth is allocated even if there is nothing to transmit.

  2. Taking turns requires that we create some means of granting permission for a system to transmit. A polling protocol uses a master node to poll other nodes in sequence, offering each a chance to transmit their data. A token passing protocol circulates a special message, called a token, from machine to machine. When a node has a token, it is allowed to transmit and must then pass the token to its neighbor.

    The problem with the taking turns approach is that a dead master or lost token can bring the network to a halt. Handling failure cases is complex. This method was used by networks such as IBM’s Token Ring Network but is largely dead now.

  3. A random access protocol does not use scheduled time slots and allows nodes to transmit at arbitrary times in variable size time slots. This technique is known as packet switching. Network access is accomplished via statistical multiplexing. A data stream is segmented into multiple variable-size packets. Since these packets will be intermixed with others, each packet must be identified and addressed. Packet switched networks generally cannot provide guaranteed bandwidth or constant latency. Ethernet is an example of a packet-switched network.

Packet switching is the dominant means of data communication today. The packets in a packet-switched network are called datagrams and are characterized by unreliable delivery with no guarantees on arrival time or arrival order. Each datagram is fully self-contained with no reliance on previous or future datagrams. This form of communication is also known as connectionless service. There is no need to set up a communication session and hence no concept of a connection. Neither routers nor endpoints need to maintain any state as they have to do with a virtual circuit; there is no concept of where a system is in its conversation.

OSI reference model

Data networking is generally implemented as a layered stack of several protocols — each responsible for a specific aspect of networking. The OSI reference model defines seven layers of network protocols. Some of the more interesting ones are: the network, transport, and presentation layers.

1. Physical
Deals with hardware, connectors, voltage levels, frequencies, etc.
2. Data link
Sends and receives packets on the physical network. Ethernet packet transmission is an example of this layer. Connectivity at the link layer defines the local area network (LAN).
3. Network
Relays and routes data to its destination. This is where networking gets interesting because we are no longer confined to a single physical network but can route traffic between networks. IP, the Internet Protocol, is an example of this layer.
4. Transport
Provides a software endpoint for networking. Now we can talk application-to-application instead of machine-to-machine. TCP/IP and UDP/IP are examples of this layer.
5. Session
Manages multiple logical connections over a single communication link. Examples are SSL (Secure Sockets Layer) tunnels, remote procedure call connection management, and HTTP 1.1.
6. Presentation
Converts data between machine representations. Examples are data representation formats such as XML, JSON, XDR (for ONC remote procedure calls), NDR (for Microsoft COM+ remote procedure calls), and ASN.1 (used for encoding cryptographic keys and digital certificates).
7. Application
This is a catch-all layer that includes every application-specific communication protocol. For example, SMTP (sending email), IMAP (receiving email), FTP (file transfer), HTTP (getting web pages).

Data link layer

Ethernet and Wi-Fi (the 802.11 family of protocols) are the most widely used link-layer technologies for local area networks. Both Wi-Fi and Ethernet use the same addressing format and were designed to freely interoperate at the link layer.

Ethernet provides packet-based, in-order, unreliable, connectionless communications. It occupies layers one and two of the OSI model. There is no acknowledgement of packet delivery. This means that a packet may be lost or mangled and the sender will not know. Communication is connectionless, which means that there is no need to set up a path between the sender and receiver and no need for either the sender or receiver to maintain any information about the state of communications; packets can be sent and received spontaneously. Messages are delivered in the order they were sent. Unlike IP-based wide-area networking, there are no multiple paths that may cause a scrambling of the sequence of messages.

Interfaces communicating at the link layer must use link-layer addressing. A MAC address (for example, an Ethernet address) is different from, and unrelated to, an IP address. An Ethernet MAC address is globally unique to a device and there is no expected grouping of such addresses within a local area network. IP addresses on a LAN, on the other hand, will share a common network prefix.

Network layer: IP Networking

The Internet Protocol (IP) is a network layer protocol that handles the interconnection of multiple local and wide-area networks and the routing logic between the source and destination. It is a logical network whose data is transported by physical networks (such as Ethernet, for example). The IP layer provides unreliable, connectionless datagram delivery of packets between nodes (e.g., computers).

The key principles that drove the design of the Internet are:

  1. Support the interconnection of networks. The Internet is a logical network that spans multiple physical networks, each of which may have different characteristics. IP demands nothing of these underlying networks except an ability to try to deliver packets.

  2. IP assumes unreliable communication. That does not mean that most packets will get lost! It means that delivery is not guaranteed. If reliable delivery is needed, software on the receiver will have to detect lost data and ask the sender to retransmit it. Think of mail delivery: most mail gets to its destination but once in a while, a letter gets lost or takes a really long time to arrive.

  3. Routers connect networks together. A router is essentially a dedicated computer with multiple network links. It receives packets from one network and decides which outgoing link to send the packet.

  4. No central control of the network. The precursor of the Internet was the ARPAnet, built to connect companies and universities working on Department of Defense projects. As such, it was important that there wouldn’t be a single point of failure – a key element that could be taken out of service to cause the entire network to stop functioning.

Since IP is a logical network, any computer that needs to send out IP packets must do so via the physical network, using the data link layer. Often, this is Ethernet, which uses a 48-bit Ethernet address that is completely unrelated to a 32-bit IP address (or a 128-bit IPv6 address). To send an IP packet out, the system needs to identify the link layer destination address (MAC, or Media Access Control address) on the local area network that corresponds to the desired IP destination (it may be the address of a router if the packet is going to a remote network). The Address Resolution Protocol, or ARP, accomplishes this. It works by broadcasting a request containing an IP address (the message asks, do you know the corresponding MAC address for this IP address?) and then waiting for a response from the computer with the corresponding IP address. To avoid doing this for every outgoing packet, ARP maintains a cache of most recently used addresses.

Transport layer: TCP and UDP

IP is responsible for transporting packets between computers. The transport layer enables applications to communicate with each other by providing logical communication channels so that related messages can be abstracted as a single stream at an application.

There are two transport-layer protocols on top of IP: TCP and UDP.

TCP (Transmission Control Protocol) provides reliable byte stream (connection-oriented) service. This layer of software ensures that packets arrive at the application in order and lost or corrupt packets are retransmitted. The transport layer keeps track of the destination so that the application can have the illusion of a connected data stream.

UDP (User Datagram Protocol) provides datagram (connectionless) service. While UDP drops packets with corrupted data, it does not ensure in-order delivery or reliable delivery.

Port numbers in both TCP and UDP are used to allow the operating system to direct the data to the appropriate application (or, more precisely, to the communication endpoint, or socket, that is associated with the communication stream).

TCP tries to give a datagram some of the characteristics of a virtual circuit network. The TCP layer will send packet sequence numbers along with the data, buffer received data in memory so they can be presented to the application in order, acknowledge received packets, and request a retransmission of missing or corrupt packets. The software will also keep track of source and destination addresses (this is state that is maintained at the source and destination systems). We now have the illusion of having a network-level virtual circuit with its preset connection and reliable in-order message delivery. What we do not get is constant latency or guaranteed bandwidth. TCP also implements flow control to ensure that the sender does not send more data than the receiver can receive. To implement this, the receiver simply sends the amount of free buffer space it has when it sends responses. Finally, TCP tries to be a good network citizen and implements congestion control. If the sender gets notification of a certain level of packet loss, it assumes that some router’s queue must be getting congested. It then lowers its transmission rate to relieve the congestion.

The design of the Internet employs the end-to-end principle. This is a design philosophy that states that application-specific functions should, whenever possible, reside in the end nodes of a network and not in intermediary nodes, such as routers. Only if the functions cannot be implemented “completely and correctly,” should any logic migrate to the network elements. An example of this philosophy in action is TCP. TCP’s reliable, in-order delivery and flow control are all is implemented via software on the sender and receiver: routers are blissfully unaware of any of this.

A related principle is fate sharing, which is also a driving philosophy of Internet design. Fate sharing states that it is acceptable to lose the state information associated with an entity if, at the same time, the entity itself is lost. For example, it is acceptable to lose a TCP connection if a client or server dies. The argument is that the connection has no value in that case and will be reestablished when the computer recovers. However, it is not acceptable to lose a TCP connection if a router in the network dies. As long as alternate paths for packet delivery are available, the connection should remain alive.


One huge difference in moving from a local area to a wide area environment is that network delays become substantially longer, jitter (the variation in delay) increases, packet loss is more likely, and packets may occasionally arrive out of order.

Network delay is due to four factors:

  1. Processing delay. The processing delay is the work that a router has to do to examine the header, check for packet errors, figure out the outbound port (route), and move data around. It is usually not a significant contributor to the overall delay and consumes a few microseconds.

  2. Queuing delay. We can only transmit one packet onto a link at a time. Any other packets that need to go out on that link will need to wait in a queue. The queuing delay is a function of the amount of bits that are ahead of the packet (number of packets × the size of each packet) and the transmission rate of the outbound link. Queuing delay can vary a lot depending on how much data traffic is flowing over any particular link. It is dependent on how much traffic arrives at a router at approximately the same time that needs to go out on the same link and on how quickly the router can transmit the data out (see transmission delay).

  3. Transmission delay. The transmission delay is the time that it takes to get a complete packet out onto the network. This is a function of the speed of the link (e.g., 1 Gbps) and the number of bits in the packet: (packet size ÷ transmission speed). If the packet size is L and the transmission rate is R, the transmission delay is L/R. For example the transmission delay of a 1,500-byte packet on a gigabit per second link is approximately (1500*8)÷(10^9), or 12 microseconds.

  4. Propagation delay. The propagation delay is the time it actually takes the signal to move from one end of the medium to the other. While we might transmit the bits onto the network at, say, 100 megabits per second, there is a delay between the time that the signal is sent and the signal is received. This is the speed of signal propagation in the medium. For electrical signals in unshielded twisted pair or for light pulses in fiber optics, this value is approximately 2×108 m/s (about 67% of the speed of light in a vacuum). An electrical signal propagates in air on a wireless network at approximately 3×108 m/s. Depending on the distance the packet needs to travel, the delay may be from a few nanoseconds to a few tens of milliseconds. It might be considerably longer for satellite transmission due to the longer distance covered.


To achieve reliable delivery on an unreliable network, we rely on detecting lost or corrupted packets and requesting retransmissions.

The simplest possible mechanism is to send a packet and wait for the receiver to acknowledge it … then send the next one and wait for that to get acknowledged. This, unfortunately, is horribly inefficient since only a single packet is on the network at any time. It is more efficient to use pipelining and send multiple packets before receiving any acknowledgements. Acknowledgements can arrive asynchronously and the sender needs to be prepared to retransmit any lost packets.

It would be a waste of network resources for the TCP layer to send back a packet containing nothing an acknowledgement number. While this is inevitable in some cases, if the receiver happens to have data to transmit back to the sender, the acknowledgement number is simply set in the TCP header of the transmitted segment, completely avoiding the need to send a separate acknowledgement. Using an outgoing data segment to transmit an acknowledgement is known as a piggybacked acknowledgement.

TCP also uses cumulative acknowledgements. Instead of sending an acknowledgement per received message, TCP can acknowledge multiple messages at once.


Sockets are a general-purpose interface to the network provided to applications by the operating system. By this, we mean that they were not designed to support one specific network but rather provide a generic mechanism for inter-process communication. They are the only way that an application can interact with the network.

They are created with the socket system call and assigned an address and port number with the bind system call. For connection-oriented protocols (e.g., TCP), a socket on the server can be set to listen for connections with the listen system call. The accept call blocks until a connection is received, at which point the server receives a socket dedicated to that connection. A client establishes a connection with the connect system call. The “connection” is not a a configuration of routers as with virtual circuits; it is just state that is maintained by the transport layer of the network stack in the operating system at both endpoints. After this, sending and receiving data is compatible with file operations: the same read/write system calls can be used. When communication is complete, the socket can be closed with the shutdown or close system calls.

With sockets that use a connectionless protocol (e.g., UDP), there is no need to establish a connection or to close one. Hence, there is no need for the connect, listen, or shutdown system calls. The sendto and recvfrom system calls were created to send and receive datagrams since the read and write system calls do not enable you to specify the remote address. sendto allows you to send a datagram and specify its destination. recvfrom allows you to receive a datagram and identify who sent it.

Protocol encapsulation

We saw that if we want to send an IP packet out on an Ethernet network (IP is a logical network, so there is no physical IP network), we needed to send out an Ethernet packet. The entire IP packet becomes the payload (data) of an Ethernet packet. Similarly, TCP and UDP have their own headers, distinct from IP headers (they need a port number, for example). A TCP or UDP packet is likewise treated as data by the IP layer. This wrapping process is known as protocol encapsulation.

Remote Procedure Calls

Go here for lecture notes

Goal: Provide a layer of abstraction for process-to-process communication that enables a process on one system to invoke a function, or service, on a another system without having to deal with the problems of formatting data and parsing the request.

One problem with the interface offered by sockets was that it offered a send-receive model of interaction. However, most programs use a functional (procedure call) model. Remote procedure calls are a programming language construct (something provided by the compiler, as opposed to an operating system construct such as sockets). They provide the illusion of calling a procedure on a remote machine. During this time, execution of the local thread stops until the results are returned. The programmer is alleviated from packaging data, sending and receiving messages, and parsing results.

The illusion of a remote procedure call is accomplished by generating stub functions. On the client side, the stub (sometimes known as a proxy) is a function with the same interface as the desired remote procedure. Its job is to take the parameters, marshal them into a network message, send them to the server, await a reply, and then unmarshal the results and return them to the caller. On the server side, the stub (sometimes known as a skeleton) is responsible for being the main program that registers the service and awaits incoming requests for running the remote procedure. It unmarshals the data in the request, calls the user’s procedure, and marshals the results into a network message that is sent back to the recipient.

There are a few hurdles to overcome in implementing remote procedure calls:

Parameter passing
Most parameters in our programs are passed by value. That is easy to do remotely: just send the data in a network message. Some parameters, however, are passed by reference. A reference is a memory address of the parameter. The problem with passing this is that memory is local and the memory address passed from a client to a server will now refer to memory on the server rather than to the contents on the client. There is no good solution to this except to understand the data that is being referenced, send it to the remote side (pass by value), where it will be placed in some temporary memory. A local reference can then be passed to the server function. Because the contents might have been modified by the function, the data will need to be sent back to the calling client and copied back to its original location.
All data that is sent needs to be represented as a series of bytes that can be placed into one or more network messages. This is known as marshaling. Not only must any data structure be sent in a serialized format: a sequence of bytes with no pointers, but the format of this marshaled data must be standardized between the client and server so that the server can make sense of the data it receives and vice versa. Different processors and languages may use different conventions for integer sizes, floating point sizes and formats, placement of most significant bytes, and alignment of data.

The marshaled data that is sent over the network may contain data that makes it self-describing: identifying the individual parameters by name and type. Such a format is known as explicit typing. JSON and XML formats are examples of this. In contrast, implicit typing does not send such information and requires the remote side to know the precise expected sequence of parameters.

Generating stubs
Since most languages do not support remote procedure calls natively, something has to generate client and server stubs. That is often a stand-alone program known as an RPC compiler. The RPC compiler takes an interface specification as input and generates client-side stubs (proxies) and a server-side proxy (skeleton). The interface specification is written in an interface definition language (IDL) and defines remote classes, methods, and data structures. It contains all the information that the RPC compiler needs to generate stub functions.
Looking up services
A client process needs to find out how to set up a network connection to an appropriate service that hosts the remote procedures: which host and port to use. An RPC name server is a network service that a client can communicate with to query the host and port of a desired remote interface. The client sends the server a name identifying the interface. That “name” is often a number that uniquely identifies the service that hosts a set of functions on the server. The server returns a host and port number for the service that implements the functions. In many cases, the name server resides on the machine where the remote procedures run and the server will return only the port number. When a service starts up, it will register its interfaces with the RPC name server.
Handling failures
We don’t have a concept of local procedure calls not working. With remote calls, however, problems can arise. The server can stop working or network connectivity may break or experience unexpected delays. These may prevent or delay requests reaching the server or responses reaching the client. To combat this, RPC libraries may attempt to retransmit requests if a response is not received in time. This may have the side-effect of invoking a procedure more than once if the network is slow. In some cases, no harm is done in doing this. Functions that may be run multiple times without undesirable side-effects are called idempotent functions. Functions that have undesirable side-effects if run multiple times (e.g., transfer $500 from my checking account to my savings account) are called non-idempotent functions. Most RPC systems offer at least once semantics, in which case a remote procedure will be executed one or more times (if there are network delays) or at most once semantics, in which case a remote procedure library will avoid resending the procedure request even if it does not get a timely response. Software that uses remote procedure calls has to be prepared to deal with errors that can arise from problems in contacting or getting a response from a remote procedure.

Remote Procedure Calls: case studies

Go here for lecture notes


Sun’s RPC, formally called ONC (Open Network Computing) RPC was one of the first RPC systems to achieve widespread use. It is still in use on virtually all UNIX-derived systems (Linux, OS X, *BSD, SunOS). It uses an RPC compiler called rpcgen that takes input from an interface definition language (IDL) file. This is a file that defines the interfaces to the remote procedures. From this, rpcgen creates client stub functions and a server stub program. These can be compiled and linked with the client and server functions, respectively.

Every interface (set of functions) is assigned a unique 32-bit number, known as a program number. When the server starts up, it binds a socket to any available port and registers that port number and interface’s program number with a name server, known as the portmapper, running on the same machine. A client, before invoking any remote procedure calls, contacts the portmapper on the desired server with the program number to find the port to which it needs to send its requests.

The choice of transport protocol, UDP or TCP, can be specified at run-time. All incoming and return parameters are marshaled into a standard format called XDR, or eXternal Data Representation.


The Distributed Computing Environment, defined by the Open Group, created its own flavor of RPC which was similar to Sun’s. They also had the programmer specify an interface in an IDL, which they called the Interface Definition Notation (IDN).

To avoid the problem of picking a unique 32-bit identifier for the interface, DCE RPC provides the programmer with a program called uuidgen. This generates a unique universal ID (UUID) – a 128-bit number that is a function of the current time and ethernet address.

The Distributed Computing Environment also introduced the concept of a cell, which is an administrative grouping of machines. Each cell has a cell directory server that maintains information about the services available within the cell. Each machine in the cell knows how to contact its cell directory server. When a server program starts up under DCE RPC, it registers its port and the interface’s UUID with a local name server (the DCE host dæmon, dced, which is similar to the portmapper). It also registers the UUID-to-host mapping with the cell directory server. This allows for location transparency for services: a client does not need to know what machine a service lives on a priori.

A standard way of encapsulating data (marshaling) is crucial since encodings may differ between different machines. Sun defined a standard format called XDR (eXternal Data Representation). Every participating system must convert (marshal) data into this format. DCE defined a format called NDR (Network Data Representation). However, instead of creating a single set of definitions, NDR defines a set of data representations that can be used. The hope is that the sender can find a format that will require minimal or no data conversion (hence, greater efficiency). If the client uses the same system architecture, it also will not need to convert data. This is known as a multi-canonical approach to data conversion.

As object oriented languages gained popularity in the late 1980s and 1990s, RPC systems like Sun’s and DCE’s proved incapable of handling some object oriented constructs, such object instantiation or polymorphism (different functions sharing the same name, with the function distinguished by the incoming parameters). Creating objects, in particular, requires a need for memory allocation on the server and cleanup when these remote objects are no longer needed. This is called distributed garbage collection. A new generation of RPC systems dealt with these issues.

Microsoft COM+/DCOM & ORPC (MS-RPC)

Microsoft already had a mechanism in place for dynamically loading software modules, called components, into a process. This was known as COM, the Component Object Model and provided a well-defined mechanism for a process to identify and access interfaces within the component. The same model was extended to invoke remotely-located components and became the Distributed Component Object Model (DCOM), later fully merged with COM and called COM+. Because remote components cannot be loaded into the local process space, they have to be loaded by some process on the remote system. This process is known as a surrogate process. It runs on the server (under the name dllhost.exe), accepting remote requests for loading components and invoking operations on them.

COM+ is implemented through remote procedure calls. The local COM object simply makes RPC requests to the remote implementation. Microsoft enhanced the DCE RPC protocol slightly to create what they called Object RPC (ORPC). This is essentially DCE RPC with the addition of support for an interface pointer identifier (IPID). The IPID provides the ability to identify a specific instance of a remote class. Interfaces are defined via the Microsoft Interface Definition Language (MIDL) and compiled into client and server side stubs. The client-side stub becomes the local COM object that is loaded when the object is activated. Like DCE, ORPC supports multi-canonical data representation.

Since objects can be instantiated and deleted remotely, the surrogate process needs to ensure that there isn’t a build-up of objects that are no longer needed by any client. DCOM accomplishes this via remote reference counting. This is an explicit action on the part of the client where the client sends requests to increment or decrement a reference count on the server. When the reference count for an object drops to zero, the surrogate process deletes that object. To guard against programming errors or processes that terminated abnormally, a secondary mechanism exists, called pinging. The client must periodically send the server a ping set – a list of all the remote objects that are currently active. If the server does not receive this information within a certain period, it deletes the objects. This is a form of leasing, where the object expires if a lease is not renewed periodically. However, there is a subtle difference. With leasing, the lifetime of an object is generally renewed whenever an object is accessed, so there is no need for the client to ping the server to renew a lease unless it has not accessed the object for the duration of the lease.

Java RMI

When Java was created, it was designed to be a language for deploying downloadable applets. In 1995, Sun extended Java to support Remote Method Invocation (RMI). This allows a programmer to invoke methods on objects that are resident on other JVMs.

Since RMI is designed for Java, there is no need for OS, language, or architecture interoperability. This allows RMI to have a simple and clean design. Classes that interact with RMI must simply play by a couple of rules. All parameters to remote methods must implement the serializable interface. This ensures that the data can be serialized into a byte stream (marshaled) for transport over the network. Serialization is a core aspect of marshaling: converting data into a stream of bytes so that it can be sent over a network or stored in a file or database. All remote classes must extend the remote interface. A remote interface is one whose methods may be invoked from a different Java virtual machine. Any class that is defined as extends Remote can be a remote object. Remote methods within that class must be capable of throwing a java.rmi.RemoteException. This is an exception that the client RMI library will throw if there is a communication error in calling the remote method.

RMI provides a naming service called rmiregistry to allow clients to locate remote object references. These objects are given symbolic names and looked up via a URI naming scheme (e.g., rmi://

Java’s distributed garbage collection is somewhat simpler than Microsoft’s COM+. There are two operations that a client can send to the server: dirty and clean. When the first reference to a remote object is made, the client JVM sends a dirty message to the server for that object. As long as local references exist for the object, the client will periodically send dirty messages to the server to renew the lease on the object. When the local JVM’s garbage collector detects that there are no more references to the object, it sends a clean message for that object to the server.

Web Services

In the late 1990s, the web browser became the dominant model for user interaction on the Internet. It used HTTP, the Hypertext Transfer Protocol, over TCP for requests and responses and formatted web pages with HTML, the Hypertext Markup Language. While this created a decent user experience, dealing with fully-formatted pages was not good for programmatic access to that data. The user interface content (tables, images, text formatting) was a major component of the content. Approaches such as site scraping were often employed, where a program would request and receive an HTML page and then parse through tons of formatting directives to access the data it needed.

What we wanted was remotely-hosted services that programs, not users, can access. This enables machine-to-machine communication. Remote procedure calls would seem to be a natural choice for this but they also had issues when used on the Internet outside of an organizations. Because of the convenience of “you don’t need to pick a port”, RPC solutions typically ran services over an arbitrary range of ports where the operating system selected some unused port and registered it with an RPC name server. This led to an administrative nightmare where an administrator could not set a firewall rule to allow or block a specific port. Even though some RPC solutions were designed to support multiple languages, most worked well with just one.

Web services are a set of protocols by which services can be published, discovered, and used over the Internet in a technology neutral form. This means that they are designed to be language and architecture independent. Applications will typically invoke multiple remote services across different systems, sometimes offered by different organizations.

The general principles of web services are:

  • Use text-based payloads, called documents, marshaling all data into formats such as XML or JSON. This ensures that the marshaling format does not favor a specific processor architecture or programming language. It also ensures that content-inspecting firewalls do not cause problems.

  • Use HTTP over TCP/IP for transport. This allows us to use existing infrastructure: web servers, firewalls, and load balancers.

  • Tolerate high latency. Servers are likely not to be on a local area network and may be slow to respond either due to their own load or due to network latency. This means that, where possible, programmers should strive for asynchronous interactions: dispatch a request and see if you can do something else useful before you get the response.

  • Tolerate a large number of clients. Applications that interact with web services tend to be more loosely-coupled than those that use distributed objects (remote procedure calls). Services may be run by different organizations and servers cannot count on well-behaved clients or a small number of them. When possible, the ideal design is a stateless one where each client request contains all the necessary data and the server does not have to store any state between requests. Documents, the unit of message exchange in web services, tend to be self-describing. That is, they will identify and itemize all the parameters (explicit typing) and also describe any additional state needed for the interaction.

The use of web services leads to a programming model called Service Oriented Architecture (SOA). Under SOA, an application is the integration of network-accessible services where each service has a well-defined interface. These services, or components, are unassociated and loosely coupled. By unassociated we mean that neither service depends on the other one; they are all mutually independent. By loosely coupled we mean that neither service needs to know about the internal structure of other services. To maintain this independence, web services generally forgo distributed garbage collection.

Functionally, you can do anything with web services that you can with distributed objects (RPC). The differences are usually philosphical. Web services focus on document exchange and are designed with high latency in mind. Document design is central to web services. Distributed objects tend to look at the world in a way where interfaces are the key parts of the design. The data structures (“documents”) passed in these interfaces just package the data for use by them.


XML-RPC was created in 1998 as a simple protocol that marshals all requests and responses into XML messages. It is essentially a marshaling protocol and the standard does not define an IDL or stub function generator. There are a lot of libraries to support XML RPC and some languages implement it more transparently than others.


XML RPC took an evolutionary fork and, with the support of companies such as Microsoft and IBM, evolved into something known as SOAP, the Simple Object Access Protocol. The acronym has since been deprecated since SOAP has grown to the point where it is neither simple nor confined to accessing objects. XML RPC is a subset of SOAP. In addition to remote procedure calls, SOAP added support for general purpose messaging (send, receive, asynchronous notification) of messages. SOAP invocations are XML messages sent via an HTTP protocol. SOAP services can be described via a Web Services Description Language (WSDL) document. This is another XML document that essentially serves as an interface definition and defines all the names, operations, parameters, destination, and format of requests. WSDL is somewhat complex for human consumption. Typically, one creates an interface definition in a language such as Java and then uses a tool to translate that definition into a WSDL document. That WSDL document can then be fed to another tool (often by another programmer) to generate code that can be used to invoke remote functions or send remote messages.

In addition to WSDL, SOAP was also augmented with a directory service for storing and serving information about web services. The service is called UDDI, for Universal Description, Discovery, and Integration and uses SOAP to communicate, providing WSDL documents as a result. UDDI never really achieved widespread popularity or deployment.

JAX-WS: Java Web Services

As web services became popular, quite a few services to support them were created for the Java platform. One of the more popular ones, and supported by the Oracle (the owner of Java), is JAX-WS (Java API for XML Web Services). The goal of JAX-WS is to invoke Java web services using Java RMI. Unlike traditional Java RMI, interoperability is important since the remote side (client or server) may not necessarily use Java. JAX-WS uses SOAP and WSDL to achieve platform independence.


REST (REpresentational State Transfer) is a departure from the approach of SOAP. Instead of using HTTP simply as a conduit for sending and receiving XML messages where everything you need is contained in the body, REST incorporates itself into the HTTP protocol. In particular, the URL incorporates the request and list of parameters. The protocol intself defines the core nature of the operation:

  • HTTP PUT: create something
  • HTTP GET: read something
  • HTTP POST: update something
  • HTTP DELETE: delete something

The body of the message will contain the document, which will be formatted data and not, as in the case, a structure that also identifies the operations to be performed on the document.

Marshaling formats

When web services were first developed, the obvious marshaling format to use was XML. This was, in a rough way, what HTML used for describing the content of web pages. Its use was adopted by XML-RPC and SOAP and it remains heavily in use. However, it turned out to be a rather text-heavy protocol that was complex to parse. A lightweight alternative that gained much popularity is JSON (JavaScript Object Notation). Despite the JavaScript in the name, it was designed to be language-independent and easy to parse. It was touted as the “fat-free alternative to XML.” Even more efficient is the use of Google Protocol Buffers. This is a binary marshaling protocol and is not always suited for web services over HTTP but is phenomenally efficient for local services and for storing serialized data (e.g., saving objects in a file system).

Clock synchronization

Go here for lecture notes

Goal: Enable clocks on multiple machines to be synchronized to the same value.

No two clocks tick in perfect synchrony with each other. The difference between two clocks at any given instant is the clock offset. The rate at which the clocks are drifting is the clock drift. A linear compensation function adjusts the rate at which time is measured on a computer (e.g., number of ticks that make up a second).

Cristian’s algorithm sets the time on a client to the time returned by the server plus an offset that is one half of the transit time between the request and response messages: Tclient = Tserver + ½(Treceived - Tsent). It also allows one to compute the maximum error of the new time stamp. The error is ± ½[(round-trip time) - (best-case round-trip time)]. Errors are additive. If you incur an error of ±50 msec and the server’s clock source has an error of ±80 msec, your clock’s error is now ±130 msec.

The Berkeley algorithm does not assume the presence of a server with an accurate time (i.e., one that keeps track of UTC time). Instead, one system is chosen to act as a coordinator. It requests the time from all systems in the group (including itself) and computes a fault-tolerant average (an arithmetic average, dismissing systems whose time values differ by more than a certain amount). It then sends each machine an offset by which to adjust its clock.

The Network Time Protocol, NTP, was created to allow a large set of machines to synchronize their clocks. A set of machines act as time servers. This collection of machines is known as the synchronization subnet. The subnet is hierarchical, with the time server’s stratum defined as the number of hops it is from a machine that synchronizes from a direct time source. Machines that are directly connected to a time source (e.g., a GPS receiver) are at stratum 0. Machines that synchronize from a system at stratum 0 are at stratum one, and so on. The Simple Network Time Protocol, SNTP, is a restricted form of NTP that does not support peer-to-peer synchronization of time servers. The peer-to-peer mode maintains state on drift and synchronization time and is intended for time servers to synchronize with each other. SNTP is essentially the same as Cristian’s algorithm. The formula for NTP and SNTP is time_offset = ½ (T2 - T1 + T3 - T4) where T1 is the time the message left the client, T2 is the time it arrived at the server, T3 is the time the response left the server, and T4 is the time that it arrived at the client. If we let TS = ½(T2+T3) then we arrive at Cristian’s formula.

NTP encourages clients to try several servers. For each server contacted, the protocol computes

The difference between the client’s clock and the server’s. This is how much the client needs to offset its clock to set it to the server’s time.
This is an estimate of the time spent sending or receiving the message. It is one half of the round-trip time minus the estimate of time spent on the server.
Jitter measures the variation in delay among multiple messages to the server. It gives us an idea of how consistent the latency is between the client and server.
Dispersion is the estimated maximum error for the computed offset. It takes into account the root delay (total delay, not just to the server but the delay from the server to the ultimate time source), estimated server clock drift, and jitter.

Given the choice of several NTP servers, NTP picks the server with the lowest dispersion. That is, the server from which it can get the most consistent and accurate time. If there is a tie, it then picks one with the lowest stratum; that is, one closest to the master time source. Note that this may result having a client synchronize from a higher-stratum server even if it can contact a lower-stratum one (one that is closer to the time source). This may happen if the client can access that server more reliably and with less delay and jitter than a “better” server.

The Precision Time Protocol (PTP, an IEEE standard), was designed with different goals than NTP. While NTP was designed for synchronizing time with machines across the Internet and remote servers, PTP was designed for LANs: networks with low jitter and low latency. PTP is intended to achieve sub-microsecond precision among cooperating machines. Clock synchronization is initiated by a PTP master (unlike NTP, where the client host initiates the sync). The master announces its time to clients via a sync message. If a client is then interested in synchronizing its clock, it must communicate back to the server in order to discern the delay in communication. The client sends a delay request message and notes the time it was sent. The server sends back a delay response message containing the time of arrival of the delay request message. With this data, the client can compute the server’s timestamp with an adjustment for the network transit delay.

Logical clocks

Go here for lecture notes

Goal: Allow processes on different systems to identify causal relationships among events, particularly among messages between these systems.

Lamport clocks allow one to assign sequence numbers (“timestamps”) to messages and other events so that all cooperating processes can agree on the order of related events. There is no assumption of a central time source and no concept of total ordering (when events took place). The central concept with logical clocks is the happened-before relation: a→b represents that event a occurred before event b. This order is imposed upon consecutive events at a process and also upon a message being sent before it is received at another process. Beyond that, we can use the transitive property of the relationship to determine causality: if a→b and b→c then a→c. If there is no causal relationship between two events (e.g., they occur on different processes that do not exchange messages or have not yet exchanged messages), the events are concurrent.

Lamport’s algorithm states that every event is timestamped (assigned a sequence number) and each message carries a timestamp of the sender’s clock (sequence number). A message comprises two events: (1) at the sender, we have the event of sending the message and (2) at the receiver, we have the event of receiving the message. The clock is a process-wide counter (e.g., a global variable) and is always incremented before each event. When a message arrives, if the receiver’s clock is less than or equal to the message’s timestamp, the clock is set to the message timestamp + 1. This ensures that the timestamp marking the event of a received message will always be greater than the timestamp of that sent message.

Vector clocks

One result of Lamport timestamps is that multiple events on different processes may all be tagged with the same timestamp. We can force each timestamp to be unique by suffixing it with a globally unique process number. While these new timestamps will not relate to real time ordering, each will be a unique number that can be used for consistent comparisons of timestamps among multiple processes (e.g., if we need to make a decision on who gets to access a resource based on comparing two timestamps).

A second deficiency with Lamport timestamps is that, by looking at timestamps, one cannot determine whether there is a causal relationship between two events. For example, just because event a has a timestamp of 5 and event b has a timestamp of 6, it does not imply that event a happened before event b.

A way to create timestamps that allow us to discern causal relationships is to use a vector clock. A vector clock is no longer a single value but rather a vector of numbers, with each element corresponding to a process. Before affixing a vector timestamp to an event, a process increments the element of its local vector that corresponds to its position in the (for example, process 0 increments element 0 of its vector; process 1 increments element 1 of its vector). When a process receives a message, it sets the vector of the event to one that contains the higher of two values when doing and element-by-element comparison of the original event’s vector and the vector received in the message. This becomes the new per-process vector from which future events on that process will be timestamped. Think of a vector timestamp as a set of version numbers, each representing a different author.

For example, in an environment of four processors, P0, … P3, P1 will “own” the second element of the vector. If one event on P1 is (2, 4, 0, 1) then the next event will be (2, 5, 0, 1). If the event after that is the receipt of a message with a timestamp of (3, 2, 9, 8) then the timestamp will be created by setting an element-by-element maximum of (2, 6, 0, 1) and (3, 2, 9, 8), which is (3, 6, 9, 8). We can illustrate this with the following pseudocode where e is the system’s vector counter and r is the received vector timestamp:

/* receive message with vector timestamp r */
e[myproc]++;    /* increment our process' index */
for (i=0; i < num_procs; i++) {
    if (e[i] < r[i])
        e[i] = r[i];

Two events are concurrent if one vector timestamp is neither greater than or equal nor less than or equal to the other element when doing an element-by-element comparison. For example, events (2, 4, 6, 8) and (3, 4, 7, 9) are not concurrent (i.e., they are causally related) because every element of the first vector is less than or equal to the corresponding element of the second vector. The vectors (2, 4, 6, 8) and (1, 5, 4, 9) represent concurrent events. Neither vector is less than or equal to the other. For instance, 2 ≥ 1 (first element of the first vector is greater than the first element of the second vector) but 4 ≤ 5 (second element of the first vector is less than the second element of the second vector).

Note that in an environment where the overall set of processes is not known by each process, a vector timestamp may consist of a set of <process, number> tuples. In such a case, the timestamping and comparison process is exactly the same: numbers are compared with those of matching processors and any missing process is treated as having a value of 0 in that vector.

Group communication

Goal: Provide mechanisms for a process to send a message to a group of processes instead of to one process. Consider the issues of reliability and message ordering.

Point-to-point communication is known as unicast. This is what we generally use to communicate a single client and server. There are other modes of communication. Broadcast is the sending of data to every host on the network. Anycast is point-to-point communication (as in unicast) but the receiver is the nearest one of receivers with the same address (for example, IPv6 uses this to allow a host to update the routing table of the nearest host). Anycast is a special purpose form of unicast that will will not discuss here.

An alternative to point-to-point communication is group communication, or point-to-multipoint messaging. Group communication is known as multicast and provides the abstraction of allowing a process to send a single message that is delivered to all group members. True multicast is handled by the network and offers the efficiency of avoiding replicated messages. The sender sends a single message on a special multicast address. Interested parties listen on that address and the network takes care of the delivery. If multicast is not available, it can be simulated by invoking multiple unicasts, one to each recipient.

There are two considerations in group communication (multicast): reliability and message ordering.

Message receipt versus message delivery

We talk about a process receiving a message, which means that it is received from the network and handled by a multicast receiving algorithm. This algorithm decides when to deliver the message to the application logic.

Since receivers cannot control the order in which messages are received over the network, a layer of software is responsible for transmitting a multicast message and, at the receiver, receiving it and deciding when and if to make it available to the application (i.e., deliver it). When a message is received, the multicast receiving algorithm may take one of three actions:

  1. Discard the message. This may be done if the receiver is no longer a member of the group or if the message is a duplicate.

  2. Deliver the message. The message is placed into a FIFO (first-in, first-out) queue from which the application reads incoming multicast messages.

  3. Hold the message. The message is not ready to be delivered to the application. Most likely, this is because it has not arrived in the expected order and the algorithm must first receive an earlier message. Alternatively, it may need to be held to get feedback that all other members have received it before passing it to the application. In cases like this, the message is placed on a hold-back queue. When the next message is received, the algorithm may check the hold-back queue to determine whether any held messages can now be delivered or discarded.


An atomic multicast requires that a message must reach all group members (if one host cannot get the message, no others can process it). This multicast must survive machines going down – both the sender and/or receivers. If a recipient is not certain that a message has been received by all group members, it cannot deliver it to the application. Because of this, it requires the most overhead in implementation, often employing persistent logs.

A reliable multicast is a multicast where the sender tries its best to ensure that messages get to group members. The sender sends a message and waits for an acknowledgement. If it doesn’t receive the acknowledgement in time, it will retransmit the message. It will try this again and again until, eventually, after a longer interval of time, it will give up and assume the receiver is dead.

An unreliable multicast doesn’t wait for acknowledgements and will generally use whatever underlying multicast mechanism is provided.

Message ordering

The issue in message ordering is that multiple hosts can be sending messages to the entire group at the same time. Will each host receive all the messages in the exact same order? Will each host receive all the messages in the order they were sent?

Global time ordering requires that all messages arrive in the exact order they were sent: if host A sent a message 5 nanoseconds before host B did then all hosts should receive the message from host A first. This is impossible to implement (clocks can’t be that perfectly synchronized and the chance of a tie is always present; also, networks will not be able to guarantee this ordering if routing is involved). A more practical approach is total ordering. This requires that all messages arrive in the same order at all hosts. This can be easily achieved by providing a group-wide mechanism for obtaining a totally sequenced message ID: for example, from a sequence number server.

Causal ordering means that messages that are causally related (according to Lamport’s happened before definition) will be delivered in order to all group members. Concurrent messages may be delivered in any order. One way of implementing causal ordering is by having each process keep a precedence vector and sending it along with every message that is sent to the group. A precedence vector is similar to a vector timestamp with the exception that an event counter is not incremented for received messages. The vector applies only to events that relate to sending messages to a specific group. Each entry in the precedence vector represents the latest message sequence number that the corresponding process (group member) knows about.

When a process Psender sends a message, it increments the element of the vector that corresponds to its own entry:

Vsender[sender] = Vsender[sender] + 1

This vector is sent along with the message.

When a process Preceiver receives a message from Psender, it checks two conditions:

(1) The message must be the very next message from Psender. That is, the value of the sequence number in the received vector that corresponds to Preceiver must be exactly one greater than the one we have for that process in our vector:

(Vsender[i] == Vreceiver[i] + 1) ?

This tells us that we are receiving messages in sequnce from Psender. If the value is more than one greater, it means that we missed one or more messages from that sender. This message will have to go on the hold-back queue until the the earlier messages arrive.

(2) The message should not be causally dependent on another message that the receiver has not yet seen. This means that every other element of the vector has to be less than or equal to the corresponding element of the vector at the receiver.

∀i, i ≠ sender: (Vsender[i] ≤ Vreceiver[i]) ?

If the vector from the sender contains some sequnce number that is greater than a corresponding sequence number in the receiver’s vector, that means that the sending process has seen a message from some other process that the receiver has not yet processed. Since the precedence vector is used for group communication and the reciver is part of the group, that means the receiver needs to wait for that message to come.

If both conditions are satisfied, then the received message can be delivered to the application immediately. Otherwise, it is placed in the hold-back queue until the conditions can be satisfied. Causal ordering has the advantage that there is no need for a global sequencer as in total ordering. Note that the precedence vector technique requires reliable message delivery. Otherwise, the receiver will not know if a message was lost or if a message has just not yet arrived.

Sync ordering requires a special type of message — a sync message. When this is issued, any messages that were already sent have to be processed (and acknowledged to the senders). The sync assures that any messages sent before the sync will be processed before any messages after the sync (globally – for all hosts).

FIFO (first in, first out) ordering on messages ensures that messages from each source are delivered in order but messages from multiple sources may be interleaved in any order at the receiver. For example, if host A sends messages m1, m2, m3 and host B sends messages n1, n2, n3, it is valid for host C to receive the sequence m1, m2, m3, n1, n2, n3 and for host D to receive the sequence m1, n1, n2, m2, n3, m3.

Finally, an unordered multicast doesn’t impose any message ordering among messages from other hosts.

IP multicast

IP multicasting is designed, like IP, to span multiple physical networks. Membership is dynamic: a machine can join or leave a multicast group at any time. Moreover, there is no central coordinator and no restriction on the number of hosts that can be in a group. Multicasting provides network efficiency. Packets in a multicast stream only need to be replicated when a router needs to send them to multiple network links. Only one stream of packets is needed on any network segment regardless of the number of receivers.

An IP multicast address (also known as a class D address) is an IP address that starts with 1110 and contains a 28-bit multicast address. A host may join this address and receive messages addressed to that multicast ID. Within a LAN, an IP class D address is mapped onto an Ethernet multicast address by copying the least-significant 23 bits of the address onto an Ethernet multicast address. Within a LAN, an ethernet chip is programmed to to perform an exact match on a small set of addresses or to accept addresses that hash to particular values. The ethernet driver will need to remove any unneeded addresses that pass through. The ethernet chip can also be set to multicast promiscuous mode, where it will accept all multicast ethernet packets.

Routers have to get involved to support multicasting beyond the local area network. Two protocols are used to implement multicasting. The Internet Group Management Protocol (IGMP) is designed for hosts to inform the routers on their LAN that they are interested in joining a multicast group. The Protocol Independent Multicast (PIM) protocol enables routers to tell their neighboring routers that they are, or are no longer, interested in receiving packets for a particular multicast group.

A host uses IGMP to send a multicast join message (also known a a membership report) to join a specific group. A multicast-aware router will get this message and now know that the the link on which the message arrived needs to receive any packets addressed to that multicast group.

Periodically, a router will send a membership query message to all hosts on the LAN. If any node is still interested in the group, it must re-send a join message. In version 1 of the protocol, if no join messages are received, then the router would stop responding to join messages and the LAN will no longer receive packets for that group. With IGMP v2, a leave message was added to avoid having to wait for the timeout to realize that nobody is interested in a group. That avoids needlessly sending multicast traffic on a LAN where no hosts are interested in receiving the messages anymore.

A lingering problem was that multicast IP uses no centralized coordinator and anyone can send multicast messages to any multicast addresses. IGMP v3 adds the ability for a host that joins a multicast group to specify the source address (originator) of the multicast. The router will then not forward packets originating from unwanted source addresses onto the LAN.

IGMP allows edge routers (those connected to LANs) to know what multicast groups the nodes on its connected LANs are interested in. PIM, Protocol Independent Multicast, is responsible for conveying multicast membership information among routers within the wide area Internet. It assumes the presence of other protocols to know the network topology and which routers are connected together. There are two basic approaches to multicasting on the WAN (wide-area network): dense mode (flooding) and sparse-mode multicast.

Dense Mode multicast, also known as flooding, originates from the multicast sender. The message is duplicated and sent to all connected routers. Each of those routers, in turn, duplicates and sends the message to all of its connected routers, and so on. To avoid routing loops, each router uses reverse path forwarding (RFP). A received packet is forwarded only if it was received via the link that the router knows is the shortest path back to the sender (it finds this by checking its forwarding table, which is what it would use if it was sending a packet to that address). PIM Dense Mode floods the entire network of connected multicast-aware routers.

If an edge router receives this multicast packet and is not interested the data stream (i.e., it has not received IGMP join messages), it will send a prune message to the router that delivered that packet. If that router receives prune messages from all interfaces, it will in turn send a prune message to the router that is sending it the multicast messages. A router sends prune messages if it is getting redundant traffic from another link or if its downstream router or LAN connections are not interested in the stream. If a node on a LAN joins a multicast group at a later time, sending an IGMP message to a router, that router would then send a PIM Graft message to its connected routers to state interest in the stream. Dense mode only makes sense when there are receivers spread through most locations covered my multicast-aware routers. It is rarely used.

In contradistinction to Dense Mode, PIM Sparse Mode starts with requests from multicast receivers rather than flooding the network with traffic from the sender. Each router must send a Join message to its connected routers in order to request multicast traffic (and a Prune message when it no longer is). This causes multicast packets to only go to the routers where it is needed. The trick to getting this to work is that a multicast group must be associated with a router known as a rendezvous point (RP). The RP acts as a central point that senders know how to contact to register that they are transmitting multicast streams and for receivers to contact to join multicast streams. Join messages initially are routed to the RP to avoid flooding the network. From there, they are routed to participating routers – routers that expressed an interest in that multicast group.

A sender that transmits multicast packets will simply send them only to the rendezvous point (this is effectively a unicast stream). The RP then multicasts the packet to any routers from which it has received Join messages.

To ensure that the RP does not become a bottleneck, after a the aggregate bandwidth exceeds a defined threshold, routers closer to the receiver will try to Join routers that are more directly connected to the source since they have seen the multicast traffic and know the source address.

Sparse mode is ideal when the receivers are concentrated among a few network segments.

State Machine Replication and Virtual Synchrony

Go here for lecture notes

Goal: Create a software framework that gives everyone the same view of live group members and ensures that messages are multicast to all of them in the same order. There should never be a case where only some group members receive a message.

State Machine Replication

In a clustered computing environment, we often would like to create replicas of certain components, such as name servers, compute servers, databases, file servers, or object stores. That is, we’d like multiple computers to have identical content or have their processes be in the same state. This is useful for both scalability and high availability (fault tolerance).

Scalability means that we can handle an ever increasing number of requests by adding more servers. For serving files or web pages, where reads far outnumber writes, keeping replicated computers allows us to distribute read requests among the set of machines holding replicas.

High availability means that the service can survive computer or network failures. Redundancy helps achieve these goals by allowing a request to be sent to any surviving computer even if a process, computer, rack, or entire data center is down. Replicated components can take the place of those that stopped working. Active-passive replication means that the replicated systems do not process requests when the main system is up but are standing by in case it fails. Google’s Chubby lock manager/file system (we’ll look at this later) is an example of this, where requests must always go to the master. Any changes at the master get propagated (replicated) to the replicas. With active-passive systems, we get fault tolerance by having standby systems but achieve no scalability of performance since one process has to handle all requests. Active-active replication is where all components are working and accepting requests. Because active-active systems allow clients to contact send a request to any available system, one can distribute, or balance, client load across the set of replicas.


Faults may be fail-silent or byzantine. With fail-silent faults, the faulty process is dead: it does not accept or transmit communication. With byzantine faults, a process is running but producing erroneous data. Byzantine faults are more difficult to handle and, for our discussion, we will assume that our processes can detect faulty messages (e.g., checksums, digital signatures, encryption, and time stamps will be used and we’ll examine these techniques in a future lecture) and disregard them. With fail-silent faults, a process may remain dead. In that case, it exhibits fail-stop behavior. If a failed process recovers, it exhibits fail-recover behavior. Recovery is good, of course, but we need to realize that the process has not received any updates to the state of the group during the time it was dead.

The two-army problem demonstrates that reliable communication can never be achieved with faulty communication lines. The story goes like this:

Two armies, A & B, need to decide whether to attack the enemy. If both attack, they win. If only one attacks, it will die. A sends a messenger to B: “let’s attack in the morning”. But A needs to know that B received the message so it asks for an acknowledgement as a return message. If the messenger bearing the acknowledgement does not arrive, A does not know if the return messenger didn’t make it and B got the message or if B never received the message. If A receives a response, B isn’t sure if the messenger made it to A or not. B can ask for an acknowledgement to the acknowledgement but then A will not be sure if the second acknowledgement made it to B. We can do this indefinitely…

In asynchronous networks, such as the IP network, there is no time limit by which one can reliably expect a packet to arrive. The two-army problem shows us that we can never achieve 100% certainty that a process is truly dead (fail-silent) just because we do not hear from it. This is something we have to live with. Because of this, one process cannot reliably detect that another one has failed. Instead, a process may suspect the another process has failed and share that knowledge with all other members of the group, taking the “failed” process out of the group even if it is actually alive. Virtual synchrony deals with managing this type of group-wide list of active group members. As with active-passive systems, changes at one process must be propagated to all others. However, consistency becomes a consideration: if all replicas do not get the updates at the same time (atomically), then there is a risk that clients may read inconsistent or old data depending on which server handles their requests.

For both scalability and high availability, all replicas should share the same software and same data. This data reflects the state of the system. It would be confusing if data in a file sometimes looks different because it is accessed from different servers that received write requests in a different order, or if only some systems were told that a lock has been released.

To accomplish this synchronization among replicas, processes across different systems must see the same sequence of events (events that modify the state of the system). These events can include lease/lock information, file or object writes, or any inputs into a process that is running on a machine.

Virtual synchrony

Virtual synchrony is a technique used to provide applications with the abstraction of multicasting a message to a group of processes and ensuring that all data is replicated to all processes in the group in such a manner that it is indistinguishable from using a single non-faulty system. This involves managing group membership and ensuring that causal dependencies on messages from multiple systems are taken into account.

Virtual synchrony focuses on process groups. Each process group is a collection of processes. When a process sends a message to the group, it must be received by all current group members. Processes may join groups, leave groups, and send messages to groups. The processes that send messages to the group are often, but not necessarily, outside of the group (e.g., think of systems communicating with a fault tolerant server).

A group view is the set of processes that are currently in a given process group. Every multicast is associated with a group view and this group view must be identical for every process that will send multicasts to the group.

The virtual synchrony framework tracks group membership. Over time, the membership of a process group may change. Processes may be added and processes might leave (e.g., a computer may crash). This change of membership is called a view change. Any processes in the system should be made aware of this change since future multicasts will have to be sent to the new group view. Message delivery should not span a view change. That is, if a view change takes place while a message was being multicast, that message should be delivered to all the processes in the group before the view change message is delivered to any member.

Although our discussion is general, we will focus on the Isis framework, which is deployed in places such as the New York Stock Exchange and the Swiss Exchange. Similar services are also used by IBM’s Distribution and Consistency Services, Microsoft’s Cluster Service, CORBA remote objects, and many others.

Group Membership Service

A Group Membership Service (GMS) keeps track of which processes are in the group. If a process, p, reports another process, q, as faulty, the GMS will take the faulty process out of the group and tell every process with a connection to q that q is no longer in the group. With asynchronous networks, we cannot be 100% sure that the process was really faulty; it might have just been slow to respond. However, if any process suspects that process is dead, we will just take it out of the process group and nobody will send messages to it. Once it is taken out of the group, it will have to rejoin just as if was a new process.

When a process joins a group, it first needs to import the current state of the synchronized group. This is called a state transfer. The process contacts an existing member and transfers the state to initialize itself to the most recent checkpoint. During this time, it is not processing any requests. After the state transfer, it is part of the group.

Reliability and message ordering

An enhanced version of Isis, Isis2, supports four types of multicast protocols depending on the order of delivery that is needed: unordered, causally ordered, total ordered, and sync ordered (barriers). Isis2 also includes a SafeSend API that is a full Paxos implementation, which is a relatively recent addition to the software (we will look at Paxos, a consensus protocol, in a future lecture).

Systems can think of the process group and virtual synchrony as one abstract entity. Isis uses reliable TCP point-to-point links and the client library iterates through group members to deliver the message to each member. Upon receiving a message, each receiver adds the message to its queue (but does not yet deliver to the application). At this point, the message is marked unstable at the receiver. The receiver then sends an acknowledgement to the sender. When the sender receives acknowledgements from all members, it sends a confirmation to all processes in the group, who then mark the message as stable and deliver it to the application. To improve performance, not every message is actually acknowledged. A receiver may collect a set of messages, each with a per-source sequence number and periodically send a cumulative acknowledgement containing the highest sequence number it received. Likewise, the sender can send a cumulative confirmation with a sequence number that tells each receiver that it is safe to deliver all messages with a lower sequence number. If a multicast sender dies before it is able to confirm delivery of messages to the entire group, these messages will remain in an unstable state at the receivers and are not delivered to applications. When the death of the sender is eventually detected (either by the Group Membership Service or by a process that then alerted the GMS) or a new member joins, the GMS instantiates a view change since the group membership has now changed. The view change will ensure that unstable messages are received by all current group members, thus enforincing atomic multicasting (the all or none property).

Processing a view change

If some process P receives a view change message because another process wants to join the group or a process has left the group, P will forward a copy of any unstable messages to every process in the group. This will ensure that the messages will become stable and all current group members will get to deliver them to their applications. Process P then needs to be sure that all group members are ready and have no unstable message of their own. To do this, P sends a flush message to the group. Upon receiving a flush message, a process will do the same thing that P did: multicast all unstable messages (if it has not done so yet) and then acknowledge the flush. When a process has received a flush message from every other process in the group, it knows that there are no unstable messages left in the system and can switch to the new group view. We thus achieve atomicity of multicasts. The flush is the implementation of a barrier: all outstanding messages must be delivered to applications before the barrier allows new message traffic to continue.

Mutual exclusion

Go here for lecture notes

Goal: Allow processes to request and be granted exclusive access to named resources. The algorithms should be designed to avoid starvation and ensure that exactly one requesting process gets the resource even if multiple processes make requests concurrently.

Mutual exclusion is responsible for making sure that only one process or thread accesses a resource at a time. In non-distributed systems, it is accomplished through mechanisms such as semaphores and monitors and, at a low level, through instructions such as test-and-set locks. None of these mechanisms work across networks of computers. We need an algorithm that will allow processes to compete for access to a resource and ensure that at most one process will be granted that access. The algorithm must be fair: it has to ensure that all processes that want a resource will eventually get a chance to get it. If this is not the case, starvation occurs, where a process will have to wait indefinitely for a resource.

The “resource” is an arbitrary name that all processes agree to use. It may refer to a critical section of code, a physical device, or some network service. A mutual exclusion algorithm allows a process to request access to this resource and be granted exclusive access to it. This is known as a lock. If the lock has a timeout associated with it, then it is known as a lease.

Distributed algorithms fall into three categories. A centralized approach uses a central coordinator that is responsible for granting access permissions.

A token-based approach allows a process to access a resource if it is holding a “token”; that is, it received an unsolicited message where ownership of the message allows the process to access the resource. Sending the message (token) means forfeiting access.

A contention-based algorithm is one where all processes coordinate together on deciding who gets access to the resource.

The centralized algorithm is a server that accepts REQUEST messages for a resource. If nobody is using the resource, it responds with a GRANT message. If somebody is using the resource, that client simply does not respond. When a client is done with a resource, it sends the server a RELEASE message. The server then sends a GRANT message to the next client in the queue (if there is one).

The token ring algorithm creates a logical communication ring among the processes in the group. A message, called a token, is created for each resource. This token is passed from process to process along the ring. If a process receives a token and does not need to access that resource, it simply passes the token to the next process. Otherwise, it will hold on to the token until it is done with the resource.

Lamport’s mutual exclusion algorithm requires each process that wants a resource to send a timestamped request for the resource to all processes in the group, including itself. Receipt of each message is immediately acknowledged by every process. Each process places the received message in a local priority queue that is sorted by the Lamport timestamp that is in each message (or any totally unique timestamp). A process decides whether it can access the resource by checking whether its own request is the earliest (first) one in the queue of all requests that it has received. If so, it accesses the resource and, when done, sends a release to all members (including itself). The receipt of a release message causes each process to remove the process from the ordered queue. If a process now finds itself as the earliest process in the queue, it knows that it is now its turn to access the resource.

Lamports mutual exclusion algorithm summary: send a request message to everyone in the group, including yourself. Get an acknowledgement from everyone. Each process stores request messages in a priority queue sorted by timestamp. The process ID at the head of the queue can access the resource. When it is done, it sends a release message to everyone.

The Ricart & Agrawala algorithm. like Lamport’s, is also based on using reliable multicasts. A process that wants to access a resource sends a request to all other processes in the group and waits for all of them to respond. If another process is currently using the resource, that process delays its response until it is done. If process A and process B sent out requests concurrently (i.e., two systems want the same resource concurrently), each system compares the timestamp of that request with that of its own request. If process A received a request from process B that is older (a lower timestamp) than the one it sent, then process A will give process B priority to access the resource by sending a response to process B . Otherwise, if process A has the earlier timestamp, it will queue the request from B and continue to wait for all acknowledgements, sending a response to B (and any other processes who wanted the resource) only when it is done using the resource. The key point is that processes A and B will make the same comparison and exactly one will hold back on sending the response.

Ricart & Agrawala’s algorithm summary: send a request message to everyone in the group and wait for acknowledgements from everyone. Any process using the resource will delay sending an acknowledgement.

The Lamport and Ricart & Agrawala algorithms are similar in that they are both contention based and truly distributed. Ricart & Agrawala’s requires fewer messages since there is no explicit release message that needs to be sent; acknowledgements are simply delayed. Neither are efficient compared with the centralized algorithm.

Election algorithms

Go here for lecture notes

Goal: Allow all processes in a group to elect exactly one of the processes in the group. All processes should agree to elect the same process.

Election algorithms are designed to allow a collection of processes to agree upon a single coordinator. All candidate processes are considered to be equally capable of acting as a coordinator. The task is to select exactly one of these processes — and make sure that every process is aware of the selection. In designing these algorithms, we assume that every process has a unique ID (for example, a process ID combined with the machine’s address). The algorithms we examine in this section will choose a winner (a coordinator) by selecting either the highest available process ID or the lowest one. It is important that whatever criteria is used will never result in one process choosing a different winner than another.

The bully algorithm selects the largest process ID as the coordinator. If a process detects a dead coordinator, it sends an ELECTION message to all processes with a higher ID number and waits for any replies. If it gets none within a certain time, it announces itself as a coordinator. When a process receives an ELECTION message it immediately sends a response back to the requestor (so the requestor won’t become the coordinator) and holds an election to see if there are any higher-numbered processes that will respond. If not, then it declares itself as coordinator.

The ring election algorithm requires a logical communication ring among the processes in the group (as we had in the token ring mutual exclusion algorithm). When a process detects a non-responding coordinator, it creates an ELECTION message containing its own process ID and sends it to the next process in the ring. If the process is dead, then it sends the message to the following process. This sequence continues, with each process sending an ELECTION message to the next process in the ring, skipping dead processes until it finds a process that can receive the message. When a process receives an ELECTION message, it adds its own process ID to the body and sends that message out to its neighboring process. When the election message circulates back to the original sender (the sender detects this by seeing its process ID at the head of the list, indicating that it started the election), the sender looks at the list of active processes in the message body and chooses a coordinator from among them. Since multiple elections may have been started concurrently, the algorithm for choosing the leader must yield the same result among the different list. Hence, selecting the highest or the lowest process ID will work. Selecting the first or last process in the list will not.

The Chang and Roberts ring algorithm optimizes the ring algorithm in two ways. First, there is no need to keep the entire list of live processes in the election messages; we can perform the election as we go along. If the goal is to vote for the highest-numbered live process ID and a process receives an election message, it does one of two things: (1) pass it untouched if its process ID is smaller than the one in the message, or (2) replace the process ID In the message with its own if the process ID is greater than the one in the message. When a process receives a message that contains its own process ID, it knows that the message has come full circle and it is the winner. It will then notify each process of the outcome.

The second optimization is to avoid having multiple election messages circulate if multiple processes have initiated elections concurrently. To do this, a process keeps track if it has participated in an election (i.e., has initiated or forwarded an election message). If a process receives a message with a smaller process ID than its own and it is a participant in an election, it simply discards the message. If, on the other hand, the received message contains a greater process ID than its own, it will forward the message even if it is a participant in an election since the message contains a candidate that takes priority over the previous one that was forwarded. Once election results are announced, each process clears its status of participaing in an election.

One problem with election algorithms is when a network gets partitioned (also known as segmented): one set of processes is separated from another and cannot communicate with the other. In this case, each segment may elect its own coordinator and problems can arise. This is known as a split brain situation. To combat this, a redundant network is needed or some alternate communication mechanism to enquire about the state of remote processes.

Another approach that is often taken is to require a majority of prospective coordinators to be available. A quorum is the minimum number of processes that need to be available to participate in an operation. In this case, the operation is an election and we need a quorum of over 50% of the processes. If a network is partitioned into two or more segments, at most one of the partitions will have over 50% of the processes. Those processes that do not will refuse to elect a coordinator.


Go here for lecture notes

Goal: Create a fault-tolerant distributed algorithm that enables a bunch of processes to agree on a single value.

The goal of a consensus algorithm is to have a group of processes agree on a common value. The value must be one of the values that was proposed by at least one of the processes. Once the processes agree on the value, it is considered final.

A common use of consensus is in replicated state machines to build fault-tolerant systems. Processes send messages (commands, such as data updates) to the group of replicated servers. These messages are received by each of the servers and added to a log on the server. If one process wants to set “name=value_1” and, concurrently, another process tries to set “name=value_2”, we’d like every instance of our database to consistent. Every copy should have either “name=value_1” or “name=value_2”. It is not acceptable to have some instances have “name=value_1” while others have “name=value_2”. This log must be consistent across all servers so that when the commands in the log are applied to the state machine (i.e., the processes on the servers read their logs and apply the commands) they are all processed in the same sequence on each server. Consensus enables all processes to agree on the order in which commands are added to the replicated logs. Another example of choosing a value is running an election: each process may propose itself as a contender and the consensus algorithm will pick one winner. Yet another example is mutual exclusion: agreement on who gets a lock on a resource.

Achieving consensus is easy if processes don’t fail or we are willing to wait for all processes to recover if they die (as with the two-phase commit protocol). The challenge with a consensus algorithm is to achieve consensus in the presence of faults.

A consensus algorithm must satisfy four goals:

The outcome must be one of the proposed values.
Uniform agreement
No two processes may ultimately agree on different values.
A process must ultimately agree on a single value. It cannot change its mind later.
The algorithm must eventually terminate such that every process will decide on the same value.


Paxos is a fault-tolerant distributed consensus algorithm. It enables participants to agree on a proposed value (arbitrary data) as long as a majority of processes running the Paxos algorithm are alive (more details on this in a little while). Paxos is a general-purpose consensus algorithm. Among other things, it can be used to create agreement among participants on a globally consistent (total) order for client messages. The agreed-upon data, for example, can be commands to update the state of a system or data in a file.

Paxos is made up of three groups of agents: proposers, acceptors, and learners. These need not be separate systems and one system may, and usually does, take on multiple roles. Proposers receive requests from clients. They are responsible for running the protocol by sending messages to and receiving messages from acceptors. Acceptors provide the basis of fault tolerance in the system. Once acceptors agree to a value, they propagate the result to learners, which simply forward the results to the processes that need to act on them.

For now, let’s assume that there is only a single proposer to which all clients send requests. Also, let’s assume there is a single learner.

Without contention from other processes or any failure, what Paxos does is trivial. A client gives the proposer the message it wants to add to the servers. The proposer picks the next highest sequence number that it plans to associate with the requested event and asks all the acceptors to reserve that number. The acceptors agree (there is no reason to disagree!) and the proposer sends the requested message from the client associated with that value. This sequenced message then gets propagated to wherever it has to go by the learner.

Let us now consider the case of several processes making concurrent requests. A proposer picks a sequence number and asks the acceptors to reserve that number, each acceptor makes a promise to not accept any proposals for lower sequence numbers. If any acceptor already accepted a higher sequence number from someone else, it will reject this lower-numbered proposal and the client will have to try again and have the proposer issue a request with another, higher, sequence number. Lower-numbered proposals are rejected to handle the case of possibly old messages reaching acceptors. A rejection includes the highest accepted sequence number and its corresponding value from that acceptor. The proposer is obligated to continue the algorithm using that data instead of its original proposal, which has been rejected. If needed, the client will have to retry the original proposal later by re-running the algorithm. For now, the proposer has to finish this instance of the algorithm to push for group consensus.

Now let’s consider failure. A majority of the acceptors must be alive for the algorithm to function. This majority of live acceptors is called a quorum. Suppose the quorum of acceptors makes a promise to accept a particular sequence number. If some of those acceptors die but others (with old state) come alive, there is guaranteed to be at least one acceptor that remembers the promise and will therefore be able to reject any requests with lower numbers. The quorum is important for two reasons:

  1. It ensures continuity of proposal number information.
  2. It ensures that split-brain problems don’t occur if the network becomes partitioned. It is not possible for a minority of acceptors to continue running the algorithm.

When the entire quorum of acceptors accepts the proposal, the accepted message is sent the learners, which can act on the request (e.g., update system state on a set of machines).

Paxos is a two-phase protocol:

Phase 1: Prepare
The proposer sends the client’s message with a unique proposal number to as many acceptors as it can (at least a majority). Each successive proposal must have a higher number.

When an acceptor receives a proposal, it checks to see if this is the highest proposal number that it has seen. If so, it promises to ignore any smaller proposal numbers it gets. It responds back with either the current proposal (if that’s the highest one it has seen) or the highest proposal that it has received so far (along with the corresponding message).

Phase 2: Accept
After sending proposals to the group of acceptors, the proposer must select the highest numbered proposal that it received and send an accept message to the acceptors. This allows information about the current highest proposal number to propagate to acceptors that may not have received it (for example, because they just came up).

As long as no acceptor receives a higher proposal number during this time, it accepts the proposal, sends an acknowledgement to the proposer, and sends the sequenced message to the learner. The learner discards duplicate messages (that came from multiple acceptors) and sends them to the servers that act on these messages.

One proposer or many?

We temporarily assumed there was only one proposer. For fault tolerance, there could be any number of proposers with which clients communicate. However, that increases the chance that one proposer will pick a proposal number that is less than one that anther proposer chose, resulting in rejection of the proposal. To minimize message rejection, Paxos chooses one of the proposers as the leader, which receives all client requests. Should it die, any other proposer can step in as the leader. A single leader just makes it easier to ensure that we have unique, incrementing sequence numbers. While the algorithm works without a leader, the protocol guarantees progress only if there is a leader. If two proposers act as leaders, it is possible to have a situation where each proposer proposes conflicting updates, leading to no progress in the algorithm.


Raft is another consensus protocol, created as an easier-to-understand alternative to Paxos. Like Paxos, Raft requires a majority of servers to function. Unlike Paxos, Raft requires a single leader to receive all client requests. Any server can act as a leader and an election takes place to select a leader.

A server can be in a follower, leader, or candidate state. If it is the leader then it receives all client requests. If it is the follower then it is a non-leader and the system has a leader. If it is a candidate then we are holding an election to pick a leader and it may become a leader.

Raft elections

Leaders send periodic heartbeats to all followers. Servers start in a follower state. A server remains in a follower state as long as it receives valid heartbeat messages from a leader or a candidate. If it receives no heartbeat within a specific time interval, the follower may need to become a leader, so it becomes a candidate and runs an election by requesting votes from other servers. If a candidate receives a majority of votes from the other servers, it becomes the leader.

To start an election, each candidate picks a random election timeout. If it receives no heartbeat during that time, it sets its role to candidate, assumes there is no leader, and starts an election.

To start an election, a follower picks a random time interval before it makes itself a candidate. It then votes for itself and sends a request vote message to all other servers. If a server receives a “request vote” message and has not voted yet, it then votes for that candidate. When a candidate gets a majority of votes, it becomes the leader.

Each election marks a new term, identified by an incrementing number.

If a candidate receives a heartbeat from another server and that leader’s term number is at least as large as the candidate’s current term, then the candidate recognizes the leader as legitimate and becomes a follower. On the chance that the candidate does not win or lose an election, it simply times out and starts a new election. Randomized timeouts ensure that split votes rarely occur.

Raft consensus

The elected leader takes client requests. Each message sent by clients is a log entry that will be replicated among the servers. The leader first adds the message to its own log. This entry is considered to be uncommitted. The leader then sends the message to the other servers (the followers). Each follower acknowledges the message. When a majority of followers have acknowledged the message, the entry is applied to the state machine and considered to be committed. The leader then notifies the followers that the entry has been committed and they apply the message to their state machines. Consensus has now been achieved.

Locks vs. leases

When processes rely on locks, there is an implicit assumption that fault-tolerance is lost. The process holding a lock may forget to release a lock or may die. A way to avoid this problem is to use leases instead of locks. A lease is a lock with an expiration time. The downside with a leasing approach is that the resource is unavailable to others until the lease expires. Now we have a trade-off: have long leases with a possibly long wait after a failure or have short leases that need to be renewed frequently.

Paxos is a fault-tolerant way to achieve consensus and can be used for granting leases to resources. However, the overhead of the algorithm isn’t attractive for all situations. A compromise is to use hierarchical leases, or sub-leases. The top-level lease is a coarse-grained lease while sub-leases are typically fine-grained leases. A consensus algorithm such as Paxos is used to elect a coordinator. This coordinator is granted a lease on a set of resources (or all resources) in the system. Now the coordinator hands out leases to processes that need those resources. When the coordinator’s main lease expires, a consensus algorithm has to be run again to grant a new lease and possibly elect a new coordinator but it does not have to be run for every client’s lease request; that is simply handled by the coordinator.

Distributed transactions

Go here for lecture notes

Goal: Create a fault tolerant algorithm that ensures that all group members agree to commit (make their actions permanent). If agreement is not reached, then all group members must agree to abort (undo any changes).

A transaction is a set of operations that operates on, and typically modifies, data. A key facet of a transaction is that it has to be atomic — all results have to be made permanent (commit) and appear to anyone outside the transaction as an indivisible action. If a transaction cannot complete, it must abort, reverting the state of the system to that before it ran. If several transactions run concurrently, the end result must be the same as if those transactions ran in some (any) serial order. The specific order in which transactions execute is known as a schedule.

A transaction-based model guarantees a set of properties known as ACID:

Atomic : The transaction happens as a single indivisible action. Everything succeeds (and the transaction commits) or else the entire transaction is rolled back (the transaction aborts). Other transactions do not see intermediate results.

Consistent : A transaction cannot leave the data in an inconsistent state. If the system has invariants, they must hold after the transaction. For example, the total amount of money in all accounts must be the same before and after a “transfer funds” transaction.

Isolated (Serializable) : Transactions cannot interfere with each other. If transactions run at the same time, the final result must be the same as if they executed in some serial order.

Durable : Once a transaction commits, the results are made permanent. No failures after a commit will cause the results to revert.

A write-ahead log (or transaction log) is used to enable rollback: reverting to the previous state when aborting a transaction. It is also crucial in maintaining the state of the transaction in a stable place in case the process or computer should die. The system will be able to read the log and recover to where it was in the transaction commit protocol.

In a distributed transaction environment, multiple processes participate in a transaction, each executing its own sub-transaction that can commit only if there is unanimous consensus by all participants to do so. Each system runs a transaction manager, a process that is responsible for participating in the commit algorithm algorithm to decide whether to commit or abort its sub-transaction. One of these transaction managers may be elected as the coordinator and initiates and runs the commit algorithm. Alternatively, the coordinator could be a separate process from any of the transaction participants.

Two-phase commit protocol

The two-phase commit protocol uses atomic multicasts to reach a consensus among the group on whether to commit or abort. It uses a coordinator to send a request (“can you commit?”) to every member of the group (reliably, retransmitting as often as needed until all replies are received). Phase 1 is complete when every member of the group (each participant) responds. If the coordinator gets even a single abort response from a participant, it must tell all participants to abort the entire transaction. Otherwise, it will tell all participants to commit it. In phase 2, the coordinator sends the commit or abort order and waits for a response from everyone. In summary, in phase 1, the coordinator gets everyone’s agreement and in phase 2, the coordinator sends the directive to commit or abort.

The write-ahead log in stable storage is crucial for ensuring atomic multicasts (the write-ahead log is also important for transaction rollback, which is used for aborts!). For example, if a participant sent the coordinator a commit response for phase 1 and then died, it must be able to reboot and reconstruct the transaction state from the log; it cannot change its mind after rebooting. The two-phase commit protocol cannot proceed until every participant acknowledges every message.

The two phase commit stalls if any member, coordinator or any participant, dies. It has to wait for the recovery of that member before proceeding with the protocol. A recovery coordinator can step in in certain circumstances. If the coordinator died and a recovery coordinator took over, it queries the participants. If at least one participant has received a commit message then the new coordinator knows that the vote to commit must have been unanimous and it can tell the others to commit. If no participants received a commit message then the new coordinator can restart the protocol. However, if one of the participants died along with the coordinator, confusion may arise. If all the live participants state that they have not received a commit message, the coordinator does not know whether there was a consensus and the dead participant may have been the only one to receive the commit message (which it will process when it recovers). As such, the coordinator cannot tell the other participants to make any progress; it must wait for the dead participant to come back.

Three-phase commit protocol

The two-phase commit protocol is a blocking protocol. If the coordinator or any participant stops running, the entire protocol stalls until the failed process is restarted. The three-phase commit protocol is similar to the two-phase commit protocol but allows entities to time out in certain cases to avoid indefinite waits.

The protocol also supports the use of a recovery coordinator by introducing an extra phase where participants are told what the consensus was so that any participant that received this information before a coordinator died could inform a standby coordinator whether there was a unanimous decision to commit or abort. The three-phase commit protocol propagates the knowledge of the outcome of the election to all participants before starting the commit phase.

In phase 1 of the three-phase commit protocol, the coordinator sends a query to commit request (“can you commit?”) to every member of the group (reliably, retransmitting as often as needed until all replies are received over a period of time). If any of the participants respond with a no or if any failed to respond within a defined time, then send an abort to every participant.

In phase 2, the coordinator sends a agreed to commit message to all participants and gets acknowledgements from everyone. When this message is received, a participant knows that the unanimous decision was to commit. If a participant fails to receive this message in time, then it aborts. At this point, the participants do not commit. However, if a participant receives an abort message then it can immediately abort the transaction.

In phase 3, the coordinator sends a commit message to all participants telling them to commit. If a participant fails to receive this message, it commits anyway since it knows from phase 2 that there was the unanimous decision to commit. If a coordinator crashes during this protocol, another one can step in and query the participants for the commit decision. If every participant received the prepare-to-commit message then the coordinator can issue the commit directives. If some participants received the message, the coordinator now knows that the unanimous decision was to commit and can re-issue the prepare-to-commit request followed by a commit. If no participant received the message, the coordinator can restart to protocol or, if necessary, restart the transaction.

The three-phase commit protocol accomplishes two things:

  1. Enables use of a recovery coordinator. If a coordinator died, a recovery coordinator can query a participant.

    • If the participant is found to be in phase 2, that means that every participant has completed phase 1 and voted on the outcome. The completion of phase 1 is guaranteed. It is possible that some participants may have received commit requests (phase 3). The recovery coordinator can safely resume at phase 2.

    • If the participant was in phase 1, that means NO participant has started commits or aborts. The protocol can start at the beginning

    • If the participant was in phase 3, the coordinator can continue in phase 3 – and make sure everyone gets the commit/abort request

  2. Every phase can now time out – there is no indefinite wait like in the two-phase commit protocol.

    Phase 1:
    Participant aborts if it doesn’t hear from a coordinator in time
    Coordinator sends aborts to all if it doesn’t hear from any participant
    Phase 2:
    If coordinator times out waiting for a participant – assume it crashed, tell everyone to abort
    If participant times out waiting for a coordinator, elect a new coordinator
    Phase 3:
    If a participant fails to hear from a coordinator, it can contact any other participant for results

The three-phase commit protocol suffers from two problems. First, a partitioned network may cause a subset of participants to elect a new coordinator and vote on a different transaction outcome. Secondly, it does not handle fail-recover well. If a coordinator that died recovers, it may read its write-ahead log and resume the protocol at what is now an obsolete state, possibly issuing conflicting directives to what already took place. The protocol does not work well with fail-recover systems.

Paxos commit

The two-phase commit protocol requires all processes in the group to be available to complete the transaction. The three-phase commit introduces timeouts but does not work correctly in all situations. If the two-phase or three-phase protocols are enhanced for fault tolerance to elect an alternate coordinator, one runs into problems where different participants may have received messages from different coordinators.

Using Paxos for distributed commit enables us to have a fault-tolerant infrastructure that reaches agreement on commit or abort decisions. Multiple systems run the Paxos algorithm for fault tolerance. Each participant contacts its own instance of the Paxos algorithm on these systems to ensure that Paxos agrees on that participant’s commit or abort decision. Paxos then forwards that decision to learner processes, which decide on the overall commit or abort based on whether all participants sent a commit message (each learner gets the same data and hence reaches the same decision; the multiple learners are there for fault tolerance). They then notify each of the participants with the commit/_abort_ decision.

Brewer’s CAP Theorem

Ideally, we’d like three properties in a replicated distributed system:

The data retrieved from the system should be the same regardless of which server is contacted.
The system should always be available to handle requests.
Partition tolerance
The system should continue to function even if some network links do not work and one group of computers cannot talk to another. For example, a link between two data centers may go down.

Eric Brewer’s CAP Theorem states that you can have either consistency or availability in the presence of partitions but not both. Quite often, the theorem is summarized as: if you want consistency, availability, and partition tolerance, you have to settle for at most two out of three of these.

With distributed architectures, we generally strive for high availability and the ability to withstand partitions (occasional breaks in the ability of nodes to communicate). Hence, we will have to give up on consistency and break the guarantees of ACID. An alternative to the requirements of ACID is BASE.

BASE stands for Basic Availability, Soft-state, Eventual consistency. Instead of requiring consistency after every transaction, it is enough for the data to eventually get to a consistent state. The downside is that some processes may access stale data which has not yet been brought into a consistent state.

Concurrency control

Go here for lecture notes

Goal: Allow multiple transactions to run concurrently but ensure that resource access is controlled to give results that are identical if they ran in some serial sequence. This preserves the “Isolated” guarantee of ACID transaction semantics.

The goal of concurrency control is to allow multiple transactions to run concurrently (that’s great for performance!) but to ensure that data access is controlled such that the net effect is the same as if the transactions all ran in some serialized order. That is, we cannot have the net result be one where transactions read an interim state of data from another transaction.

Exclusive locks, via a lock manager, are an easy way to accomplish this. A transaction can grab locks for the resources it needs. That ensures mutual exclusion. To ensure serializability, it is important that a transaction does not acquire any new locks after it has released a lock. If we do not do this, transaction A may create a result based on the completion of transaction B while transaction B might have used data that A wrote and unlocked. This technique is known as two-phase locking. The first phase is the growing phase, in which locks are acquired. The second phase is the shrinking phase, in which locks are released.

The problem with two-phase locking is that, if a transaction that has released some locks now aborts, there is a chance that other transactions have used data that was modified by the transaction. In that case, those transactions (and all transactions that depend on them) have to abort as well. This condition is called cascading aborts. Strict two-phase locking avoids this problem by requiring all locks to be held until the end. The shrinking phase, in effect, is an atomic operation that occurs at the very end of the transaction. You lose concurrency this way but avoid having to process cascading aborts.

Exclusive locking for every resource access is a bit aggressive. Consider a transaction that just reads data. It needs to lock that data to ensure that no other transaction modifies it but nothing would go wrong if another transaction also wanted to read that same data. We can achieve greater concurrency by distinguishing read locks from write locks. Read locks need not be exclusive: any number of them can be granted. However, if there are any read locks on an object, a write lock cannot be granted and the transaction must wait. Similarly, if there is a write lock on an object then any read lock requests or write lock requests must wait.

A way of increasing concurrency even beyond read/write locks is two-version-based concurrency control, also known as multiversion concurrency control. In this case, one transaction writes tentative versions (private versions) while other transactions read existing, previously committed versions of the data. This allows transactions to request read locks even if another transaction has a write lock on the object. When the transaction that has a write lock is ready to commit, it converts its write lock to a commit lock, waits until all transactions that have a read lock for that object complete, and then makes its modified version permanent. During a commit lock, no other transaction can grab any lock on that object. This allows increased concurrency but transactions that write data risk waiting when they attempt to commit and make their data permanent.

Concurrency control techniques that rely on locking force locks to be held while the transaction is using the resource, or, in some cases, until the end of the transaction. This restricts the maximum amount of concurrency that may be achieved in the system. Optimistic concurrency control techniques assume that transactions are more likely to complete than not. As such, it is better to put more effort on rectifying errors from having used data from aborted transactions than to lock access to the data. A fully optimistic system uses no locks and checks for conflicts at commit time. Optimistic concurrency control has three phases of operation:

  1. Working phase. The transaction reads and writes data. A private workspace is often, but not always, used to keep non-committed results isolated.

  2. Validation phase. When the transaction is ready to commit, a check is made to see if there is any conflict with other transactions that took place during this time. For example, if some transaction A modified data and committed but transaction B read data before A modified that data, then transaction B cannot be allowed to commit because that would violate serializability.

  3. Update phase. Tentative changes are made permanent and the transaction commits.

Timestamp ordering allows concurrency by keeping track of two timestamps per object: the timestamp of the last committed that read the object and the timestamp of the last committed transaction that wrote the object. If a transaction wants to write an object, it compares its own timestamp with the object’s write timestamp. If the object’s timestamp is older then we have good ordering and the transaction can proceed. Otherwise the transaction aborts and is restarted.

Distributed Deadlock

Go here for lecture notes

Goal: Determine if a transactions will wait for resources in such a manner as to create an indefinite wait, called deadlock. Find ways to ensure that this will not happen.

A deadlock occurs when there is a circular dependency on processes holding and requesting resources. The four conditions that must hold are:

  1. mutual exclusion: A resource can be held by at most one process.
  2. hold and wait: Processes that already hold resources can wait for another resource.
  3. non-preemption: A resource, once granted, cannot be taken away.
  4. circular wait: Two or more processes are waiting for resources held by one of the other processes.

Three approaches can be used for managing deadlock in distributed systems.

A centralized deadlock detection approach uses a central coordinator to manage a resource graph of processes and the resources they are using. Each time a process wants to grab or releases a resource, it sends a message to this coordinator (waiting-for or releasing). The coordinator builds a graph of all the processes and the resources they are holding or waiting for. This is called a wait-for graph. If a cycle is detected, in the graph then the coordinator knows a deadlock exists. In some cases, if release and waiting-for messages are received out of order, they can lead the coordinator to believe that there is a deadlock cycle when none really exists. In reality the release message should have been processed first and would cause the deadlock to not happen. This condition is known as false deadlock.

The Chandy-Misra-Haas distributed deadlock detection algorithm has a process send a probe message to a process that is holding a resource prior to waiting for the resource. The receiving process forwards the probe to every process that contains resources it is waiting for. This is called edge chasing. If the original process receives its own probe message then it knows that a dependency cycle, and hence deadlock, will exist if it waits for the resource it wants.

Deadlock prevention approaches require processes to access resources in restricted ways to ensure that a deadlock cannot occur. The approach to implementing this is to make decisions based on the timestamps of each transaction competing for resources.

The wait-die algorithm states that if a younger process is using the resource, then the older process (that wants the resource) waits. If an older process is holding a resource, then the younger process that wants the resource kills itself (that’s ok; transactions are designed to be restartable). This forces the resource utilization graph to be directed from older to younger processes, making cycles impossible.

The wound-wait algorithm ensures that the graph flows from young to old and cycles are again impossible. an old process will preempt (kill) the younger process that holds a resource. If a younger process wants a resource that an older one is using, then it waits until the old process is done.

This requires predicting the precise resources that will be needed, the times they will be needed, and which processes will need them and manage resource allocation or transaction scheduling to ensure that this will not happen. This is generally impossible to predict and hence is not a practical approach.

Network Attached Storage

Go here for lecture notes

Goal: Allow multiple clients to access files from file servers on a network.

To provide the same system call interface for supporting different local file systems as well as remote files, operating systems generally rely on a layer of abstraction that allows different file system-specific interfaces to coexist underneath the common system calls. On most Unix-derived systems (e.g., Linux, BSD, OS X, SunOS), this is known as the VFS layer (Virtual File System).

There are a couple of models for implementing distributed file systems: the download/upload model_ or the remote procedure call_ model. In a stateful file system, the server maintains varying amounts of state about client access to files (e.g., whether a file is open, whether a file has been downloaded, cached blocks, modes of access). In a stateless file system, the server maintains no state about a client’s access to files. The design of a file system will influence the access semantics to files. Sequential semantics are what we commonly expect to see in file systems, where reads return the results of previous writes. Session semantics occur when an application owns the file for the entire access session, writing the contents of the file only upon close, thereby making the updates visible to others after the file is closed, and overwriting any modifications made by others prior to that.


NFS was designed as a stateless, RPC-based model implementing commands such as read bytes, write bytes, link files, create a directory, and remove a file. Since the server does not maintain any state, there is no need for remote open or close procedures: these only establish state on the client. NFS works well in faulty environments: there’s no state to restore if a client or server crashes. To improve performance, a client reads data a block (8 KB by default) at a time and performs read-ahead (fetching future blocks before they are needed). NFS suffers from ambiguous semantics because the server (or other clients) has no idea what blocks the client has cached and the client does not know whether its cached blocks are still valid. NFS uses validation, where the client compares modification times from server requests to the times of data that it cached. However, it does this only if there are file operations to the server. Otherwise, the client simply invalidates the blocks after a few seconds. In NFS’s original stateless design, File locking could not be supported since that would require the server to keep state. It was later added through a separate lock manager that maintained the state of locks.

To facilitate larger deployments, NFS introduced the automounter. It was common to have an environment with many clients, each mounting many remote file systems. In such an environment, if all clients start up at approximately the same time, they can flood the server with mount requests. The automounter mounts remote directories only when they are first accessed. To make keeping track of mount points easier across many machines, automounter maps are configuration files that define what remote directories get mounted. These can be distributed across a set of clients.


AFS was designed as an improvement over NFS to support file sharing on a massive scale. NFS suffered because clients would never cache data for a long time (not knowing if it would become obsolete) and had to frequently contact the server. AFS introduced the use of a partition on a client’s disk to cache large amounts of data for a long time: whole file caching and long-term caching. It supports a file download-upload model. The entire file is downloaded on first access (whole file download) and uploaded back to the server after a close only if it was modified. Because of this behavior, AFS provides session semantics: the last one to close a modified file wins and other changes (earlier closes) are lost.

During file access, the client need never bother the server: it already has the file. When a client first downloads a file, the server makes a callback promise: it maintains a list of each client that has downloaded a copy of a certain file. Whenever it gets an update for that file, the server goes through the list and sends a callback to each client that may have a cached copy so that it can be invalidated on the client. The next time the client opens that file, it will download it from the server. Files under AFS are shared in units called volumes. A volume is just a directory (with its subdirectories and files) on a file server that is assigned a unique ID among the cell of machines (remember cells from DCE RPC?). If an administrator decides to move the volume to another server, the old server can issue a referral to the new server. This allows the client to remain unaware of resource movement.


Coda was built on top of AFS and focused on two things: supporting replicated servers and disconnected operation. To support replicated storage, AFS’s concept of a volume was expanded into that of a Volume Storage Group (VSG). Given that some volumes may be inaccessible at a particular point in time, the Accessible Volume Storage Group (AVSG) refers to the subset of the VSG that the client can currently access. Before a client accesses a file, it first looks up the replicated volume ID of the file to get the list of servers containing replicated volumes and the respective local volume IDs. While it can read files from any available server, it first checks the versions from all of them to ensure that one or more servers don’t have out-of-date files. If the client discovers that a server has an old version of a file, it initiates a resolution process by sending a message to that server, telling it to update its old versions. When a client closes a file, if there were any modifications, the changes are written out to all available replicated volumes.

If no servers are available for access, the client goes into disconnected operation mode. In this mode, no attempt is made to contact the server and any file updates are logged instead in a client modification log (CML). Upon connection, the client plays back the log to send updated files to the servers and receive invalidations. If conflicts arise (e.g., the file may have been modified on the server while the client was disconnected) user intervention may be required.

Because there’s a chance that users may need to access files that are not yet in the local cache, Coda supports hoarding, which is a term for user-directed caching. It provides a user interface that allows a user to look over what is already in the cache and bring additional files into the cache if needed.

DFS (AFS version 3)

AFS evolved over the years. The most significant evolutionary version is version 3, which was adopted as the recommended distributed file system in the Distributed Computing Environment (DCE) where it is named DFS (Distributed File System).

The primary design goal of this system was to avoid the unpredictable lost data problems of session semantics if multiple clients are modifying the same file. The concept of tokens was introduced. A token is permission given by the server to the client to perform certain operations on a file and cache a file’s data. The system has four classes of tokens: open, data, status, and lock tokens. An open token must be obtained to have permission to open a file. A read data token must be obtained for a byte range of a file to have permission to access that part of the file. Similarly, a write data token is needed to write the file. Status tokens tell the client that it may be able to cache file attributes. These tokens give the server control over who is doing what to a file. Tokens are granted and revoked by the server. For example, if one client needs to write to a file then any outstanding read and write data tokens that were issued to any clients for that byte range get revoked: those clients are now denied access until they get new tokens.


Microsoft’s Server Message Block protocol was designed as a connection-oriented, stateful file system with a priority on file consistency and support of locking rather than client caching and performance. While it does not use remote procedure calls, its access principle is the same: requests (message blocks) are functional messages, providing file access commands such as open, create, rename, read, write, and close.

With the advent of Windows NT 4.0 and an increasing need to provide improved performance via caching, Microsoft introduced the concept of opportunistic locks (oplocks) into the operating system. This is a modified form of DFS’s tokens. An oplock tells the client how it may cache data. It allows clients to cache information without worrying about changes to the file at the server. At any time, a client’s oplock may be revoked or changed by the server. The mechanism has been extended since Windows 7. There are currently eight oplocks (do not memorize thede but have an understanding of what they do):

  1. A level 1 oplock (exclusive oplock) provides the client with exclusive access to the file (nobody else is reading or writing it), so it can cache lock information, file attributes, and perform read-aheads and write-behinds.

  2. A level 2 oplock (shared oplock) is granted in cases where multiple processes may read a file and no processes are writing to it.

  3. A batch oplock is also exclusive and allows a client to keep a file open on the server even if the local process using it closed the file. This optimizes cases where a process repeatedly opens and closes tehe same files (e.g., batch script execution).

  4. A filter oplock is exclusive and allows applications that hold the oplock to be preempted whenever other processes or clients need to access the file.

  5. A read oplock (R) is a shared oplock that is essentially the same as a level 2 oplock. It supports read caching.

  6. A read-handle oplock (RH) allows multiple readers and keeps the file open on the server even if the client process closes it. It is similar to the batch oplock but is shared and does not support file modifications. It supports read caching and handle caching.

  7. A read-write oplock (RW) gives a client exclusive access to the file and supports read and write caching. It is essentially the same the the level 1, or exclusive, oplock.

  8. A read-write-handle oplock (RWH) enables a client to keep a file open on the server even if the client process closes it. It is exclusive and similar to the batch oplock.

The last four oplocks have been added since Windows 7 and are somewhat redundant with earlier mechanisms.

Oplocks may be revoked (broken) by the server. For example, if Process A has a read-write oplock, it can cache all read and write operations, knowing that no other client is modifying or reading that file. If another client, Process B, opens the file for reading, a conflict is detected. Process B is suspended and Process A is informed of the oplock break. This gives Process A a chance to send any cached changes to the server before Process B resumes execution.

SMB 2 and beyond

The SMB protocol was known to be chatty. Common tasks often required several round-trip messages. It was originally designed for LANs and did not perform optimally either on wide area networks (WANs) or on today’s high-speed LANs (1–100 Gbps). The SMB protocol was dramatically cleaned up with the introduction of the Microsoft Vista operating system (SMB 2), with minor changes added in Windows 7 (SMB 2.1) and even more in Windows 8 (SMB 3). Apple has dropped its proprietary AFP protocol in favor of SMB 2 in OS X 10.10 (Mavericks). We will focus our discussion on the significant changes introduced in SMB 2.

SMB 2 added six key changes to its design:

Reduced complexity
The set of SMB commands went from over 100 commands down to 19.
Pipelining is the ability to send additional commands before the response to a prior command is received. Traditionally, SMB (and any RPC-based system) would have to wait for one command to complete before sending an additional command. The goal of pipelining is to keep more data in flight and use more of the available network bandwidth.

To give the server control over getting an overly high rate of client requests, credit-based flow control is used. With credit-based flow control, the server creates a small number of credits and later increases this number as needed. The server sends these credits to each client. The client needs credits to send a message to the server. Each time a message is sent, it decrements its credit balance. This allows server to control the rate of messages from any client and avoid buffer overflow.

Note that TCP uses congestion control, which results in data loss and wild oscillations in traffic intensity (TCP keeps increasing its transmission window size until packet loss occurs; then it cuts the value of the buffer in half and starts increasing again).

Compounding is similar to pipelining but now allows multiple commands to be sent in one message. It avoids the need to optimize the system by creating commands that combine common sets of operations. Instead, one can send an arbitrary set of commands in one request. For instance, instead of the old SMB RENAME command, the following set of commands are sent: CREATE (create new file or open existing); SET_INFO; CLOSE. Compounding reduces network time because multiple requests can be placed within one message.
Larger read/write sizes
Fast networks can handle larger packet sizes and hence transfer larger read and write buffers more efficiently.
Improved caching
SMB 2 improved its ability to cache folder and file properties. This avoids messages to the server to retrieve these properties.
Durable handles
If there was a temporary network disconnection, An SMB client would lose its connection to the server and have to reestablish all connections and remount all file systems and reopen all files. With SMB 2, the connection has to be reestablished but all handles to open files will remain valid.

NFS version 4

While NFS version 3 is still widely used, NFS version 4 introduced significant changes and is a departure from the classic stateless design of NFS. The server is now stateful and is able to control client-side cache coherence better, allowing clients to cache data for a longer time. Servers can grant clients the ability to do specific actions on a file to enable more aggressive client caching. This is similar to SMB’s oplocks. Callbacks will notify a client when file or directory contents have changed. Because of the stateful server, NFS now acquired open and close operations.

Like SMB 2, NFS now supports compound RPC, where multiple operations can be grouped together into a single request. Sending one message instead of a series of messages reduces overall round-trip time significantly.

NFSv4 also added strong authentication and encryption and support file system replication and migration. This includes a mechanism of sending referrals similar to that used by AFS.


Go here for lecture notes

Goal: Create a highly-available centralized lease manager and file system for small files that can serve as a name server and configuration repository.

Chubby is a highly available and persistent distributed lock service and file system created by Google and used in various projects and frameworks, including Bigtable and MapReduce. Its purpose is to manage locks for resources and store configuration information for various distributed services throughout the Google cluster environment. The key design goal of Chubby was high availability and high reliability.

By default, the service runs on five computers as five active replicas. This grouping is called a Chubby cell. One of these replicas is elected as the master to serve client requests. Only the master will serve client requests; the other machines in the cell are for fault tolerance in case the master dies. The only request they will answer from clients is to tell them who the master is.

A majority of the replicas in the cell must be running for the service to work. Paxos is used as a leader election algorithm and is used to keep the replicas consistent (i.e., make sure that all updates take place in the same order on all replicas). Typically, there will be one Chubby cell per datacenter.

In addition to providing locks, Chubby is designed for managing relatively small amounts of data: items such as system configuration and state information for various services. Chubby provides clients with a namespace of files & directories. Every file or directory can be treated as a lock file and every file may, of course, be used to store data. The name of a lock is the name of its file: its hierarchical pathname. The interface to Chubby is not that of a native file system. There is no kernel module and client software communicates with Chubby via an API that sends remote procedure calls to the Chubby master.

File operations are somewhat different from those offered by conventional file systems. Files can be read and written only in their entirety: there are no seek or byte-range read and write operations. When a file is opened by the client, it is downloaded and a lease for that file is established. In this way, Chubby keeps track of which clients have cached copies of a file. All writes from a client must be sent to the Chubby master (we have a write-through cache). Chubby then sends invalidations to all clients that have cached copies.

Locks are advisory and can be either exclusive (one writer) or not (multiple readers). A client can send a request to acquire a lock for a file, release it, and also assign and check sequence numbers for a lock. Clients can also subscribe to receive events for an open file. These events include notification of file modification, the creation of directories or files under an open directory, and lock acquisition.

Chubby is designed primarily for managing coarse-grained locks. Fine-grained locks are locks that are generally used for a small object, such as a row in a table of a database. They are generally held held for a short duration, seconds or less. Coarse-grained locks typically control larger structures, such as an entire table or an entire database. They might be held for hours or days. They are acquired rarely compared to fine-grained locks. Hence, a server that is designed for coarse-grained locks can handle more clients.

Even though Chubby uses the term “coarse-grained”, it doesn’t exactly fit a “pure” coarse/fine grained lock model, but that model does not necessarily work well in real systems. In theory, the concept of a coarse-grained lock is to grant a process a lock for a large pool of objects of a particular class and then have that process be the lock manager for those objects. This is essentially what Chubby does but Chubby doesn’t keep track of all the objects. Instead, the top (coarse) level is a set of services, such as Bigtable tables, a GFS file system, or Pregel frameworks. Chubby allows them to ensure there is a single master and to lock any critical resources that might be shared among these applications. Those applications then handle the fine-grained aspect of giving out locks for data blocks, table cells, or synchronizing communication barriers.

Because Chubby does not hold huge amounts of data but may serve thousands of clients, all Chubby data is cached in memory. For fault tolerance, all data is written through to the disk when modified, and is propagated to replicas via the Paxos consensus algorithm to ensure consistent ordering. The entire database is backed up to the Google File System (GFS) every few hours to ensure that critical configuration information is saved even if all replicas die.

Chubby’s fault-tolerant locking makes it a good service for leader election. If a group of processes wants to elect a leader, they can each request a lock on the same Chubby file. Whoever gets the lock is the leader (for the lease period of the lock).

Parallel file systems

Go here for lecture notes

Goal: Store huge files (terabyte and larger) in a file system that is distributed across many (thousands) of machines.

Conventional file systems lay out an entire file system on one logical storage device (a disk or NAND flash memory). This means that the metadata and data for all files and directories is located in this one device. In some cases, the device might be a collection of disks (e.g., a RAID drive or several disks connected by a volume manager) but, to the operating system, it appears as one device – one collection of blocks. This places a restriction on each file system that it cannot span one machine. Conventional distributed file systems, used for network attached storage (NAS), basically use a file system access protocol but interact with one or more servers running these conventional file systems.

With parallel file systems, the metadata and data of files are separated onto different entities. The metadata of a file is the set of attributes about a file: its name, size, access permissions, time stamps, etc. The data of a file is, well, the file’s data. Metadata does not take up much space. It’s a small set of attributes and each attribute is generally only a few bytes in size. Data, however, can be huge: terabytes or more for large files.

The idea of a parallel file system is to have a system – or systems – manage the metadata. As part of the metadata for each file, the system will identify the systems and logical blocks holding the file’s data. By doing this, we gain several benefits:

  1. A file’s data is no longer limited to the capacity of any one system and can be spread out across many systems.

  2. Because data blocks are distributed among multiple systems, access is effectively load balanced across them. Moreover, multiple systems can process different file or different parts of a file with read operations taking place on different systems. This provides scalable bandwidth, comparable to striping in storage arrays.

  3. Data blocks can be replicated on multiple systems, providing fault tolerance and the potential for greater read bandwidth since processes can read from any of the replicated data blocks.

This design approach is the basis for the Google File System (GFS), the Hadoop Distributed File System (HDFS, essentially a clone of GFS), and distributed file systems used in supercomputing clusters, such as Luster and GlusterFS. We will not look at these last two but the concepts are similar.

Google File System (GFS)

The Google File System is designed to support large data-intensive applications (the kind of algorithms Google uses for search and other services). The system is designed for an environment where there are many thousands of storage servers, some of which are expected to be down at any given point in time. The data that is managed comprises of billions of objects and many petabytes of data. GFS is not designed to be a general-purpose file system and is optimized for large files (rather than lots of small files), streaming reads (more common than writes), and atomic appends (which may be done concurrently by multiple clients).

Each GFS cluster contains one master file server. This is a faster and more reliable machine that manages file system metadata, such as names and attributes of files and directories. None of the actual file data resides on this system. Instead, it contains a mapping of the file contents to the chunks that hold the data. Chunks are fixed-size, 64 MB blocks and are stored in chunkservers. The chunkservers are less reliable than the master and are replicated so that each chunk is stored on typically three three separate chunkservers.

Clients contact the master to look up files. The master gives them a chunkserver name and chunk handles for the files requested.

To write to a file, the master grants a chunk lease to one of the replicated chunks. The chunkserver holding this is the primary replica chunkserver for that chunk. Writing is a two-stage process. First, the client writes to one replica. That replica then forwards the data to another replica chunk server, and so on until all replicas for that chunk receive the data. This relieves load from the client in that the client has to only write one copy. It also doesn’t put any one computer within GFS in the position of having to send out N copies of the data.

Once all replicas acknowledge receipt of the data, the second stage, writing, takes place. The client sends a write request to the primary chunkserver, identifying the data that was sent. The primary assigns a sequence to the write operations that take place on the data and sends write requests to all the replicas so that they will write data in the same sequence-number order. Note that the data flow (data transfer) from client is chained: it goes to the primary replica, then to secondary replica, then another secondary replica, etc. The control flow (write commands) goes from the primary directly to all of the secondaries (but is a much, much smaller message).

Hadoop Distributed File System

Apache’s Hadoop Distributed File System (HDFS) is incredibly similar to GFS and is designed with essentially the same goals. The key distinction is that it does not support modifying a file once it is created, but it does support appends. This avoids the need to manage leases or locks for the file.

Like GFS’s master, HDFS uses a separate server, called a NameNode, that is responsible for keeping track of the filesystem namespace and the location of replicated file blocks. File data is broken into 128 MB blocks (default, but configurable per file) and is stored on DataNodes (GFS’s chunkservers). Each block (GFS chunk) is replicated on multiple DataNodes (the default is three, as with GFS). As with GFS, all file system metadata, names and block maps, is stored in memory on the DataNode for performance. File writes from clients are pipelined through each of the replica DataNodes that will hold the data.

HDFS uses rack-aware logic for determining which computers should host replicas for a block. The first replica is targeted to a computer in the same rack as the requesting client (to minimize latency). Second and additional replicas come from other racks in the data center for fault tolerance in case the first rack fails. The process for writing a block is essentially the same as in GFS. Data writes are pipelined to get data onto the replicas and then an update command is sent by the primary replica to all the others to write the block (this is essentially a commit operation, as it makes the data permanent).

Dropbox: Cloud-based file synchronization

Go here for lecture notes

Goal: Provide an Internet service that synchronizes part of a user’s file system to remote servers. Any changes are propagated back to any devices that a user uses for synchronization. In this manner, multiple user computers can keep their data synchronized.

Dropbox is one of the first of a set of popular services that provide “cloud-based file storage” – a service that allows you to store data on remote servers and access it from anywhere. Dropbox does this in a manner that is largely transparent to the user. The user designates a directory (aka folder) that will remain in sync with the server. Any local changes will be propagated to the server and any changes from the server will be propagated to the local computer. Multiple computers can thus keep data in sync, with changes on one computer being sent to dropbox servers and then synchronized on other computers.

The Dropbox service started in 2008 and grew rapidly in popularity and, hence, the amount of users and data. It serves as a good case study in scaling a system. Currently (2012), Dropbox handles over a 100 million users synchronizing a billion files each day. One way that Dropbox differs from other data access services such as Twitter, Facebook, Reddit, and others is that those services have an extremely high read to write ratio. Any content that is created is usually read many, many times. With Dropbox, in contrast, the read to write ratio is close to 1. Because the primary use case is file synchronization, remote data is rarely accessed except to synchronize it with other machines. As such, Dropbox has to deal with an inordinately high number of uploads.

Dropbox began life as one server, using a mySQL database to keep track of users and their files. As the server ran out of disk space, all file data was moved over to use Amazon’s S3 service (Simple Storage Service; a web services interface to store and retrieve objects where each object is identified with a unique key). Dropbox ran a database server that stored all the metadata (information about the file) and a server that provided a web portal and interacted with client software. At this time, each client ran a process that would look through a directory to identify changed files and send them to dropbox. In addition, the program polled the server periodically to see if there are any changes that need to be downloaded.

Tens of thousands of servers all polling a server to see if anything changed generates considerable load, so the next design change was to add a notification server. Instead of having a client ask periodically, the notification server sends a message telling the client that there are changes. Since clients may be behind firewalls and it may not be possible for a notification server to connect to them, the notification server relies on having the client establish a TCP connection to it. The single server that handled all requests was also split in two: a metadata server handled everything related to information about users and files while a blockserver, hosted at Amazon, handled everything having to do with reading and writing file data.

As the number of users grew even larger, a two-level hierarchy of notification servers was added since each server could manage only around one million connections. The metadata server, block server, and database were also replicated and load balanced.

Distributed Lookup Services (Distributed Hash Tables)

Go here for lecture notes

Goal: Create a highly-scalable, completely decentralized key-value store

The purpose of a distributed lookup service is to find the computer that has data that corresponds to a key that you have. For example, the key can be the name of the song and the computer is one that is hosting the MP3 file, or the key can be your customer ID and the corresponding data is the contents of your shopping cart.

The most straightforward approach is to use a central coordinator to store all the keys and their mapping to computers. You ask the server to look up a key and it gives you the computer containing the content. This is the Napster implementation, which was one of the first peer-to-peer file sharing services. A coordinator served as the web service that kept track of who was hosting which music files for download but the files themselves were hosted by the community of users. The Google File System, GFS, also uses a central coordinator although the corresponding data for a key (file) is distributed among multiple chunkservers.

Another approach is to “flood” the network with queries. What happens here is that your computer knows of a few peer computers, not the entire collection. It sends the request to these computers. Those computers, in turn, know of other computers. When a computer needs to look up a key, it forwards the request to all of its peer nodes. If a peer does not have the content, it repeats the process and forwards the request to its peers (and they forward the request to their peers …). A time to live (TTL) value in the request is decremented with each hop and the request is no longer forwarded if the hop count drops below zero. If one of the peers has the content, then it responds to whoever sent it the request. That node, in turn sends the response back to its requestor and the process continues back until the originator of the query gets the response. This is called back propagation. This approach was used by the Gnutella file sharing system.

Flooding uses what is known as an overlay network. An overlay network is a logical network that is built on top of another network. Typically, computers on the network have knowledge of only some of the other computers on the network and will have to use these “neighbor” nodes to route traffic to more distant nodes.

Finally, the distributed hash table (DHT) approach is a set of solutions that is based on hashing the search key to a number that is then used to find the node responsible for items that hash to that number. A key difference between the DHT approach and the centralized or flooding approaches is that the hash of the key determines which node is responsible for holding information relating to the key.

Consistent hashing

Conventional hash functions require most keys to be remapped if the size of the hash table changes: that is, keys will usually hash to a different value when the size of the table changes. For a distributed hash, this will be particularly problematic. It would mean that if we remove or add a node to a group of computers managing a DHT, a very large percentage of data will have to migrate onto different systems. A consistent hash is a hashing technique where most keys will not need to be reamapped if the number of slots in the table changes. On average, only k/n keys will need to be remapped for a system where k is the number of keys and n is the number of slots in the table.

CAN: Content-Addressable Network

CAN implements a logical x-y grid (although it can be applied to an arbitrary number of dimensions). A key is hashed by two hashing functions, one for each dimension (e.g., an x-hash and a y-hash). The result of these hashes identifies an x, y point on the grid. Each node is responsible for managing all keys that are located within a rectangle on the grid; that is, a node will be responsible for all keys whose x-hash falls in some range between xa and xb and whose y-hash falls in some range between ya and yb. This rectangle is known as a zone. If a node is contacted with a query for a key, it will hash the key and determine if it is present within its zone. If it falls within the node’s zone, the node can satisfy the request. If the hashed values are out of range, the node will forward the request to one of four neighbors (north, east, south, or west), which will then invoke the same logic to either process the query of forward it onto other nodes. The average route for a system with n nodes is O(sqrt(n)) hops.

CAN scales because each zone can be split into two, either horizontally or vertically, and the process can be repeated over and over as needed. For each split, some of the keys will have to move to the new node. For fault tolerance, key, value data can be replicated on one or more neighboring nodes and each node will know not just its neighbors but its neighbors’ neighbors. If a node is not reachable, the request will be sent to a neighboring node. Even though we discussed CAN in two dimensions, it can be implemented in an arbitrary number of dimensions. For d dimensions, each node needs to keep track of 2d neighbors.


Chord constructs a logical ring representing all possible hash values (bucket positions). Note that for hash functions such as a 160-bit SHA–1 hash, this is an insanely huge value of approximately 1.46 x 1048 slots. Each node in the system is assigned a position in the huge logical ring by hashing its IP address. Because the vast majority of bucket positions will be empty, (key, value) data is stored either at the node to which the key hashes (if, by some rare chance, the key hashes to the same value that the node’s IP address hashed) or on a successor node, the next node that would be encountered as the ring is traversed clockwise. For a simple example, let us suppose that we have a 4-bit hash (0..15) and nodes occupying positions 2 and 7. If a key hashes to 4, the successor node is 7 and hence the computer at node 7 will be responsible for storing all data for keys that hash to 4. It is also responsible for storing all data to keys that hash to 3, 5, 6, and 7.

If a node only knows of its clockwise neighbor node, then any query that a node cannot handle will be forwarded to a neighboring node. This results in an unremarkable O(n) lookup time for a system with n nodes. An alternate approach is to have each node keep a list of all the other nodes in the group. This way, any node will be able to find out out which node is responsible for the data on a key simply by hashing the key and traversing the list to find the first node ≥ the hash of the key. This gives us an impressive O(1) performance at the cost of having to maintain a full table of all the nodes in the system on each node. A compromise approach to have a bounded table size is to use finger tables. A finger table is a partial list of nodes with each element in the table identifying a node that is a power of two away from the current node. Element 0 of the table is the next node (20 = 1 away), element 1 of the table is the node after that (21 = 2 away), element 2 of the table four nodes removed (22), element 3 of the table eight nodes removed (23), and so on. With finger tables, O(log n) nodes need to be contacted to find the owner of a key.

Distributed hash table case study: Amazon Dynamo

Go here for lecture notes

Goal: Create a practical implementation of a key-value store that is highly scalable, completely decentralized, replicated, and efficient.

Dynamo is a highly available key-value store created internally within Amazon. Many services within Amazon, such as best seller lists, shopping carts, user preferences, session management, etc., neeed only primary key access to data. A multi-table relational database system would be overkill and would limit scalability and availability. With Dynamo, an application can configure its instance of Dynamo for desired availability (# replicas) and consistency.

Dynamo supports two operations: get(key) and put(key, data, context). The data is an arbitrary bunch of bytes (typically less than 1 MB) and is always identified by a unique key. The (key, data) values are distributed over a large set of servers and replicated for availability. The context is a value that is returned with a get operation and then sent back with a future put operation. The context is meaningless to the user program and is used like a cookie. Internally, it contains versioning information.

The system is decentralized: there is no need for a central coordinator to oversee operations. Every node has equivalent responsibilities and the system can grow dynamically by adding new nodes.

Partitioning: scalability

Since massive scalability is a key design aspect of Dynamo, data storage is distributed among many nodes. Dynamo relies on consistent hashing to identify which node will be responsible for a specific key. With normal hashing, a change in the number of slots in the table (e.g., number of nodes since we are hashing to find a node) will result in having to remap all keys. With consistent hashing, this is not the case and only K/n keys need to be remapped, where K is the number of keys and n is the number of slots (nodes).

Nodes are arranged in a logical ring. Each node is assigned a random “token” (a number) that represents its position in the logical ring. It is responsible for all keys that hash to any number between the node’s number and its predecessor’s number. When a key is hashed, a node will traverse a logical ring of node values to find the first node with a position greater than or equal to that hashed result. This tells it which node is responsible for handling that key. Since nodes are not added that frequently, every node can store a cached copy of this ring (the size is the list of N hash values, where N is the number of nodes). Moreover, to avoid the extra hop resulting because a client sent a request to the wrong Dynamo node, a client may also get a copy of this table. The system has been described as a zero-hop distributed hash table (DHT).

Adding or removing nodes affects only the node’s immediate neighbor in the ring. If a node is added, the successor node will give up some range of values to the new node. If a node is removed, the key, value data it manages will have to be moved over to the successor. In reality, a physical node is assigned to manage multiple points in the ring (multiple ranges of hash values). Thes nodes in the algorithm that we just discussed are really virtual nodes. A real machine, or physical node, will be responsible for managing multiple virtual nodes. The advantage of doing this is that we can attain balanced load distribution. If a machine becomes unavailable, the load is evenly distributed among the available nodes. If a new machine is added, each virtual node within it will take a set of values from other virtual nodes, effectively taking on a small amount of load from a set of other machines instead of from just the neighboring system (successor node). Moreover, if a more powerful machine is added, it can be assigned a larger number of virtual nodes so that it bears a greater percentage of the query workload.

Replication: availability

Dynamo is designed as an always writable data store. Data can be replicated on N hosts, where the value N is configurable per instance of Dynamo. The node that is responsible for storing the key-value is assigned the role of coordinator for that key and is in charge of replication for that key. The coordinator copies (key, value) data onto N–1 successor nodes clockwise in the ring. Copying is asynchronous (in the background) and Dynamo provides an eventually consistent model. This replication technique is called optimistic replication, which means that replicas are not guaranteed to be identical at all times. If a client cannot contact the coordinator, it sends the request to a node holding a replica, which will then periodically try to contact the coordinator.

Because data is replicated and the system does not provide ACID guarantees, it is possible that replicas may end up holding different versions. Dynamo favors application-based conflict resolution since the application is aware of the meaning of the data associated with the key and can act upon it intelligently. For example, the application may choose to merge two versions of a shopping cart. If the application does not want to bother with this, the system falls back on a last write wins strategy. Dynamo uses vector clocks to identify versions of stored data and to distinguish between versions that have been modified concurrently from versions that are causally related (a later one is just an update of an earlier one). For example, one can identify whether there are two versions of a shopping cart that have been updated independently or whether one version is just a newer modification of an older shopping card. A vector clock value is a set of (node, counter) pairs. This set of values constitutes the version of the data. It is returned as the “context” with a get operation and updated and stored when that context is passed as a parameter to a put operation.

Because Dynamo is completely decentralized and does not rely on coordinator (unlike GFS, for example), each node serves three functions:

  1. Managing get and put requests. A node may act as a coordinator responsible for a particular key or may forward the request to the node that has that responsibility.

  2. Keeping track of membership and detecting failures. A node will detect if other nodes are out of service and keep track of the list of nodes in the system and their associated hash ranges.

  3. Local persistant storage. Each node is responsible for being either the primary or replica store for keys that hash to a certain range of values. These (key,value) pairs are stored within that node using a variety of mechanisms depending on application needs. These include the Berkeley Database Transactional Data Store, MySQL (for large objects), or an in-memory buffer with persistent backing store (for highest performance).


Go here for lecture notes

Goal: Create a massively parallel software framework to make it easy to program classes of problems that use big data that can be parsed into (key, value) pairs. Each unique key is then processed with a list of all values that were found for it.

MapReduce is a framework for parallel computing. Programmers get a simple API and do not have to deal with issues of parallelization, remote execution, data distribution, load balancing, or fault tolerance. The framework makes it easy for one to use thousands of processors to process huge amounts of data (e.g., terabytes and petabytes). It is designed for problems that lend themselves to a map-reduce technique, which we will discuss. From a user’s perspective, there are two basic operations in MapReduce: Map and Reduce.

The Map function reads a stream of data and parses it into intermediate (key, value) pairs. When that is complete, the Reduce function is called once for each unique key that was generated by Map and is given that key along with a list of all values that were generated for that key as parameters. The keys are presented in sorted order.

The MapReduce client library creates a master process and many worker processes on a large set of machines. The master serves as a coordinator and is responsible for assigning roles to workers, keeping track of progress, and detecting failed workers. The set of inputs is broken up into chunks called shards. The master assigns map tasks to idle workers and gives each of them a unique shard to work on. The map worker invokes a user’s map function, which reads and parses the data in the shard and emits intermediate (key, value) results.

The intermediate (key, value) data is partitioned based on the key according to a partitioning function that determines which of R reduce workers will work on that key and its associated data. The default function is simply hash(key) mod R that distributes keys uniformly among R reduce workers but a user can replace it with a custom function. These partitioned results are stored in intermediate files on the map worker, sorted by key. All map workers must use the same partitioning function since the same set of keys must go to the same reduce workers. Map workers work in parallel and there is no need for any one to communicate with another.

After the mapping phase completes and all (key, value) data is generated, we need to get all values that share the same key to a single reduce worker. The process of moving the data to reduce workers is called shuffling.

When all map workers inform the master that they are complete, the master dispatches the reduce workers. Each reduce worker contacts all the map worker nodes to get the set of (key, value) data that was targeted for them. The combined data for each key is then merged to create a single sorted list of values for each key and the user reduce function gets called once for each unique key. The user’s reduce function is passed the key and the list of all values associated with that key. The reduce function writes its output to an output file, which the client can read once the MapReduce operation is complete.

The master periodically pings each worker for liveness. If no response is received within a time limit, then the master reschedules and restarts that worker’s task onto another worker.


Go here for lecture notes

Goal: Build a massively scalable, eventually consistent table of data that is indexed by a row key, contains a potentially arbitrary and huge number of columns, and can be distributed among tens of thousands of computers.

Bigtable is a distributed storage system developed at Google that is structured as a large table: one that may be petabytes in size and distributed among tens of thousands of machines. It is designed and used for storing items such as billions of URLs, with many versions per page; hundreds of terabytes of satellite image data; hundreds of millions of users; and performing thousands of queries a second.

To the user, an instance of Bigtable appears as a large spreadsheet: a table that is indexed by a row key, column name, and timestamp. If an application does not specify a timestamp, it will get the latest version. Alternatively, it can specify a timestamp and get the latest version that is earlier than or equal to that timestamp. All operations on rows are atomic.

Columns in the table are organized into column families. A column family is a related group of columns (e.g., “outgoing links”). Each column family has a unique name and contains within it a list of named columns. A Bigtable instance will typically have a small number of column families (perhaps a few hundred at most) but each column family may have a huge number (perhaps millions) of columns within it. Bigtable tables are generally sparse. If a cell (a column in a row) is empty, it will not use up any space. Column families are configured as part of the table and are common to all rows. Columns within a column family may be specific to a row. An example of columns within a column family is a list of all the websites you visited (i.e., many thousands of columns). The list of columns used between one row and another may be wildly different. Each cell may contain multiple versions of data. Timestamped versioning is configured on a per-column-family basis.

A Bigtable table starts off as a single table. As rows are added, the table is always kept sorted by row key. As the table grows, it may split along row boundaries into sub-tables, called tablets. Tablet servers are responsible for serving data for tablets. If a tablet gets too many queries, Bigtable can balance the workload by splitting the tablet and moving one of the new tablets to a different server.

Bigtable comprises a client library (linked with the user’s code), a master server that coordinates activity, and many tablet servers. Each tablet server is responsible for managing multiple tablets. Depending on the size of the tablets, it may manage from tens to thousands of tablets. Tablet servers can be added or removed dynamically. Tablet data is stored within GFS (Google File System) and tablet servers run on the same machines as GFS chunkservers. A tablet server handles read/write requests for its tablets and splits tablets when they grow large.

The master assigns tablets to tablet servers, balances tablet server load, and handles schema changes (table and column family creations). It tries to run the tablet server on the same GFS chunkserver that holds data for that tablet. The master is also responsible for garbage collection of files in GFS and managing schema changes (table and column family creation).

In Bigtable, Chubby is used to ensure there is only one active master, store the bootstrap location of Bigtable data, discover tablet servers, store Bigtable schema information, and store access control lists.

Bigtable can be configured for replication to multiple Bigtable clusters in different data centers to ensure availability. Data propagation is asynchronous and results in an eventually consistent model.


Go here for lecture notes

Goal: Design a huge-scale worldwide database that provides ACID semantics, read-free locking, and external consistency.


Brewer’s CAP theorem led to the popularity of an eventual consistency model rather than relying on transactional ACID semantics.

In this model, writes propagate through the system so that all replicated copies of data will eventually be consistent. Prior to the completion of all updates, processes may access old versions of the data if they are reading from a replica that has not been updated yet.

With ACID semantics, this would not happen since the transaction would grab a lock on all the replicas prior to updating them. Eventual consistency is the trade-off in designing a system that is both highly available and can survive partitioning.

The presence of network partitions is, in reality, a rare event. By using an eventual consistency model, we choose to give up on “C” (consistency) in order to gain availability on the slim chance that the network becomes partitioned. Given that partitions are rare, it is not unreasonable to enforce a strong consistency model (which requires mutual exclusion via locks). Partitioning will rarely affect the entire system but only a subset of computers. As such, it is possible to use a majority consensus algorithm, such as Paxos, to continue operations as long as the majority of replicas are functioning.

Experience with eventual consistency has taught us that it places a greater burden on the programmer. With an eventual consistency model, it is now up to the programmer to reconcile the possibility that some data that is being accessed may be stale while other data might be current. Bigtable, for example, was difficult to use in applications that required strong consistency. Custom code had to be written to coordinate locks for any changes that spanned multiple rows.

With Spanner, Google has built a transactional (ACID) database that spans many systems and widely distributed data centers. Spanner provides the user with a large-scale database that comprises multiple tables, with each table containing multiple rows and columns. The model is semi-relational: each table is restricted to having a single primary key. Unlike Bigtable, transactions can span multiple rows. Also unlike Bigtable, which provided eventually consistent replication, Spanner offers synchronous replication, ACID semantics, and lock-free reads.

Data storage

Spanner gives the user the the ability to manage multiple tables, each with rows and columns. Internally, the data is stored as a large keyspace that is sharded across multiple servers. Like Bigtable, each shard stores a group of consecutive of rows, called a tablet. This sharding is invisible to applications.

Tablets are replicated synchronously using Paxos. One of the replicas is elected to be a leader and runs a transaction manager. Any transactions that span multiple shards use the two-phase commit protocol. Replication is performed within a transaction and all replicas remain locked until replication is complete, ensuring a consistent view of the database.

Applications can specify geographic data placement and amount of replication:

  • proximity of data to users (impacts read latency)

  • proximity of replicas to each other (impacts write latency)

  • amount of replicaton (impacts availability and read performance)

Spanner was designed for a global scale and a database will span multiple data centers around the globe. In a data center, spanservers store tablets. A zonemaster periodically rebalances tablets across servers to balance load. The zonemaster does not participate in transactions.


Spanner provides transactions with ACID semantics. Transactions are serialized to satisfy the “I” (isolated) property and to create the illusion that one transaction happens after another. Strict two-phase locking is used to accomplish this.

Once a transaction has acquired all the locks it needs, it does its work and then picks a commit timestamp. Two-phase locking can reduce overall perfomance because other transactions may need to wait for locks to be released before they can access resources. Spanner uses separate read and write locks but even these can often block another transaction. A read lock will cause any writers to block and a write lock will cause any other writers or readers to block.

To provide lock-free reads, Spanner implements multiversion concurrency control by keeping old versions of data. A transaction reads data from a snapshot, an earlier point in time, without getting a lock. This is particularly useful for long-running transactions that read that many rows of the database. Any other ongoing transactions that modify that data will not affect the data that the long-running transaction reads since those will be later versions.

External consistency

Serialization (isolation, the I in ACID) simply requires that transactions behave as if they executed in some serial order. Spanner implements a stronger consistency model, external consistency, which means that the order of transactions reflects their true time order. Specifically, if a transaction T1 commits before a transaction T2 starts, based on physical (also called “wall clock”) time, then the serial ordering of commits should reflect that and T1 must get a smaller timestamp than T2. Spanner does this by using physical timestamps.

It is not possible to get a completely accurate real-world timestamp. Even if we had an accurate time source, there would be synchronization delays that would add a level of uncertainty. The problem is compounded by having the database span data centers around the globe. Spanner tries to minimize the uncertainty in getting a timestamp and make it explicit.

Each datacenter is equipped with one or more highly accurate time servers, called time masters (a combination of a GPS receivers and atomic clocks is used). The time master ensures that the uncertainty of a timestamp can be strongly bounded regardless of where a server resides. Each spanserver has access to a TrueTime API that provides a time interval for the current time, ranging from up to The earliest time is guaranteed to be in the past and the latest is guaranteed to be a timestamp in the future when the function was called. These values would typically be only milliseconds apart. Spanner can now use these timestamps to ensure that transactions satisfy the demands of external consistency.

Implementing external consistency

The key to providing external consistency is to ensure that any data committed by the transaction will not be visible until after the transaction’s timestamp. This means that even other systems that have a different clock should still see a wall clock time that is later than the transaction’s timestamp. To do this, spanner waits out any uncertainty. Before a transaction commits, it acquires a commit timestamp:

t =

This is the latest possible value of the true time across all servers in the system. It then makes sure that no locks will be released until that time is definitely in the past. This means waiting until the earliest possible current time on any system is greater than the transaction timestamp: > t

This wait is called a commit wait and ensures that any newer transaction that grabs any of the same resources will definitely get a later timestamp.

Note that there is no issue if multiple transactions have the same timestamp. Timestamps are strictly used for versioning to ensure that we provide a consistent view of the database while supporting lock-free reads. Consider the case of a transaction that needs to read hundreds of millions of rows of data. Many transactions may modify the database since you start your work but the view of the database will be consistent since all data read will be no later than a specified timestamp.


By making timestamp uncertainty explicit, Spanner could implement a commit wait operation that can wait out the uncertainty and provide external consistency along with full ACID semantics. By storing multiple versions of data, Spanner provides lock-free reads.

Spanner’s design was a conscious decision to not sacrifice the strong ACID semantics of a database. Programming without ACID requires a lot of extra thinking about the side effects of eventual consistency and careful programming if one wants to avoid it. Complex code, such as the two-phase commit protocol, is implemented within Spanner so programmers do not have to reinvent it.

Bulk Synchronous Parallel & Pregel

Go here for lecture notes

Goal: Create a software framework for fault tolerant, deadlock-free parallel processing. Then, adapt that to create an software framework makes it easy to operate on graphs on a massive scale.

Bulk Synchronous Parallel (BSP)


Bulk Synchronous Parallel (BSP) is a programming model and computation framework for parallel computing. Computation is divided into a sequence of supersteps. In each superstep, a collection of processes executes concurrently and creates messages that are sent to other processes. The superstep ends when all the computation in the superstep is complete and all messages are sent. A barrier synchronization at the end of the superstep ensures that all messages have been transmitted (but not yet delivered to the processes). The next superstep begins with the delivery of all those messages to the processes, who then execute their superstep and send messages that will be delivered at the start of the next superstep. This process continues until all processors vote to halt.

Note that no process is permitted to send and receive messages from with another process during a superstep. Any sent messages will be delivered only at the start of the next superstep. This restriction ensures deadlock-free execution.

A popular implementation of the BSP framework is Apache Hama.


What’s a graph?

A graph is a set of vertices connected by edges. Edges may be directed from one vertex to another or bidirectional. In computing, a vertex is represented by an object and a directed edge is a link to another object.

Graphs are all around us

They represent computer networks, social groups, roads, disease outbreaks, phone call connections, Twitter followers, Facebook friends, web page links, etc. Some of these graphs have an enormous scale. The world wide web has billions of pages and Facebook has around a billion users.

What’s wrong with MapReduce?

Nothing is wrong with MapReduce. It’s great for many problems. However, many graph traversal problems, when written for MapReduce, end up having to take multiple iterations of MapReduce, with the output of one iteration feeding the input of another. This is not only slow, but it is also inefficient as data has to be written to files at the end of every map and reduce operation and moved between machines. The entire state of the graph needs to be transmitted in each stage, which requires a lot more communication overhead than Pregel, where the vertices and edges remain on the machine that performs the computation. Moreover, MapReduce is not the most direct way of thinking about and logically solving many problems.

Introducing Pregel

Pregel is a software framework created by Google to make it easy to work on huge graphs (e.g., ones with billions of vertices) that span many machines (e.g., tens of thousands). Like MapReduce, the framework relieves the programmer from having to deal with assigning tasks to processors, monitoring jobs, handling failures, and managing communications.

The Pregel framework allows you to write “vertex-centric” code. That is, the same user code, a compute() function, is run concurrently on each vertex of the graph. Each instance of this function keeps track of information, can iterate over outgoing edges (each of which has a value), and can send messages to the vertices connected to those edges or to any other vertices it may know about (e.g., having received a vertex ID via a message).

When a function does not have any more work to do, it votes to halt. This puts the corresponding vertex in an inactive state. When all vertices are in an inactive state, the framework terminates. However, if a vertex’s compute function sends a message to an inactive vertex, that vertex will be made active at the next superstep.

Pregel is an adaptation of the Bulk Synchronous Parallel (BSP) model that is specifically designed with graph processing in mind. Like BSP, Pregel executes in supersteps. Each superstep consists of computation followed by message sending. All messages are synchronized with a barrier, which marks the end of the superstep. At the next superstep, the messages from the previous superstep are delivered to, and available at, the compute function. The downside of using BSP directly for graph processing is that significant additional work would be needed to define, propagate, and maintain the topology (connections) of a graph and map vertices to compute nodes.

Advanced APIs

Since there is overhead in sending messages to vertices, particularly when they are on other machines, Pregel supports the optional use of combiners. A combiner is a user-defined function that is applied to a bunch of messages all targeted to the same vertex. The Combine method processes the values and creates a single input to that vertex. For example, if the goal is to take the sum of a set of values or to choose data from the highest-numbered vertex, the combiner can merge several messages into one before they are transmitted over the network. This does not alter the compute function since it still has to be prepared to receive multiple messages from multiple sources.

To manage global state, such as overall statistics, total edges of a graph, global flags, or minium or maximum values of a vertex, Pregel allows a user to define an aggregator. An aggregator comines received values into one value and makes that value available to all vertices at the next superstep.

Pregel design

Pregel uses a master-slave architecture. Many copies of the program are started on a cluster of machines. One copy becomes the master and is responsible for coordinating activity rather than processing the graph. Others are workers. The master registers itself with a name server (Chubby). Each worker process contacts the name server to find the master. The master assigns one or more sets of vertices to each worker. By default, the assignment is based on the hash of the vertex ID, so neighboring vertices will not necessarily be assigned to the same worker.

The master assigns chunks of input, which is usually resident in GFS or Bigtable, to each worker. Each input item is a set of vertices and its edges. Workers read this input and create either local messages for the vertices they manage or, if the input record is for a remote vertex, send the message to the worker that owns that vertex. All vertices are initially marked as active. The master then asks each worker to perform a superstep. The worker will run concurrent threads, each of which executes a compute function on its vertex. The compute function consumes input, runs its algorithm, and generates zero or more messages to other vertices. Workers send these messages asynchronously but they will not be delivered to their target functions until the next superstep starts. When a worker is done with its processing for one superstep, it informs the master. It also tells the master how many of the vertices it manages will be in the active state in the next superstep. The master waits for all workers to complete before starting the next superstep. This cycle continues until there are no more vertices in the active state.

Fault tolerance

Pregel uses checkpointing for fault tolerance. The master tells each worker to checkpoint itself every N supersteps. Checkpointing means that each vertex has to save its entire state in stable storage. This state will include vertex values, edge values, incoming messages, and possibly any other state that the algorithm needs to track. A master will periodically send ping messages to workers to see if they are alive. If a master does not hear from a worker within a certain window of time, it assumes that the worker has failed. In this case, the master reassigns the set of vertices to other workers and tells all workers to restart their computation from the superstep at the most recent checkpoint.

A popular implementation of Pregel is Apache Giraph, which has been used by Facebook to analyze its social graph.


Go here for lecture notes

Goal: Create a general-purpose high-performance framework for big-data processing.

MapReduce was a powerful framework for parallel computation but it forced a rigid model onto the programmer. Quite often, a computation had to be implemented as a sequence of MapReduce operations. Every map and every reduce operation has to run to completion and write its results to a file before the next sequence of operations can start. Spark creates a highly-flexible framework that lets programmers define their job as a sequence of tranformations and actions on data.

The main client application is called the driver program (or just driver) and is linked with a Spark library. When run, the library creates a SparkContext, which connects to a cluster manager that allocates available worker nodes to run the job. Each worker node runs an executor that contacts the driver for tasks to run. Each executor runs as a process inside a Java Virtual Machine (JVM).

The driver goes through the program, which consists of a sequence of transformations and actions as well as the source data. It creates a directed graph of tasks, identifying how data flows from one transformation to another and ultimately to an action. These tasks are then sent to the executors as jar files for execution. Tasks operate on data and each task is either a transformation or action.


Data in Spark is a collection of Resilient Distributed Datasets (RDDs). RDDs can be created in three ways:

  1. They can be data that is a file or set of files in HDFS, key-value data from an Amazon S3 server (similar to Dynamo), HBase data (Hadoop’s version of Bigtable), or the result of a SQL or Cassandra database query.

  2. They can be streaming data obtained using the Spark Streaming connector. As an example, this could be a stream of data from a set of remote sensors.

  3. An RDD can be the output of a transformation function. This is the way one transformation creates data that another transformation will use. To optimize performance, the RDD created by a transfomation is cached in memory (overflowing to disk if necessary).

RDDs have several key properties:

  • They are immutable. A task can read an RDD and create a new one but cannot modify an existing RDD.

  • They are typed (structured). An RDD has a structure that a task can parse.

  • They are partitioned. Parts of an RDD may be sent to different servers. The default partitioning function is to send a row of data to the server corresponding to hash(key) mod servercount.

  • They are ordered. An RDD contains elements that can be sorted. For example, a list of key-value pairs can be sorted by key. This property is optional.

Transformations and actions

Spark allows two types of operations on RDDs: transformations and actions. Transformations read an RDD and create a new RDD. Example transformations are map, filter, groupByKey, and reduceByKey. Transformations are evaluated lazily, which means they are computed only when some other task needs the RDD that they generate. At that point, the driver schedules the task for execution.

Actions are operations that evaluate and return a value to the driver program. When an action is requested on an RDD object, the necessary transformations are computed and the result is returned to the driver. Example actions are reduce, grab samples, and write to file.

Fault tolerance

For each RDD, the driver tracks the sequence of transformations used to create it. That means an RDD knows which RDDs it is dependent on and which tasks needed to create each one. If any RDD is lost (e.g., a task that creates one died), the driver can ask the task that generated it to recreate it. The driver maintains the entire dependency graph, so this recreation may end up being a chain of transformation tasks going back to the original data.

Content delivery networks

Go here for lecture notes

Goal: Provide a highly-scalable infrastructure for caching and serving content from multiple sources.

A content delivery network (CDN) is a set of servers that are usually placed at various points at the edges of the Internet, at various ISPs, to cache content and distribute it to users.

There are several traditional approaches to making a site more scalable and available:

Proxy servers
Organizations can pass web requests through caching proxies. This can help a small set of users but you’re out of luck if you are not in that organization.
Clustering within a datacenter with a load balancer
Multiple machines within a datacenter can be load-balanced. However, they all fail if the datacenter loses power or internet connectivity.
Machines can be connected with links to multiple networks served by multiple ISPs to guard against ISP failure. However, protocols that drive dynamic routing of IP packets (BGP) are often not quick enough to find new routes, resulting in service disruption.
Mirroring at multiple sites
The data can be served at multiple sites, with each machine’s content synchronized with the others. However, synchronization can be difficult.

All these solutions require additional capital costs. You’re building the capability to handle excess capacity and improved availability even if the traffic never comes and the faults never happen.

By serving content that is replicated on a collection of servers, traffic from the main (master) server is reduced. Because some of the caching servers are likely to be closer to the requesting users, network latency is reduced. Because there are multiple servers, traffic is distributed among them. Because all of the servers are unlikely to be down at the same time, availability is increased. Hence, a CDN can provide highly increased performance, scalability, and availability for content.


We will focus on one CDN: Akamai. The company evolved from MIT research that was focused on “inventing a better way to deliver Internet content.” A key issue was the flash crowd problem: what if your web site becomes really popular all of a sudden? Chances are, your servers and/or ISP will be saturated and a vast number of people will not be able to access your content. This became known as the slashdot effect.

In late 2016, Akamai ran on over 216,000 servers on over 1,500 networks in over 120 countries. It serves between 15 and 30 percent of all web traffic.

Akamai tries to serve clients from nearest, available servers that are likely to have requested content. According to the company’s statistics, 85% percent of the world’s Internet users are within a single network hop of an Akamai CDN server.

To access a web site, the user’s computer first looks up a domain name via DNS. A mapping system locates the caching server that can serve the content. Akamai deploys custom dynamic DNS servers and customers who use Akamai’s services register their domains to use those servers. They use the requestor’s address to find the nearest edge server that is likely to hold the cached content for the requested site.

When an Akamai DNS server gets a request to resolve a host name, it chooses the IP address to return based on:

  • domain name being requested

  • server health

  • server load

  • user location

  • network status

  • load balancing

Akamai can also perform load shedding on specific content servers; if servers get too loaded, the DNS server will not respond with those addresses.

The next step in accessing a site is to send the request to the edge server that was provided by the DNS lookup. That edge server may already have the content and be able to serve it directly. Otherwise, it will need to contact the origin server (the server at the company that hosts the content) via its transport system.

To do this efficiently, Akamai manages an overlay network: the collection of its thousands of servers and statistics about their availability and connectivity. Akamai generates its own map of overall IP network topology based on BGP (Border Gateway Protocol) and traceroute data from various points on the network.

Content servers report their load to a monitoring application. The monitoring application publishes load reports to a local Akamai DNS server, which then determines which IP addresses to return when resolving names.

A CDN, serving as a caching overlay, provides three distinct benefits:

  1. Caching: static content can be served from caches, thus reducing the load on origin servers.

  2. Routing: by measuring latency, packet loss, and bandwidth throughout its collection of servers, The CDN can find the best route to an origin server, even if that requires forwarding the request through several of its servers instead of relying on IP routers to make the decision.

  3. Security. Because all requests go to the CDN, it absorbs any Distributed Denial-of-Service attacks (DDoS) rather than overwhelming the origin server. Moreover, any penetration attacks target the machines in the CDN rather than the origin servers.


Go here for lecture notes

Goal: Combine computers together to create high performing and/or highly reliable systems that provide users with a single system image.

Clustering is the aggregation of multiple independent computers to work together and provide a single system that offers increased reliability and/or performance. It is a realization of the single system image that we discussed at the start of the semester. Clusters are generally off-the-shelf computers that are connected to a local area network that allows them to communicate with other computers in the cluster. A cluster may be a collection of tens of thousands (or more) computers (e.g., google cluster) or just a backup computer to take over for a failed web server or database.

There are four classes of cluster architectures:

Also known as high-performance computing, or HPC. The goal of an HPC cluster is to create a computing environment that resembles that of a supercomputer.
High Availability (HA)
The goal in this cluster it to ensure maximum availability by providing redundant systems for failover.
Load Balancing
A load balancing cluster distributes requests among a collection of computers. In doing so, it addresses both scalability and high availability.
Storage clustering is a way to ensure that systems can all access the same storage. It is also a way to make vast amounts of storage available to a computer without having to put it inside the computer (where it will not be available if the computer fails).

The boundaries between these cluster types are often fuzzy and many clusters will use elements of several of these cluster types. For example, any cluster may employ a storage cluster. High availability is important in supercomputing clusters since the likelihood of any one computer failing increases with an increasing number of computers. There is no such thing as a “standard” cluster.

Cluster components

A cluster needs to keep track of its cluster membership: which machines are members of the cluster. Related to this are the configuration and service management components. The configuration system runs on each node (computer) in the cluster and manages the setup of each machine while the service management component identifies which nodes in the cluster perform which roles (e.g., standby, active, running specific applications).

A quorum is the number of nodes in a cluster that have to be alive for the cluster to function. Typically, a majority is required. This provides a simple way of avoiding split-brain due to network partitioning where one group of computers cannot talk to another and two instances of the cluster may be created. With a majority quorum, a minority of systems will never create their own cluster.

A cluster interconnect is the network that allows computers in a cluster to communicate with each other. In most cases, this is just an Ethernet local area network. For performance, bandwidth and latency are considerations. Communicating outside of a rack incurs longer cable runs and the overhead of an extra switching stage. Communicating outside of a data center incurs even longer latency. For maximum performance, we would like computers that communicate frequently to be close together physically. However, for maximum availability, we would like them to be distant. If a rack switch or an entire data center loses power, it would be good to have a working replica elsewhere. For high performance applications within a local area, a dedicated network is often used as a cluster interconnect. This is known as a System Area Network (SAN). A high-performance SAN will provide low latency, highly reliable, switched communication between computers. By using a SAN, the software overhead of having to run the TCP/IP stack, with its requisite fragmentation, buffer management, timers, acknowledgements, and retransmissions, is largely eliminated. Remote DMA (RDMA) allows data to be copied directly to the memory of another processor. SANs are often used for HPC clusters, with SAN/RDMA communication incorporated into the Message Passing Interface (MPI) library, which is commonly used in high performance computing applications. Examples of SAN interconnects are Infiniband, Myrinet, and 10 Gbps ethernet with Data Center Bridging. They are generally used to connect a relatively small number of computers together.

A heartbeat network is the mechanism that is used to determine whether computers in the cluster are alive or dead. A simple heartbeat network exchanges messages between computers to ensure that they are alive and capable of responding. Since a local area network may go down, one or more secondary networks are often used as dedicated heartbeat networks in order to distinguish failed computers from failed networks. Asynchronous networks, such as IP, make the detection of a failed computer problematic: one is never certain whether a computer failed to send a message or whether the message is delayed beyond a timeout value.

Storage clusters

Storage in a clustered computer system can be provided in a variety of ways. Distributed file systems, such as NFS, SMB, or AFS can be used. These provide file-level remote access operations over a network.

A Storage Area Network (SAN, not to be confused with a System Area Network) is a dedicated network for connecting computers to dedicated disk systems (storage arrays). Common SAN interconnect technologies include iSCSI, which uses the SCSI protocol over the Ethernet, and Fibre Channel. Computers access this remote storage at the block level (read a specific block, write a specific block), just like they would access local storage. With a SAN, however, access to the same storage can be shared among multiple computers. This environment is called shared disk. A distributed lock manager, or DLM, manages mutual exclusion by controlling access to key resources on the shared disk so that, for example, two computers will not try to write to the same disk block at the same time. A clustered file system is a file system that is built on top of a shared disk. Unlike a distributed file system (NFS, SMB, et al.), which uses remote access at a file level, each computer’s operating system implements a full file system and makes requests at the block level. Examples of such file systems include the Oracle Cluster File System for Linux (OCFS2), Red Hat’s Global File System (GFS2), and Microsoft’s Cluster Shared Volumes (CSV). The DLM is used to ensure that critical shared file system data structures, such as bitmaps of free blocks, inode structures, and file lock tables, are accessed exclusively and caching is coherent. It operates at the level of the implementation of a file system rather than high-level file systems services as in distributed file systems. As such, it differs from something like the NFS lock daemon, which kept track of file locks requested by applications rather than block-level locks needed to keep a file system coherent.

A shared nothing cluster architecture is one where each system is independent and there is no single point of contention in the system, such as competing for access to a shared disk. Because there is no contention, there is no need for a DLM. In this environment, any data that is resident on a system’s disk can only be obtained by sending a request to the computer that owns the disk. If the computer dies, the data is generally unavailable but may be replicated on other nodes. An alternative design that uses a SAN can allow disk access to be switched to another computer but ensure that only one computer accesses the file system at any time.

To make disks themselves highly available, RAID (redundant array of independent disks) is often employed. RAID 1 is disk mirroring. Anything that is written to one disk gets written to a secondary disk. If one fails then you still have the other. RAID 5 and RAID 6 stripes the data across several disks and also adds in error correcting codes so that it data could be reconstructed from the available segments if one would die (e.g., parity to allow recovering data lost if one disk fails).

High-Performance Computing (HPC)

High-performance clusters (HPC) are generally custom efforts but there are a number of components that are common across many implementations. HPCs are designed for traditional supercomputing applications that focus on a large amount of computation on large data sets. These applications are designed to be partitioned into multiple communicating processes. The Message Passing Interface (MPI) is a popular programming interface for sending and receiving messages that handles point-to-point and group communication and provides support for barrier-based synchronization. It is sometimes used together with the Parallel Virtual Machine (PVM), a layer of software that provides an interface for creating tasks, managing global task IDs, and managing groups of tasks on arbitrary collections of processors. PVM is in many ways similar to MPI but designed to be more dynamic and support heterogenous environments. However, its performance was not up to the levels of MPI and its popularity is waning. Beowulf and Rocks Cluster are examples of HPC clusters based on Linux. Microsoft offers high performance clustering via the Microsoft HPC Pack. There are many other HPC systems as well. The common thread among them all is that they provide a front-end server for scheduling jobs and monitoring processes and offer an MPI library for programming.

Batch Processing: Single-Queue Work Distribution

Single queue work distribution is a form of high performance computing that does not rely on communication between computing nodes. Where traditional HPC applications usually involve large-scale array processing and a high level of cooperation among processing elements, the work distribution approach is used for applications such as render farms for computer animation, where a central coordinator (dispatcher) sends job requests to a collection of computers. When a system completes a job (e.g., “render frame #4,178”), the dispatcher will send it the next job (e.g., “now render frame #12,724”). The dispatcher will have the ability to list jobs, delete jobs, dispatch jobs, and get notified when a job is complete. The worker nodes have no need to communicate with each other.

Load Balancing

Web-services load balancing is a somewhat trivial but very highly used technique for distributing the load of many network requests among a collection of computers, each of which is capable of processing the request. Load balancing serves three important functions:

  1. Load balancing. It enables scalability by distributing requests among multiple computers.

  2. High availability (failover). If a computer is dead, the requests will be distributed among the remaining live computers.

  3. Planned outage management. If a computer needs to be taken out of service temporarily (for example, to upgrade software or replace hardware), requests will be distributed among the remaining live computers.

The simplest form of load balancing is to have all requests go to a single computer that then returns an HTTP REDIRECT error. This is part of the HTTP protocol and will lead the client to re-issue the request to the computer specified by the REDIRECT error.

Another, and the most popular approach, is to use a load-balancing router to map incoming requests to one of several multiple back-end computers.

For load balancing across data centers, DNS-based load balancing may be used where a DNS query returns IP addresses of machines at different data centers for domain name queries.

High Availability

High-availability clusters strive to provide a high level of system uptime by taking into account the fact that computers may fail. When this happens, applications running on those computers will resume on other computers that are still running. This is called failover.

Low-level software to support high-availability clustering includes facilities to access shared disks and support for IP address takeover, which enables a computer to listen on multiple IP addresses so that IP packets that were sent to a failed machine can reach the backup system instead.

Mid-layer software includes distributed elections to pick a coordinator, propagation of status information, and figuring out which systems and applications are alive. Higher-layer software includes the ability to restart applications, let a user assign applications to computer, and let a user see what’s going on in the system as a whole.

An active/passive configuration is one where one or more backup (passive) systems are waiting to step in for a system that died. An active/active configuration allows multiple systems to handle requests. Requests may be load balanced across all active systems and no failover is needed; the dead system is simply not sent any requests.

Failover can be implemented in several ways:

Cold failover
This is an application restart — the application is started afresh from the beginning. An example is starting up a web server on a backup computer because the primary web server died. There is no state transfer.
Warm failover
Here, the application is checkpointed periodically. It can then be restarted from from the last checkpoint. Many cluster libraries provide the ability for a process to checkpoint itself (save its memory image). Pregel is an example of a software framework that relies on periodic checkpointing so that a graph computation does not have to restart from the beginning.
Hot failover
Here, a replica application is always kept synchronized with the active application on another computer. An example of this is a replicated state machine. Chubby servers, for example, implement hot failover: if the Chubby master fails, any other machine in the Chubby cluster can step in.

Cascading failover refers to the ability of an application to fail over even after it already has failed over in the past. Multi-directional failover refers to the ability to restart applications from a failed system on multiple available systems instead of a specific computer that is designated for use as a standby system.

An annoying malfunction is a byzantine failure. In this case, the failed process or computer continues to communicate but communicates with faulty data. Related to this is the problem of fail-restart behavior, where a process may restart but not realize that the data it is working with is obsolete (e.g., a transaction coordinator might restart and not realize that the transaction has already been aborted). Fencing is the use of various techniques to isolate a node from the rest of the cluster. Power fencing shuts off power to the node to ensure that the node does not restart. SAN fencing disables the node from accessing shared storage, avoiding possible file system corruption. Other fencing techniques may block network messages from the node or remove processes from a replication group (as done in virtual synchrony).


Go here for lecture notes

Goal: Use symmetric cryptography, public key cryptography, random numbers, and hash functions to enable secure communication, authenticated messages, and key exchange.

Cryptography deals with encrypting plaintext using a cipher, also known as an encryption algorithm, to create ciphertext, which is unintelligible to anyone unless they can decrypt the message.

A restricted cipher is one where the workings of the cipher must be kept secret. There is no reliance on a key and the secrecy of the cipher is crucial to the value of the algorithm. This has obvious flaws (people in the know leaking the secret or coming up with a poor algorithm that can easily be reverse engineered). For any serious encryption, we use well-known and well-tested non-secret algorithms that rely on secret keys.

A symmetric encryption algorithm uses the same secret key for encryption and decryption.

A public key algorithm uses a pair of keys: data encrypted with the first key can be decrypted only with the second key and vice versa. One of these keys is typically kept private (known only to the creator) and is known as the private key. The corresponding key is generally made visible to others and is known as the public key. Anything encrypted with the private key can only be decrypted with the public key. This is the basis for digital signatures because the encryption can only be performed by the key’s owner. Anything that is encrypted with a public key can be encrypted only with the corresponding private key. This is the basis for authentication and covert communication because the decryption can only be performed by the recipient, who is the only one who has the corresponding private key.

A one-way function is one that can be computed relatively easily in one direction but there is no known way of computing the inverse function. One-way functions are crucial in a number of cryptographic algorithms, including digital signatures, Diffie-Hellman key exchange, and RSA public key cryptography. For Diffie-Hellman and RSA keys, they ensure that someone cannot generate the corresponding private key when presented with a public key. A particularly useful form of a one-way function is the hash function. This is a one-way function whose output is always a fixed number of bits for any input. Cryptographic hash functions generally produce longer results than those used for hash tables. Common lengths are 224, 256, 384, or 512 bits. For good cryptographic hash functions (e.g., SHA–1, SHA–2, SHA–3), it is highly unlikely that two messages will ever hash to the same value. It is also extremely difficult to construct text that hashes to a specific value, and it is extremely difficult to modify the plaintext without changing its resultant hash. The hash function is the basis for message authentication codes and digital signatures. Note that when we talk about cryptography and mention phrases such as “extremely difficult”, we mean “impossible for all practical purposes,” not that “you can do it if you spend an entire week working on the problem.”

Secure communication

To communicate securely using a symmetric cipher, both parties need to have a shared secret key. Alice will encode a message to Bob using the key and Bob will use the same key to decode the message. If Alice wants to communicate with Charles, she and Charles will also need a secret key. The fact that every pair of entities will need a secret key leads to a phenomenon known as key explosion. Overall, in a system with n users, there will be O(n2) keys.

The biggest problem with symmetric cryptography is dealing with key distribution: how can Alice and Bob share a key so they can communicate securely? The Diffie-Hellman exponential key exchange algorithm allows us to do this. Each party generates a private “key” and a public “key” (these are not encryption keys; they’re just numbers — Diffie-Hellman does not implement public key cryptography — it is unfortunate that the term was used to describe these numbers). It uses a one-way function abmod c in a way that allows Alice to compute a common key using her private key and Bob’s public key. Bob can compute the same common key in the same way by using his private key and Alice’s public key. They can then communicate securely by using the common key with a symmetric cipher.

Using true public key cryptography, such as RSA, if Alice encrypts a message with Bob’s public key, Bob will be the only one who can decrypt it since doing so will require Bob’s private key. Likewise, Bob can encrypt messages with Alice’s public key, knowing that only Alice will be able to decrypt them with her private key.

Session keys

A session key is a random key that is created for encrypting and decrypting data for just one communication session. It is useful because if the key is ever compromised, no lasting information is obtained: future communication sessions will use different keys. With the Diffie-Hellman algorithm. for example, Alice would typically use the common key to encrypt a randomsession key so she can pass it to Bob securely (only Bob can decode it). Then, Alice and Bob will encrypt their messages with the session key.

A hybrid cryptosystem uses public key cryptography to send a session key securely. The originator generates a random session key and encrypts it with the recipient’s public key. The recipient decrypts the message with the corresponding private key to extract the session key. After that, symmetric cryptography is used for communication, with messages encrypted with the session key. This has the advantages of higher performance (public key cryptography is much, much slower than symmetric cryptography), ease of communicating with multiple parties (just encrypt the session key with the public keys of each of the recipients), and allows the bulk of data to be encrypted with session keys instead of the hardly-ever-changing public keys.

Message Authentication Codes and Digital Signatures

A hash of a message can act as a form of a checksum for the message: if the message is modified, it will hash to a different value. If an intruder modifies the messagee, they will have to rehash it and update the corresponding hash value.

Both message authentication codes and digital signatures are a bunch of bytes that are associated with a message to allow the recipient to check whether the message has been damaged or modified. The message itself does not have to be encrypted and the authentication code or signature is separate from the message.

A message authentication code (MAC) is a hash of a message encrypted with a symmetric key. A message can be sent unencrypted along with the MAC. Anyone can see the message but an intruder will not be able to modify it without knowing the key needed to encrypt a new hash of the message.

A digital signature is simply the hash of a message encrypted with the creator’s (signer’s) private key. Anyone who has the message signer’s public key can decrypt the hash and thus validate the hash against the message. Other parties cannot recreate the signature. Note that, with a MAC, the recipient or anyone in possession of the shared key can create the same MAC. With a digital signature, the signature can only be created by the owner of the private key. Even though others can generate the same hash for the message, they do not have the signer’s private key to encrypt that hash.


Go here for lecture notes

Goal: Create protocols for authenticating users, establishing secure communication sessions, authorizing services, and passing identities.

The three A’s of security are:

The process of binding an identity to the user. Note the distinction between authentication and identification. Identification is simply the process of asking you to identify yourself (for example, ask for a login name). Authentication is the process of proving that the identification is correct.
Given an identity, making a decision on what access the user is permitted. Authentication is responsible for access control.

Accounting Logging system activity so that any breaches can be identified (intrusion detection) or a post facto analysis can be performed.

A fourth item, not in the “standard list,” is auditing: inspecting the software and system configuration for security flaws.

The three factors of authentication are: something you have (such as a key or a card), something you know (such as a password or PIN), and something you are (biometrics). Combining these into a multi-factor authentication scheme can increase security against the chance that any one of the factors is compromised. Multi-factor authentication must use two or more of these factors. Using two passwords, for example, is not sufficient.

Password Authentication Protocol (PAP)

The classic authentication method is the use of reusable passwords. This is known as the password authentication protocol, or PAP. The system asks you to identify yourself (login name) and then enter a password. If the password matches that which is associated with the login name on the system then you’re authenticated.

One problem with the protocol is that if someone gets hold of the password file on the system, then they have all the passwords. The common way to thwart this is to store hashes of passwords instead of the passwords themselves. This takes advantage of the one-way property of the hash. To authenticate a user, check if hash(password) = stored_hashed_password. If someone got hold of the password file, they’re still stuck since they won’t be able to reconstruct the original password from the hash. They’ll have to resort to an exhaustive search or a dictionary attack to search for a password that hashes to the value in the file.

A dictionary attack is an optimization of the search that tests common passwords, including dictionary words and common letter-number substitutions. An intruder does not need to perform a search for each password to find a matching hash. Instead, the results of an exhaustive or dictionary search can be stored and searched to find a corresponding hash in a password file. These are called precomputed hashes. To guard against this, a password is concatenated with a bunch of extra random characters, called salt. These characters make the password substantially longer and a table of precomputed hashes insanely huge and hence not practical. The salt is not a secret – it is stored in plaintext in the password file in order to validate a user’s password. Its only function is to make using precomputed hashes impractical and ensure that even identical passwords do generate the same hashed results.

The other problem with reusable passwords is that if a network is insecure, an eavesdropper may sniff the password from the network. A potential intruder may also simply observe the user typing a password. To thwart this, we can turn to one-time passwords. If someone sees you type a password or gets it from the network stream, it won’t matter because that password will be useless for future logins.

There are three forms of one-time passwords:

  1. Sequence-based. Each password is a function of the previous password. S/Key is an example of this.

  2. Challenge-based. A password is a function of a challenge provided by the server. CHAP is an example of this.

  3. Time-based. Each password is a function of the time. SecurID is an example of this.

S/Key Authentication

S/Key authentication allows the use of one-time passwords by generating a list via one-way functions. The list is created such that password n is generated as f(password[n–1]), where f is a one-way function. The list of passwords is used backwards. Given a password password[p], it is impossible for an observer to compute the next valid password because a one-way function f makes it improbably difficult to compute the inverse function, f–1(password[p]), to get the next valid password, password[p–1].

CHAP Authentication

The Challenge-Handshake Authentication Protocol (CHAP) is an authentication protocol that allows a server to authenticate a user without sending a password over the network.

Both the client and server share a secret (such as a password). A server creates a random bunch of bits (called a nonce) and sends it to the client (user) that wants to authenticate. This is the challenge.

The client identifies itself and sends a response that is the hash of the shared secret combined with the challenge. The server has the same data and can generate its own hash of the same challenge and secret. If the hash matches the one received from the client, the server is convinced that the client knows the shared secret and is therefore legitimate.

An intruder that sees this hash cannot extract the original data. An intruder that sees the challenge cannot create a suitable hashed response without knowing the secret. Note that this technique requires passwords to be accessible at the server and the security rests on the password file remaining secure.


RSA’s SecureID is a two-factor authentication system that generates one-time passwords for response to a user login prompt. It relies on a user password (Personal ID Number, PIN) and a token device (an authenticator card or fob). The token generates a new number every 30 seconds. The number is a function of a seed that is unique for each card and the time of day. To authenticate to a server, you send a concatenation of your PIN and the number from the token in lieu of a password. A legitimate remote system will have your PIN as well as the token seed and will be able to compute the same value to validate your password. An intruder would not know your PIN or the token’s seed and will never see it on the network.

Public key authentication

A nonce is a random bunch of bits that is generated on the fly and usually used to present to the other party as a challenge for them to prove that they are capable of encrypting something with a specific key that they possess. The use of a nonce is central to public key authentication. If I send you a nonce and you encrypt it with your private key and give me the results, I can decrypt that message using your public key. If the decryption matches the original nonce, this will convince me that only you could have encrypted the message since only you possess your private key.

Kerberos authentication

Kerberos is a trusted third party authentication, authorization, and key exchange protocol using symmetric cryptography. When you want to access a service, you first need to ask Kerberos. If access is authorized, you get two messages. One is encrypted with your secret key and contains the session key for your communication with the service. The other message is encrypted with the service’s secret key. You cannot read or decode this second message. It is known as a ticket or sealed envelope. It contains the same session key that you received but is encrypted for the service. When the service decrypts it, it knows that the message must have been generated by an entity that knows its secret key: Kerberos. Now that it has the session key, the service can communicate with you securely by encrypting all traffic with that key.

Since your secret key is needed to decrypt every service request you make of Kerberos, you’ll end up typing your password each time you want to access a service. Storing the key in a persistant file is not a good idea. Kerberos handles this by splitting itself into two components that run the same protocol: the authentication server (AS) and the ticket granting server (TGS). The authentication server handles the initial user request and provides a session key to access the TGS. This session key can be cached for the user’s login session and allows the user to send requests to the TGS without re-entering a password. The TGS is the part of Kerberos that handles requests for services. It also returns two messages to the user: a session key for the desired service and a ticket that must be provided to that service.

Digital certificates

While public keys simplify authentication (just decrypt this with my public key and you know that I was the only one who could have encrypted it), identity binding of the public key must be preserved for you to know that you really have my public key instead of someone else’s. X.509 digital certificates provide a way to do this. A certificate is a data structure that contains user information and the user’s public key. This data structure also contains a signature of the certification authority. The signature is created by taking a hash of the rest of the data in the structure and encrypting it with the private key of the certification authority. The certification authority (CA) is responsible for setting policies of how they validate the identity of the person who presents the public key for encapsulation in a certificate.

Transport Layer Security (Secure Sockets Layer)

Secure Sockets Layer (SSL, also known as TLS — Transport Layer Security) is a layer of software designed to provide authentication and secure communication over the abstraction of a sockets interface. It makes it easy to add a secure transport onto insecure TCP socket based protocols (e.g., HTTP and FTP). SSL uses a hybrid cryptosystem and relies on public keys for authentication. If both the sender and receiver have X.509 digital certificates, SSL can validate them and use nonce-based public key authentication to validate that each party has the corresponding private key. In some cases, it may validate the server only. If the server does not have a certificate, SSL will then use a public key simply to allow a symmetric session key to be passed securely from client to server. The client generates a session key and encrypts it with the server’s public key. This ensures that only the server will be able to decode the message and get the session key. After that, communication takes place using a symmetric algorithm and the client-generated session key.

Service Authorization via OAuth

Go here for lecture notes

Goal: Enable users to provide limited permissions for one service to access another service.

Suppose that you want an app (or some network service) to access your Google calendar. One way to do this is to provide the app with the ID and password of your Google account. Unfortunately, this gives the app full control of the entire account and is something you may not feel comfortable granting.

OAuth is designed to allow users to control what data or services one service can access from another service.

For example, if you want a photo printing service to access photos from your flickr account, you don’t want to provide it with your flickr username and password for unrestricted access. Instead, OAuth allows you to specify what access you allow the photo printing service to have to flicker and for how long. Token credentials (a bunch of bits as far as your app or website is concerned) are used in place of the resource owner’s username and password for gaining access to the service. These token credentials include a token identifier and a secret.

There are three entities involved in OAuth:

  1. User: the user and the user’s browser.

  2. Client (application): this is the service that the user is accessing. For example, the photo printing service

  3. Authorization Server, acting as the Service Provider (server): this is the service that the consumer needs to access. For example, to get the photos

We’ll use the photo printing and the photo serving service as an example.

  1. Alice wants to order some prints and logs into Moo knows about flickr and allows her to select select flickr as the source of her photos. When Moo built its service, its developers registered the service with flickr and obtained OAuth client credentials (client ID and secret).

  2. When Alice selects flickr, needs to contact for an authorization code, which is a temporary credential. The application,, creates a request that contains a scope: a list of requested services that it needs from flickr (for example, get photos from your account). The application then redirects Alice to to the flicker OAuth page (that could be a separate server at flickr) with a directive to redirect her back to Moo when she’s done. The request contains the app’s ID, the app’s secret, and the scope.

  3. At the flicker OAuth page, she authenticates (using login/password or perhaps via OpenID, which may cause another level of redirection) and is presented with a description of what is requesting to do (e.g., access to download photos for the next 10 minutes). She can approve or reject the request.

  4. When Alice approves the request, she is redirected back to The redirect contains the authorization code.

  5. Moo now contacts directly and exchanges the authorization code for an access token. Authorization codes are used to obtain a user’s approval. Access tokens are used to access resources on the provider (server); that is, to call APIs. Moo now sends API requests to flicker containing the access token to flickr to download the requested photos.

Distributed Authentication via OpenID Connect

Go here for lecture notes

Goal: Allow services to use a third-party user authentication service to authenticate a user.

OpenID Connect was created to solve the problem of alleviating a user from managing multiple identities (logins and passwords) at many different services (web sites).

It is not an authentication protocol. Rather, it’s a technique for a service to redirect authentication to a specific OpenID Connect authentication server and have that server be responsible for authentication.

OpenID Connect is an identity layer on top of the OAuth 2.0 protocol. It uses the basic protocol of OAuth and the same communication mechanisms: HTTPS with JSON messages.

There are three entities involved:

  1. Client: this is the application that the user is accessing. It is often a web site but may be a dedicated app. (In OAuth, we think of this as the consumer).

  2. User: this is the user’s browser. In the case of a dedicated app, the app will take this role.

  3. Authorization Server, acting as the Identity Provider. This is the server that is responsible for managing your ID and authenticating you. It is called the Authorization Server because OpenID Connect is built on top of OAuth 2.0, a service authorization framework. For OAuth, we this of this as a Service Provider instead of Identity Provider.

OpenID Connect allows the client to answer the question, what is the identity of the person using this browser or app?

The protocol contacts an authorization server to get an access token and then uses that access token at the client.

Here’s the basic flow:

  1. Before OpenID Connect is used, a client may pre-register its service with an Authorization Server. This can allow a server administrator to control restrictions on policies for accessing user information. It also allows a client and server to agree upon a shared secret that will be used to sign future messages. If this is not done, public key cryptography can be used.

  2. The user is presented with a request to log in. The client may choose to support the use of multiple identity providers. In this case, the user identifies the identity provider in the user name. For example, The protocol then knows that the authorization server is at Alternatively, the provider may force the use of a specific provider (e.g., Google). The client redirects the user to the authorization server. This is done via an HTTP REDIRECT message.

  3. The redirect results in the client sending an Authentication Request to the authorization server. This is an OAuth message with a scope (access request) that requests "openid’. The scope can also include other identifying data, such as user profile, email, postal address, and phone number.

  4. The authorization server authenticates the user using whatever protocol it wants to use.

  5. After authenticating the user, the authorization server requests the users permission for all requests listed in the scope. For example, SomeApp requests permission for: signing you in, your profile, your email address, your address, your phone number. This is the same as any OAuth request for services.

  6. If the user approves, the authorization server sends a redirect to switch the user back to the client. This redirect message contains an authorization code created by the authorization server that is now given to the client. The authorization code looks like a large bunch of random text and is of no use to the user.

  7. The client now sends a token request directly to the authorization server. The request includes the authorization code that it received. All traffic is encrypted via HTTPS to guard agains eavesdroppers. If the client pre-registered, the request will contain a pre-established secret so that the authorization server can validate the client.

  8. The server returns an access token and an ID token to the client. Note that the user is not involved in this flow.

The ID token asserts the user’s identity. It is a JSON object that:

  • identifies the Identity Provider (authorization server)
  • has an issue and expiration date
  • may contain additional details about the user or service
  • is digitally signed by the identity provider using either the provider’s private key or a shared secred that was set up during registration.

By getting this ID token, the client knows that, as far as the authorization server is concerned, the user has successfully authenticated. At this point, the client does not need to contact the authorization server anymore.

The access token is the same access token that OAuth provides clients so that they can request services. It has an expiration date associated with it and may be passed to the authorization server to request access to detailed information about the user or get other protected resources. What the client can request is limited by the scope of the Authentication Request in step 2.

OpenID and OAuth were separate protocols until 2014. They were merged such that OpenID Connect is a special mode of OAuth that requests authentication and provides the client with an ID token. However, the purpose of OAuth and OpennID are fundamentally different. OpenID is designed to allow a third party to manage user authentication. That is, it provides single sign-on: allowing a user to use the same login and a third party (OpenID) authentication service for many web sites. OAuth, on the other hand, is designed for service authorization. It is up to the remote service that you are accessing to decide if and how to authenticate you before allowing you to authorize access to that service.