1. 程式人生 > >【轉載】Apache Spark Jobs 性能調優(一)

【轉載】Apache Spark Jobs 性能調優(一)

功能 dso brush 數據結構 nsf 必須 char dal 開始

當你開始編寫 Apache Spark 代碼或者瀏覽公開的 API 的時候,你會遇到各種各樣術語,比如 transformation,action,RDD 等等。 了解到這些是編寫 Spark 代碼的基礎。 同樣,當你任務開始失敗或者你需要透過web界面去了解自己的應用為何如此費時的時候,你需要去了解一些新的名詞: job, stage, task。對於這些新術語的理解有助於編寫良好 Spark 代碼。這裏的良好主要指更快的 Spark 程序。對於 Spark 底層的執行模型的了解對於寫出效率更高的 Spark 程序非常有幫助。


Spark 是如何執行程序的


一個 Spark 應用包括一個 driver 進程和若幹個分布在集群的各個節點上的 executor 進程。


driver 主要負責調度一些高層次的任務流(flow of work)。exectuor 負責執行這些任務,這些任務以 task 的形式存在, 同時存儲用戶設置需要caching的數據。 task 和所有的 executor 的生命周期為整個程序的運行過程(如果使用了dynamic resource allocation 時可能不是這樣的)。如何調度這些進程是通過集群管理應用完成的(比如YARN,Mesos,Spark Standalone),但是任何一個 Spark 程序都會包含一個 driver 和多個 executor 進程。

技術分享



在執行層次結構的最上方是一系列 Job。調用一個Spark內部的 action 會產生一個 Spark job 來完成它。 為了確定這些job實際的內容,Spark 檢查 RDD 的DAG再計算出執行 plan 。這個 plan 以最遠端的 RDD 為起點(最遠端指的是對外沒有依賴的 RDD 或者 數據已經緩存下來的 RDD),產生結果 RDD 的 action 為結束 。

執行的 plan 由一系列 stage 組成,stage 是 job 的 transformation 的組合,stage 對應於一系列 task, task 指的對於不同的數據集執行的相同代碼。每個 stage 包含不需要 shuffle 數據的 transformation 的序列。


什麽決定數據是否需要 shuffle ?RDD 包含固定數目的 partition, 每個 partiton 包含若幹的 record。對於那些通過 narrow tansformation(比如map

filter)返回的 RDD,一個 partition 中的 record 只需要從父 RDD 對應的 partition 中的 record 計算得到。每個對象只依賴於父 RDD 的一個對象。有些操作(比如coalesce)可能導致一個 task 處理多個輸入 partition ,但是這種 transformation 仍然被認為是 narrow 的,因為用於計算的多個輸入 record 始終是來自有限個數的 partition。


然而 Spark 也支持需要 wide 依賴的 transformation,比如 groupByKeyreduceByKey。在這種依賴中,計算得到一個 partition 中的數據需要從父 RDD 中的多個 partition 中讀取數據。所有擁有相同 key 的元組最終會被聚合到同一個 partition 中,被同一個 stage 處理。為了完成這種操作, Spark需要對數據進行 shuffle,意味著數據需要在集群內傳遞,最終生成由新的 partition 集合組成的新的 stage。


舉例,以下的代碼中,只有一個 action 以及 從一個文本串下來的一系列 RDD, 這些代碼就只有一個 stage,因為沒有哪個操作需要從不同的 partition 裏面讀取數據。

<span style="font-family:Microsoft YaHei;font-size:14px;color:#333333;">sc.textFile("someFile.txt").
  map(mapFunc).
  flatMap(flatMapFunc).
  filter(filterFunc).
  count()</span>

  

跟上面的代碼不同,下面一段代碼需要統計總共出現超過1000次的字母,

<span style="font-family:Microsoft YaHei;font-size:14px;color:#333333;">val tokenized = sc.textFile(args(0)).flatMap(_.split(‘ ‘))
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
val filtered = wordCounts.filter(_._2 >= 1000)
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).
  reduceByKey(_ + _)
charCounts.collect()</span>

  


這段代碼可以分成三個 stage。recudeByKey 操作是各 stage 之間的分界,因為計算 recudeByKey 的輸出需要按照可以重新分配 partition。


