Spark(十一)Spark分割槽
一、分割槽的概念
分割槽是RDD內部平行計算的一個計算單元,RDD的資料集在邏輯上被劃分為多個分片,每一個分片稱為分割槽,分割槽的格式決定了平行計算的粒度,而每個分割槽的數值計算都是在一個任務中進行的,因此任務的個數,也是由RDD(準確來說是作業最後一個RDD)的分割槽數決定。
二、為什麼要進行分割槽
資料分割槽,在分散式叢集裡,網路通訊的代價很大,減少網路傳輸可以極大提升效能。mapreduce框架的效能開支主要在io和網路傳輸,io因為要大量讀寫檔案,它是不可避免的,但是網路傳輸是可以避免的,把大檔案壓縮變小檔案,從而減少網路傳輸,但是增加了cpu的計算負載。
Spark裡面io也是不可避免的,但是網路傳輸spark裡面進行了優化:
Spark把rdd進行分割槽(分片),放在叢集上平行計算。同一個rdd分片100個,10個節點,平均一個節點10個分割槽,當進行sum型的計算的時候,先進行每個分割槽的sum,然後把sum值shuffle傳輸到主程式進行全域性sum,所以進行sum型計算對網路傳輸非常小。但對於進行join型的計算的時候,需要把資料本身進行shuffle,網路開銷很大。
spark是如何優化這個問題的呢?
Spark把key-value rdd通過key的hashcode進行分割槽,而且保證相同的key儲存在同一個節點上,這樣對改rdd進行key聚合時,就不需要shuffle過程,我們進行mapreduce計算的時候為什麼要進行shuffle?,就是說mapreduce裡面網路傳輸主要在shuffle階段,shuffle的根本原因是相同的key存在不同的節點上,按key進行聚合的時候不得不進行shuffle
Spark從這個教訓中得到啟發,spark會把key進行分割槽,也就是key的hashcode進行分割槽,相同的key,hashcode肯定是一樣的,所以它進行分割槽的時候100t的資料分成10分,每部分10個t,它能確保相同的key肯定在一個分割槽裡面,而且它能保證儲存的時候相同的key能夠存在同一個節點上。比如一個rdd分成了100份,叢集有10個節點,所以每個節點存10份,每一分稱為每個分割槽,spark能保證相同的key存在同一個節點上,實際上相同的key存在同一個分割槽。
key的分佈不均決定了有的分割槽大有的分割槽小。沒法分割槽保證完全相等,但它會保證在一個接近的範圍。所以mapreduce裡面做的某些工作裡邊,spark就不需要shuffle了,spark解決網路傳輸這塊的根本原理就是這個。
進行join的時候是兩個表,不可能把兩個表都分割槽好,通常情況下是把用的頻繁的大表事先進行分割槽,小表進行關聯它的時候小表進行shuffle過程。
大表不需要shuffle。
需要在工作節點間進行資料混洗的轉換極大地受益於分割槽。這樣的轉換是cogroup,groupWith,join,leftOuterJoin,rightOuterJoin,groupByKey,reduceByKey,combineByKey和lookup。
分割槽是可配置的,只要RDD是基於鍵值對的即可。三、Spark分割槽原則及方法
RDD分割槽的一個分割槽原則:儘可能是得分割槽的個數等於叢集核心數目
無論是本地模式、Standalone模式、YARN模式或Mesos模式,我們都可以通過spark.default.parallelism來配置其預設分割槽個數,若沒有設定該值,則根據不同的叢集環境確定該值
3.1 本地模式
(1)預設方式
以下這種預設方式就一個分割槽
結果
(2)手動設定
設定了幾個分割槽就是幾個分割槽
結果
(3)跟local[n] 有關
n等於幾預設就是幾個分割槽
如果n=* 那麼分割槽個數就等於cpu core的個數
結果
本機電腦檢視cpu core,我的電腦--》右鍵管理--》裝置管理器--》處理器
(4)引數控制
結果
3.2 YARN模式
進入defaultParallelism方法
繼續進入defaultParallelism方法
這個一個trait,其實現類是(Ctrl+h)
進入TaskSchedulerImpl類找到defaultParallelism方法
繼續進入defaultParallelism方法,又是一個trait,看其實現類
Ctrl+h看SchedulerBackend類的實現類
進入CoarseGrainedSchedulerBackend找到defaultParallelism
totalCoreCount.get()是所有executor使用的core總數,和2比較去較大值
如果正常的情況下,那你設定了多少就是多少
四、分割槽器
(1)如果是從HDFS裡面讀取出來的資料,不需要分割槽器。因為HDFS本來就分好區了。
分割槽數我們是可以控制的,但是沒必要有分割槽器。
(2)非key-value RDD分割槽,沒必要設定分割槽器
al testRDD = sc.textFile("C:\\Users\\Administrator\\IdeaProjects\\myspark\\src\\main\\hello.txt") .flatMap(line => line.split(",")) .map(word => (word, 1)).partitionBy(new HashPartitioner(2))
沒必要設定,但是非要設定也行。
(3)Key-value形式的時候,我們就有必要了。
HashPartitioner
val resultRDD = testRDD.reduceByKey(new HashPartitioner(2),(x:Int,y:Int) => x+ y) //如果不設定預設也是HashPartitoiner,分割槽數跟spark.default.parallelism一樣 println(resultRDD.partitioner) println("resultRDD"+resultRDD.getNumPartitions)
RangePartitioner
val resultRDD = testRDD.reduceByKey((x:Int,y:Int) => x+ y) val newresultRDD=resultRDD.partitionBy(new RangePartitioner[String,Int](3,resultRDD)) println(newresultRDD.partitioner) println("newresultRDD"+newresultRDD.getNumPartitions)
注:按照範圍進行分割槽的,如果是字串,那麼就按字典順序的範圍劃分。如果是數字,就按資料自的範圍劃分。
自定義分割槽
需要實現2個方法
class MyPartitoiner(val numParts:Int) extends Partitioner{ override def numPartitions: Int = numParts override def getPartition(key: Any): Int = { val domain = new URL(key.toString).getHost val code = (domain.hashCode % numParts) if (code < 0) { code + numParts } else { code } } } object DomainNamePartitioner { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("word count").setMaster("local") val sc = new SparkContext(conf) val urlRDD = sc.makeRDD(Seq(("http://baidu.com/test", 2), ("http://baidu.com/index", 2), ("http://ali.com", 3), ("http://baidu.com/tmmmm", 4), ("http://baidu.com/test", 4))) //Array[Array[(String, Int)]] // = Array(Array(), // Array((http://baidu.com/index,2), (http://baidu.com/tmmmm,4), // (http://baidu.com/test,4), (http://baidu.com/test,2), (http://ali.com,3))) val hashPartitionedRDD = urlRDD.partitionBy(new HashPartitioner(2)) hashPartitionedRDD.glom().collect() //使用spark-shell --jar的方式將這個partitioner所在的jar包引進去,然後測試下面的程式碼 // spark-shell --master spark://master:7077 --jars spark-rdd-1.0-SNAPSHOT.jar val partitionedRDD = urlRDD.partitionBy(new MyPartitoiner(2)) val array = partitionedRDD.glom().collect() } }
轉自:https://www.cnblogs.com/frankdeng/p/9301688.html