30. [Activity] Running the Similiar Movies Script using Spark's Cluster Manager


  • Import MoviesSimilarities.scala from resource folder into Eclipse-Scala IDE in SparkScala folder

  • Open and take a look at MoviesSimilarities.scala

Looking At The Code

  • Looking at the main method where it runs

      println("\nLoading movie names...")
      val nameDict = loadMovieNames()
        /** Load up a Map of movie IDs to movie names. */
        def loadMovieNames() : Map[Int, String] = {
          // Handle character encoding issues:
          implicit val codec = Codec("UTF-8")
            // Create a Map of Ints to Strings, and populate it from u.item.
          var movieNames:Map[Int, String] = Map()
          val lines = Source.fromFile("../ml-100k/u.item").getLines()
          for (line <- lines) {
                 var fields = line.split('|')
                 if (fields.length > 1) {
                     movieNames += (fields(0).toInt -> fields(1))
           return movieNames
  • The method loadMovieNames() gets maps the data from u.item into a map of key MovieIDs, and value of MovieNames

    val data = sc.textFile("../ml-100k/u.data")

    // Map ratings to key / value pairs: user ID => movie ID, rating
    val ratings = data.map(l => l.split("\t")).map(l => (l(0).toInt, (l(1).toInt, l(2).toDouble)))

    // Emit every movie rated together by the same user.
    // Self-join to find every combination.
    val joinedRatings = ratings.join(ratings)   

    // At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))

    // Filter out duplicate pairs
    val uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)
  • Retrieves the data from u.data and split it into a map with user ID being the key and value being a tuple of movie ID and rating

  • Self join to combine all the movie ratings by the same user into a RDD

    def filterDuplicates(userRatings:UserRatingPair):Boolean = {
        val movieRating1 = userRatings._2._1
        val movieRating2 = userRatings._2._2

        val movie1 = movieRating1._1
        val movie2 = movieRating2._1

        return movie1 < movie2
  • filterDuplicates function compares against 2 userRatingPairs and takes only 1 unique userRatingPair with the lower rating (True condition for movie1 < movie2)

    // Now key by (movie1, movie2) pairs.
    val moviePairs = uniqueJoinedRatings.map(makePairs)

    // We now have (movie1, movie2) => (rating1, rating2)
    // Now collect all ratings for each movie pair and compute similarity
    val moviePairRatings = moviePairs.groupByKey()

    // We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
    // Can now compute similarities.
    val moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).cache()
  • Now we have a unique key of (movie1, movie2) with value of rating 1, 2 by the makePairs function

  • groupByKey functiokn will group together all the different ratings associated for a given movie pair

    type RatingPair = (Double, Double)
    type RatingPairs = Iterable[RatingPair]

    def computeCosineSimilarity(ratingPairs:RatingPairs): (Double, Int) = {
        var numPairs:Int = 0
        var sum_xx:Double = 0.0
        var sum_yy:Double = 0.0
        var sum_xy:Double = 0.0

        for (pair <- ratingPairs) {
            val ratingX = pair._1
            val ratingY = pair._2

            sum_xx += ratingX * ratingX
            sum_yy += ratingY * ratingY
            sum_xy += ratingX * ratingY
            numPairs += 1

        val numerator:Double = sum_xy
        val denominator = sqrt(sum_xx) * sqrt(sum_yy)

        var score:Double = 0.0
        if (denominator != 0) {
            score = numerator / denominator

        return (score, numPairs)
  • computeCosineSimlarity function will compute a similarity score for each of the given movie pair

  • This is just one way of measuring how similiar are the ratings for a movie pair similiar to one another

    //Save the results if desired
    //val sorted = moviePairSimilarities.sortByKey()

    // Extract similarities for the movie we care about that are "good".

    if (args.length > 0) {
        val scoreThreshold = 0.97
        val coOccurenceThreshold = 50.0

        val movieID:Int = args(0).toInt

        // Filter for movies with this sim that are "good" as defined by
        // our quality thresholds above     

        val filteredResults = moviePairSimilarities.filter( x =>
                val pair = x._1
                val sim = x._2
                (pair._1 == movieID || pair._2 == movieID) && sim._1 > scoreThreshold && sim._2 > coOccurenceThreshold

        // Sort by quality score.
        val results = filteredResults.map( x => (x._2, x._1)).sortByKey(false).take(10)

        println("\nTop 10 similar movies for " + nameDict(movieID))
        for (result <- results) {
            val sim = result._1
            val pair = result._2
            // Display the similarity result that isn't the movie we're looking at
            var similarMovieID = pair._1
            if (similarMovieID == movieID) {
                similarMovieID = pair._2
        println(nameDict(similarMovieID) + "\tscore: " + sim._1 + "\tstrength: " + sim._2)
  • We use .cache function on moviePairSimilarities to use the RDD more than once

  • We filter the movies with similarity score with a threshold we specified

  • We map the results and flip the results around to take the top 10 similiar movies with the highest similarity score in descending order

  • Then we use a condition to check if the similiarMovieID is the movieID we are looking for, and than display the similiarity movie score for the movie pair

  • So we are finding out if for a given movie pair, do they have similar ratings given to them

To Run The Code And Pass In An Argument

  • We need to pass in the argument for the movieID we are finding, from the command line using the submit command

  • Right click the package and click export

  • Select Java, and JAR File

  • We can use the settings as default

  • We can export the JAR to our SparkScalaCourse Folder with the File name: MovieSims.jar

  • Just press finish to export MovieSims.jar to the destinaton folder

  • Open cmd and run as Administrator and cd to the folder containing MovieSims.jar

      cd C:\SparkScala\SparkScalaCourse
  • Next pass in the submit argument for Spark to run the JAR with the movieID 50 which stands for Star Wars

      spark-submit --class com.sundogsoftware.spark.MovieSimilarities MovieSims.jar 50
  • If you encounter the issue of spark-submit command not found in cmd, open control panel and navigate to environment variables. Select on system variables and add SPARK_HOME as a variable and into the path to execute spark-submit command.

  • For my Windows setup, it can run the Spark Job but it encountered an exception.

  • This exception is present when running simulated Hadoop environment on Windows with Spark job

      ERROR ShutdownHookManager: Exception while deleting Spark temp dir:
    Top 10 similar movies for Star Wars (1977)
    Empire Strikes Back, The (1980) score: 0.9895522078385338       strength: 345
    Return of the Jedi (1983)       score: 0.9857230861253026       strength: 480
    Raiders of the Lost Ark (1981)  score: 0.981760098872619        strength: 380
    20,000 Leagues Under the Sea (1954)     score: 0.9789385605497993       strength: 68
    12 Angry Men (1957)     score: 0.9776576120448436       strength: 109
    Close Shave, A (1995)   score: 0.9775948291054827       strength: 92
    African Queen, The (1951)       score: 0.9764692222674887       strength: 138
    Sting, The (1973)       score: 0.9751512937740359       strength: 204
    Wrong Trousers, The (1993)      score: 0.9748681355460885       strength: 103
    Wallace & Gromit: The Best of Aardman Animation (1996)  score: 0.9741816128302572       strength: 58
  • You should see this as the output on your cmd for the Top 10 similiar movies rating for Stars Wars (1977)