這裏還有一個更加復雜的 transfromation 圖,包含一個有多路依賴的 join transformation。

技術分享

粉紅色的框框展示了運行時使用的 stage 圖 。技術分享 運行到每個 stage 的邊界時,數據在父 stage 中按照 task 寫到磁盤上,而在子 stage 中通過網絡按照 task 去讀取數據。這些操作會導致很重的網絡以及磁盤的I/O,所以 stage 的邊界是非常占資源的,在編寫 Spark 程序的時候需要盡量避免的。父 stage 中 partition 個數與子 stage 的 partition 個數可能不同,所以那些產生 stage 邊界的 transformation 常常需要接受一個 numPartition 的參數來覺得子 stage 中的數據將被切分為多少個 partition。


正如在調試 MapReduce 是選擇 reducor 的個數是一項非常重要的參數,調整在 stage 邊屆時的 partition 個數經常可以很大程度上影響程序的執行效率。我們會在後面的章節中討論如何調整這些值。

選擇正確的 Operator


當需要使用 Spark 完成某項功能時,程序員需要從不同的 action 和 transformation 中選擇不同的方案以獲得相同的結果。但是不同的方案,最後執行的效率可能有雲泥之別。回避常見的陷阱選擇正確的方案可以使得最後的表現有巨大的不同。一些規則和深入的理解可以幫助你做出更好的選擇。


在最新的 Spark5097 文檔中開始穩定 SchemaRDD(也就是 Spark 1.3 開始支持的DataFrame),這將為使用 Spark 核心API的程序員打開 Spark的 Catalyst optimizer,允許 Spark 在使用 Operator 時做出更加高級的選擇。當 SchemaRDD 穩定之後,某些決定將不需要用戶去考慮了。


選擇 Operator 方案的主要目標是減少 shuffle 的次數以及被 shuffle 的文件的大小。因為 shuffle 是最耗資源的操作,所以有 shuffle 的數據都需要寫到磁盤並且通過網絡傳遞。repartitionjoincogroup,以及任何*By 或者 *ByKey 的 transformation 都需要 shuffle 數據。不是所有這些 Operator 都是平等的,但是有些常見的性能陷阱是需要註意的。

  • 當進行聯合的規約操作時,避免使用 groupByKey。舉個例子,rdd.groupByKey().mapValues(_ .sum) 與 rdd.reduceByKey(_ + _) 執行的結果是一樣的,但是前者需要把全部的數據通過網絡傳遞一遍,而後者只需要根據每個 key 局部的 partition 累積結果,在 shuffle 的之後把局部的累積值相加後得到結果。
  • 當輸入和輸入的類型不一致時,避免使用 reduceByKey。舉個例子,我們需要實現為每一個key查找所有不相同的 string。一個方法是利用map把每個元素的轉換成一個 Set,再使用 reduceByKey 將這些 Set 合並起來
<span style="font-family:Microsoft YaHei;font-size:14px;color:#333333;">rdd.map(kv => (kv._1, new Set[String]() + kv._2))
    .reduceByKey(_ ++ _)</span>

  


這段代碼生成了無數的非必須的對象,因為每個需要為每個 record 新建一個 Set。這裏使用 aggregateByKey 更加適合,因為這個操作是在map 階段做聚合。
<span style="font-family:Microsoft YaHei;font-size:14px;color:#333333;">val zero = new collection.mutable.Set[String]()
rdd.aggregateByKey(zero)(
    (set, v) => set += v,
    (set1, set2) => set1 ++= set2)</span>

  

  • 避免 flatMap-join-groupBy 的模式。當有兩個已經按照key分組的數據集,你希望將兩個數據集合並,並且保持分組,這種情況可以使用 cogroup。這樣可以避免對group進行打包解包的開銷。

什麽時候不發生 Shuffle


當然了解在哪些 transformation 上不會發生 shuffle 也是非常重要的。當前一個 transformation 已經用相同的 patitioner 把數據分 patition 了,Spark知道如何避免 shuffle。參考一下代碼:
<span style="font-family:Microsoft YaHei;font-size:14px;color:#333333;">rdd1 = someRdd.reduceByKey(...)
rdd2 = someOtherRdd.reduceByKey(...)
rdd3 = rdd1.join(rdd2)</span>

  



