1. 程式人生 > >《Spark 官方文件》Spark作業排程

《Spark 官方文件》Spark作業排程

概覽

Spark有好幾種計算資源排程的方式。首先,回憶一下叢集模式概覽(cluster mode overview)中每個Spark應用(包含一個SparkContext例項)中運行了一些其獨佔的執行器(executor)程序。叢集管理器提供了Spark應用之間的資源排程(scheduling across applications)。其次,在各個Spark應用內部,各個執行緒可能併發地通過action運算元提交多個Spark作業(job)。如果你的應用服務於網路請求,那這種情況是很常見的。在Spark應用內部(對應同一個SparkContext)各個作業之間,Spark預設FIFO排程,同時也可以支援公平排程(

fair scheduler)。

Spark應用之間的資源排程

如果在叢集上執行,每個Spark應用都會獲得一批獨佔的執行器JVM,來執行其任務並存儲資料。如果有多個使用者共享叢集,那麼會有很多資源分配相關的選項,如何設定還取決於具體的叢集管理器。

對Spark所支援的各個叢集管理器而言,最簡單的資源分配,就是對資源靜態劃分。這種方式就意味著,每個Spark應用都是設定一個最大可用資源總量,並且該應用在整個生命週期內都會佔住這些資源。這種方式在Spark獨立部署(standalone)和YARN排程,以及Mesos粗粒度模式(coarse-grained Mesos mode)下都可用。

  • Standalone mode:
     預設情況下,Spark應用在獨立部署的叢集中都會以FIFO(first-in-first-out)模式順序提交執行,並且每個Spark應用都會佔用叢集中所有可用節點。不過你可以通過設定spark.cores.max或者spark.deploy.defaultCores 來限制單個應用所佔用的節點個數。最後,除了可以控制對CPU的使用數量之外,還可以通過 spark.executor.memory 來控制各個應用的記憶體佔用量。
  • Mesos: 在Mesos中要使用靜態劃分的話,需要將 spark.mesos.coarse 設為true,同樣,你也需要設定 spark.cores.max 來控制各個應用的CPU總數,以及 spark.executor.memory 來控制各個應用的記憶體佔用。
  • YARN: 在YARN中需要使用 –num-executors 選項來控制Spark應用在叢集中分配的執行器的個數,對於單個執行器(executor)所佔用的資源,可以使用 –executor-memory 和 –executor-cores 來控制。

Mesos上另一種可用的方式是動態共享CPU。在這種模式下,每個Spark應用的記憶體佔用仍然是固定且獨佔的(仍由spark.executor.memory決定),但是如果該Spark應用沒有在某個機器上執行任務的話,那麼其他應用可以佔用該機器上的CPU。這種模式對叢集中有大量不是很活躍應用的場景非常有效,例如:叢集中有很多不同使用者的Spark shell session。但這種模式不適用於低延遲的場景,因為當Spark應用需要使用CPU的時候,可能需要等待一段時間才能取得CPU的使用權。要使用這種模式,只需要在 mesos:// URL 上設定 spark.mesos.coarse 屬性為flase即可。

注意,目前還沒有任何一種資源分配模式能支援跨Spark應用的記憶體共享。如果你需要跨Spark應用共享記憶體,我們建議你用單獨用一個server來計算和保留同一個RDD查詢的結果,這樣就能在多個請求(request)之間共享同一個RDD的資料。在未來的釋出版本中,一些記憶體儲存系統(如:Tachyon)或許能夠提供這種跨Spark應用共享RDD的能力。

動態資源分配

Spark還提供了一種基於負載來動態調節Spark應用資源佔用的機制。這意味著,你的應用會在資源空閒的時候將其釋放給叢集,而後續用到的時候再重新申請。這一特性在多個應用共享Spark叢集資源的情況下特別有用。

注意,這個特性預設是禁用的,但是在所有的粗粒度叢集管理器上都是可用的,如:獨立部署模式(standalone mode),YARN模式(YARN mode)以及Mesos粗粒度模式(Mesos coarse-grained mode)。

配置和部署

要使用這一特性有兩個前提條件。首先,你的應用必須設定 spark.dynamicAllocation.enabled 為 true。其次,你必須在每個節點上啟動一個外部混洗服務(external shuffle service),並在你的應用中將 spark.shuffle.service.enabled 設為true。外部混洗服務的目的就是為了在刪除執行器的時候,能夠保留其輸出的混洗檔案(本文後續有更詳細的描述)。啟用外部混洗的方式在各個叢集管理器上各不相同:

在Spark獨立部署的叢集中,你只需要在worker啟動前設定 spark.shuffle.server.enabled 為true即可。

