RDDs基本操作、RDDs特性、KeyValue對RDDs
摘要:RDD是Spark中極為重要的數據抽象,這裏總結RDD的概念,基本操作Transformation(轉換)與Action,RDDs的特性,KeyValue對RDDs的Transformation(轉換)。
1.RDDs是什麽
Resilient distributed datasets(彈性分布式數據集) 。RDDs並行的分布在整個集群中,是Spark分發數據和計算的基礎抽象類,一個RDD是一個不可改變的分布式集合對象,Spark中,所有的計算都是通過RDDs的創建,轉換操作完成的,一個RDD內部由許多partitions(分片)組成。
分片:每個分片包括一部分數據,partitions可在集群不同節點上計算;分片是Spark並行處理的單元,Spark順序的,並行的處理分片。
2.RDDs的創建
(1) 把一個存在的集合傳給SparkContext的parallelize()方法,測試用
val rdd=sc.parallelize(Array(1,2,3,4),4)
第一個參數:待並行化處理的集合;第二個參數:分片個數
(2) 加載外部數據集
val rddText=sc.textFile("hellospark.txt")
3.RDD基本操作之Transformation(轉換)
從之前的RDD構建一個新的RDD,像map()和filter()
(1)逐元素Transformation
map(): 接收函數,把函數應用到RDD的每一個元素,返回新RDD。
val lines=sc.parallelize(Array("hello","spark","hello","world","!")) val lines2=lines.map(word=>(word,1)) lines2.foreach(println) //結果: (hello,1) (spark,1) (hello,1) (world,1) (!,1)
filter(): 接收函數,返回只包含滿足filter()函數的元素的新RDD。
val lines=sc.parallelize(Array("hello","spark","hello","world","!")) val lines3=lines.filter(word=>word.contains("hello")) lines3.foreach(println) //結果: hello hello
flatMap(): 對每個輸入元素,輸出多個輸出元素。flat壓扁的意思,將RDD中元素壓扁後返回一個新的RDD。
val inputs=sc.textFile("/home/lucy/hellospark.txt") val lines=inputs.flatMap(line=>line.split(" ")) lines.foreach(println) //結果 hello spark hello world hello ! //文件內容/home/lucy/hellospark.txt hello spark hello world hello !
(2)集合運算
RDDs支持數學集合的計算,例如並集,交集計算
val rdd1=sc.parallelize(Array("red","red","blue","black","white")) val rdd2=sc.parallelize(Array("red","grey","yellow")) //去重: val rdd_distinct=rdd1.distinct() //去重結果: white blue red black //並集: val rdd_union=rdd1.union(rdd2) //並集結果: red blue black white red grey yellow //交集: val rdd_inter=rdd1.intersection(rdd2) //交集結果: red //包含: val rdd_sub=rdd1.subtract(rdd2) //包含結果: blue white black
4.RDD基本操作之Action
在RDD上計算出來一個結果。把結果返回給driver program或保存在文件系統,count(),save。
函數名 功能 例子 結果
collect() 返回RDD的所有元素 rdd.collect() {1,2,3,3}
count() 計數 rdd.count() 4
countByValue() 返回一個map表示唯一元素出現的個數 rdd.countByValue() {(1,1),(2,1),(3,2)}
take(num) 返回幾個元素 rdd.take(2) {1,2}
top(num) 返回前幾個元素 rdd.top(2) {3,3}
takeOrdered 返回基於提供的排序算法的前幾個元素 rdd.takeOrdered(2)(myOrdering) {3,3}
(num)(ordering)
takeSample 取樣例 rdd.takeSample(false,1) 不確定
(withReplacement,num,[seed])
reduce(func) 合並RDD中元素 rdd.reduce((x,y)=>x+y) 9
fold(zero)(func) 與reduce()相似提供zero value rdd.fold(0)((x,y)=>x+y) 9
foreach(func) 對RDD的每個元素作用函數,什麽也不返回 rdd.foreach(func) 無
5.RDDs的特性
1.血統關系圖:
Spark維護著RDDs之間的依賴關系和創建關系,叫做血統關系圖,Spark使用血統關系圖來計算每個RDD的需求和恢復丟失的數據
2.延遲計算
Spark對RDDs的計算是,他們第一次使用action操作的時候。Spark內部記錄metadata表明transformations操作已經被響應了。加載數據也是延遲計算,數據只有 在必要的時候,才會被加載進去。
3.RDD.persist():
默認每次在RDDs上面進行action操作時,Spark都重新計算RDDs。如果想重復利用一個RDD,可以使用RDD.persist()。upersist()方法從緩存中移除。
6.KeyValue對RDDs
創建KeyValue對RDDs:
val rdd3=sc.parallelize(Array((1,2),(3,4),(3,6)))
KeyValue對RDDs的Transformation(轉換):
(1)reduceByKey(func) 把相同key的結合
val rdd4=rdd3.reduceByKey((x,y)=>x+y)
//結果
(1,2)
(3,10)
(2)groupByKey 把相同的key的values分組
val rdd5=rdd3.groupByKey()
//結果
(1,CompactBuffer(2))
(3,CompactBuffer(4, 6))
(3)mapValues() 函數作用於pairRDD的每個元素,key不變
val rdd6=rdd3.mapValues(x=>x+1)
//結果
(1,3)
(3,5)
(3,7)
(4)keys/values
rdd3.keys.foreach(println)
1
3
3
rdd3.values.foreach(println)
2
4
6
(5)sortByKey
val rdd7=rdd3.sortByKey()
//結果
(1,2)
(3,4)
(3,6)
(6)combineByKey(): (createCombiner,mergeValue,mergeCombiners,partitioner)
最常用的基於key的聚合函數,返回的類型可以與輸入類型不一樣。許多基於key的聚合函數都用到了它,像groupByKey()
原理:遍歷partition中的元素,元素的key,要麽之前見過的,要麽不是。如果是新元素,使用我們提供的createCombiner()函數,如果是這個partition中已經存在的key,就會使用mergeValue()函數,合計每個partition的結果的時候,使用mergeCombiner()函數
例子:求平均值
val score=sc.parallelize(Array(("Tom",80.0),("Tom",90.0),("Tom",85.0),("Ben",85.0),("Ben",92.0),("Ben",90.0))) val score2=score.combineByKey(score=>(1,score),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double),c2:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2)) //結果 (Ben,(3,267.0)) (Tom,(3,255.0))
val average=score2.map{case(name,(num,score))=>(name,score/num)} //結果 (Ben,89.0) (Tom,85.0)
RDDs基本操作、RDDs特性、KeyValue對RDDs