因為沒有 partitioner 傳遞給 reduceByKey,所以系統使用默認的 partitioner,所以 rdd1 和 rdd2 都會使用 hash 進行分 partition。代碼中的兩個reduceByKey 會發生兩次 shuffle 。如果 RDD 包含相同個數的 partition, join 的時候將不會發生額外的 shuffle。因為這裏的 RDD 使用相同的 hash 方式進行 partition,所以全部 RDD 中同一個 partition 中的 key的集合都是相同的。因此,rdd3中一個 partiton 的輸出只依賴rdd2和rdd1的同一個對應的 partition,所以第三次 shuffle 是不必要的。


舉個例子說,當 someRdd 有4個 partition, someOtherRdd 有兩個 partition,兩個 reduceByKey 都使用3個 partiton,所有的 task 會按照如下的方式執行: 技術分享 如果 rdd1 和 rdd2 在reduceByKey 時使用不同的 partitioner 或者使用相同的 partitioner 但是 partition 的個數不同的情況,那麽只用一個 RDD (partiton 數更少的那個)需要重新 shuffle。


相同的 tansformation,相同的輸入,不同的 partition 個數: 技術分享 當兩個數據集需要join 的時候,避免 shuffle 的一個方法是使用broadcast variables。如果一個數據集小到能夠塞進一個 executor 的內存中,那麽它就可以在 driver 中寫入到一個 hash table中,然後 broadcast 到所有的 executor 中。然後 map transformation 可以引用這個 hash table 作查詢。

什麽情況下 Shuffle 越多越好


盡可能減少 shuffle 的準則也有例外的場合。如果額外的 shuffle 能夠增加並發那麽這也能夠提高性能。比如當你的數據保存在幾個沒有切分過的大文件中時,那麽使用 InputFormat 產生分 partition 可能會導致每個 partiton 中聚集了大量的 record,如果 partition 不夠,導致沒有啟動足夠的並發。在這種情況下,我們需要在數據載入之後使用repartiton (會導致shuffle)提高 partiton 的個數,這樣能夠充分使用集群的CPU。


另外一種例外情況是在使用 recude 或者 aggregate action 聚集數據到 driver 時,如果數據把很多 partititon 個數的數據,單進程執行的 driver merge 所有 partition 的輸出時很容易成為計算的瓶頸。為了緩解 driver 的計算壓力,可以使用reduceByKey 或者 aggregateByKey 執行分布式的 aggregate 操作把數據分布到更少的 partition 上。每個 partition 中的數據並行的進行 merge,再把 merge 的結果發個 driver 以進行最後一輪 aggregation。查看treeReducetreeAggregate 查看如何這麽使用的例子。


這個技巧在已經按照 Key 聚集的數據集上格外有效,比如當一個應用是需要統計一個語料庫中每個單詞出現的次數,並且把結果輸出到一個map中。一個實現的方式是使用aggregation,在每個 partition 中本地計算一個 map,然後在 driver 中把各個 partition 中計算的 map merge 起來。另一種方式是通過aggregateByKey 把 merge 的操作分布到各個 partiton 中計算,然後在簡單地通過 collectAsMap 把結果輸出到 driver 中。


二次排序


還有一個重要的技能是了解接口 repartitionAndSortWithinPartitions transformation。這是一個聽起來很晦澀的 transformation,但是卻能涵蓋各種奇怪情況下的排序,這個 transformation 把排序推遲到 shuffle 操作中,這使大量的數據有效的輸出,排序操作可以和其他操作合並。


舉例說,Apache Hive on Spark 在join的實現中,使用了這個 transformation 。而且這個操作在secondary sort 模式中扮演著至關重要的角色。secondary sort 模式是指用戶期望數據按照 key 分組,並且希望按照特定的順序遍歷 value。使用repartitionAndSortWithinPartitions 再加上一部分用戶的額外的工作可以實現 secondary sort。


結論


現在你應該對完成一個高效的 Spark 程序所需的所有基本要素有了很好的了解。在 Part II 中將詳細介紹資源調用、並發以及數據結構相關的調試。 【轉載自:http://blog.csdn.net/u012102306/article/details/51700491】

【轉載】Apache Spark Jobs 性能調優(一)