Apache 2.0 Spark with Scala
  • Introduction
  • Introduction
    • Introduction
  • Section 1: Getting Started
    • 1. Warning about Java 9 and Spark2.3!
    • 2. Introduction, and Getting Set Up
    • 3. [Activity] Create a Histogram of Real Movie Ratings with Spark!
  • Section 2: Scala Crash Course
    • 4. [Activity] Scala Basics, Part 1
    • 5. [Exercise] Scala Basics, Part 2
    • 6. [Exercise] Flow Control in Scala
    • 7. [Exercise] Functions in Scala
    • 8. [Exercise] Data Structures in Scala
  • Section 3: Spark Basics and Simple Examples
    • 9. Introduction to Spark
    • 10. Introducing RDD's
    • 11. Ratings Histogram Walkthrough
    • 12. Spark Internals
    • 13. Key /Value RDD's, and the Average Friends by Age example
    • 14. [Activity] Running the Average Friends by Age Example
    • 15. Filtering RDD's, and the Minimum Temperature by Location Example
    • 16. [Activity] Running the Minimum Temperature Example, and Modifying it for Maximum
    • 17. [Activity] Counting Word Occurences using Flatmap()
    • 18. [Activity] Improving the Word Count Script with Regular Expressions
    • 19. [Activity] Sorting the Word Count Results
    • 20. [Exercise] Find the Total Amount Spent by Customer
    • 21. [Exercise] Check your Results, and Sort Them by Total Amount Spent
    • 22. Check Your Results and Implementation Against Mine
  • Section 4: Advanced Examples of Spark Programs
    • 23. [Activity] Find the Most Popular Movie
    • 24. [Activity] Use Broadcast Variables to Display Movie Names
    • 25. [Activity] Find the Most Popular Superhero in a Social Graph
    • 26. Superhero Degrees of Seperation: Introducing Breadth-First Search
    • 27. Superhero Degrees of Seperation: Accumulators, and Implementing BFS in Spark
    • 28. Superhero Degrees of Seperation: Review the code, and run it!
    • 29. Item-Based Collaborative Filtering in Spark, cache(), and persist()
    • 30. [Activity] Running the Similiar Movies Script using Spark's Cluster Manager
    • 31. [Exercise] Improve the Quality of Similiar Movies
  • Section 5: Running Spark on a Cluster
    • 32. [Activity] Using spark-submit to run Spark driver scripts
    • 33. [Activity] Packaging driver scripts with SBT
    • 34. Introducing Amazon Elastic MapReduce
    • 35. Creating Similar Movies from One Million Ratings on EMR
    • 36. Partitioning
    • 37. Best Practices for Running on a Cluster
    • 38. Troubleshooting, and Managing Dependencies
  • Section 6: SparkSQL, DataFrames, and DataSets
    • 39. Introduction to SparkSQL
    • 40. [Activity] Using SparkSQL
    • 41. [Activity] Using DataFrames and DataSets
    • 42. [Activity] Using DataSets instead of RDD's
  • Section 7: Machine Learning with MLLib
    • 43. Introducing MLLib
    • 44. [Activity] Using MLLib to Produce Movie Recommendations
    • 45. [Activity] Using DataFrames with MLLib
    • 46. [Activity] Using DataFrames with MLLib
  • Section 8: Intro to Spark Streaming
    • 47. Spark Streaming Overview
    • 48. [Activity] Set up a Twitter Developer Account, and Stream Tweets
    • 49. Structured Streaming
  • Section 9: Intro to GraphX
    • 50. GraphX, Pregel, and Breadth-First-Search with Pregel.
    • 51. [Activity] Superhero Degrees of Seperation using GraphX
  • Section 10: You Made It! Where to Go from Here.
    • 52. Learning More, and Career Tips
    • 53. Bonus Lecture: Discounts on my other "Big Data" / Data Science Courses.
Powered by GitBook
On this page
  • Analyzing and transversing graphs
  • GRAPHX
  • Creating Vertex RDD's
  • Creating Edge RDD's
  • Creating A Graph
  • Doing Stuff
  • Breadth-First Search With Pregel
  • How Pregel Works
  • BFS: Initialization
  • Sending Messages
  • Preserving The Minimum Distance At Each Step
  • Putting It All Together
  • Let's Do It
  1. Section 9: Intro to GraphX

50. GraphX, Pregel, and Breadth-First-Search with Pregel.

Analyzing and transversing graphs

Using Spark GraphX

