JStorm與Storm源碼分析(三)--Scheduler,調度器
阿新 • • 發佈:2017-07-28
系統 負責 bad 二維碼 sting storm return prepare end
Scheduler作為Storm的調度器,負責為Topology分配可用資源。
Storm提供了IScheduler接口,用戶可以通過實現該接口來自定義Scheduler。
其定義如下:
public interface IScheduler { //接收當前Nimbus的Storm配置作為參數,進行一些初始化工作 void prepare(Map conf); /** * 真正進行任務分配的方法,在Nimbus進行任務分配的時候會調用該方法. * 參數為topologies、cluster:前者含有當前集群中所有Topology的靜態信息, * cluster包含了Topology的運行態信息,比如用戶自定義調度邏輯時所需要的所有資源、 * Supervisor信息、當前可用的所有slot * 以及任物分配情況等,根據topologies和cluster信息,就可以進行調度分配任務了 */ void schedule(Topologies topologies, Cluster cluster); }
真正選擇哪個調度器來對Topology進行分配的方法是mk-assignments。
mk-assignments方法定義與解釋如下:
;;參數:stormConf和接口INimbus的實現類實例 (defn mk-scheduler [conf inimbus] ;;調用inimbus中的getForcedScheduler方法,並將返回值賦給臨時變量forced-scheduler (let [forced-scheduler (.getForcedScheduler inimbus) scheduler (cond ;;若調用的getForcedScheduler方法,返回的是非null的IScheduler,則返回該IScheduler實例 forced-scheduler (do (log-message "Using forced scheduler from INimbus " (class forced-scheduler)) forced-scheduler) ;;如果用戶實現了自定義的IScheduler,並且在storm.yaml中有配置, ;;則返回用戶自定義的IScheduler. (conf STORM-SCHEDULER) (do (log-message "Using custom scheduler: " (conf STORM-SCHEDULER)) (-> (conf STORM-SCHEDULER) new-instance)) ;;如果上述都不滿足則返回默認的DefaultScheduler :else (do (log-message "Using default scheduler") (DefaultScheduler.)))] (.prepare scheduler conf) scheduler ))
從上述代碼可以看出,如果調用inimbus中的getForcedScheduler方法,且返回的是非null的IScheduler,則返回該IScheduler實例;如果用戶實現了自定義的IScheduler,並且在storm.yaml中有配置,則返回用戶自定義的IScheduler;如果兩者都沒有實現,則采用默認調度器DefaultScheduler進行任務的分配。現在我們只關心DefaultScheduler。
DefaultScheduler的定義與解釋如下:
;;DefaultScheduler是Storm默認的調度器,如果用戶沒有指定自己實現的調度器, ;;Storm就會使用該調度器進行Topology的任務分配。 ;;DefaultScheduler實現了IScheduler接口 (ns backtype.storm.scheduler.DefaultScheduler (:use [backtype.storm util config]) (:require [backtype.storm.scheduler.EvenScheduler :as EvenScheduler]) (:import [backtype.storm.scheduler IScheduler Topologies Cluster TopologyDetails WorkerSlot SchedulerAssignment EvenScheduler ExecutorDetails]) (:gen-class :implements [backtype.storm.scheduler.IScheduler]))
;;default-schedule方法主要是計算當前集群中所有可供分配的slot資源, ;;並判斷當前已經分配給該Topology的slot資源是否需要重新分配, ;;利用這些信息,對新提交的Topology進行資源分配 (defn default-schedule [^Topologies topologies ^Cluster cluster] ;;調用cluster的needsSchedulingTopologies方法獲取所需要進行任務調度的Topology集合 ;;needsSchedulingTopologies方法定義如fn1所示. (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)] ;;這部分代碼塊的作用是對每一個需要進行任務調度的Topology進行處理 (doseq [^TopologyDetails topology needs-scheduling-topologies ;;通過調用getId獲取topology-id :let [topology-id (.getId topology) ;;調用cluster的getAvailableSlots方法獲取當前集群中所有可用的slot資源, ;;並將其轉換為<node,port>集合賦給available-slots變量. ;;getAvailableSlots方法定義如下fn2所示 available-slots (->> (.getAvailableSlots cluster) (map #(vector (.getNodeId %) (.getPort %)))) ;;調用getExecutors獲取Topology的所有Executor信息, ;;並將其轉換為<start-task-id,end-task-id>集合 all-executors (->> topology .getExecutors (map #(vector (.getStartTask %) (.getEndTask %))) set) ;;調用EvenScheduler的get-alive-assigned-node+port->executors方法 ;;計算當前Topology已經分配的任務信息,以<[node,port],executors>信息保存到alive-assigned變量中 alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id) ;; alive-executors (->> alive-assigned vals (apply concat) set) ;;調用slots-can-reassign方法對alive-assigned的slot信息進行判斷, ;;選出其中可被重新分配的slot集合並保存到can-reassign-slots. ;;slots-can-reassign方法定義如fn3所示: can-reassign-slots (slots-can-reassign cluster (keys alive-assigned)) ;;計算當前Topology所能使用的全部slot數目,它取以下兩個量中較小的值作為total-slots-to-use total-slots-to-use (min (.getNumWorkers topology) (+ (count can-reassign-slots) (count available-slots))) ;;用於判斷如果total-slots-to-use的數目大於當前已經分配的slot數目, ;;或者正在運行的executors數目不等於所有的executors數 ;;則調用bad-slots方法計算所有可被釋放的slot. ;;bad-slots方法的具體定義如fn4所示. bad-slots (if (or (> total-slots-to-use (count alive-assigned)) (not= alive-executors all-executors)) (bad-slots alive-assigned (count all-executors) total-slots-to-use) [])]] ;;調用cluster的freeSlots方法釋放前面計算出來的bad-slots (.freeSlots cluster bad-slots) ;;調用EvenScheduler的schedule-topologies-evenly方法將系統中的資源均勻分配給Topology (EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id topology}) cluster))))
fn1:
/** * 獲取所有需要調度的topology,並以集合的形式返回 */ public List<TopologyDetails> needsSchedulingTopologies(Topologies topologies) { List<TopologyDetails> ret = new ArrayList<TopologyDetails>(); for (TopologyDetails topology : topologies.getTopologies()) { if (needsScheduling(topology)) { ret.add(topology); } } return ret; }
fn2:
//根據supervisor信息獲取所有可用的slot資源,並封裝在WorkerSlot中,以集合的形式返回 public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor) { Set<Integer> ports = this.getAvailablePorts(supervisor); List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size()); for (Integer port : ports) { slots.add(new WorkerSlot(supervisor.getId(), port)); } return slots; }
fn3:
;;該方法將對傳入的slots資源進行過濾,選出其中仍然可以繼續使用的slot,組成新的集合 ;;過濾方法:先判斷slot的node信息是否存在於集群的黑名單裏, ;;如果不在則繼續判斷slot的port信息是否在於node相對應的Supervisor的所有可用端口列表中 ;;如果在,則表示該slot可以繼續使用 (defn slots-can-reassign [^Cluster cluster slots] (->> slots (filter (fn [[node port]] (if-not (.isBlackListed cluster node) (if-let [supervisor (.getSupervisorById cluster node)] (.contains (.getAllPorts supervisor) (int port)) ))))))
fn4:
;;該方法用於計算一個Topology已經分配的資源中哪些是不再需要的 ;;existing-slots:已經分配出去的資源(分配給Topology),它是一個<[node,port],executors>集合 ;;num-executors:Topology的所有Executor(包括已分配和未分配的) ;;num-workers:Topology可使用的全部slot數目 (defn- bad-slots [existing-slots num-executors num-workers] ;;判斷num-workers是否為0。如果是,意味著當前沒有可供該Topology使用的slot,這時返回一個空集合 (if (= 0 num-workers) ‘() ;;定義distribution集合和keepers集合,distribution集合通過調用integer-divided方法生成 ;;實際所做的事是將num-executors均勻地分配到num-workers中. ;;keepers集合為一個空集合 (let [distribution (atom (integer-divided num-executors num-workers)) keepers (atom {})] ;;對於傳入的existing=slots中的每一項,計算其對象的executor-count, ;;然後以該executor-count作為鍵從前面計算的distribution集合中獲取值.如果獲取的值大於0, ;;則意味著存在至少一個Worker上有executor-count個Executor的分配,並且,這個分配信息便繼續維持,不更新。 ;;這時,會將<[node,port],executors>信息放入keepers中,同時將distribution中該executor-count的對應值減一. (doseq [[node+port executor-list] existing-slots :let [executor-count (count executor-list)]] (when (pos? (get @distribution executor-count 0)) (swap! keepers assoc node+port executor-list) (swap! distribution update-in [executor-count] dec) )) ;;從existing-slots中移除keepers中記錄的需要繼續維持的分配情況.如果移除完之後還存在slot信息, ;;表明這些slot可以被釋放掉,將其轉換為WorkerSlot對象集合並返回. (->> @keepers keys (apply dissoc existing-slots) keys (map (fn [[node port]] (WorkerSlot. node port)))))))
註:學習李明等老師Storm源碼分析和陳敏敏等老師Storm技術內幕與大數據實踐的筆記整理。
歡迎關註下面二維碼進行技術交流:
JStorm與Storm源碼分析(三)--Scheduler,調度器