28. Superhero Degrees of Seperation: Review the code, and run it!

Activity

  • Import DegreesOfSeperation.scala from resource folder into Eclipse-Scala IDE in SparkScala folder

  • Open and take a look at DegreesOfSeperation.scala

  • At the start, we seperate each line in marvel-graph.txt into a BFS node with heroID, default distance and color of gray

Looking at the Code

    val startCharacterID = 5306 // SpiderMan
    val targetCharacterID = 14 // ADAM 3,031 (who?)
  • This is the characters we want to find the seperation between.

    // BFSData contains an array of hero ID connections, the distance, and color.
    var hitCounter:Option[Accumulator[Int]] = None

    //BFSData contains an array of hero ID connections, the distance, and color.
    type BFSData = (Array[Int]), Int, String)

    // A BFSNode has a heroID and the BFSData associated with it.
    type BFS Node = (Int, BFSData)
  • Specifying the BFSData and BFSNode

    def convertToBFS(line: String): BFSNode = {

        // Split up the line into fields
        val fields = line.split("\\s+")

        // Extract this hero ID from the field field
        val heroID = fields(0).toInt

        // Extract subsequent hero ID's into the connections array
        var connections: ArrayBuffer[Int] = ArrayBuffer()
        for (connections <- i to (fields.length - 1)) {
            connections += fields(connection).toInt
        }

        // Default distance and color is 9999 and white
        var color: String = "WHITE"
        var distance: Int = 9999

        // Unless this is the character we'restarting from
        if (heroID == startCharacterID){
            color = "GRAY"
            distance = 0
        }

        return (heroID, (connections.toArray, distance, color))
    }
  • Specifying the hero ID and the connections from the csv data

  • Assigning the default color of WHITE and distance of 9999 to a new BFSNode

  • Unless this is the character we are starting from, the color is changed to GRAY and the distance is changed to 0

    def main(args: Array[String]) {

        // Set the log level to only print errors
        Logger.getLogger("org").setLevel(Level.ERROR)

        // Create a SparkContext using every core of the local machine
        val sc = new SparkContext("Local[*]", "DegreesOfSeperation")

        // Our accumulator, used to signal when we find the target
        // character is in our BFS transversal.
        hitCounter = Some(sc.longAccumulator("Hit Counter"))

            var iterationRdd = createStartingRdd(sc)
            for (iteration <- 1 to 10) {
            println("Running BFS Iteration# " + iteration)

            // Create new vertices as needed to darken or reduce distances in the
            // reduce stage. If we encounter the node we're looking for as a GRAY
            // node, increment our accumulator to signal that we're done.
            val mapped = iterationRdd.flatMap(bfsMap)

            // Note that mapped.count() action here forces the RDD to be evaluated, and
            // that's the only reason our accumulator is actually updated.  
            println("Processing " + mapped.count() + " values.")

            if (hitCounter.isDefined) {
                val hitCount = hitCounter.get.value

                if (hitCount > 0) {
                    println("Hit the target character! From " + hitCount + " different direction(s).")
                    return
                }
            }

            // Reducer combines data for each character ID, preserving the darkest
            // color and shortest path.      
            iterationRdd = mapped.reduceByKey(bfsReduce)
        }
        }
  • We are going to run the main method and iterate through 10 times through the graph to find out who is actually connected to one another

  • It actually goes to the function bfsMap to expand BFS Node into this node and its children

    /** Expands a BFSNode into this node and its children */
      def bfsMap(node:BFSNode): Array[BFSNode] = {

        // Extract data from the BFSNode
        val characterID:Int = node._1
        val data:BFSData = node._2

        val connections:Array[Int] = data._1
        val distance:Int = data._2
        var color:String = data._3

        // This is called from flatMap, so we return an array
        // of potentially many BFSNodes to add to our new RDD
        var results:ArrayBuffer[BFSNode] = ArrayBuffer()

        // Gray nodes are flagged for expansion, and create new
        // gray nodes for each connection
        if (color == "GRAY") {
              for (connection <- connections) {
                val newCharacterID = connection
                val newDistance = distance + 1
                val newColor = "GRAY"

                // Have we stumbled across the character we're looking for?
                // If so increment our accumulator so the driver script knows.
                if (targetCharacterID == connection) {
                      if (hitCounter.isDefined) {
                        hitCounter.get.add(1)
                      }
                }

                // Create our new Gray node for this connection and add it to the results
                val newEntry:BFSNode = (newCharacterID, (Array(), newDistance, newColor))
                results += newEntry
            }

              // Color this node as black, indicating it has been processed already.
              color = "BLACK"
        }

        // Add the original node back in, so its connections can get merged with 
        // the gray nodes in the reducer.
        val thisEntry:BFSNode = (characterID, (connections, distance, color))
        results += thisEntry

        return results.toArray
      }
  • Returns a results of array that contains the many possible BFS nodes that contains the results for the targetCharacterID

  • hitCounter is defined as a global option, so the operation can reference it while calculating if the current node colour is gray.

  • If the current node colour is gray, it means it is the character we are starting from

    /** Combine nodes for the same heroID, preserving the shortest length and darkest color. */
      def bfsReduce(data1:BFSData, data2:BFSData): BFSData = {

        // Extract data that we are combining
        val edges1:Array[Int] = data1._1
        val edges2:Array[Int] = data2._1
        val distance1:Int = data1._2
        val distance2:Int = data2._2
        val color1:String = data1._3
        val color2:String = data2._3

        // Default node values
        var distance:Int = 9999
        var color:String = "WHITE"
        var edges:ArrayBuffer[Int] = ArrayBuffer()

        // See if one is the original node with its connections.
        // If so preserve them.
        if (edges1.length > 0) {
              edges ++= edges1
        }
        if (edges2.length > 0) {
              edges ++= edges2
        }

        // Preserve minimum distance
        if (distance1 < distance) {
              distance = distance1
        }
        if (distance2 < distance) {
              distance = distance2
        }

        // Preserve darkest color
        if (color1 == "WHITE" && (color2 == "GRAY" || color2 == "BLACK")) {
              color = color2
        }
        if (color1 == "GRAY" && color2 == "BLACK") {
              color = color2
        }
        if (color2 == "WHITE" && (color1 == "GRAY" || color1 == "BLACK")) {
              color = color1
        }
        if (color2 == "GRAY" && color1 == "BLACK") {
              color = color1
        }
        return (edges.toArray, distance, color)
      }
  • Maps the nodes and returns the nodes with edges, minimum distance and darkest color

  • Remember for the starting character id we are starting with, the distance is set to 0 in bfsMap function

  • Now go ahead and run the code and you should see the output

The Results

    Running BFS Iteration# 1
    Processing 8330 values.
    Running BFS Iteration# 2
    Processing 220615 values.
    Hit the target character! From 1 different direction(s).
  • So it means that our targetted character ADAM 3,031 is 2 degrees of seperations from Spiderman our starting character

Last updated