13. Key /Value RDD's, and the Average Friends by Age example
KEY /VALUE RDD'S
And the "friends by age" example
RDD'S Can Hold Key /Value Pairs
For example: number of friends by age
Key is age, value is number of friends
Instead of just a list of ages or a list of # of friends, we can store (age, # friends) etc...
Creating A Key /Value RDD
Nothing special in Scala, really
Just map pairs of data into the RDD using tuples. For example.
totalsByAge = rdd.map(x => (x, 1))
Voila, you now have a key /value RDD.
OK to have tuples or other objects as values as well.
Spark Can Do Special Stuff With Key /Value Data
reduceByKey(): combine values with the same key using some function. rdd.reduceByKey((x, y) => x + y) adds them up.
groupByKey(): Group values with the same key
sortByKey(): Sort RDD by key values
keys(), values() - Create an RDD of just the keys, or just the values
You Can Do SQL-Style Joins On Two Key /Value-RDD's
join, rightOuterJoin, leftOuterJoin, cogroup, subtractByKey
We 'll look at an example of this later.
Mapping Just The Values Of A Key /Value RDD?
With Key /Value data, use mapValues() and flatMapValues() if your transformation doesn't affect the keys.
It's more efficient
Friends By Age Example
Input Data: ID, name, age, number of friends
0, Will, 33, 385
1, Jean-Luc, 33, 2
2, Hugh, 55, 22
3, Deanna, 40, 465
4, Quark, 68, 21
Parsing (Mapping) The Input Data
def parseLine(line: String) = {
val fields = line.split(",")
val age = fields(2).toInt
val numFriends = fields(3).toInt
(age, numFriends)
}
val lines = sc.textFile("../fakefriends.csv")
val rdd = lines.map(parseLine)
Output is key/value pairs of (age, numFriends):
33, 385
33, 2
55, 221
40, 465
...
Count Up Sum Of Friends And Number Of Entries Per Age
val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2))
rdd.mapValues(x => (x, 1))
(33, 385) => (33, (385, 1))
(33, 2) => (33, (2, 1))
(55, 221) => (55, (221, 1))
reduceByKey((x, y) => (x, 1 + y, 1, 2 + x, 2))
Adds up all values for all unique key!
(33, (387, 2))
Compute Averages
val averagesByAge = totalsByAge.mapValues(x => x._1/ x._2)
(33, (387, 2)) => (33, 193.5)
Collect And Display The Results
val results = averagesByAge.collect()
results.sorted.foreach(println)
Last updated