Exam 3 Study Guide

The one-hour study guide for exam 3

Paul Krzyzanowski

April 2020

Disclaimer: This study guide attempts to touch upon the most important topics that may be covered on the exam but does not claim to necessarily cover everything that one needs to know for the exam. Finally, don't take the one hour time window in the title literally.

Last update: Sat Apr 29 20:44:25 EDT 2023


Goal: Build a massively scalable, eventually consistent table of data that is indexed by a row key, contains a potentially arbitrary and huge number of columns, and can be distributed among tens of thousands of computers.

Bigtable is a distributed storage system developed at Google that is structured as a large table: one that may be petabytes in size and distributed among tens of thousands of machines. It is designed and used for storing items such as billions of web pages indexed by URL, with many versions per page; hundreds of terabytes of satellite image data; or statistics and preferences of hundreds of millions of users. It is also designed to be able to handle thousands of queries a second.

To the user, an instance of Bigtable appears as a large spreadsheet: a table that is indexed by a row key, column name, and timestamp. Each cell can store multiple timestamped versions of data. If an application does not specify a timestamp, it will get the latest version. Alternatively, it can specify a timestamp and get the latest version that is no newer than that timestamp. All operations on rows are atomic but only on one row at a time. That is, a client cannot grab a lock for a set of rows in order to make changes.

Columns in the table are organized into column families. A column family is a related group of columns (e.g., “outgoing links” could be a column family). Each column family has a unique name and contains within it a list of named columns. A Bigtable instance will typically have a small number of column families (perhaps a few hundred at most) but each column family may have a huge number (perhaps millions) of columns within it. Bigtable tables are generally sparse. If a cell (a column in a row) is empty, it will not use up any space. Column families are configured as part of the table and are common to all rows. Columns within a column family may be specific to a row and can be added dynamically. An example of columns within a column family is a list of all the websites you visited (i.e., many thousands of columns, with a new one added when you visit a new website). The list of columns used between one row and another may be wildly different (which is what makes tables typically sparse). Each cell may contain multiple versions of data. Timestamped versioning is configured on a per-column-family basis.

A Bigtable table starts off as a single table. As rows are added, the table is always kept sorted by row key. As the table grows, it may split along row boundaries into sub-tables, called tablets. Tablet servers are responsible for serving data for tablets. If a tablet gets too many queries, Bigtable can balance the workload by splitting the tablet and moving one of the new tablets to a different server.

Bigtable comprises a client library (linked with the user’s code), a master server that coordinates activity, and many tablet servers. Each tablet server is responsible for managing multiple tablets. Depending on the size of the tablets, it may manage from tens to thousands of tablets. Tablet servers can be added or removed dynamically. Tablet data is stored within GFS (Google File System) and tablet servers run on the same machines as GFS chunkservers. A tablet server handles read/write requests for its tablets and splits tablets when they grow large.

New data to Bigtable is stored on the tablet server in a memory structure called a memtable. Once the size of the memtable gets larger than some threshold value, it is frozen and converted to an SSTable. The SSTable (sorted string table) is the file format used to store Bigtable data, managing the mapping of keys to their values.

The master assigns tablets to tablet servers, balances tablet server load, and handles schema changes (table and column family creations). It tries to run the tablet server on the same GFS chunkserver that holds data for that tablet. The master is also responsible for garbage collection of files in GFS and managing schema changes (table and column family creation).

In Bigtable, Chubby is used to ensure there is only one active master, store the bootstrap location of Bigtable data, discover tablet servers, store Bigtable schema information, and store access control lists. Chubby stores the location of the root tablet, which is the top level of the three-layer-deep balanced tree. The top two layers of the tree are metadata tablets, which do not contain table data but only information to locate the tablet that is responsible for a particular key.

Bigtable relies on GFS for protecting its data from loss but can be configured for replication to multiple Bigtable clusters in different data centers to ensure availability. Data propagation is asynchronous and results in an eventually consistent model.

Cassandra NoSQL database

Goal: Create a distributed NoSQL database system designed for high availability, fault tolerance, and efficient storage and retrieval of large volumes of data across many commodity servers.

Cassandra is a distributed NoSQL database system that is designed to handle large volumes of data across many commodity servers while providing high availability, fault tolerance, and low latency. Cassandra’s architecture is based on the concept of a wide-column data store and distributed hash tables.

In a wide-column database (also known as a column-family or columnar database), data is organized into tables with columns that can vary in number and type. This allows for flexible schema design and efficient storage and retrieval of data. Cassandra uses the Cassandra Query Language (CQL) to interact with data stored in tables.

Cassandra’s architecture is based on a peer-to-peer distributed system that uses a ring-based architecture to distribute data across nodes in a cluster. Each node in the cluster is responsible for storing a portion of the data, and nodes work together to provide a highly available and fault-tolerant system.

