47. Spark Streaming Overview
SPARK STREAMING
Spark Streaming
Analyzes continual streams of data
Common example: processing log data from a website or server
Data is aggregated and analyzed at some given interval
Can take data fed to some port, Amazon Kinesis, HDFS, Kafka, Flume, and others
"Checkpointing" stores state to disk periodically for fault tolerance
A "Dstream" object breaks up the stream into distinct RDD's
Simple example:
val stream = new StreamingContext(conf, Seconds(1)) val lines = stream.sockTextStream("localhost", 8888) val errors = lines.filter(_.contains("error")) errors.print()
This listen to log data sent into port 8888, one second at a time, and prints out error lines.
You may need to kick off the job explicitly
stream.start() stream.awaitTermination()
Remember your RDD's only contain one little chunk of incoming data.
"Windowed operations" can combine results from multiple batches over some sliding time window
See window(), reduceByWindow(), reduceByKeyAndWindow()
updateStateByKey()
Lets you maintain a state acros many batches as time goes by
For example, running counts of some event
Let's Stream
We'll run a Spark Streaming script that monitors live Tweets from Twitter, and keeps track of the most popular hashtags as Tweets are received!
Need A Twitter Developer Account.
Specifically we need to get an API key and access token
This allows us to stream Twitter data in real time!
Do this at Twitter Website
Create A Twitter.txt File In Your WorkSpace
On each line, specify a name and your own consumer key & access token information
For example (substitute in your own keys & tokens!)
consumerKey AXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXSQ consumerSecret 9EwXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX accessToken 384953089-3DUtu13XXXXXXXXXXXXXXXXXXXX accessTokenSecret 7XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
Step One
Get a Twitter stream and extract just the messages themselves
val tweets = TwitterUtils.createStream(ssc, None)
val statuses = tweets.map(status => status.getText())
Vote for #BoatMcBoatFace!
Vote for "I Like Big Boats and I Cannot Lie!"
No! Vote for #WhatIceberg ?
Are you crazy? #BoatMcBoatFace all the way.
...
Step Two
Create a new Dstream that has every individual word as its own entry.
We use flatMap() for this
val tweetwords = statuses.flatMap(tweetText => tweetText.split(" "))
Vote
for
#BoatyMcBoatFace
Vote
For
I
...
Step Three
Eliminate anything that's not a hashtag
We use the filter() function for this
val hashtags = tweetwords.filter(word => word.startsWith("#"))
#BoatyMcBoatFace
#WhatIceberg
#BoatyMcBoatFace
...
Step Four
Convert our RDD of hashtags to key/value pairs
This is so we can count them up with a reduce function
val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1))
(#BoatyMcBoatFace, 1)
(#WhatIceberg, 1)
(#BoatyMcBoatFace, 1)
...
Step Five
Count up the results over a sliding window
A reduce operations adds up all of the values for a given key
Here, a key is a unique hashtag, and each value is 1
So by adding up all the "1"'s associated with each instance of a hastag, we get the count of that hashtag
reduceByKeyAndWindow performs this reduce operations over a given window and slide interval
val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow((x,y) => x + y, (x,y) => x - y, Seconds(300), Seconds(1))
(#BoatyMcBoatFace, 2)
(#WhatIceberg, 1)
Besides combining, we also pass in a function to remove hashtag, and input the parameters of 5 minutes for window time and 1 seconds for batch time
So basically every one second, it will update the result over the past five minutes
Step Six
Sort and output the results.
The counts are in the second value of each tuple, so we sort by
._2 element
val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => x._2, false))
sortedResults.print
(#BoatyMcBoatFace, 2)
(#WhatIceberg, 1)
Let's See It In Action
Last updated