在Mesos粗粒度模式下,你需要在各個節點上執行 ${SPARK_HOME}/sbin/start-mesos-shuffle-service.sh 並設定 spark.shuffle.service.enabled 為true 即可。例如,你可以用Marathon來啟用這一功能。

在YARN模式下,混洗服務需要按以下步驟在各個NodeManager上啟動:

  1. 首先按照YARN profile 構建Spark。如果你已經有打好包的Spark,可以忽略這一步。
  2. 找到 spark-<version>-yarn-shuffle.jar。如果你是自定義編譯,其位置應該在 ${SPARK_HOME}/network/yarn/target/scala-<version>,否則應該可以在 lib 目錄下找到這個jar包。
  3. 將該jar包新增到NodeManager的classpath路徑中。
  4. 配置各個節點上的yarn-site.xml,將 spark_shuffle 新增到 yarn.nodemanager.aux-services 中,然後將 yarn.nodemanager.aux-services.spark_shuffle.class 設為 org.apache.spark.network.yarn.YarnShuffleService,並將 spark.shuffle.service.enabled 設為 true。
  5. 最後重啟各節點上的NodeManager。

所有相關的配置都是可選的,並且都在 spark.dynamicAllocation.* 和 spark.shuffle.service.* 名稱空間下。更詳細請參考:configurations page

資源分配策略

總體上來說,Spark應該在執行器空閒時將其關閉,而在後續要用時再次申請。因為沒有一個固定的方法,可以預測一個執行器在後續是否馬上回被分配去執行任務,或者一個新分配的執行器實際上是空閒的,所以我們需要一些試探性的方法,來決定是否申請或移除一個執行器。

請求策略

一個啟用了動態分配的Spark應用會在有等待任務需要排程的時候,申請額外的執行器。這種情況下,必定意味著已有的執行器已經不足以同時執行所有未完成的任務。

Spark會分輪次來申請執行器。實際的資源申請,會在任務掛起 spark.dynamicAllocation.schedulerBacklogTimeout 秒後首次觸發,其後如果等待佇列中仍有掛起的任務,則每過 spark.dynamicAlloction.sustainedSchedulerBacklogTimeout 秒觸發一次資源申請。另外,每一輪所申請的執行器個數以指數形式增長。例如,一個Spark應用可能在首輪申請1個執行器,後續的輪次申請個數可能是2個、4個、8個… … 。

採用指數級增長策略的原因有兩個:第一,對於任何一個Spark應用如果只是需要多申請少數幾個執行器的話,那麼必須非常謹慎地啟動資源申請,這和TCP慢啟動有些類似;第二,如果一旦Spark應用確實需要申請很多個執行器的話,那麼可以確保其所需的計算資源及時地增長。

移除策略

移除執行器的策略就簡單多了。Spark應用會在某個執行器空閒超過 spark.dynamicAllocation.executorIdleTimeout 秒後將其刪除。在絕大多數情況下,執行器的移除條件和申請條件都是互斥的,也就是說,執行器在有待執行任務掛起時,不應該空閒。

優雅地關閉執行器

非動態分配模式下,執行器可能的退出原因有執行失敗或者相關Spark應用已經退出。不管是那種原因,執行器的所有狀態都已經不再需要,可以丟棄掉。但在動態分配的情形下,執行器有可能在Spark應用執行期間被移除。這時候,如果Spark應用嘗試去訪問該執行器儲存的狀態,就必須重算這一部分資料。因此,Spark需要一種機制,能夠優雅地關閉執行器,同時還保留其狀態資料。

這種需求對於混洗操作尤其重要。混洗過程中,Spark執行器首先將map輸出寫到本地磁碟,同時執行器本身又是一個檔案伺服器,這樣其他執行器就能夠通過該執行器獲得對應的map結果資料。一旦有某些任務執行時間過長,動態分配有可能在混洗結束前移除任務異常的執行器,而這些被移除的執行器對應的資料將會被重新計算,但這些重算其實是不必要的。

要解決這一問題,就需要用到一個外部混洗服務(external shuffle service),該服務在Spark 1.2引入。該服務在每個節點上都會啟動一個不依賴於任何Spark應用或執行器的獨立程序。一旦該服務啟用,Spark執行器不再從各個執行器上獲取shuffle檔案,轉而從這個service獲取。這意味著,任何執行器輸出的混洗狀態資料都可能存留時間比對應的執行器程序還長。

除了混洗檔案之外,執行器也會在磁碟或者記憶體中快取數。一旦執行器被移除,其快取資料將無法訪問。這個問題目前還沒有解決。或許在未來的版本中,可能會採用外部混洗服務類似的方法,將快取資料儲存在堆外儲存中以解決這一問題。