Cassandra uses a Chord-style distributed hash table (DHT) to manage the distribution of data across the cluster. A DHT is a distributed data structure that maps keys to values, allowing nodes to efficiently locate data stored on other nodes in the cluster. Cassandra’s DHT is called the partitioner, and it uses consistent hashing to evenly distribute data across nodes in the cluster. This ensures that each node is responsible for a balanced and manageable portion of the data.

A row within a table is indexed by a primary key. The primary key in Cassandra may be composed of a combination of a partition key and one or more clustering keys (columns). The partition key determines which partition the data belongs to. This value is hashed and used by the DHT to locate the node responsible for that partition.

A clustering key is used to define the sequence of data within a partition. The clustering key is crucial for sorting and organizing data inside a partition, making it easier to retrieve and query data efficiently. When multiple clustering columns are used, they are combined in the order they are defined, creating a hierarchical structure to sort the data.

Cassandra’s replication feature provides fault tolerance by replicating data across multiple nodes in the cluster. Each data item in Cassandra is replicated across a configurable number of nodes, known as the replication factor. This ensures that if a node fails, the data can still be accessed from other nodes in the cluster.

Cassandra also provides tunable consistency, which allows developers to balance data consistency and availability according to their needs. Consistency can be tuned on a per-query basis, allowing developers to prioritize consistency for critical data while relaxing consistency requirements for less critical data.

Cassandra’s architecture also includes a distributed architecture for reads and writes. When a write request is sent to Cassandra, it is first written to a commit log on the node that received the request. The data is then written to an in-memory data structure called a memtable, which is periodically flushed to disk as a new SSTable file. Reads in Cassandra are typically served from the memtable, with the SSTable files being used to back up the data.


Goal: Design a huge-scale worldwide database that provides ACID semantics, read-free locking, and external consistency.


Brewer’s CAP theorem led to the popularity of an eventual consistency model rather than relying on transactional ACID semantics.

In this model, writes propagate through the system so that all replicated copies of data will eventually be consistent. Prior to the completion of all updates, processes may access old versions of the data if they are reading from a replica that has not been updated yet.

With ACID semantics, this would not happen since the transaction would grab a lock on all the replicas prior to updating them. Eventual consistency is the trade-off in designing a system that is both highly available and can survive partitioning.

The presence of network partitions is, in many environments, a rare event. By using an eventual consistency model, we choose to give up on “C” (consistency) in order to gain availability on the slim chance that the network becomes partitioned.

Given that partitions are rare in places such as Google data centers, which uses redundant networks both within and outside each center, it is not unreasonable to enforce a strong consistency model (which requires mutual exclusion via locks).

Partitioning will rarely affect the entire system but only a subset of computers. As such, it is possible to use a majority consensus algorithm, such as Paxos, to continue operations as long as the majority of replicas are functioning. Spanner takes advantage of an environment where partitions are rare and is willing to be partially unavailable in that rare event when a network becomes partitioned.

Experience with eventual consistency has taught us that it places a greater burden on the programmer. With an eventual consistency model, it is now up to the programmer to reconcile the possibility that some data that is being accessed may be stale while other data might be current. Bigtable, for example, was difficult to use in applications that required strong consistency. Custom code had to be written to coordinate locks for any changes that spanned multiple rows.

With Spanner, Google built a transactional (ACID) database that spans many systems and widely distributed data centers. Spanner provides the user with a large-scale database that comprises multiple tables, with each table containing multiple rows and columns. The model is semi-relational: each table is restricted to having a single primary key. Unlike Bigtable, transactions can span multiple rows and across multiple tables. Also unlike Bigtable, which provided eventually consistent replication, Spanner offers synchronous replication, ACID semantics, and lock-free reads.

Data storage

Spanner gives the user the the ability to manage multiple tables, each with rows and columns. Internally, the data is stored as a large keyspace that is sharded across multiple servers. Like Bigtable, each shard stores a group of consecutive of rows, called a tablet. This sharding is invisible to applications.

Tablets are replicated synchronously using Paxos. One of the replicas is elected to be a leader and runs a transaction manager. Any transactions that span multiple shards use the two-phase commit protocol. Replication is performed within a transaction and all replicas remain locked until replication is complete, ensuring a consistent view of the database.

Applications can specify geographic data placement and the amount of replication:

  • Proximity of data to users (impacts read latency)

  • Proximity of replicas to each other (impacts write latency)

  • Amount of replication (impacts availability and read performance)

Spanner was designed for a global scale and a database will span multiple data centers around the globe. In a data center, spanservers store tablets. A zonemaster periodically rebalances tablets across servers to balance load. The zonemaster does not participate in transactions.


Spanner provides transactions with ACID semantics. Transactions are serialized to satisfy the “I” (isolated) property and to create the illusion that one transaction happens after another. Strict two-phase locking is used to accomplish this.

