1. 程式人生 > >Spark 動態資源分配(Dynamic Resource Allocation) 解析

Spark 動態資源分配(Dynamic Resource Allocation) 解析

Spark 預設採用的是資源預分配的方式。這其實也和按需做資源分配的理念是有衝突的。這篇文章會詳細介紹Spark 動態資源分配原理。

前言

最近在使用Spark Streaming程式時,發現如下幾個問題:

  1. 高峰和低峰Spark Streaming每個週期要處理的資料量相差三倍以上,預分配資源會導致低峰的時候資源的大量浪費。
  2. Spark Streaming 跑的數量多了後,資源佔用相當可觀。

所以便有了要開發一套針對Spark Streaming 動態資源調整的想法。我在文章最後一個章節給出了一個可能的設計方案。不過要做這件事情,首先我們需要了解現有的Spark 已經實現的 Dynamic Resource Allocation 機制,以及為什麼它無法滿足現有的需求。

入口

在SparkContext 中可以看到這一行:


_executorAllocationManager =
      if (dynamicAllocationEnabled) {
        Some(new ExecutorAllocationManager(this, listenerBus, _conf))
      } else {
        None
      }

通過spark.dynamicAllocation.enabled引數開啟後就會啟動ExecutorAllocationManager。

這裡有我第一個吐槽的點,這麼直接new出來,好歹也做個配置,方便第三方開發個新的元件可以整合進去。但是Spark很多地方都是這麼搞的,完全沒有原來Java社群的風格。

動態調整資源面臨的問題

我們先看看,動態資源調整需要解決哪幾個問題:

  1. Cache問題。如果需要移除的Executor含有RDD cache該如何辦?
  2. Shuffle問題。 如果需要移除的Executor包含了Shuffle Write先關資料該怎麼辦?
  3. 新增和刪除之後都需要告知DAGSchedule進行相關資訊更新。

Cache去掉了重算即可。為了防止資料抖動,預設包含有Cache的Executor是不會被刪除的,因為預設的Idle時間設定的非常大:

private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(  
"spark.dynamicAllocation.cachedExecutorIdleTimeout"
, s"${Integer.MAX_VALUE}s")

你可以自己設定從而去掉這個限制。

而對於Shuffle,則需要和Yarn整合,需要配置yarn.nodemanager.aux-services。具體配置方式,大家可以Google。這樣Spark Executor就不用儲存Shuffle狀態了。

觸發條件

新增Worker的觸發條件是:

  • 有Stage正在執行,並且預估需要的Executors > 現有的

刪除Woker的觸發條件是:

  • 一定時間內(預設60s)沒有task執行的Executor

我們看到觸發條件還是比較簡單的。這種簡單就意味著使用者需要根據實際場景,調整各個時間引數,比如到底多久沒有執行task的Executor才需要刪除。

預設檢測時間是100ms:

private val intervalMillis: Long = 100

如何實現Container的新增和釋放

只有ApplicationMaster才能夠向Yarn釋出這些動作。而真正的中控是org.apache.spark.ExecutorAllocationManager,所以他們之間需要建立一個通訊機制。對應的方式是在ApplicationMaster有一個private class AMEndpoint(類,比如刪除釋放容器的動作在裡就有:

  case KillExecutors(executorIds) =>
        logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
        Option(allocator) match {
          case Some(a) => executorIds.foreach(a.killExecutor)
          case None => logWarning("Container allocator is not ready to kill executors yet.")
        }
        context.reply(true)

ExecutorAllocationManager則是引用YarnSchedulerBackend例項,該例項持有ApplicationMaster的 RPC引用

private var amEndpoint: Option[RpcEndpointRef] 

如何獲取排程資訊

要觸發上面描述的操作,就需要任務的排程資訊。這個是通過ExecutorAllocationListener extends SparkListener來完成的。具體是在 ExecutorAllocationMaster的start函式裡,會將該Listener例項新增到SparkContext裡的listenerBus裡,從而實現對DAGSchecude等模組的監聽。機制可以參看這篇文章 Spark ListenerBus 和 MetricsSystem 體系分析

根據上面的分析,我們至少要知道如下三個資訊:

  1. Executor上是否為空,如果為空,就可以標記為Idle.只要超過一定的時間,就可以刪除掉這個Executor.
  2. 正在跑的Task有多少
  3. 等待排程的Task有多少

這裡是以Stage為區分的。分別以三個變數來表示:

private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]

