# 30. \[Activity] Running the Similiar Movies Script using Spark's Cluster Manager

## Activity

* Import MoviesSimilarities.scala from resource folder into Eclipse-Scala IDE in SparkScala folder
* Open and take a look at MoviesSimilarities.scala

## Looking At The Code

* Looking at the main method where it runs

  ```
    println("\nLoading movie names...")
    val nameDict = loadMovieNames()

      /** Load up a Map of movie IDs to movie names. */
      def loadMovieNames() : Map[Int, String] = {

        // Handle character encoding issues:
        implicit val codec = Codec("UTF-8")
        codec.onMalformedInput(CodingErrorAction.REPLACE)
          codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

          // Create a Map of Ints to Strings, and populate it from u.item.
        var movieNames:Map[Int, String] = Map()

        val lines = Source.fromFile("../ml-100k/u.item").getLines()
        for (line <- lines) {
               var fields = line.split('|')
               if (fields.length > 1) {
                   movieNames += (fields(0).toInt -> fields(1))
               }
         }
         return movieNames
     }
  ```
* The method loadMovieNames() gets maps the data from u.item into a map of key MovieIDs, and value of MovieNames

```
    val data = sc.textFile("../ml-100k/u.data")

    // Map ratings to key / value pairs: user ID => movie ID, rating
    val ratings = data.map(l => l.split("\t")).map(l => (l(0).toInt, (l(1).toInt, l(2).toDouble)))

    // Emit every movie rated together by the same user.
    // Self-join to find every combination.
    val joinedRatings = ratings.join(ratings)   

    // At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))

    // Filter out duplicate pairs
    val uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)
```

* Retrieves the data from u.data and split it into a map with user ID being the key and value being a tuple of movie ID and rating
* Self join to combine all the movie ratings by the same user into a RDD

```
    def filterDuplicates(userRatings:UserRatingPair):Boolean = {
        val movieRating1 = userRatings._2._1
        val movieRating2 = userRatings._2._2

        val movie1 = movieRating1._1
        val movie2 = movieRating2._1

        return movie1 < movie2
      }
```

* filterDuplicates function compares against 2 userRatingPairs and takes only 1 unique userRatingPair with the lower rating (True condition for movie1 < movie2)

```
    // Now key by (movie1, movie2) pairs.
    val moviePairs = uniqueJoinedRatings.map(makePairs)

    // We now have (movie1, movie2) => (rating1, rating2)
    // Now collect all ratings for each movie pair and compute similarity
    val moviePairRatings = moviePairs.groupByKey()

    // We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
    // Can now compute similarities.
    val moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).cache()
```

* Now we have a unique key of (movie1, movie2) with value of rating 1, 2 by the makePairs function
* groupByKey functiokn will group together all the different ratings associated for a given movie pair

```
    type RatingPair = (Double, Double)
    type RatingPairs = Iterable[RatingPair]

    def computeCosineSimilarity(ratingPairs:RatingPairs): (Double, Int) = {
        var numPairs:Int = 0
        var sum_xx:Double = 0.0
        var sum_yy:Double = 0.0
        var sum_xy:Double = 0.0

        for (pair <- ratingPairs) {
            val ratingX = pair._1
            val ratingY = pair._2

            sum_xx += ratingX * ratingX
            sum_yy += ratingY * ratingY
            sum_xy += ratingX * ratingY
            numPairs += 1
        }

        val numerator:Double = sum_xy
        val denominator = sqrt(sum_xx) * sqrt(sum_yy)

        var score:Double = 0.0
        if (denominator != 0) {
            score = numerator / denominator
        }

        return (score, numPairs)
    }
```

* computeCosineSimlarity function will compute a similarity score for each of the given movie pair
* This is just one way of measuring how similiar are the ratings for a movie pair similiar to one another