Transactions can be distributed among multiple systems (which may span multiple datacenters). Each transaction is assigned a globally-meaningful timestamp and these timestamps reflect the serialization order of the transactions. Once a transaction has acquired all the locks it needs, it does its work and then picks a commit timestamp. Two-phase locking can reduce overall perfomance because other transactions may need to wait for locks to be released before they can access resources. Spanner uses separate read and write locks to achieve greater concurrency but even these can often block another transaction. A read lock will cause any writers to block and a write lock will cause any other writers or readers to block.

Spanner also offers lock-free reads. For this, spanner implements a form of multiversion concurrency control by storing multiple timestamped versions of data. A transaction can read data from a snapshot, an earlier point in time, without getting a lock. This is particularly useful for long-running transactions that read that many rows of the database. Any other ongoing transactions that modify that data will not affect the data that the long-running transaction reads since those will be later versions.

External consistency

Serialization (isolation, the I in ACID) simply requires that transactions behave as if they executed in some serial order. Spanner implements a stronger consistency model, external consistency, which means that the order of transactions reflects their true time order. Specifically, if a transaction T1 commits before a transaction T2 starts, based on physical (also called “wall clock”) time, then the serial ordering of commits should reflect that and T1 must get a smaller timestamp than T2. Spanner does this by using physical timestamps.

It is not possible to get a completely accurate real-world timestamp. Even if we had an accurate time source, there would be synchronization delays that would add a level of uncertainty. The problem is compounded by having the database span data centers around the globe. Spanner tries to minimize the uncertainty in getting a timestamp and make it explicit.

Every datacenter where spanner is used is equipped with one or more highly accurate time servers, called time masters (a combination of a GPS receivers and atomic clocks is used). The time master ensures that the uncertainty of a timestamp can be strongly bounded regardless of where a server resides. Each spanserver has access to a TrueTime API that provides a narrow time interval that contains the current time, ranging from TT.now().earliest up to TT.now().latest. The earliest time is guaranteed to be in the past and the latest is guaranteed to be a timestamp in the future when the function was called. These values would typically be only milliseconds apart. Spanner can now use these timestamps to ensure that transactions satisfy the demands of external consistency.

Implementing external consistency

The key to providing external consistency is to ensure that any data committed by the transaction will not be visible until after the transaction’s timestamp. This means that even other systems that have a different clock should still see a wall clock time that is later than the transaction’s timestamp. To do this, spanner waits out any uncertainty. Before a transaction commits, it acquires a commit timestamp:

t = TT.now().latest

This is the latest possible value of the true time across all servers in the system. It then makes sure that no locks will be released until that time is definitely in the past. This means waiting until the earliest possible current time on any system is greater than the transaction timestamp:

TT.now().earliest > t

This wait is called a commit wait and ensures that any newer transaction that grabs any of the same resources will definitely get a later timestamp.

Note that there is no issue if multiple transactions have the same timestamp. Timestamps are strictly used for versioning to ensure that we provide a consistent view of the database while supporting lock-free reads. Consider the case of a transaction that needs to read hundreds of millions of rows of data. Many transactions may modify the database since you start your work but the view of the database will be consistent since all data read will be no later than a specified timestamp.


By making timestamp uncertainty explicit, Spanner could implement a commit wait operation that can wait out the inaccuracy of a timestamp and provide external consistency along with full ACID semantics. Coupled with storing multiple versions of data, Spanner provides lock-free reads.

Spanner’s design was a conscious decision to not sacrifice the strong ACID semantics of a database. Programming without ACID requires a lot of extra thinking about the side effects of eventual consistency and careful programming if one wants to avoid it. Strict two-phase locking and a two-phase commit protocol are implemented within Spanner so programmers do not have to reinvent it.


Goal: Create a massively parallel software framework to make it easy to program classes of problems that use big data that can be parsed into (key, value) pairs. Each unique key is then processed with a list of all values that were found for it.

MapReduce is a framework for parallel computing. Programmers get a simple API and do not have to deal with issues of parallelization, remote execution, data distribution, load balancing, or fault tolerance. The framework makes it easy for one to use thousands of processors to process huge amounts of data (e.g., terabytes and petabytes). It is designed for problems that lend themselves to a map-reduce technique, which we will discuss. From a user’s perspective, there are two basic operations in MapReduce: Map and Reduce.

The Map function reads data and parses it into intermediate (key, value) pairs. When that is complete and all (key, value) sets are generated, the Reduce function is called once for each unique key that was generated by Map and is given that key along with a list of all values that were generated for that key as parameters. The keys are presented in sorted order.

The MapReduce client library creates a master process and many worker processes on the available, presumably large, set of machines. The master serves as a coordinator and is responsible for assigning roles to workers, keeping track of progress, and detecting failed workers. The set of inputs is broken up into chunks called shards. The master assigns map tasks to idle workers and gives each of them a unique shard to work on. The map worker invokes a user’s map function, which reads and parses the data in the shard and emits intermediate (key, value) results.