名字已經很清楚了。值得說的是stageIdToTaskIndices,其實就是stageId 對應的正在執行的task id 集合。

那麼怎麼計算出等待排程的task數量呢?計算方法如下:

stageIdToNumTasks(stageId) - stageIdToTaskIndices(stageId).size

這些都是動態更新變化的,因為有了監聽器,所以任務那邊有啥變化,這邊都會得到通知。

定時掃描器

有了上面的鋪墊,我們現在進入核心方法:

private def schedule(): Unit = synchronized {
    val now = clock.getTimeMillis

    updateAndSyncNumExecutorsTarget(now)

    removeTimes.retain { case (executorId, expireTime) =>
      val expired = now >= expireTime
      if (expired) {
        initializing = false
        removeExecutor(executorId)
      }
      !expired
    }
  }

該方法會每隔100ms被排程一次。你可以理解為一個監控執行緒。

Executor判定為空閒的機制

只要有一個task結束,就會判定有哪些Executor已經沒有任務了。然後會被加入待移除列表。在放到removeTimes的時候,會把當前時間now + executorIdleTimeoutS * 1000 作為時間戳儲存起來。當排程程序掃描這個到Executor時,會判定時間是不是到了,到了的話就執行實際的remove動作。在這個期間,一旦有task再啟動,並且正好執行在這個Executor上,則又會從removeTimes列表中被移除。 那麼這個Executor就不會被真實的刪除了。

Executor 需要增加的情況

首先,系統會根據下面的公式計算出實際需要的Executors數目:

