1. 程式人生 > >大數據筆記(二十九)——RDD簡介、特性及常用算子

大數據筆記(二十九)——RDD簡介、特性及常用算子

contex mce true UC 步驟 rac rep enc 測試

1、什麽是RDD? 最核心
(*)彈性分布式數據集,Resilent distributed DataSet
(*)Spark中數據的基本抽象
(*)結合源碼,查看RDD的概念

RDD屬性
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
一組分區,把數據分成了的不同的分區,每個分區可能運行在不同的worker
技術分享圖片


* - A function for computing each split
一個函數,用於計算每個分區中的數據


RDD的函數(算子)
(1)Transformation
(2)Action

* - A list of dependencies on other RDDs
RDD之間依賴關系:(1)窄依賴 (2)寬依賴
根據依賴的關系,來劃分任務的Stage(階段)

* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

如何創建一個RDD?
有兩種方式
(1)使用sc.parallelize方法

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)

(2)通過使用外部的數據源創建RDD:比如:HDFS

val rdd2 = sc.textFile("hdfs://bigdata11:9000/input/data.txt")
val rdd2 = sc.textFile("/root/temp/input/data.txt")

2、Transformation算子:不會觸發計算、延時加載(lazy值)

RDD API網址:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD

常見的RDD算子:

map(func):該操作是對原來的RDD進行操作後,返回一個新的RDD
filter: 過濾操作、返回一個新的RDD
flatMap:類似map
mapPartitions:對每個分區進行操作
mapPartitionsWithIndex: 對每個分區進行操作,帶分區的下標
union 並集
intersection 交集
distinct 去重
groupByKey: 都是按照Key進行分組
reduceByKey: 都是按照Key進行分組、會有一個本地操作(相當於:Combiner操作)


3、Action算子:會觸發計算

collect: 觸發計算、打印屏幕上。以數組形式返回
count: 求個數
first: 第一個元素(take(1))
take(n)
saveAsTextFile: 會轉成String的形式,會調用toString()方法
foreach: 在RDD的每個元素上進行某個操作


4、RDD的緩存機制:默認在內存中
(*)提高效率
(*)默認:緩存在Memory中
(*)調用:方法:persist或者cache

def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
def cache(): this.type = persist()

(*)緩存的位置:StorageLevel定義的

val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

(*)示例:

測試數據:Oracle數據庫的訂單變 sales表(大概92萬)
步驟
(1)從HDFS讀入數據

val rdd1 = sc.textFile("hdfs://bigdata11:9000/input/sales")

(2)計算

rdd1.count ---> Action,這一次沒有緩存
rdd1.cache ---> 緩存數據,但是不會觸發計算,cache是一個Transformation
rdd1.count ----> 觸發計算,將結果緩存
rdd1.count ----> ???會從哪裏得到數據

通過UI進行監控:

技術分享圖片

IDEA功能鍵:ctrl + n 查找類
ctl+alt+shit+N 在類中找方法

5、RDD的容錯機制:checkpoint檢查點
(1)復習檢查點:HDFS中,合並元信息
Oracle中,會以最高優先級喚醒數據庫寫進程(DBWn),來內存中的臟數據---> 數據文件
(2)RDD的檢查點:容錯機制,輔助Lineage(血統)---> 整個計算的過程
如果lineage越長,出錯的概率就越大。出錯之後,從最近一次的檢查點開始運行。

兩種類型

(1)本地目錄 : 需要將spark-shell運行在本地模式上

技術分享圖片


(2)HDFS目錄: 需要將spark-shell運行在集群模式上

scala> sc.setCheckpointDir("hdfs://bigdata11:9000/spark/checkpoint")

scala> val rdd1 = sc.textFile("hdfs://bigdata11:9000/input/sales")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://bigdata11:9000/input/sales MapPartitionsRDD[41] at textFile at <console>:24

scala> rdd1.checkpoint

scala> rdd1.count

源碼中對於檢查點的說明:

/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/    

RDD的檢查點多久發出一次,是手動發出的嗎?
1、不是手動
2、每個RDD計算完成後

查看源碼