The intermediate (key, value) data is partitioned based on the key according to a partitioning function that determines which of R reduce workers will work on that key and its associated data. The default function is simply hash(key) mod R that [usually] distributes keys uniformly among R reduce workers but a user can replace it with a custom function. These partitioned results are stored in intermediate files on the map worker, grouped by key. All map workers must use the same partitioning function since the same set of keys must go to the same reduce workers. Map workers work in parallel and there is no need for any one to communicate with another. When all map workers are finished processing their inputs and generated their (key, value) data, they inform the master that they are complete.

We now need to get all values that share the same key to a single reduce worker. The process of moving the data to reduce workers and aggregating it by keys is called shuffling.

The master dispatches reduce workers. Each reduce worker contacts all the map worker nodes with remote procedure calls to get the set of (key, value) data that was targeted for them. The combined data for each key is then merged to create a single sorted list of values for each key. The combined process of getting the data and sorting it my key is called shuffle and sort.

The user’s reduce function gets called once for each unique key. The reduce function is passed the key and the list of all values associated with that key. This reduce function writes its output to an output file, which the client can read once the MapReduce operation is complete.

The master periodically pings each worker for liveness. If no response is received within a time limit, then the master reschedules and restarts that worker’s task onto another worker.

Bulk Synchronous Parallel & Pregel

Goal: Create a software framework for fault tolerant, deadlock-free parallel processing. Then, adapt that to create an software framework makes it easy to operate on graphs on a massive scale.

Bulk Synchronous Parallel (BSP)


Bulk Synchronous Parallel (BSP) is a programming model and computation framework for parallel computing. Computation is divided into a sequence of supersteps. In each superstep, a collection of processes executes concurrently and creates messages that are sent to other processes. The superstep ends when all the computation in the superstep is complete and all messages are sent. A barrier synchronization at the end of the superstep ensures that all messages have been transmitted (but not yet delivered to the processes). The next superstep begins with the delivery of all those messages to the processes, who then execute their superstep and send messages that will be delivered at the start of the next superstep. This process continues until all processors vote to halt.

Note that no process is permitted to send and receive messages from with another process during a superstep. Any sent messages will be delivered only at the start of the next superstep. This restriction ensures deadlock-free execution.

A popular implementation of the BSP framework is Apache Hama.


What’s a graph?

A graph is a set of vertices connected by edges. Edges may be directed from one vertex to another or bidirectional. In computing, a vertex is represented by an object and a directed edge is a link to another object.

Graphs are all around us

They represent computer networks, social groups, roads, disease outbreaks, phone call connections, Twitter followers, Facebook friends, web page links, etc. Some of these graphs have an enormous scale. The world wide web has billions of pages and Facebook has around a billion users.

What’s wrong with MapReduce?

Nothing is wrong with MapReduce. It’s great for many problems. However, many graph traversal problems, when written for MapReduce, end up having to take multiple iterations of MapReduce, with the output of one iteration feeding the input of another. This is not only slow, but it is also inefficient as data has to be written to files at the end of every map and reduce operation and moved between machines. The entire state of the graph needs to be transmitted in each stage, which requires a lot more communication overhead than Pregel, where the vertices and edges remain on the machine that performs the computation. Moreover, MapReduce is not the most direct way of thinking about and logically solving many problems.

Introducing Pregel

Pregel is a software framework created by Google to make it easy to work on huge graphs (e.g., ones with billions of vertices) that span many machines (e.g., tens of thousands). Like MapReduce, the framework relieves the programmer from having to deal with assigning tasks to processors, monitoring jobs, handling failures, and managing communications.

The Pregel framework allows you to write “vertex-centric” code. That is, the same user code, a compute() function, is run concurrently on each vertex of the graph. Each instance of this function keeps track of information, can iterate over outgoing edges (each of which has a value), and can send messages to the vertices connected to those edges or to any other vertices it may know about (e.g., having received a vertex ID via a message).

When a function does not have any more work to do, it votes to halt. This puts the corresponding vertex in an inactive state. When all vertices are in an inactive state, the framework terminates. However, if a vertex’s compute function sends a message to an inactive vertex, that vertex will be made active at the next superstep.

Pregel is an adaptation of the Bulk Synchronous Parallel (BSP) model that is specifically designed with graph processing in mind. Like BSP, Pregel executes in supersteps. Each superstep consists of computation followed by message sending. All messages are synchronized with a barrier, which marks the end of the superstep. At the next superstep, the messages from the previous superstep are delivered to, and available at, the compute function. The downside of using BSP directly for graph processing is that significant additional work would be needed to define, propagate, and maintain the topology (connections) of a graph and map vertices to compute nodes.

