1. 程式人生 > 程式設計 >Spark記憶體模型介紹及Spark應用記憶體優化踩坑記錄

Spark記憶體模型介紹及Spark應用記憶體優化踩坑記錄

Spark作為一個基於記憶體的分散式計算引擎,其記憶體管理模組在整個系統中扮演著非常重要的角色。理解Spark記憶體管理的基本原理,有助於更好的開發Spark應用程式和進行效能調優。同時,有效率的記憶體使用是Spark應用高效效能表現的關鍵所在,不合理的記憶體使用就會導致Spark效能表現的很糟糕。

在執行Spark的應用程式時,Spark叢集會啟動Driver和Executor兩種JVM程式。
Driver程式主要負責:
(1)建立Spark上下文;
(2)提交Spark作業(Job)並將Job轉化為計算任務(Task)交給Executor計算;
(3)協調各個Executor程式間任務排程。

Executor程式主要負責:
(1)在工作節點上執行具體的計算任務(Task),並角計算結果返回給Driver;
(2)為需要持久化的RDD提供儲存功能。

由於Driver的記憶體管理比較簡單,和一般的JVM程式區別不大,所以本文重點分析Executor的記憶體管理。所以,本文提到的記憶體管理都是指Executor的記憶體管理。

Executor記憶體

Executor程式作為一個JVM程式,其記憶體管理建立在JVM的記憶體管理之上,整個大致包含兩種方式:堆內記憶體和堆外記憶體,大致如下圖1所示:

圖1.Executor記憶體結構

圖1.Executor記憶體結構

堆內記憶體

指的是JVM堆記憶體大小,在Spark應用程式啟動時通過spark.executor.memory引數配置。Executor內執行的併發任務共享JVM堆內記憶體。堆內記憶體大致可以分為如圖2所示以下4個部分(以統一記憶體管理機制為例):

圖2.JVM堆內記憶體結構

圖2.JVM堆內記憶體結構
  1. Storage記憶體:主要用於儲存Spark的cache資料,例如RDD的cache,Broadcast變數,Unroll資料等。需要注意的是,unrolled的資料如果記憶體不夠,會儲存在driver端。
  2. Execution記憶體:用於儲存Spark task執行過程中需要的物件,如Shuffle、Join、Sort、Aggregation等計算過程中的臨時資料。
  3. User記憶體:分配Spark Memory剩餘的記憶體,使用者可以根據需要使用。可以儲存RDD transformations需要的資料結構。
  4. Reserved記憶體:這部分記憶體是預留給系統使用,是固定不變的。在1.6.0預設為300MB(RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024),不過在2.4.4版本中已經看不到這個引數了。

堆外記憶體

這裡Off-heap Memory從概念上可以分為兩個:

Off-heap Memory(*)

對應圖1的Executor JVM內的off-heap memory(*),主要用於JVM自身,字串,NIO Buffer等開銷,可以通過spark.executor.memoryOverhead引數進行配置,大小一般設為executorMemory * 0.10,with minimum of 384。

Off-heap Memory(**)

對應圖1的(**): 為了進一步優化記憶體的使用以及提高Shuffle時的排序的效率,Spark引入了堆外(Off-heap)記憶體,直接在工作節點的系統記憶體中開闢的空間,儲存經過序列化的二進位制資料。Spark可以直接作業系統堆外記憶體,減少了不必要的記憶體開銷,以及頻繁的 GC 掃描和回收,提升了處理效能。堆外記憶體可以被精確地申請和釋放,而且序列化的資料佔用的空間可以被精確計算,所以相比堆內記憶體來說降低了管理的難度,也降低了誤差。

在預設情況下堆外記憶體並不啟用,可通過配置 spark.memory.offHeap.enabled 引數啟用,並由spark.memory.offHeap.size 引數設定堆外空間的大小。除了沒有 other 空間,堆外記憶體與堆內記憶體的劃分方式相同如下圖3所示(以統一記憶體管理機制為例),所有執行中的併發任務共享儲存記憶體和執行記憶體。

圖3.Executor堆外記憶體結構

圖3.Executor堆外記憶體結構

統一記憶體管理機制

Spark 1.6 之後引入的統一記憶體管理機制,與靜態記憶體管理的區別在於儲存記憶體和執行記憶體共享同一塊空間,可以動態佔用對方的空閒區域,如圖 2 和圖 3 所示,其中最重要的優化在於動態佔用機制,其規則如下:

  • 設定基本的Storage記憶體和Execution內區域(spark.storage.storageFraction 引數),該設定確定了雙方各自擁有的空間的範圍
  • 雙方的空間都不足時,則儲存到硬碟;若己方空間不足而對方空餘時,可借用對方的空間;(儲存空間不足是指不足以放下一個完整的 Block)
  • Execution記憶體的空間被對方佔用後,可讓對方將佔用的部分轉存到硬碟如圖4所示,然後"歸還"借用的空間
  • Storage記憶體的空間被對方佔用後,無法讓對方"歸還",多餘的Storage記憶體被轉存到硬碟如圖5所示,因為需要考慮 Shuffle 過程中的很多因素,實現起來較為複雜

圖4.Execution記憶體的空間被佔用

圖5.Storage記憶體的空間被佔用

Spark應用記憶體優化

case 1

某線上Spark應用,executor memory配置如下:
spark.executor.memory = 124G
spark.executor.memoryOverhead = 10G
結果在一些大資料量情況下,記憶體超出報錯如下圖6:

圖6.Spark應用OOM錯誤

這個問題從報錯的提示就比較好理解,提高spark.yarn.executor.memoryOverhead的記憶體就ok了,最終在我們這個應用上提高至20G成功跑過了任務。但是筆者調研發現,在有些別人Spark應用下,不管怎麼提升這個值都沒用。附上Stack Overflow討論 exceeding-memory-limits

case 2

同上某線上Spark應用(Spark版本1.5.1),執行時Storage記憶體如下圖7所示。 經過程式碼走讀發現,該業務實現中,對於同一份血統(Lineage)的RDD資料,每次完成一次Action計算後都會cache計算結果的RDD資料。就導致了同一份血統的RDD資料多次持續遞進cache到記憶體的情況。由於Spark 1.6 之後才引入的統一記憶體管理機制,在某些資料量較大的任務下,就會導致記憶體溢位。 解決方法是在每一次cache計算結果的RDD資料後就手動釋放(unpresis)之前同血統的RDD資料。優化後,執行時Storage記憶體如下圖8所示。可以看到Storage記憶體佔用降低了近45%.

圖7.unpresis優化前storage記憶體佔用

圖8.unpresis優化後storage記憶體佔用

以上為筆者總結的Spark記憶體模型介紹以及個人工作中的記憶體調優經驗,由於個人水平有限,如果有錯誤的地方,還望讀者指正。

參考資料

(1)www.ibm.com/developerwo…
(2)stackoverflow.com/questions/4…
(3)spark.apache.org/docs/latest…
(4)www.jianshu.com/p/10e91ace3…
(5)www.slideshare.net/databricks/…