保存Key/Value对的RDD叫做Pair RDD。
1.创建Pair RDD:
1.1 创建Pair RDD的方式:
很多数据格式在导入RDD时,会直接生成Pair RDD。我们也可以使用map()来将之前讲到的普通RDD转化为Pair RDD。
1.2 Pair RDD转化实例:
下面例子中,把原始RDD,修改成首单词做Key,整行做Value的Pair RDD。
Java中没有tuple类型,所以使用scala的scala.Tuple2类来创建tuple。创建tuple: new Tuple2(elem1,elem2) ; 访问tuple的元素: 使用._1()和._2()方法来访问。
/** * 将普通的基本RDD转化成一个Pair RDD,业务逻辑: 将每一行的首单词作为Key,整个句子作为Value 返回Key/Value PairRDD。 * @param JavaRDD* @return JavaPairRDD */ public JavaPairRDD firstWordKeyRdd(JavaRDD input){ JavaPairRDD pair_rdd = input.mapToPair( new PairFunction (){ @Override public Tuple2 call(String arg0) throws Exception { // TODO Auto-generated method stub return new Tuple2 (arg0.split(" ")[0],arg0); } } ); return pair_rdd; }
2.Pair RDD的转化操作:
2.1 Pair RDD常见的转化操作列表:
基础RDD使用的转化操作也可以在Pair RDD中使用。因为Pair RDD中使用tuple,所以需要传递操作tuple的函数给Pair RDD.
下表列出Pair RDD常用的转化操作(事例RDD内容:{(1, 2), (3, 4), (3, 6)})
函数名 | 作用 | 调用例子 | 返回结果 |
reduceByKey(func) | Combine values with the same key. | rdd.reduceByKey((x, y) => x + y) | {(1,2),(3,10)} |
groupByKey() | Group values with the same key. | rdd.groupByKey() | {(1,[2]),(3,[4,6])} |
combineByKey(createCombiner,mergeValue, mergeCombiners,partitioner) | Combine values with the same key using a different result type. | ||
mapValues(func) | Apply a function to each value of a pair RDD without changing the key. | rdd.mapValues(x =>x+1) | {(1,3),(3,5),(3,7)} |
flatMapValues(func) | Apply a function that returns an iterator to each value of a pair RDD, and for each element returned, produce a key/value entry with the old key. Often used for tokenization. | rdd.flatMapValues(x=> (x to 5) | {(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)} |
keys() | Return an RDD of just the keys. | rdd.keys() | {1, 3, 3} |
values() | Return an RDD of just the values. | rdd.values() | {2, 4, 6} |
sortByKey() | Return an RDD sorted by the key. | rdd.sortByKey() | {(1,2),(3,4),(3,6)} |
下表列举2个RDD之间的转化操作(rdd = {(1, 2), (3, 4), (3, 6)} other = {(3,9)}):
函数名 | 作用 | 调用例子 | 返回结果 |
subtractByKey | Remove elements with a key present in the other RDD. | rdd.subtractByKey(other) | {(1, 2)} |
join | Perform an inner join between two RDDs. | rdd.join(other) | {(3, (4, 9)),(3, (6, 9))} |
rightOuterJoin | Perform a join between two RDDs where the key must be present in the first RDD. | rdd.rightOuterJoin(other) | {(3,(Some(4),9)), (3,(Some(6),9))} |
leftOuterJoin | Perform a join between two RDDs where the key must be present in the other RDD. | rdd.leftOuterJoin(other) | {(1,(2,None)),(3,(4,Some(9))),(3,(6,Some(9)))} |
cogroup | Group data from both RDDs sharing the same key. | rdd.cogroup(other) | {(1,([2],[])),(3,([4, 6],[9]))} |
2.2 Pair RDD筛选操作:
Pair RDD也还是RDD,所以之前介绍的操作(例如filter)也同样适用于PairRDD。下面程序,筛选长度大于20的行:
/** * PairRDD筛选长度大于20的行。 * @param JavaPairRDD* @return JavaPairRDD */ public JavaPairRDD filterMoreThanTwentyLines (JavaPairRDD input){ JavaPairRDD filter_rdd = input.filter( new Function ,Boolean>(){ @Override public Boolean call(Tuple2 arg0) throws Exception { // TODO Auto-generated method stub return (arg0._2.length()>20); } } ); return filter_rdd; }
2.3 聚合操作:
Pair RDD提供下面方法:
1. reduceByKey()方法:可以分别归约每个键对应的数据;
2. join()方法:可以把两个RDD中键相同的元素组合在一起,合并为一个RDD。
一、创建Pair RDD:
当需要将一个普通RDD转化成一个Pair RDD时,可以使用map()函数来实现。
程序4-1: 使用第一个单词作为键建出一个Pair RDD:
val text1 = sc.textFile("file:///spark/spark/README.md")
val pair1 = text1.map(x=>( x.split(" ")(0),x))
println(pair1.collect().mkString(" "))
程序4-2: 对Pair RDD的第2个元素筛选:
val text2 = sc.textFile("file:///spark/spark/README.md")
val pair_base2 = text2.map(x=>( x.split(" ")(0),x))
val pair2 = pair_base2 .filter{case(key,value)=>value.length<20}
println(pair2.collect().mkString(" "))
对于二元组数据,有时我们只想访问Pair RDD的值的部分,这时操作二元组很麻烦。可以使用mapValues(func)函数,单操作value,不操作key,功能类似于map{case(x,y):(x,func(y))}
程序4-3: 计算每个键对应的平均值:
val text3 = sc.parallelize(List(("panda",0),("pink",3),("pirate",3),("panda",1),("pink",4)))
val text3_final = text3.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
println(text3_final.collect().mkString(" "))
其中,mapValues(x=>(x,1))得到的输出结果是:"panda",(0,1) ;"pink",(3,1) ;"pirate",(3,1) ;"panda",(1,1) ;"pink",(4,1) ;
下一步,reduceByKey((x,y)=>(x._1+y._1,x._2+y._2));自动合并同一个key的数据。例如对于panda,(0,1) ,(1,1) => (0+1 , 1+1) {解释:(0,1)的第一个数据加上(1,1)的第一个数据作为第一个数据,(0,1)的第二个数据加上(1,1)的第二个数据作为第二个数据}也就是 (1,2)。
程序4-3的输出结果类似于: (pink,(7,2)) (pirate,(3,1)) (panda,(1,2)) ;将每个key的总和得到、并将每个key出现的次数得到。
程序4-4: 实现单词计数:
val text4 = sc.textFile("file:///spark/spark/README.md")
val words = text4.flatMap(x=>x.split(" "))
程序4-5: 使用combineByKey()来实现计算每个key的平均值:
val input = sc.parallelize(List(("panda",0),("pink",3),("pirate",3),("panda",1),("pink",4)))
val result = input.combineByKey( (v) => (v,1),
(acc:(Int,Int),v) => (acc._1+v,acc._2+1),
(acc1:(Int,Int),acc2:(Int,Int)) => (acc1._1+acc2._1,acc1._2+acc2._2)