Advanced APIs

Since there is overhead in sending messages to vertices, particularly when they are on other machines, Pregel supports the optional use of combiners. A combiner is a user-defined function that is applied to a bunch of messages all targeted to the same vertex. The Combine method processes the values and creates a single input to that vertex. For example, if the goal is to take the sum of a set of values or to choose data from the highest-numbered vertex, the combiner can merge several messages into one before they are transmitted over the network. This does not alter the compute function since it still has to be prepared to receive multiple messages from multiple sources.

To manage global state, such as overall statistics, total edges of a graph, global flags, or minium or maximum values of a vertex, Pregel allows a user to define an aggregator. An aggregator comines received values into one value and makes that value available to all vertices at the next superstep.

Pregel design

Pregel uses a master-slave architecture. Many copies of the program are started on a cluster of machines. One copy becomes the master and is responsible for coordinating activity rather than processing the graph. Others are workers. The master registers itself with a name server (Chubby). Each worker process contacts the name server to find the master. The master assigns one or more sets of vertices to each worker. By default, the assignment is based on the hash of the vertex ID, so neighboring vertices will not necessarily be assigned to the same worker.

The master assigns chunks of input, which is usually resident in GFS or Bigtable, to each worker. Each input item is a set of vertices and its edges. Workers read this input and create either local messages for the vertices they manage or, if the input record is for a remote vertex, send the message to the worker that owns that vertex. All vertices are initially marked as active. The master then asks each worker to perform a superstep. The worker will run concurrent threads, each of which executes a compute function on its vertex. The compute function consumes input, runs its algorithm, and generates zero or more messages to other vertices. Workers send these messages asynchronously but they will not be delivered to their target functions until the next superstep starts. When a worker is done with its processing for one superstep, it informs the master. It also tells the master how many of the vertices it manages will be in the active state in the next superstep. The master waits for all workers to complete before starting the next superstep. This cycle continues until there are no more vertices in the active state.

Fault tolerance

Pregel uses checkpointing for fault tolerance. The master tells each worker to checkpoint itself every N supersteps. Checkpointing means that each vertex has to save its entire state in stable storage. This state will include vertex values, edge values, incoming messages, and possibly any other state that the algorithm needs to track. A master will periodically send ping messages to workers to see if they are alive. If a master does not hear from a worker within a certain window of time, it assumes that the worker has failed. In this case, the master reassigns the set of vertices to other workers and tells all workers to restart their computation from the superstep at the most recent checkpoint.

A popular implementation of Pregel is Apache Giraph, which has been used by Facebook to analyze its social graph.


Goal: Create a general-purpose high-performance framework for big-data processing.

MapReduce was a powerful framework for parallel computation but it forced a rigid model onto the programmer. Quite often, a computation had to be implemented as a sequence of MapReduce operations. Every map and every reduce operation has to run to completion and write its results to a file before the next sequence of operations can start. Spark creates a highly-flexible framework that lets programmers define their job as a sequence of tranformations and actions on data.

The main client application is called the driver program (or just driver) and is linked with a Spark library. When run, the library creates a SparkContext, which connects to a cluster manager that allocates available worker nodes to run the job. Each worker node runs an executor that contacts the driver for tasks to run. Each executor runs as a process inside a Java Virtual Machine (JVM).

The driver goes through the program, which consists of a sequence of transformations and actions as well as the source data. It creates a directed graph of tasks, identifying how data flows from one transformation to another and ultimately to an action. These tasks are then sent to the executors as jar files for execution. Tasks operate on data and each task is either a transformation or action.


Data in Spark is a collection of Resilient Distributed Datasets (RDDs). RDDs can be created in three ways:

  1. They can be data that is a file or set of files in HDFS, key-value data from an Amazon S3 server (similar to Dynamo), HBase data (Hadoop’s version of Bigtable), or the result of a SQL or Cassandra database query.

  2. They can be streaming data obtained using the Spark Streaming connector. As an example, this could be a stream of data from a set of remote sensors.

  3. An RDD can be the output of a transformation function. This is the way one transformation creates data that another transformation will use. To optimize performance, the RDD created by a transfomation is cached in memory (overflowing to disk if necessary).

RDDs have several key properties:

  • They are immutable. A task can read an RDD and create a new one but cannot modify an existing RDD.

  • They are typed (structured). An RDD has a structure that a task can parse.

  • They are partitioned. Parts of an RDD may be sent to different servers. The default partitioning function is to send a row of data to the server corresponding to hash(key) mod servercount.

  • They are ordered. An RDD contains elements that can be sorted. For example, a list of key-value pairs can be sorted by key. This property is optional.

Transformations and actions