private def maxNumExecutorsNeeded(): Int = {
    val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
    (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
  }

接著每個計算週期到了之後,會和當前已經有的Executors數:numExecutorsTarget 進行比較。

  1. 如果發現 maxNumExecutorsNeeded < numExecutorsTarget 則會發出取消還有沒有執行的Container申請。並且重置每次申請的容器數為1,也就是numExecutorsToAdd=1

  2. 否則如果發現當前時間now >= addTime(addTime 每次會增加一個sustainedSchedulerBacklogTimeoutS ,避免申請容器過於頻繁),則會進行新容器的申請,如果是第一次,則增加一個(numExecutorsToAdd),如果是第二次則增加2個以此按倍數類推。直到maxNumExecutorsNeeded <= numExecutorsTarget ,然後就會重置numExecutorsToAdd。

所以我們會發現,我們並不是一次性就申請足夠的資源,而是每隔sustainedSchedulerBacklogTimeoutS次時間,按[1,2,4,8]這種節奏去申請資源的。因為在某個sustainedSchedulerBacklogTimeoutS期間,可能已經有很多工完成了,其實不需要那麼多資源了。而按倍數上升的原因是,防止為了申請到足夠的資源時間花費過長。這是一種權衡。

DRA評價

我們發現,DRA(Dynamic Resource Allocation)涉及到的點還是很多的,雖然邏輯比較簡單,但是和任務排程密切相關,是一個非常動態的過程。這個設計本身也是面向一個通用的排程方式。

我個人建議如果採用了DRA,可以注意如下幾點:

  1. 設定一個合理的minExecutors-maxExecutors值
  2. 將Executor對應的cpuCore 最好設定為<=3 ,避免Executor數目下降時,等不及新申請到資源,已有的Executor就因為任務過重而導致叢集掛掉。
  3. 如果程式中有shuffle,例如(reduce*,groupBy*),建議設定一個合理的並行數,避免殺掉過多的Executors。
  4. 對於每個Stage持續時間很短的應用,其實不適合這套機制。這樣會頻繁增加和殺掉Executors,造成系統顛簸。而Yarn對資源的申請處理速度並不快。

Spark Streaming該使用什麼機制動態調整資源

現有的DRA機制其實適合長時的批處理過程中,每個Stage需要的資源量不一樣,並且耗時都比較長。Spark Streaming 可以理解為迴圈的微批處理。而DRA是在每次微批處理起作用,可能還沒等DRA反應過來,這個週期就已經過了。

Spark Streaming需要一個從全域性一天24小時來考慮。每個排程週期的processing time可能更適合作為增減Executors的標準。同時如果發生delay的話,則可以擴大資源申請的速度。並且,因為是週期性的,釋放和新增動作只會發生在一個新的週期的開始,所以他並不會面臨現有 DRA的問題,譬如需要通過額外的方式儲存Shuffle 狀態等。 所以實現起來更加容易。我們可能需要同時監聽StreamingContext的一些資訊。

具體而言:

每個週期檢查上個週期的處理時間 ,設為 preProcessingTime,週期為duration, 一般而言,我們的Spark Streaming程式都會讓preProcessingTime < duration。否則會發生delay。

如果 preProcessingTime > 0.8 * duration,則一次性將資源申請到maxExecutors。

如果preProcessingTime < duration,則應該刪除的Worker為

    removeExecutorNum =  currentExecutors * ((duration -preProcessingTime)/duration - 0.2)

其中0.2 為預留的worker數。如果removeExecutorNum如果<=0 則不進行任何操作。

假設duration =10s, preProcessingTime= 5s, currentExecutors=100,則我們理論上認為 只要保留50%的資源即可。
但是為了防止延時,我們其實額外保留一些20%資源。也就意味著我們刪除30個Executor。 我們並不會一次性將資源都釋放掉。假設我們增加一個新的引數spark.streaming.release.num.duration=5,這個引數意味著我們需要花費5個週期釋放掉這30個Executor的資源。也就是當前這個週期,我們要釋放掉 6個Executor。

接著到下一個週期,重複上面的計算。 直到計算結果 <=0 為止。

相關推薦

Spark 動態資源分配(Dynamic Resource Allocation) 解析

Spark 預設採用的是資源預分配的方式。這其實也和按需做資源分配的理念是有衝突的。這篇文章會詳細介紹Spark 動態資源分配原理。 前言 最近在使用Spark Streaming程式時,發現如下幾個問題: 高峰和低峰Spark Stream

利用動態資源分配優化Spark應用資源利用率

背景 在某地市開展專案的時候,發現數據採集,資料探索,預處理,資料統計,訓練預測都需要很多資源,現場資源不夠用。 目前該專案的資源3臺舊的伺服器,每臺的資源 記憶體為128G,cores 為24 (core可暫時忽略,以下僅考慮記憶體即可) 。 案例分析 我們先對任務分別分析,然後分類。 資料採集基於DC,接

Spark如何進行動態資源分配

一、操作場景 對於Spark應用來說,資源是影響Spark應用執行效率的一個重要因素。當一個長期執行的服務,若分配給它多個Executor,可是卻沒有任何任務分配給它,而此時有其他的應用卻資源緊張,這就造成了很大的資源浪費和資源不合理的排程。 動態資源排程就是為了解決這種場景,根據當前應用任務的負載情況,實時

一種服務可持續的網路功能延服務功能鏈遷移的動態資源分配方法

寫在前面,本文為 Autonomic resource arbitration and service-continuable network function migration along service function chain 學習筆記,為對NFV感

spark提交任務以及資源分配問題

使用spark-submit命令提交Spark應用(注意引數的順序) spark-submit --master spark://hadoop01:7077 --class cn.edu360.spa

藍橋杯 演算法訓練 ALGO-116 最大的算式 動態規劃 資源分配型別(最大乘積)

演算法訓練 最大的算式 時間限制:1.0s 記憶體限制:256.0MB 問題描述   題目很簡單,給出N個數字,不改變它們的相對位置,在中間加入K個乘號和N-K-1個加號,(括號隨便加)使最終結果儘量大。因為乘號和加號一共就是N-1個了,所以恰好每兩個相鄰數字之間都有一個符號。例如:

spark中的動態executor分配

動態分配executor的例項初始化部分 如果spark.executor.instances配置項設定為0或者沒有設定,這個預設情況下是一個未設定的值,yarn的執行模式時,這個配置通過--num-executors來得到. 同時spark.dynamicAlloc

Spark Streaming資源動態申請和動態控制消費速率原理剖析

為什麼需要動態? a) Spark預設情況下粗粒度的,先分配好資源再計算。對於Spark Streaming而言有高峰值和低峰值,但是他們需要的資源是不一樣的,如果按照高峰值的角度的話,就會有大量的

