1. 程式人生 > 其它 >Spark(十一)Spark分割槽

Spark(十一)Spark分割槽

一、分割槽的概念

  分割槽是RDD內部平行計算的一個計算單元,RDD的資料集在邏輯上被劃分為多個分片,每一個分片稱為分割槽,分割槽的格式決定了平行計算的粒度,而每個分割槽的數值計算都是在一個任務中進行的,因此任務的個數,也是由RDD(準確來說是作業最後一個RDD)的分割槽數決定。

二、為什麼要進行分割槽

  資料分割槽,在分散式叢集裡,網路通訊的代價很大,減少網路傳輸可以極大提升效能。mapreduce框架的效能開支主要在io和網路傳輸,io因為要大量讀寫檔案,它是不可避免的,但是網路傳輸是可以避免的,把大檔案壓縮變小檔案,從而減少網路傳輸,但是增加了cpu的計算負載。

  Spark裡面io也是不可避免的,但是網路傳輸spark裡面進行了優化:

  Spark把rdd進行分割槽(分片),放在叢集上平行計算。同一個rdd分片100個,10個節點,平均一個節點10個分割槽,當進行sum型的計算的時候,先進行每個分割槽的sum,然後把sum值shuffle傳輸到主程式進行全域性sum,所以進行sum型計算對網路傳輸非常小。但對於進行join型的計算的時候,需要把資料本身進行shuffle,網路開銷很大。

spark是如何優化這個問題的呢?

  Spark把key-value rdd通過key的hashcode進行分割槽,而且保證相同的key儲存在同一個節點上,這樣對改rdd進行key聚合時,就不需要shuffle過程,我們進行mapreduce計算的時候為什麼要進行shuffle?,就是說mapreduce裡面網路傳輸主要在shuffle階段,shuffle的根本原因是相同的key存在不同的節點上,按key進行聚合的時候不得不進行shuffle

。shuffle是非常影響網路的,它要把所有的資料混在一起走網路,然後它才能把相同的key走到一起。進行shuffle是儲存決定的。

  Spark從這個教訓中得到啟發,spark會把key進行分割槽,也就是key的hashcode進行分割槽,相同的key,hashcode肯定是一樣的,所以它進行分割槽的時候100t的資料分成10分,每部分10個t,它能確保相同的key肯定在一個分割槽裡面,而且它能保證儲存的時候相同的key能夠存在同一個節點上。比如一個rdd分成了100份,叢集有10個節點,所以每個節點存10份,每一分稱為每個分割槽,spark能保證相同的key存在同一個節點上,實際上相同的key存在同一個分割槽。

  key的分佈不均決定了有的分割槽大有的分割槽小。沒法分割槽保證完全相等,但它會保證在一個接近的範圍。所以mapreduce裡面做的某些工作裡邊,spark就不需要shuffle了,spark解決網路傳輸這塊的根本原理就是這個。

  進行join的時候是兩個表,不可能把兩個表都分割槽好,通常情況下是把用的頻繁的大表事先進行分割槽,小表進行關聯它的時候小表進行shuffle過程。

  大表不需要shuffle。

  需要在工作節點間進行資料混洗的轉換極大地受益於分割槽。這樣的轉換是cogroup,groupWith,join,leftOuterJoin,rightOuterJoin,groupByKey,reduceByKey,combineByKey和lookup。

  分割槽是可配置的,只要RDD是基於鍵值對的即可

三、Spark分割槽原則及方法

RDD分割槽的一個分割槽原則:儘可能是得分割槽的個數等於叢集核心數目

無論是本地模式、Standalone模式、YARN模式或Mesos模式,我們都可以通過spark.default.parallelism來配置其預設分割槽個數,若沒有設定該值,則根據不同的叢集環境確定該值

3.1 本地模式

(1)預設方式

以下這種預設方式就一個分割槽

結果

(2)手動設定