Spark allows two types of operations on RDDs: transformations and actions. Transformations read an RDD and create a new RDD. Example transformations are map, filter, groupByKey, and reduceByKey. Transformations are evaluated lazily, which means they are computed only when some other task needs the RDD that they generate. At that point, the driver schedules the task for execution.

Actions are operations that evaluate and return a value to the driver program. When an action is requested on an RDD object, the necessary transformations are computed and the result is returned to the driver. Example actions are reduce, grab samples, and write to file.

Fault tolerance

For each RDD, the driver tracks the sequence of transformations used to create it. That means an RDD knows which RDDs it is dependent on and which tasks needed to create each one. If any RDD is lost (e.g., a task that creates one died), the driver can ask the task that generated it to recreate it. The driver maintains the entire dependency graph, so this recreation may end up being a chain of transformation tasks going back to the original data.

Content delivery networks

Goal: Provide a highly-scalable infrastructure for caching and serving content from multiple sources.

A content delivery network (CDN) is a set of servers that are usually placed at various points at the edges of the Internet, at various ISPs, to cache content and distribute it to users.

There are several traditional approaches to making a site more scalable and available:

Proxy servers
Organizations can pass web requests through caching proxies. This can help a small set of users but you’re out of luck if you are not in that organization.
Clustering within a datacenter with a load balancer*
Multiple machines within a datacenter can be load-balanced. However, they all fail if the datacenter loses power or internet connectivity.
Machines can be connected with links to multiple networks served by multiple ISPs to guard against ISP failure. However, protocols that drive dynamic routing of IP packets (BGP) are often not quick enough to find new routes, resulting in service disruption.
Mirroring at multiple sites
The data can be served at multiple sites, with each machine’s content synchronized with the others. However, synchronization can be difficult.

All these solutions require additional capital costs. You’re building the capability to handle excess capacity and improved availability even if the traffic never comes and the faults never happen.

By serving content that is replicated on a collection of servers, traffic from the main (master) server is reduced. Because some of the caching servers are likely to be closer to the requesting users, network latency is reduced. Because there are multiple servers, traffic is distributed among them. Because all of the servers are unlikely to be down at the same time, availability is increased. Hence, a CDN can provide highly increased performance, scalability, and availability for content.


We will focus on one CDN: Akamai. The company evolved from MIT research that was focused on “inventing a better way to deliver Internet content.” A key issue was the flash crowd problem: what if your web site becomes really popular all of a sudden? Chances are, your servers and/or ISP will be saturated and a vast number of people will not be able to access your content. This became known as the slashdot effect.

In late 2022, Akamai ran on 345,000 servers in over 1,300 networks around the world. Around 2015, Akamai served between 15 and 30 percent of all web traffic. Even as several other large-scale content delivery networks emerged and companies such as Google, Amazon, and Apple built their own distribution infrastructure, Akamai still served a peak rate of 250 Tbps of traffic in 2022.

Akamai tries to serve clients from nearest, available servers that are likely to have requested content. According to the company’s statistics, 85% percent of the world’s Internet users are within a single network hop of an Akamai CDN server.

To access a web site, the user’s computer first looks up a domain name via DNS. A mapping system locates the caching server that can serve the content. Akamai deploys custom dynamic DNS servers. Customers who use Akamai’s services register their domains to have their names resolved by these servers. An Akamai DNS server uses the requestor’s address to find the nearest edge server that is likely to hold the cached content for the requested site.

When an Akamai DNS server gets a request to resolve a host name, it chooses the IP address to return based on:

  • domain name being requested

  • server health

  • server load

  • user location

  • network status

  • load balancing

Akamai can also perform load shedding on specific content servers; if servers get too loaded, the DNS server will not respond with those addresses.

Now the user can set up a connection to an Akamai caching server in a cluster of servers in a region at the network edge close to the user and send requests for content. If the content isn’t there, the server broadcasts a query to the other edge servers in that region. If the content isn’t cached at any servers within the region, then the server contacts a parent server. The parent server is also a caching server operated by Akamai but not located at the edge. The goal is to avoid accessing the origin server (the server at the company that hosts the content) unless necessary. If the content isn’t at the parent, the parent queries the other parent servers before giving up and forwarding the request to the origin server via its transport system.

To send data efficiently, Akamai manages an overlay network: the collection of its thousands of servers and statistics about their availability and connectivity. Akamai generates its own map of overall IP network topology based on BGP (Border Gateway Protocol) and traceroute data from various points on the network. It factors in latency and quality of service into its decisions and may choose to route requests to hop through its own servers via always-on TCP links rather than rely on Internet routing, which usually prioritizes the number of hops and cost over speed and quality.

Content servers report their load to a monitoring application. The monitoring application publishes load reports to a local Akamai DNS server, which then determines which IP addresses to return when resolving names.

CDN benefits