資源分配問題(動態規劃)

//問題描述:資源分配問題   //某廠根據計劃安排,擬將n臺相同的裝置分配給m個車間,各車間獲得這種裝置後,可以為國家提供盈利Ci j(i臺裝置提供給j號車間將得到的利潤,1≤i≤n,1≤j≤m) 。 //問如何分配,才使國家得到最大的盈利?其中Cij為(0,1000)

Spark Master 如何分配叢集資源

         本文以Spark 1.6 原始碼為例,解讀Spark Master 如何分配叢集資源。每次Master receive到Worker傳送Register worker 訊息請求、Client 傳送Register driver 請求、和 Register

【演算法筆記】資源分配動態規劃

1.HLOJ#411機器分配 要求資源分配的最大值,我們可以用二維陣列f[i][j]來表示前i個公司得到j臺機器後所得到的最大盈利值。 方程是: f[i][j]=max(f[i][j],f[i-1

YARN任務監控介面Aggregate Resource Allocation指標解析

在YARN的原生任務監控介面中,我們經常能看到Aggregate Resource Allocation這個指標(圖中高亮選中部分

2-3-配置DHCP服務器實現動態地址分配

客戶端 -name sci oom 動態分配 工作站 request請求 負責 evel 學習一個服務的過程: 1、 此服務的概述:名字,功能,特點,端口號 2、 安裝 3、 配置文件的位置 4、 服務啟動關閉腳本,查看端口 5、 此服務的使用方法 6、 修

yarn架構——本質上是在做解耦 將資源分配和應用程序狀態監控兩個功能職責分離為RM和AM

沒有 占用 業界 imageview 技術分享 其他 而是 基本 mapreduce Hadoop YARN架構解讀 原Mapreduce架構 原理架構圖如下: 圖 1.Hadoop 原 MapReduce 架構 原 MapReduce 程序的流程:首先用戶程

深入淺出Mesos(四):Mesos的資源分配

http software hrp 分享 例如 自定義模塊 工作原理 ces 根據 http://www.infoq.com/cn/articles/analyse-mesos-part-04 【編者按】Mesos是Apache下的開源分布式資源管理框架,它被稱為是分布式系

CDH組件目錄主機資源分配端口

postgres 程序代碼 -c lib 程序 agent gre etc ima 目錄:/var/log/cloudera-scm-installer : 安裝日誌目錄。/var/log/* : 相關日誌文件(相關服務的及CM的)。/usr/share/cmf/ : 程序

5.Resource註解解析

autowire ext 類型 名稱 配置文件 XML 使用場景 文件 cat Resource有兩種使用場景 1.Resource 當Resource後面沒帶參數的時候是根據它所註釋的屬性名稱到applicationContext.xml文件中查找是否有bean的id與之

linux進程cpu資源分配命令nice,renice,taskset

gpo comm local 簡寫 bsp launch 名詞 cif ffi 進程cpu資源分配就是指進程的優先權(priority)。優先權高的進程有優先執行權利。配置進程優先權對多任務環境的linux很有用,可以改善系統性能。還可以把進程運行到指定的CPU上,這樣一來

Spark篇】---Spark資源調度源碼分析與應用

部分 app post 類名 inf master 執行過程 efault spark 一、前述 Spark中資源調度是一個非常核心的模塊,尤其對於我們提交參數來說,需要具體到某些配置,所以提交配置的參數於源碼一一對應,掌握此節對於Spark在任務執行過程中的資源分配會更上

動態規劃(dynamic programming)

program 選擇 因此 移動 開始 解決 特征 尋找 ima 1、動態規劃是通過組合字問題的解而解決整個問題的。 2、它與分治法的區別:     分治法是將問題分解為一些獨立的子問題,遞歸的求解各個子問題,然後合並子問題的解而得到源問題的解。     而動態規劃適合用於