1. 程式人生 > >Spark函式詳解系列之RDD基本轉換+例項

Spark函式詳解系列之RDD基本轉換+例項

 RDD:彈性分散式資料集,是一種特殊集合 ‚ 支援多種來源 ‚ 有容錯機制 ‚ 可以被快取 ‚ 支援並行操作,一個RDD代表一個分割槽裡的資料集

  RDD有兩種操作運算元:

         Transformation(轉換):Transformation屬於延遲計算,當一個RDD轉換成另一個RDD時並沒有立即進行轉換,僅僅是記住了資料集的邏輯操作

         Ation(執行):觸發Spark作業的執行,真正觸發轉換運算元的計算

 

本系列主要講解Spark中常用的函式操作:

         1.RDD基本轉換

         2.鍵-值RDD轉換

         3.Action操作篇

本節所講函式

1.map(func)

2.flatMap(func)

3.mapPartitions(func)

4.mapPartitionsWithIndex(func)

5.simple(withReplacement,fraction,seed)

6.union(ortherDataset)

7.intersection(otherDataset)

8.distinct([numTasks])

9.cartesian(otherDataset)

10.coalesce(numPartitions,shuffle)

11.repartition(numPartition)

12.glom()

13.randomSplit(weight:Array[Double],seed)

 

基礎轉換操作:

 

1.map(func):資料集中的每個元素經過使用者自定義的函式轉換形成一個新的RDD,新的RDD叫MappedRDD

(例1)

1

2

3

4

5

6

7

8

9

10

object Map {

  def main(args: Array[String]) {

    val conf = new SparkConf().setMaster("local").setAppName("map")

    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(1 to 10)  //建立RDD

    val map = rdd.map(_*2)             //對RDD中的每個元素都乘於2

    map.foreach(x => print(x+" "))

    sc.stop()

  }

}

輸出:

2 4 6 8 10 12 14 16 18 20

(RDD依賴圖:紅色塊表示一個RDD區,黑色塊表示該分割槽集合,下同)

 

2.flatMap(func):與map類似,但每個元素輸入項都可以被對映到0個或多個的輸出項,最終將結果”扁平化“後輸出

(例2)

1

2

3

4

//...省略sc

   val rdd = sc.parallelize(1 to 5)

   val fm = rdd.flatMap(x => (1 to x)).collect()

   fm.foreach( x => print(x + " "))

輸出:

1 1 2 1 2 3 1 2 3 4 1 2 3 4 5

如果是map函式其輸出如下:

Range(1) Range(1, 2) Range(1, 2, 3) Range(1, 2, 3, 4) Range(1, 2, 3, 4, 5)

 (RDD依賴圖)

 

3.mapPartitions(func):類似與map,map作用於每個分割槽的每個元素,但mapPartitions作用於每個分割槽工

func的型別:Iterator[T] => Iterator[U]

假設有N個元素,有M個分割槽,那麼map的函式的將被呼叫N次,而mapPartitions被呼叫M次,當在對映的過程中不斷的建立物件時就可以使用mapPartitions比map的效率要高很多,比如當向資料庫寫入資料時,如果使用map就需要為每個元素建立connection物件,但使用mapPartitions的話就需要為每個分割槽建立connetcion物件

(例3):輸出有女性的名字:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

object MapPartitions {

//定義函式

  def partitionsFun(/*index : Int,*/iter : Iterator[(String,String)]) : Iterator[String] = {

    var woman = List[String]()

    while (iter.hasNext){

      val next = iter.next()

      next match {

        case (_,"female") => woman = /*"["+index+"]"+*/next._1 :: woman

        case _ =>

      }

    }

    return  woman.iterator

  }

 

  def main(args: Array[String]) {

    val conf = new SparkConf().setMaster("local").setAppName("mappartitions")

    val sc = new SparkContext(conf)

    val l = List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female"))

    val rdd = sc.parallelize(l,2)

    val mp = rdd.mapPartitions(partitionsFun)

    /*val mp = rdd.mapPartitionsWithIndex(partitionsFun)*/

    mp.collect.foreach(x => (print(x +" ")))   //將分割槽中的元素轉換成Aarray再輸出

  }

}

輸出:

kpop lucy

其實這個效果可以用一條語句完成

1

val mp = rdd.mapPartitions(x => x.filter(_._2 == "female")).map(x => x._1) 

之所以不那麼做是為了演示函式的定義

  (RDD依賴圖)

 

4.mapPartitionsWithIndex(func):與mapPartitions類似,不同的時函式多了個分割槽索引的引數

func型別:(Int, Iterator[T]) => Iterator[U]

(例4):將例3橙色的註釋部分去掉即是

輸出:(帶了分割槽索引)

[0]kpop [1]lucy

 

5.sample(withReplacement,fraction,seed):以指定的隨機種子隨機抽樣出數量為fraction的資料,withReplacement表示是抽出的資料是否放回,true為有放回的抽樣,false為無放回的抽樣

(例5):從RDD中隨機且有放回的抽出50%的資料,隨機種子值為3(即可能以1 2 3的其中一個起始值)

1

2

3

4

5

//省略

    val rdd = sc.parallelize(1 to 10)

    val sample1 = rdd.sample(true,0.5,3)

    sample1.collect.foreach(x => print(x + " "))

    sc.stop

 

