50. GraphX, Pregel, and Breadth-First-Search with Pregel.
Analyzing and transversing graphs
Using Spark GraphX
GRAPHX
Not that kind of graph...
Graphs like oursocial network of superheroes - graphs in the computer science / network sense.
However, it's only useful for specific things.
It can measure things like "connectedness", degree distribution, average path length, triangle counts - high level measures of a graph
It can count triangles in the graph, and apply the PageRank algorithm to it.
It can also join graphs together and transform graphs quickly
For things like our "degrees of seperation" example, you won't find built-in support. But it does support the Pregel API for transversing a graph...
Introduces VertexRDD and EdgeRDD, and the Edge data type
Otherwise, GraphX code looks like any other Spark code for the most part
Creating Vertex RDD's
// Function to extract hero ID -> hero name tuples (or None in case of failure)
def parseNames(line: String) : Option[(VertexId, String)] = {
var fields = line.split('\'")
if (fields.length > 1){
val heroID: Long = fields(0).trim().toLong
if (heroID < 6487) { // ID's above 6486 aren't real characters
return Some(fields(0).trim().toLong, fields(1))
}
}
return None // flatmap will just discard None results, and extract data from Some results.
}
Creating Edge RDD's
/** Transform an input line from marvel-graph.txt into a List of Edges */
def makeEdges(line: String) : List[Edge[Int]] = {
import scala.collection.mutable.ListBuffer
var edges = new ListBuffer[Edge[Int]]()
val fields = line.split(" ")
val origin = fields(0)
for (x <- 1 to (fields.length - 1)) {
// Our attribute field is unused, but in other graphs could
// be used to deep track of physical distances etc.
edges += Edge(origin.toLong, fields(x).toLong, 0)
}
return edges.toList
}
Creating A Graph
// Build up our vertices
val names = sc.textFile("../marvel-names.txt")
val verts = names.flatMap(parseNames)
// Build up our edges
val lines = sc.textFile("../marvel-graph.txt")
val edges = lines.flatMap(makeEdges)
// Build up our graph, and cache it as we're going to do a bunch of stuff with it.
val default = "Nobody"
val graph = Graph(verts, edges, default).cache()
Doing Stuff
Top 10 most-connected heroes:
graph.degrees.join(verts).sortBy(_._2._1, ascending=false).take(10).foreach(println)
Breadth-First Search With Pregel
Before, we actually implemented BFS search in Spark the hard and manual way.
Now with Pregel, that gives us an general algorithm for actually doing iterations through a graph in a general manner and we can actually implement breadth first search using the Pregel algorithm, in a little bit of a more simple way than we did it before
How Pregel Works
Vertices send messages to other vertices (their neighbours)
The graph is processed in iterations called supersteps
Each superstep does the following:
Messages from the previous iteration are received by each vertex
Each vertex runs a program to transform itself
Each vertex sends messages to other vertices
BFS: Initialization
val initialGraph = graph.mapVertices(id, _) => if (id == root) 0.0 else Double.PositiveInfinity)
Sending Messages
triplet => {
if (triplet.srcAttr != Double.PositiveInfinity) {
Iterator((triplet.dstId, triplet.srcAttr+1))
} else {
Iterator.empty
}
},
Preserving The Minimum Distance At Each Step
Pregel's vertex program will preserve the minimum distance between the one it receives and what it has:
(id, attr, msg) => math.min(attr, msg)
Its reduce operation preserves the minimum distance if multiple messages are received for the same vertex:
(a,b) => math.min(a,b)
Putting It All Together
// Initialize each node with a distance of infinity, unless it's our starting point
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity)
// Now the Pregel magic
val bfs = initialGraph.pregel(Double.PositiveInfinity, 10)(
// Our "vertex program" preserves the shortest distance
// between an inbound message and its current value.
// It receives the vertex ID we are operating on,
// the attribute already stored with the vertex, and
// the inbound message from this iteration.
(id, attr, msg) => math.min(attr, msg),
// Our "send message" function propagates out to all neighbors
// with the distance incremented by one.
triplet => {
if (triplet.srcAttr != Double.PositiveInfinity) {
Iterator((triplet.dstId, triplet.srcAttr+1))
} else {
Iterator.empty
}
},
// The "reduce" operation preserves the minimum
// of messages received by a vertex if multiple
// messages are received by one vertex
(a,b) => math.min(a,b) )
}
Let's Do It
Last updated