A CDN, serving as a caching overlay, provides five distinct benefits:

  1. Caching: static content can be served from caches, thus reducing the load on origin servers.

  2. Routing: by measuring latency, packet loss, and bandwidth throughout its collection of servers, The CDN can find the best route to an origin server, even if that requires forwarding the request through several of its servers instead of relying on IP routers to make the decision.

  3. Security: Because all requests go to the CDN, which can handle a high capacity of requests, it absorbs any Distributed Denial-of-Service attacks (DDoS) rather than overwhelming the origin server. Moreover, any penetration attacks target the machines in the CDN rather than the origin servers. Companies (or a group within a company) that runs the CDN service is singularly focused on the IT infrastructure of the service and is likely to have the expertise to ensure that systems are properly maintained, updated, and are running the best security software. These aspects may be neglected by companies that need to focus on other services.

  4. Analytics: CDNs monitor virtually every aspect of their operation, providing their customers with detailed reports on the quality of service, network latency, and information on where the clients are located.

  5. Cost: CDNs cost money, which is a disadvantage. However, CDNs are an elastic service, which is one that can adapt to changing workloads by using more or fewer computing and storage resources. CDNs provide the ability to scale instantly to surges of demand practically anywhere in the world. They absorb all the traffic so you don’t have to scale up.


Video is the most bandwidth-intensiver service on the Internet and has been a huge source of growth for CDNs. As an example, Netflix operates a global CDN called OpenConnect, which contains up to 80 percent of its entire media catalog. Stored (e.g., on-demand) video is, in some ways, just like any other content that can be distributed and cached. The difference is that, for use cases that require video streaming rather than downloading (i.e., video on demand services), the video stream may be reprocessed, or transcoded, to lower bitrates to support smaller devices or lower-bandwidth connections.

Live video cannot be cached. We have seen the benefits of the fanout that CDNs offer by having many servers at the edge; a video stream can be replicated onto multiple edge servers so it doesn’t have to be sourced from the origin. CDNs can also help with progressive downloads, which is there the user can start watching content while it is still being downloaded.

Today, HTTP Live Streaming, or HLS, is the most popular protocol for streaming video. It allows the use of standard HTTP to deliver content and uses a technique called Adaptive Bitrate Coding (ABR) to deliver video. ABR support playback on various devices in different formats & bitrates.
The CDN takes on the responsibility of taking the video stream and converting it into to a sequence of chunks. Each chunk represents between 2 and 10 seconds of video. The CDN then encodes each chunk at various bitrates, with can affect the quality & resolution of that chunk of video. For content delivery, the HLS protocol uses feedback from the user’s client to select the optimal chunk. It revises this constantly throughout playback since network conditions can change.

Kafka publish-subscribe messaging

Goal: Create a high-performance, highly-scalable, reliable message delivery framework.

Kafka messaging is a distributed, fault-tolerant, and high-throughput messaging system that was developed by LinkedIn. It is an open-source project and is widely used in various industries to process and handle large volumes of data in real-time. Kafka is designed to handle data in motion, allowing it to be used in stream processing applications.

Kafka’s architecture is based on the publish-subscribe model. Producers publish messages to Kafka topics, and consumers subscribe to those topics to receive the messages. Kafka brokers act as intermediaries between producers and consumers, storing and distributing messages across the cluster. Brokers can run on a single node or across multiple nodes in a cluster, providing high availability and fault tolerance.

Messages are associated with a category, called a topic. A topic is a way to separate messages based on their content. Each message in Kafka is written to a specific topic, and each topic consists of one or more partitions.

A partition is an ordered, immutable sequence of messages that can be stored on a different Kafka broker in a Kafka cluster (a collection of Kafka servers). The number of partitions for a topic is determined when the topic is created, and can be increased or decreased later.

Partitions allow Kafka to scale horizontally by distributing the data across multiple brokers. Each partition is also replicated across multiple brokers, providing fault tolerance and ensuring that the data is available even if a broker fails. Kafka uses a partitioning scheme to determine which partition a message belongs to based on the key or a custom partitioner. Partitions can also be used to parallelize processing, allowing multiple consumers to process messages from the same topic simultaneously.

Kafka’s replication feature provides fault tolerance by replicating each partition across multiple brokers. When a broker fails, its partitions can be automatically reassigned to other brokers in the cluster, ensuring that the data remains available and that the cluster continues to function normally.

Kafka can be used in queuing or publish-subscribe models. In the queuing model, messages are sent to a single consumer in a first-in-first-out (FIFO) order. In this model, consumers compete for messages in a topic, and each message is only consumed by one consumer. In the publish-subscribe model, messages are broadcast to all subscribed consumers. In this model, each message is consumed by all subscribed consumers.

Kafka’s publish-subscribe model allows fors flexibility in processing data. Consumers can subscribe to multiple topics, allowing them to process related data from multiple sources. Additionally, Kafka’s ability to partition data allows for parallel processing, allowing multiple consumers to process data from the same topic simultaneously.

