46. [Activity] Using DataFrames with MLLib
MLLIB WITH DATASETS
Using DataSets With MLLib Is Actually Preferred
But's not always practical. Not everything is a SQ problem...
Use spark.ml instead of spark.mllib for the preferred DataSet-based API (instead of RDDs)
Performs better
Will interoperate better with Spark Streaming, Spark SQL, etc.
Available in Spark 2.0.0+
API's are different
Let's look An Example
We'll do our linear regresson example using DataSets this time.
Activity
Import LinearRegressionDataFrame.scala from sourcefolder into SparkScalaCourse in Spark-Eclipse IDE
Open LinearRegressionDataFrame.scala and look at the code
Looking At the Code
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.types._
import org.apache.spark.ml.linalg.Vectors
Now we are importing all these seperate spark ml packages that are different from the MLLib package that we use before
// Use new SparkSession interface in Spark 2.0
val spark = SparkSession
.builder
.appName("LinearRegressionDF")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
.getOrCreate()
Now instead of using a spark context, we're going to use a Spark session.
This is basically the API that we use in Spark 2.0 for doing DataSet stuff and Spark SQL stuff.
// Load up our page speed / amount spent data in the format required by MLLib
// (which is label, vector of features)
// In machine learning lingo, "label" is just the value you're trying to predict, and
// "feature" is the data you are given to make a prediction with. So in this example
// the "labels" are the first column of our data, and "features" are the second column.
// You can have more than one "feature" which is why a vector is required.
val inputLines = spark.sparkContext.textFile("../regression.txt")
val data = inputLines.map(_.split(",")).map(x => (x(0).toDouble, Vectors.dense(x(1).toDouble)))
We are parsing and delimiting the lines by , commas
The _ underscore is basically a wildcard so sort of a shortcut instead of saying X => x.split(), you can just say underscore and it means the same exact thing.
So it means that each individual input coming into your map function that's represented by the underscore character
Remember a DataFrame is a DataSet.
// Convert this RDD to a DataFrame
import spark.implicits._
val colNames = Seq("label", "features")
val df = data.toDF(colNames: _*)
Here, we are actually giving names to those columns
So in a DataFrame, you want to have names associate with those columns sowe can actually do SQL querys on them and refer to them by name.
So let's do something a little bit more fancier just to make life a little bit more interesting if you're not familiar with machine learning you get a little free lesson here.
So one way you can evaluate the performance of a machine learningmodel is technique called Train test
and the idea is that you have set of data where you have a set of known results.
So in this case,we know the actual order amount and page speed for a set of data.
What we're going to do is to split the data into half randomly
A half is used for building our model and the other half is reserved for testing that model
Since the model had no knowledge of this other data when you are creating it, it's a good way to actually test how effective this model is at predicting data that it hasn't seen before.
// Note, there are lots of cases where you can avoid going from an RDD to a DataFrame.
// Perhaps you're importing data from a real database. Or you are using structured streaming
// to get your data.
// Let's split our data into training data and testing data
val trainTest = df.randomSplit(Array(0.5, 0.5))
val trainingDF = trainTest(0)
val testDF = trainTest(1)
// Now create our linear regression model
val lir = new LinearRegression()
.setRegParam(0.3) // regularization
.setElasticNetParam(0.8) // elastic net mixing
.setMaxIter(100) // max iterations
.setTol(1E-6) // convergence tolerance
Split the data 50 50 randomly into trainingDF and testDF
Create a linear regression model
// Train the model using our training data
val model = lir.fit(trainingDF)
// Now see if we can predict values in our test data.
// Generate predictions using our linear regression model for all features in our
// test dataframe:
val fullPredictions = model.transform(testDF).cache()
// This basically adds a "prediction" column to our testDF dataframe.
// Extract the predictions and the "known" correct labels.
val predictionAndLabel = fullPredictions.select("prediction", "label").rdd.map(x => (x.getDouble(0), x.getDouble(1)))
// Print out the predicted and actual values for each point
for (prediction <- predictionAndLabel) {
println(prediction)
}
// Stop the session
spark.stop()
Train the model using trainingDF and predict the values as fullPredictions
Extract the predictions and the "known" correct labels
Print it out and see if it is accurate
Stop the spark session once you have finished running spark job
The results are pretty similiar and this tells us our model is actually pretty good
Last updated