Spark應用內部的資源排程

在指定的Spark應用內部(對應同一SparkContext例項),多個執行緒可能併發地提交Spark作業(job)。在本節中,作業(job)是指,由Spark action運算元(如:collect)觸發的一系列計算任務的集合。Spark排程器是完全執行緒安全的,並且能夠支援Spark應用同時處理多個請求(比如:來自不同使用者的查詢)。

預設,Spark應用內部使用FIFO排程策略。每個作業被劃分為多個階段(stage)(例如:map階段和reduce階段),第一個作業在其啟動後會優先獲取所有的可用資源,然後是第二個作業再申請,再第三個……。如果前面的作業沒有把叢集資源佔滿,則後續的作業可以立即啟動執行,否則,後提交的作業會有明顯的延遲等待。

不過從Spark 0.8開始,Spark也能支援各個作業間的公平(Fair)排程。公平排程時,Spark以輪詢的方式給每個作業分配資源,因此所有的作業獲得的資源大體上是平均分配。這意味著,即使有大作業在執行,小的作業再提交也能立即獲得計算資源而不是等待前面的作業結束,大大減少了延遲時間。這種模式特別適合於多使用者配置。

要啟用公平排程器,只需設定一下 SparkContext中spark.scheduler.mode 屬性為 FAIR即可:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

公平排程資源池

公平排程器還可以支援將作業分組放入資源池(pool),然後給每個資源池配置不同的選項(如:權重)。這樣你就可以給一些比較重要的作業建立一個“高優先順序”資源池,或者你也可以把每個使用者的作業分到一組,這樣一來就是各個使用者平均分享叢集資源,而不是各個作業平分叢集資源。Spark公平排程的實現方式基本都是模仿 Hadoop Fair Scheduler 來實現的。

預設情況下,新提交的作業都會進入到預設資源池中,不過作業對應於哪個資源池,可以在提交作業的執行緒中用SparkContext.setLocalProperty 設定 spark.scheduler.pool 屬性。示例程式碼如下:

// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

一旦設好了區域性屬性,所有該執行緒所提交的作業(即:在該執行緒中呼叫action運算元,如:RDD.save/count/collect 等)都會使用這個資源池。這個設定是以執行緒為單位儲存的,你很容易實現用同一執行緒來提交同一使用者的所有作業到同一個資源池中。同樣,如果需要清除資源池設定,只需在對應執行緒中呼叫如下程式碼:

sc.setLocalProperty("spark.scheduler.pool", null)

資源池預設行為

預設地,各個資源池之間平分整個叢集的資源(包括default資源池),但在資源池內部,預設情況下,作業是FIFO順序執行的。舉例來說,如果你為每個使用者建立了一個資源池,那麼久意味著各個使用者之間共享整個叢集的資源,但每個使用者自己提交的作業是按順序執行的,而不會出現後提交的作業搶佔前面作業的資源。

配置資源池屬性

資源池的屬性需要通過配置檔案來指定。每個資源池都支援以下3個屬性:

  • schedulingMode:可以是FIFO或FAIR,控制資源池內部的作業是如何排程的。
  • weight:控制資源池相對其他資源池,可以分配到資源的比例。預設所有資源池的weight都是1。如果你將某個資源池的weight設為2,那麼該資源池中的資源將是其他池子的2倍。如果將weight設得很高,如1000,可以實現資源池之間的排程優先順序 – 也就是說,weight=1000的資源池總能立即啟動其對應的作業。
  • minShare:除了整體weight之外,每個資源池還能指定一個最小資源分配值(CPU個數),管理員可能會需要這個設定。公平排程器總是會嘗試優先滿足所有活躍(active)資源池的最小資源分配值,然後再根據各個池子的weight來分配剩下的資源。因此,minShare屬效能夠確保每個資源池都能至少獲得一定量的叢集資源。minShare的預設值是0。

資源池屬性是一個XML檔案,可以基於 conf/fairscheduler.xml.template 修改,然後在 SparkConf 的 spark.scheduler.allocation.file 屬性指定檔案路徑:

conf.set("spark.scheduler.allocation.file", "/path/to/file")

資源池XML配置檔案格式如下,其中每個池子對應一個<pool>元素,每個資源池可以有其獨立的配置:

<?xml version="1.0"?>
<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

完整的例子可以參考 conf/fairscheduler.xml.template。注意,沒有在配置檔案中配置的資源池都會使用預設配置(schedulingMode:FIFO,weight:1,minShare:0)。