Kafka’s queuing model is useful when you need to ensure that each message is only consumed by one consumer. For example, a queue might be used to process orders in an e-commerce system, ensuring that each order is only processed once. Kafka’s queuing model can also be used for tasks like load balancing, where tasks are distributed among multiple consumers.

memory caching

Goal: Create a simple key-value store for memory-based caching, used store things such as frequently-accessed database queries or web pages generated from semi-dynamic content.


Memcached is a popular open-source distributed memory caching system that is used to speed up dynamic web applications by storing frequently used data in memory. It is designed to reduce the load on servers such as dataabses and application servers by caching frequently accessed data in memory, which allows for faster access times and improved application performance. Memcached is widely used by many high-traffic websites, including Facebook, Twitter, and Wikipedia.

Memcached uses a client-server architecture, where the client sends requests to a memcached server to store or retrieve data. Client libraries are available in many programming languages, including PHP, Python, Ruby, and Java. Memcached servers can be deployed across multiple machines to handle large amounts of data, providing scalability and high availability. However, it is up to client-side logic to decide which server to contact.

The architecture of Memcached is based on a key-value storage model, where data is stored as key-value pairs in memory. The keys are unique identifiers for the data. Memcached does not support data types. Data associated with a key is stored as a string. When a client sends a request to store or retrieve data, it includes the key that identifies the data.

As the amount of data stored in the cache increases, it becomes necessary to evict some data from the cache to make room for new data. Memcached uses a process called cache eviction to remove data from its cache and employs a least recently used (LRU) algorithm for cache eviction. This algorithm works by tracking the last time a given item in the cache was accessed. When the cache reaches its memory limit and needs to evict some data, it identifies the least recently used item and removes it from the cache to make room for new data.

Memcached servers are designed to be stateless, which means that each request is handled independently and the server does not maintain any session state. This allows for easy scalability and high availability, as requests can be load balanced across multiple servers without any issues. Additionally, the servers do not have any knowledge of the data stored on other servers, which allows for easy addition or removal of servers without any impact on the overall system.


Redis is another popular open-source, in-memory data store that is used for fast, scalable, and flexible data storage and retrieval. It is often used as a cache, message broker, and database. Redis stands for Remote Dictionary Server and is designed to be fast and efficient by storing data in memory instead of on disk, which allows for fast access times.

Redis is a key-value data store, which means that data is stored in a map of key-value pairs. Redis supports a wide variety of data structures, including strings, lists, sets, hashes, and sorted sets. It provides a number of data manipulation commands to perform operations on these data structures, such as set, get, push, pop, and increment.

Redis supports advanced features such as transactions, publish-subscribe messaging, and Lua scripting. Transactions allow multiple commands to be executed as a single atomic operation, ensuring data consistency. Publish-subscribe (pub/sub) messaging allows for asynchronous communication between clients, while Lua scripting allows for complex data processing and manipulation on the server.

Redis can be used in a variety of ways, including as a cache for frequently accessed data, as a simple message broker for communication between different parts of an application, and as a simple database for storing and retrieving data. It can be used as a standalone server or as part of a cluster, providing scalability and high availability.

When multiple Redis servers are deployed to form a Redis cluster, two main types of partitioning are supported: range-based partitioning and hash-based partitioning. Partitioning simply refers to having the client figure out which of several servers is responsible for storing data about a specific key.

Range-based partitioning involves partitioning the keyspace into multiple ranges based on the key values. Each Redis instance is responsible for a specific range of keys. The client is responsible for determining which Redis instance is responsible for a given key and sending requests to that instance. This strategy works well when the keyspace can be naturally partitioned into ranges, such as when the keys are sorted or have a natural order.

Hash-based partitioning involves partitioning the keyspace using a hash function to assign keys to Redis instances. Each Redis instance is responsible for a subset of the keys based on the hash value of the key. The client is responsible for calculating the hash value of the key and sending requests to the appropriate Redis instance. This strategy works well when the keys do not have a natural order or when the keyspace is not evenly distributed.

Redis provides several different cache eviction policies, which can be configured on a per-key basis. These policies include:

  1. LRU (Least Recently Used): Redis keeps track of the last time each key was accessed, and evicts the least recently used keys when it runs out of memory.

  2. LFU (Least Frequently Used): Redis keeps track of the frequency of access to each key, and evicts the least frequently used keys when it runs out of memory.

  3. Random: Redis selects random keys to evict when it runs out of memory.

  4. TTL (Time To Live): Redis allows keys to be set with an expiration time, and automatically removes them from the cache when they expire.

Redis provides client libraries for many programming languages, including Python, Java, Ruby, and JavaScript. It also has a command-line interface and a web-based management interface that allows for easy monitoring and management of the Redis server.

Last modified April 22, 2023.
recycled pixels