Bulk Synchronous Parallel and Pregel
Large-scale graph processing
Paul Krzyzanowski
November 15, 2022
Goal: Create a software framework for fault-tolerant, deadlock-free parallel processing. Then, adapt that to create a software framework that makes it easy to operate on graphs on a massive scale.
Bulk Synchronous Parallel (BSP)
Bulk Synchronous Parallel (BSP) is a parallel computing model used for designing and implementing parallel algorithms. It was introduced to provide a more structured approach to parallel computing.
Computation is divided into a sequence of supersteps. In each superstep, a set of processes, running the same code, executes concurrently and creates messages that are sent to other processes. The superstep ends when all the computation in the superstep is complete and all messages have been sent. A barrier synchronization at the end of the superstep ensures that all messages have been transmitted (but not yet delivered to the processes). The next superstep begins with the delivery of all those messages to the processes, that then execute their superstep and send messages that will be delivered at the start of the next superstep. This process continues until all processors vote to halt.
Note that no process is permitted to send and receive messages from or to another process during a superstep. Any sent messages will be delivered only at the start of the next superstep. This restriction ensures deadlock-free execution.
Advantages of BSP
Structured Parallel Programming: BSP offers a structured way to develop parallel algorithms. It divides the computation into a series of supersteps, each consisting of a computation phase followed by a communication phase, and then a barrier synchronization. This structure makes it easier to design and understand parallel algorithms.
Ease of Reasoning and Predictability: The clear structure of BSP (computation, communication, synchronization) simplifies reasoning about the performance and behavior of parallel algorithms. Programmers can more easily predict and control the performance of their applications. Because all communications take place between supersteps, there is no possibility of deadlock, a situation that might otherwise occur if a process is not sending a message because it is busy waiting to receive some message.
Scalability: BSP is designed to scale well with the number of processors. Its model can be effectively used on a wide range of parallel architectures, from small multi-core processors to large distributed systems.
Performance Modeling: The BSP model allows for the development of accurate performance models. These models help in predicting the execution time of parallel algorithms, enabling better optimization and resource allocation.
Handling Non-Uniform Workloads: The synchronization step in BSP can help in managing workloads that are not uniformly distributed among processors. This synchronization ensures that all processors are in sync, which can help in balancing the load and improving overall efficiency.
Flexibility in Communication: BSP abstracts the communication layer, allowing programmers to focus on algorithm development without worrying about the underlying communication details. This abstraction also makes BSP algorithms portable across different parallel architectures.
Reduced Overhead: By enforcing synchronization only at the end of each superstep, BSP can potentially reduce the overhead associated with frequent synchronization in other parallel programming models.
Applications of Bulk Synchronous Parallel (BSP)
Bulk Synchronous Parallel (BSP) has been successfully adapted to solve a variety of problems across different domains. Some of these applications include:
Scientific Computing: Used in large-scale computations in physics, chemistry, and biology, where parallel processing can significantly reduce computation time.
Graph Processing: Efficient for processing large-scale graph data, common in social network analysis, web graph processing, and bioinformatics.
Data Processing and Analysis: Employed in big data processing and analytics, including data mining, machine learning algorithms, and statistical data analysis.
Numerical Algorithms: Adapted for parallelizing a wide range of numerical algorithms, including linear algebra operations, matrix computations, and numerical solutions of differential equations.
Image and Signal Processing: Useful in medical imaging or signal processing for parallel processing of large images or signal datasets.
Financial Modeling: Applied to simulations and models in finance, such as risk assessment models, Monte Carlo simulations, and option pricing models.
Climate and Weather Modeling: Suitable for climate and weather simulation models that require processing large volumes of data.
Parallel Sorting and Searching Algorithms: Used to implement parallel versions of sorting and searching algorithms.
Distributed Machine Learning: Used for training models in a distributed manner, especially in parallelizable algorithms like neural networks or clustering algorithms.
BSP’s structured approach to parallel computation makes it a versatile tool for these and other computationally intensive tasks across various domains. A popular implementation of the BSP framework is Apache Hama.
Pregel
What’s a graph?
A graph is a set of vertices connected by edges. Edges are directed from one vertex to another or bidirectional. In computing, a vertex is represented by an object and a directed edge is a link to another object.
Graphs are all around us
They represent computer networks, social groups, roads, disease outbreaks, phone call connections, Twitter followers, Facebook friends, web page links, etc. Some of these graphs have an enormous scale. The World Wide Web has billions of pages and Facebook has over two billion users.
What’s wrong with MapReduce?
Nothing is wrong with MapReduce. It’s great for many problems. However, network analysis problems often require iterative algorithms (like graph traversal algorithms) to analyze relationships and network structures. Implementing these solutions with MapReduce requires running multiple iterations of MapReduce, with the output of one iteration feeding the input of another. This is not only slow, but it is also inefficient as data has to be written to files at the end of every map and reduce operation and moved between machines. The entire state of the graph needs to be transmitted in each stage.
Some data, such as social networks, is inherently graph-structured, meaning the data elements are interconnected and related. MapReduce is more suited for linear, non-relational data processing. It lacks the ability to efficiently process the complex, interconnected graph structures prevalent in social networks. Moreover, implementing graph algorithms in MapReduce can be complex and non-intuitive as one has to think of problems from MapReduce’s key-value mindset.
Introducing Pregel
Pregel is a software framework created by Google to make it easy to work with huge graphs (e.g., ones with billions of vertices) that span many machines (e.g., tens of thousands). Like MapReduce, the framework relieves the programmer from having to deal with assigning tasks to processors, monitoring jobs, handling failures, and managing communications.
The Pregel framework allows you to write “vertex-centric” code. That is, the same user code, a compute() function, is run concurrently on each vertex of the graph. Each instance of this function:
- receives zero or more messages from other vertices
- keeps track of information
- can iterate over outgoing edges (links to vertices, where each edge has a value)
- and can send messages to the vertices connected to those edges or to any other vertices it may know about (e.g., having received a vertex ID via a message).
When a function does not have any more work to do, it votes to halt. This puts the corresponding vertex in an inactive state. When all vertices are in an inactive state, the framework terminates. However, if a vertex’s compute function sends a message to an inactive vertex, that vertex will be made active at the next superstep.
Pregel is an adaptation of the Bulk Synchronous Parallel (BSP) model specifically designed with graph processing in mind. Like BSP, Pregel executes in supersteps. Each superstep consists of computation followed by message sending. All messages are synchronized with a barrier, which marks the end of the superstep. At the next superstep, the messages from the previous superstep are delivered to, and available at, the compute function. The downside of using BSP directly for graph processing is that significant additional work would be needed to define, propagate, and maintain the topology (connections) of a graph and map vertices to compute nodes.
Advanced APIs
Since there is overhead in sending messages to vertices, particularly when they are on other machines, Pregel supports the optional use of combiners. A combiner is a user-defined function that is applied to a bunch of messages all targeted to the same vertex. The Combine method processes the values and creates a single input to that vertex. For example, if the goal is to take the sum of a set of values or to choose data from the highest-numbered vertex, the combiner can merge several messages into one before they are transmitted over the network. This does not alter the compute function since it still has to be prepared to receive multiple messages from multiple sources.
To manage global state, such as overall statistics, total edges of a graph, global flags, or minimum or maximum values of a vertex, Pregel allows a user to define an aggregator. An aggregator combines received values into one value and makes that value available to all vertices at the next superstep.
Pregel design
Pregel uses a master-worker architecture. Many copies of the program are started on a cluster of machines. One copy becomes the master and is responsible for coordinating activity rather than processing the graph. The others are workers. The master registers itself with a name server (Chubby). Each worker process contacts the name server to find the master.
The Pregel framework divides the graph into partitions. Each partition contains a set of vertices and their outgoing edges. A vertex is assigned to a partition based on its ID. By default, this is simply a hash function: hash(vertex_ID) mod N, where N is the number of partitions.
The master assigns one or more partitions to each worker. Each partition will run in a separate thread. Assigning multiple partitions per worker enables better use of the CPU and better load balancing. More powerful machines can be assigned more partitions.
The master assigns chunks of input, which is usually resident in GFS or Bigtable (a large structured table), to each worker. Each input item is a set of vertices and its edges. Workers read this input and create either local messages for the vertices they manage or, if the input record is for a remote vertex, send the message to the worker that owns that vertex.
All vertices are initially marked as active. The master then asks each worker to perform a superstep. To do this, the worker will run concurrent threads, one for each partition. Each thread loops through all the active vertices in that partition, calling the Compute() method for the vertex. The compute function consumes input, runs its algorithm, and generates zero or more messages to other vertices. Workers send these messages asynchronously but they will not be delivered to their target functions until the next superstep starts. When a worker is done with its processing for one superstep, it informs the master. It also tells the master how many of the vertices it manages will be in the active state in the next superstep. The master waits for all workers to complete before starting the next superstep. This cycle continues until there are no more vertices in the active state.
Note that the programmer does not need to be aware of any of this – the framework takes care of starting the workers and tracking their execution. Programmers only need to know that their Compute() method will be invoked once for each vertex at each superstep.
Fault tolerance
Pregel uses checkpointing for fault tolerance. The master tells each worker to checkpoint itself every N supersteps. Checkpointing means that each vertex must save its entire state in stable storage at the start of the superstep. This state will include vertex values, edge values, incoming messages, and possibly any other state that the algorithm needs to track. A master will periodically send ping messages to workers to see if they are alive. If a master does not hear from a worker within a certain window of time, it assumes that the worker has failed. In this case, the master reassigns the set of vertices to other workers and tells all workers to restart their computation from the superstep at the most recent checkpoint.
Pregel in the open source world
A popular implementation of Pregel is Apache Giraph. This was initially created at Yahoo before being given to the Apache projected. Both LinkedIn and Facebook have been heavy users of the software and made significant contributions to its implementation.
Giraph has been used by Facebook to analyze its graph of social connections among its user community. Facebook improved the platform to allow Giraph to handle over a billion users, hundreds of billions of social connections, and a trillion edges.
The input data for Giraph is usually stored in HBase (the Apache project version of BigTable), HDFS (the Apache project version of GFS), or Hive. Hive is a layer of software on top of HDFS or HBase that adds a metastore, which applies a table structure to large sets of unstructured data. It makes accessing HDFS or HBase data comparable to a database that contains multiple tables and columns. A user can make SQL-like queries on this data, which the Hive driver translates into MapReduce jobs.