```
    //Save the results if desired
    //val sorted = moviePairSimilarities.sortByKey()
    //sorted.saveAsTextFile("movie-sims")

    // Extract similarities for the movie we care about that are "good".

    if (args.length > 0) {
        val scoreThreshold = 0.97
        val coOccurenceThreshold = 50.0

        val movieID:Int = args(0).toInt

        // Filter for movies with this sim that are "good" as defined by
        // our quality thresholds above     

        val filteredResults = moviePairSimilarities.filter( x =>
            {
                val pair = x._1
                val sim = x._2
                (pair._1 == movieID || pair._2 == movieID) && sim._1 > scoreThreshold && sim._2 > coOccurenceThreshold
            }
        )

        // Sort by quality score.
        val results = filteredResults.map( x => (x._2, x._1)).sortByKey(false).take(10)

        println("\nTop 10 similar movies for " + nameDict(movieID))
        for (result <- results) {
            val sim = result._1
            val pair = result._2
            // Display the similarity result that isn't the movie we're looking at
            var similarMovieID = pair._1
            if (similarMovieID == movieID) {
                similarMovieID = pair._2
            }
        println(nameDict(similarMovieID) + "\tscore: " + sim._1 + "\tstrength: " + sim._2)
        }
    }
```

* We use .cache function on moviePairSimilarities to use the RDD more than once
* We filter the movies with similarity score with a threshold we specified
* We map the results and flip the results around to take the top 10 similiar movies with the highest similarity score in descending order
* Then we use a condition to check if the similiarMovieID is the movieID we are looking for, and than display the similiarity movie score for the movie pair
* So we are finding out if for a given movie pair, do they have similar ratings given to them

## To Run The Code And Pass In An Argument

* We need to pass in the argument for the movieID we are finding, from the command line using the submit command
* Right click the package and click export
* Select Java, and JAR File
* We can use the settings as default
* We can export the JAR to our SparkScalaCourse Folder with the File name: MovieSims.jar
* Just press finish to export MovieSims.jar to the destinaton folder
* Open cmd and run as Administrator and cd to the folder containing MovieSims.jar

  ```
    cd C:\SparkScala\SparkScalaCourse
  ```
* Next pass in the submit argument for Spark to run the JAR with the movieID 50 which stands for Star Wars

  ```
    spark-submit --class com.sundogsoftware.spark.MovieSimilarities MovieSims.jar 50
  ```
* If you encounter the issue of spark-submit command not found in cmd, open control panel and navigate to environment variables. Select on system variables and add SPARK\_HOME as a variable and into the path to execute spark-submit command.
* For my Windows setup, it can run the Spark Job but it encountered an exception.
* This exception is present when running simulated Hadoop environment on Windows with Spark job

  ```
    ERROR ShutdownHookManager: Exception while deleting Spark temp dir:
  ```

```
    Top 10 similar movies for Star Wars (1977)
    Empire Strikes Back, The (1980) score: 0.9895522078385338       strength: 345
    Return of the Jedi (1983)       score: 0.9857230861253026       strength: 480
    Raiders of the Lost Ark (1981)  score: 0.981760098872619        strength: 380
    20,000 Leagues Under the Sea (1954)     score: 0.9789385605497993       strength: 68
    12 Angry Men (1957)     score: 0.9776576120448436       strength: 109
    Close Shave, A (1995)   score: 0.9775948291054827       strength: 92
    African Queen, The (1951)       score: 0.9764692222674887       strength: 138
    Sting, The (1973)       score: 0.9751512937740359       strength: 204
    Wrong Trousers, The (1993)      score: 0.9748681355460885       strength: 103
    Wallace & Gromit: The Best of Aardman Animation (1996)  score: 0.9741816128302572       strength: 58
```

* You should see this as the output on your cmd for the Top 10 similiar movies rating for Stars Wars (1977)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://alvintoh.gitbook.io/apache-2-0-spark-with-scala/section-4-advanced-examples-of-spark-programs/30.-activity-running-the-similiar-movies-script-using-sparks-cluster-manager.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