6.union(ortherDataset):將兩個RDD中的資料集進行合併,最終返回兩個RDD的並集,若RDD中存在相同的元素也不會去重

1

2

3

4

5

6

//省略sc

   val rdd1 = sc.parallelize(1 to 3)

   val rdd2 = sc.parallelize(3 to 5)

   val unionRDD = rdd1.union(rdd2)

   unionRDD.collect.foreach(x => print(x + " "))

   sc.stop 

輸出:

1 2 3 3 4 5

  

7.intersection(otherDataset):返回兩個RDD的交集

1

2

3

4

5

6

//省略sc

val rdd1 = sc.parallelize(1 to 3)

val rdd2 = sc.parallelize(3 to 5)

val unionRDD = rdd1.intersection(rdd2)

unionRDD.collect.foreach(x => print(x + " "))

sc.stop 

輸出:

3 4

 

8.distinct([numTasks]):對RDD中的元素進行去重

1

2

3

4

5

//省略sc

val list = List(1,1,2,5,2,9,6,1)

val distinctRDD = sc.parallelize(list)

val unionRDD = distinctRDD.distinct()

unionRDD.collect.foreach(x => print(x + " "))  

輸出:

1 6 9 5 2

 

9.cartesian(otherDataset):對兩個RDD中的所有元素進行笛卡爾積操作

1

2

3

4

5

//省略

val rdd1 = sc.parallelize(1 to 3)

val rdd2 = sc.parallelize(2 to 5)

val cartesianRDD = rdd1.cartesian(rdd2)

cartesianRDD.foreach(x => println(x + " ")) 

輸出:

複製程式碼

(1,2)
(1,3)
(1,4)
(1,5)
(2,2)
(2,3)
(2,4)
(2,5)
(3,2)
(3,3)
(3,4)
(3,5)

複製程式碼

 (RDD依賴圖)

 

 

10.coalesce(numPartitions,shuffle):對RDD的分割槽進行重新分割槽,shuffle預設值為false,當shuffle=false時,不能增加分割槽數

目,但不會報錯,只是分割槽個數還是原來的

(例9:)shuffle=false

1

2

3

4

//省略 

val rdd = sc.parallelize(1 to 16,4)

val coalesceRDD = rdd.coalesce(3//當suffle的值為false時,不能增加分割槽數(即分割槽數不能從5->7)

println("重新分割槽後的分割槽個數:"+coalesceRDD.partitions.size) 

輸出:

重新分割槽後的分割槽個數:3
//分割槽後的資料集
List(1, 2, 3, 4)
List(5, 6, 7, 8)
List(9, 10, 11, 12, 13, 14, 15, 16) 

 

(例9.1:)shuffle=true

1

2

3

4

5

//...省略

val rdd = sc.parallelize(1 to 16,4)

val coalesceRDD = rdd.coalesce(7,true)

println("重新分割槽後的分割槽個數:"+coalesceRDD.partitions.size)

println("RDD依賴關係:"+coalesceRDD.toDebugString)  

輸出:

複製程式碼

重新分割槽後的分割槽個數:5
RDD依賴關係:(5) MapPartitionsRDD[4] at coalesce at Coalesce.scala:14 []
| CoalescedRDD[3] at coalesce at Coalesce.scala:14 []
| ShuffledRDD[2] at coalesce at Coalesce.scala:14 []
+-(4) MapPartitionsRDD[1] at coalesce at Coalesce.scala:14 []
| ParallelCollectionRDD[0] at parallelize at Coalesce.scala:13 []
//分割槽後的資料集
List(10, 13)
List(1, 5, 11, 14)
List(2, 6, 12, 15)
List(3, 7, 16)
List(4, 8, 9) 

複製程式碼

 (RDD依賴圖:coalesce(3,flase))

 

 

 (RDD依賴圖:coalesce(3,true))

 

 

11.repartition(numPartition):是函式coalesce(numPartition,true)的實現,效果和例9.1的coalesce(numPartition,true)的一樣

 

 

12.glom():將RDD的每個分割槽中的型別為T的元素轉換換陣列Array[T]

 

1

2

3

4

5

//省略

val rdd = sc.parallelize(1 to 16,4)

val glomRDD = rdd.glom() //RDD[Array[T]]

glomRDD.foreach(rdd => println(rdd.getClass.getSimpleName))

sc.stop 

輸出:

int[] //說明RDD中的元素被轉換成陣列Array[Int]

 

 

13.randomSplit(weight:Array[Double],seed):根據weight權重值將一個RDD劃分成多個RDD,權重越高劃分得到的元素較多的機率就越大

1

2

3

4

5

6

7

//省略sc

val rdd = sc.parallelize(1 to 10)

val randomSplitRDD = rdd.randomSplit(Array(1.0,2.0,7.0))

randomSplitRDD(0).foreach(x => print(x +" "))

randomSplitRDD(1).foreach(x => print(x +" "))

randomSplitRDD(2).foreach(x => print(x +" "))

sc.stop 

輸出:

2 4
3 8 9
1 5 6 7 10

 

 以上例子原始碼地址:https://github.com/Mobin-F/SparkExample/tree/master/src/main/scala/com/mobin/SparkRDDFun/TransFormation/KVRDD