設定了幾個分割槽就是幾個分割槽

結果

(3)跟local[n] 有關

n等於幾預設就是幾個分割槽

如果n=* 那麼分割槽個數就等於cpu core的個數

結果

本機電腦檢視cpu core,我的電腦--》右鍵管理--》裝置管理器--》處理器

(4)引數控制

結果

3.2 YARN模式

進入defaultParallelism方法

繼續進入defaultParallelism方法

這個一個trait,其實現類是(Ctrl+h)

進入TaskSchedulerImpl類找到defaultParallelism方法

繼續進入defaultParallelism方法,又是一個trait,看其實現類

Ctrl+h看SchedulerBackend類的實現類

進入CoarseGrainedSchedulerBackend找到defaultParallelism

totalCoreCount.get()是所有executor使用的core總數,和2比較去較大值

如果正常的情況下,那你設定了多少就是多少

四、分割槽器

(1)如果是從HDFS裡面讀取出來的資料,不需要分割槽器。因為HDFS本來就分好區了。

   分割槽數我們是可以控制的,但是沒必要有分割槽器。

(2)非key-value RDD分割槽,沒必要設定分割槽器

al testRDD = sc.textFile("C:\\Users\\Administrator\\IdeaProjects\\myspark\\src\\main\\hello.txt")
  .flatMap(line => line.split(","))
  .map(word => (word, 1)).partitionBy(new HashPartitioner(2))
  沒必要設定,但是非要設定也行。

(3)Key-value形式的時候,我們就有必要了。

HashPartitioner

val resultRDD = testRDD.reduceByKey(new HashPartitioner(2),(x:Int,y:Int) => x+ y)
//如果不設定預設也是HashPartitoiner,分割槽數跟spark.default.parallelism一樣
println(resultRDD.partitioner)
println("resultRDD"+resultRDD.getNumPartitions)

RangePartitioner

val resultRDD = testRDD.reduceByKey((x:Int,y:Int) => x+ y)
val newresultRDD=resultRDD.partitionBy(new RangePartitioner[String,Int](3,resultRDD))
println(newresultRDD.partitioner)
println("newresultRDD"+newresultRDD.getNumPartitions)
注:按照範圍進行分割槽的,如果是字串,那麼就按字典順序的範圍劃分。如果是數字,就按資料自的範圍劃分

自定義分割槽

需要實現2個方法

class MyPartitoiner(val numParts:Int) extends  Partitioner{
  override def numPartitions: Int = numParts
  override def getPartition(key: Any): Int = {
    val domain = new URL(key.toString).getHost
    val code = (domain.hashCode % numParts)
    if (code < 0) {
      code + numParts
    } else {
      code
    }
  }
}

object DomainNamePartitioner {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("word count").setMaster("local")

    val sc = new SparkContext(conf)

    val urlRDD = sc.makeRDD(Seq(("http://baidu.com/test", 2),
      ("http://baidu.com/index", 2), ("http://ali.com", 3), ("http://baidu.com/tmmmm", 4),
      ("http://baidu.com/test", 4)))
    //Array[Array[(String, Int)]]
    // = Array(Array(),
    // Array((http://baidu.com/index,2), (http://baidu.com/tmmmm,4),
    // (http://baidu.com/test,4), (http://baidu.com/test,2), (http://ali.com,3)))
    val hashPartitionedRDD = urlRDD.partitionBy(new HashPartitioner(2))
    hashPartitionedRDD.glom().collect()

    //使用spark-shell --jar的方式將這個partitioner所在的jar包引進去,然後測試下面的程式碼
    // spark-shell --master spark://master:7077 --jars spark-rdd-1.0-SNAPSHOT.jar
    val partitionedRDD = urlRDD.partitionBy(new MyPartitoiner(2))
    val array = partitionedRDD.glom().collect()

  }
}

轉自:https://www.cnblogs.com/frankdeng/p/9301688.html