GRAPHX

  • Not that kind of graph...

  • Graphs like oursocial network of superheroes - graphs in the computer science / network sense.

  • However, it's only useful for specific things.

    • It can measure things like "connectedness", degree distribution, average path length, triangle counts - high level measures of a graph

    • It can count triangles in the graph, and apply the PageRank algorithm to it.

    • It can also join graphs together and transform graphs quickly

    • For things like our "degrees of seperation" example, you won't find built-in support. But it does support the Pregel API for transversing a graph...

  • Introduces VertexRDD and EdgeRDD, and the Edge data type

  • Otherwise, GraphX code looks like any other Spark code for the most part

Creating Vertex RDD's

    // Function to extract hero ID -> hero name tuples (or None in case of failure)
    def parseNames(line: String) : Option[(VertexId, String)] = {
        var fields = line.split('\'")
        if (fields.length > 1){
            val heroID: Long = fields(0).trim().toLong
            if (heroID < 6487) { // ID's above 6486 aren't real characters
                return Some(fields(0).trim().toLong, fields(1))
            }
        }

        return None // flatmap will just discard None results, and extract data from Some results.
    }

Creating Edge RDD's

    /** Transform an input line from marvel-graph.txt into a List of Edges */
    def makeEdges(line: String) : List[Edge[Int]] = {
        import scala.collection.mutable.ListBuffer
        var edges = new ListBuffer[Edge[Int]]()
        val fields = line.split(" ")
        val origin = fields(0)
        for (x <- 1 to (fields.length - 1)) {
            // Our attribute field is unused, but in other graphs could
            // be used to deep track of physical distances etc.
            edges += Edge(origin.toLong, fields(x).toLong, 0)
        }

        return edges.toList
    }

Creating A Graph

    // Build up our vertices
    val names = sc.textFile("../marvel-names.txt")
    val verts = names.flatMap(parseNames)

    // Build up our edges
    val lines = sc.textFile("../marvel-graph.txt")
    val edges = lines.flatMap(makeEdges)    

    // Build up our graph, and cache it as we're going to do a bunch of stuff with it.
    val default = "Nobody"
    val graph = Graph(verts, edges, default).cache()

Doing Stuff

  • Top 10 most-connected heroes:

      graph.degrees.join(verts).sortBy(_._2._1, ascending=false).take(10).foreach(println)

Breadth-First Search With Pregel

  • Before, we actually implemented BFS search in Spark the hard and manual way.

  • Now with Pregel, that gives us an general algorithm for actually doing iterations through a graph in a general manner and we can actually implement breadth first search using the Pregel algorithm, in a little bit of a more simple way than we did it before

How Pregel Works

  • Vertices send messages to other vertices (their neighbours)

  • The graph is processed in iterations called supersteps

  • Each superstep does the following:

    • Messages from the previous iteration are received by each vertex

    • Each vertex runs a program to transform itself

    • Each vertex sends messages to other vertices

BFS: Initialization

    val initialGraph = graph.mapVertices(id, _) => if (id == root) 0.0 else Double.PositiveInfinity)

Sending Messages

    triplet => { 
        if (triplet.srcAttr != Double.PositiveInfinity) { 
            Iterator((triplet.dstId, triplet.srcAttr+1)) 
        } else { 
        Iterator.empty 
        } 
    },

Preserving The Minimum Distance At Each Step

  • Pregel's vertex program will preserve the minimum distance between the one it receives and what it has:

    • (id, attr, msg) => math.min(attr, msg)

  • Its reduce operation preserves the minimum distance if multiple messages are received for the same vertex:

    • (a,b) => math.min(a,b)

Putting It All Together

    // Initialize each node with a distance of infinity, unless it's our starting point
    val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity)

    // Now the Pregel magic
    val bfs = initialGraph.pregel(Double.PositiveInfinity, 10)( 
        // Our "vertex program" preserves the shortest distance
        // between an inbound message and its current value.
        // It receives the vertex ID we are operating on,
        // the attribute already stored with the vertex, and
        // the inbound message from this iteration.
        (id, attr, msg) => math.min(attr, msg), 

        // Our "send message" function propagates out to all neighbors
        // with the distance incremented by one.
        triplet => { 
            if (triplet.srcAttr != Double.PositiveInfinity) { 
                Iterator((triplet.dstId, triplet.srcAttr+1)) 
            } else { 
            Iterator.empty 
            } 
        }, 

        // The "reduce" operation preserves the minimum
        // of messages received by a vertex if multiple
        // messages are received by one vertex
        (a,b) => math.min(a,b) )
    }

Let's Do It

Previous49. Structured StreamingNext51. [Activity] Superhero Degrees of Seperation using GraphX

Last updated 6 years ago