/**
* Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
* has completed (therefore the RDD has been materialized and potentially stored in memory).
* doCheckpoint() is called recursively on the parent RDDs.
*/
private[spark] def doCheckpoint(): Unit = {


6、RDD的依賴關系、劃分Spark任務的Stage(階段)
(*)窄依賴(Narrow Dependencies):每一個父RDD的分區最多被子RDD的一個分區使用
比方:獨生子女

  舉例:map,filter,union

(*)寬依賴(Wide Dependencies):多個子RDD的分區會依賴同一個父RDD的分區
比方:超生

  舉例:groupByKey

  比如分區1和分區2都有10號部門的員工,那麽在統計10號部門(key)的員工時需要依賴分區1和分區2,它們屬於不同的父RDD的分區。
技術分享圖片

根據寬依賴和窄依賴的標準,我們可以劃分任務的Stage(階段)


7、RDD算子的基礎例子

1、創建一個RDD(數字)
    val rdd1 = sc.parallelize(List(5,6,1,2,10,4,12,20,100,30))
    
    每個元素*2,然後排序
    val rdd2 = rdd1.map(_*2).sortBy(x=>x,true)
完整 val rdd2
= rdd1.map((x:Int)=>x*2) 過濾出大於10的元素 val rdd3 = rdd2.filter(_>10) rdd3.collect 2、創建一個RDD(字符) val rdd1 = sc.parallelize(Array("a b c","d e f","h i j")) val rdd2 = rdd1.flatMap(_.split(‘ ‘)) rdd2.collect 3、集合運算、去重 val rdd1 = sc.parallelize(List(5,6,7,8,1,2)) val rdd2 = sc.parallelize(List(1,2,3,4)) val rdd3 = rdd1.union(rdd2) rdd3.distinct.collect val rdd4 = rdd1.intersection(rdd2)
技術分享圖片
4、分組
    val rdd1 = sc.parallelize(List(("Tom",1000),("Jerry",3000),("Mary",2000)))
    val rdd2 = sc.parallelize(List(("Jerry",500),("Tom",3000),("Mike",2000)))
    
    並集
    val rdd3 = rdd1 union rdd2
    scala> val rdd4 = rdd3.groupByKey
    rdd4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[27] at groupByKey at <console>:30

    scala> rdd4.collect
    res8: Array[(String, Iterable[Int])] = Array((Tom,CompactBuffer(1000, 3000)),
                                                 (Jerry,CompactBuffer(3000, 500)), 
                                                 (Mike,CompactBuffer(2000)), 
                                                 (Mary,CompactBuffer(2000)))

技術分享圖片

六、Spark RDD的高級算子
1、mapPartitionsWithIndex: 對RDD中的每個分區進行操作,帶有分區號
定義:def mapPartitionsWithIndex[U](f: (Int, Iterator[T])=>Iterator[U], preservesPartitioning: Boolean = false)
(implicit arg0: ClassTag[U]): RDD[U]
參數說明:
f: (Int, Iterator[T])=>Iterator[U]
(*)Int: 分區號
(*)Iterator[T]: 該分區中的每個元素
(*)返回值:Iterator[U]

Demo:
(1)創建一個RDD:

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)

(2)創建一個函數,作為f的值

def func1(index:Int,iter:Iterator[Int]):Iterator[String] ={
iter.toList.map(x=>"[PartID:" + index +",value="+x+"]").iterator
}

(3)調用

rdd1.mapPartitionsWithIndex(func1).collect

(4)結果:

Array([PartID:0,value=1], [PartID:0,value=2], [PartID:0,value=3], [PartID:0,value=4], 
[PartID:1,value=5], [PartID:1,value=6], [PartID:1,value=7], [PartID:1,value=8], [PartID:1,value=9])    


2、aggregate:聚合操作
定義:def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
作用:先對局部進行操作,再對全局進行操作
技術分享圖片


舉例:

val rdd1 = sc.parallelize(List(1,2,3,4,5),2)

(1)求每個分區最大值的和
先查看每個分區中的元素:

rdd1.mapPartitionsWithIndex(func1).collect

rdd1.aggregate(0)(math.max(_,_),_+_)

(2)改一下:

rdd1.aggregate(0)(_+_,_+_) ====> 15 兩個分區求和並相加
rdd1.aggregate(10)(math.max(_,_),_+_) ===> 30 初始值是10,每個分區裏有10,初始值10+分區一10+分區二10 = 30

(3)一個字符串的例子
技術分享圖片

技術分享圖片

3、aggregateByKey

(1)類似aggregate,也是先對局部,再對全局
(2)區別:aggregateByKey操作<key,value>
(3)測試數據:

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

每個分區中的元素(key,value)

def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}    


[partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)],
[partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)]

(4)把每個籠子中,每種動物最多的個數進行求和

pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect

技術分享圖片


4、coalesce和repartition

(*)都是將RDD中的分區進行重分區
(*)區別:coalesce 默認:不會進行shuffle(false)
        repartition 會進行shuffle
                    
(*)舉例:
    創建一個RDD
    val rdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
             
    進行重分區
     val rdd5 = rdd4.repartition(3)
     val rdd6 = rdd4.coalesce(3,false)  ---> 分區的長度: 2
     val rdd6 = rdd4.coalesce(3,true)  ---> 分區的長度: 2

5、其他高級算子:參考文檔
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

大數據筆記(二十九)——RDD簡介、特性及常用算子