# 50. GraphX, Pregel, and Breadth-First-Search with Pregel.

## Analyzing and transversing graphs

Using Spark GraphX

### GRAPHX

* Not that kind of graph...
* Graphs like oursocial network of superheroes - graphs in the computer science / network sense.
* However, it's only useful for specific things.
  * It can measure things like "connectedness", degree distribution, average path length, triangle counts - high level measures of a graph
  * It can count triangles in the graph, and apply the PageRank algorithm to it.
  * It can also join graphs together and transform graphs quickly
  * For things like our "degrees of seperation" example, you won't find built-in support. But it does support the Pregel API for transversing a graph...
* Introduces VertexRDD and EdgeRDD, and the Edge data type
* Otherwise, GraphX code looks like any other Spark code for the most part

### Creating Vertex RDD's

```
    // Function to extract hero ID -> hero name tuples (or None in case of failure)
    def parseNames(line: String) : Option[(VertexId, String)] = {
        var fields = line.split('\'")
        if (fields.length > 1){
            val heroID: Long = fields(0).trim().toLong
            if (heroID < 6487) { // ID's above 6486 aren't real characters
                return Some(fields(0).trim().toLong, fields(1))
            }
        }

        return None // flatmap will just discard None results, and extract data from Some results.
    }
```

### Creating Edge RDD's

```
    /** Transform an input line from marvel-graph.txt into a List of Edges */
    def makeEdges(line: String) : List[Edge[Int]] = {
        import scala.collection.mutable.ListBuffer
        var edges = new ListBuffer[Edge[Int]]()
        val fields = line.split(" ")
        val origin = fields(0)
        for (x <- 1 to (fields.length - 1)) {
            // Our attribute field is unused, but in other graphs could
            // be used to deep track of physical distances etc.
            edges += Edge(origin.toLong, fields(x).toLong, 0)
        }

        return edges.toList
    }
```

### Creating A Graph

```
    // Build up our vertices
    val names = sc.textFile("../marvel-names.txt")
    val verts = names.flatMap(parseNames)

    // Build up our edges
    val lines = sc.textFile("../marvel-graph.txt")
    val edges = lines.flatMap(makeEdges)    

    // Build up our graph, and cache it as we're going to do a bunch of stuff with it.
    val default = "Nobody"
    val graph = Graph(verts, edges, default).cache()
```

### Doing Stuff

* Top 10 most-connected heroes:

  ```
    graph.degrees.join(verts).sortBy(_._2._1, ascending=false).take(10).foreach(println)
  ```

### Breadth-First Search With Pregel

* Before, we actually implemented BFS search in Spark the hard and manual way.
* Now with Pregel, that gives us an general algorithm for actually doing iterations through a graph in a general manner and we can actually implement breadth first search using the Pregel algorithm, in a little bit of a more simple way than we did it before

### How Pregel Works

* Vertices send messages to other vertices (their neighbours)
* The graph is processed in iterations called supersteps
* Each superstep does the following:
  * Messages from the previous iteration are received by each vertex
  * Each vertex runs a program to transform itself
  * Each vertex sends messages to other vertices

### BFS: Initialization

```
    val initialGraph = graph.mapVertices(id, _) => if (id == root) 0.0 else Double.PositiveInfinity)
```

### Sending Messages

```
    triplet => { 
        if (triplet.srcAttr != Double.PositiveInfinity) { 
            Iterator((triplet.dstId, triplet.srcAttr+1)) 
        } else { 
        Iterator.empty 
        } 
    },
```

### Preserving The Minimum Distance At Each Step

* Pregel's vertex program will preserve the minimum distance between the one it receives and what it has:
  * (id, attr, msg) => math.min(attr, msg)
* Its reduce operation preserves the minimum distance if multiple messages are received for the same vertex:
  * (a,b) => math.min(a,b)

### Putting It All Together

```
    // Initialize each node with a distance of infinity, unless it's our starting point
    val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity)

    // Now the Pregel magic
    val bfs = initialGraph.pregel(Double.PositiveInfinity, 10)( 
        // Our "vertex program" preserves the shortest distance
        // between an inbound message and its current value.
        // It receives the vertex ID we are operating on,
        // the attribute already stored with the vertex, and
        // the inbound message from this iteration.
        (id, attr, msg) => math.min(attr, msg), 

        // Our "send message" function propagates out to all neighbors
        // with the distance incremented by one.
        triplet => { 
            if (triplet.srcAttr != Double.PositiveInfinity) { 
                Iterator((triplet.dstId, triplet.srcAttr+1)) 
            } else { 
            Iterator.empty 
            } 
        }, 

        // The "reduce" operation preserves the minimum
        // of messages received by a vertex if multiple
        // messages are received by one vertex
        (a,b) => math.min(a,b) )
    }
```

### Let's Do It


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://alvintoh.gitbook.io/apache-2-0-spark-with-scala/section-9-intro-to-graphx/50.-graphx-pregel-and-breadth-first-search-with-pregel..md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
