1. 程式人生 > >Flink on Yarn部署

Flink on Yarn部署

環境資訊:

Hadoop版本:2.6.0

Flink版本:1.1.2

快速部署Flink on Yarn環境:

比如啟動一個有4個TaskManager(每個節點都有4GB堆記憶體)的Yarn會話:

1.      下載Flink的軟體包,如flink-1.1.2-bin-hadoop26-scala_2.11.tgz(因為我的Hadoop叢集版本為2.6.0,Flink的Binary包是包含Yarn客戶端的)

2.      解壓縮

su - hadoop

tar -zxvf flink-1.1.2-bin-hadoop26-scala_2.11.tgz

ln -s flink-1.1.2 flink

3.      啟動Flink Yarn Session

cd flink

bin/yarn-session.sh -n 4 -jm 1024 -tm 4096 -s 32

對引數說明:

-n,--container 指YARN container分配的個數(即TaskManagers的個數)

-jm,--jobManagerMemory 指JobManager Containe的記憶體大小,單位為MB

-tm,--taskManagerMemory 指每個TaskManagerContainer的記憶體大小,單位為MB

-s 指每個TaskManager的slot個數。

執行上面命令來分配 4個 TaskManager,每個都擁有 4GB 的記憶體和 32 個 slot,同時會請求啟動 5 個容器,因為對於 ApplicationMaster 和 JobManager 還需要一個額外的容器。

注:

A.     Flink的JobManager和TaskManager的記憶體大小不要小於YARNContainer的最小值(yarn.scheduler.minimum-allocation-mb,預設值為1024MB)。

B.     請注意客戶端需要提前設定環境變數 YARN_CONF_DIR 或 HADOOP_CONF_DIR,用來讀取 YARN 和 HDFS 配置。

啟動的日誌中會連線Yarn的ResourceManager,如下:

org.apache.flink.yarn.YarnClusterDescriptor-     Using values:

org.apache.flink.yarn.YarnClusterDescriptor-        TaskManager count = 4

org.apache.flink.yarn.YarnClusterDescriptor-        JobManager memory = 1024

org.apache.flink.yarn.YarnClusterDescriptor-        TaskManager memory = 4096

org.apache.hadoop.yarn.client.RMProxy       - Connecting to ResourceManager at /192.168.1.128:9080

   這裡省略將Flink的配置檔案和Jar等上傳到

hdfs://gpmaster:9000/user/hadoop/.flink/application_1474521395841_0004/目錄下的過程

org.apache.flink.yarn.YarnClusterDescriptor - Submitting application master application_1474521395841_0004

org.apache.hadoop.yarn.client.api.impl.YarnClientImpl -Submitted application application_1474521395841_0004

org.apache.flink.yarn.YarnClusterDescriptor- Waiting for the cluster to be allocated

org.apache.flink.yarn.YarnClusterDescriptor- Deploying cluster, current state ACCEPTED

org.apache.flink.yarn.YarnClusterDescriptor - YARN application has been deployed successfully.

Flink JobManageris now running on 192.168.1.128:17642

JobManager Web Interface: http://gpmaster:8088/proxy/application_1474521395841_0004/

org.apache.flink.yarn.YarnClusterClient- Starting client actor system.

akka.event.slf4j.Slf4jLogger- Slf4jLogger started

Remoting - Starting remoting

Remoting - Remoting started; listening on addresses :[akka.tcp://[email protected]:18282]

org.apache.flink.yarn.YarnClusterClient - Start application client.

org.apache.flink.yarn.ApplicationClient - Notification about new leader address akka.tcp://[email protected]:17642/user/jobmanagerwith session ID null.

org.apache.flink.yarn.ApplicationClient - Received address of new leader akka.tcp://[email protected]:17642/user/jobmanager with sessionID null.

org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager null.

org.apache.flink.yarn.ApplicationClient - Trying to register at JobManager akka.tcp://[email protected]:17642/user/jobmanager.

org.apache.flink.yarn.ApplicationClient - Successfully registered at the ResourceManager using JobManagerActor[akka.tcp://[email protected]:17642/user/jobmanager#-1966299512]

Number of connected TaskManagers changed to 1. Slots available: 32

Number of connected TaskManagers changed to 2. Slots available: 64

Number of connected TaskManagers changed to 4. Slots available: 128

此時的Flink YARN客戶端會一直執行,不會退出。如果你希望放到後臺執行,那麼可以使用-d或--detached引數,即:

bin/yarn-session.sh -d -n 4 -jm 1024 -tm 4096

在這種情況下,Flink YARN 客戶端只會提交 Flink 到叢集中然後關閉自己。注意在這種情況下,不能像上面這樣停止 YARN 會話了,必須手動停止,如下面日誌中提示的內容:

yarn application -kill application_1474521395841_0008

Flink YARN客戶端以detached模式啟動,我們也可以從這種啟動方式的日誌中檢視到如下內容:

org.apache.flink.yarn.cli.FlinkYarnSessionCli - The Flink YARN client has been started in detached mode.In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:

yarn application -kill application_1474521395841_0008

Please also note that the temporary files of the YARN session in hdfs://gpmaster:9000/user/hadoop/.flink/application_1474521395841_0008 will not be removed.

既然Flink on Yarn模式啟動了,下面我們檢視一下相關的程序。

在Hadoop的ResourceManager節點檢視程序:

[[email protected]~]$ jps

9062 YarnTaskManager

8872 FlinkYarnSessionCli

8985 YarnApplicationMasterRunner

9196 Jps

5325 ResourceManager

4990 NameNode

5086 DataNode

5423 NodeManager

在Hadoop的NodeManager節點檢視程序:

[[email protected]~]$ jps

6208 YarnTaskManager

4336 DataNode

6327 Jps

4441 NodeManager

4.      執行Flink的example例項

上面我們已經基於Yarn啟動了Flink,我們來測試一個例子:

[[email protected] flink]$ bin/flink run ./examples/batch/WordCount.jar

返回的結果為:

(action,1)

(after,1)

(against,1)

(and,12)

(arms,1)

(arrows,1)

(awry,1)

(ay,1)

(bare,1)

(be,4)

……

使用 run 操作來提交一個任務到 YARN。客戶端自己就能確定 JobManager 的地址。在遇到罕見的問題時,你可以使用 -m 引數傳入 JobManager 的地址。JobManager 的地址可以在 YARN 控制檯找到。

5.      在YARN上直接執行單個Flink任務

上面介紹了在 Hadoop YARN 環境中啟動一個 Flink 叢集。另外,也可以在 YARN 中啟動只執行單個任務的 Flink。請注意該客戶端需要提供 -yn 引數值(TaskManager 的數量)。

這裡不需要先啟動Flink服務後,再執行Flink程式。

[[email protected] flink]$ bin/flink run -m yarn-cluster -yn2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar

輸出結果為:

(action,1)

(after,1)

(against,1)

(and,12)

(arms,1)

(arrows,1)

(awry,1)

(ay,1)

(bare,1)

(be,4)

……

相關引數說明:

-m,--jobmanager <host:port> 指需要連線的JobManager地址

-yn,--yarncontainer 指分配的YARN container個數(等於TaskManagers個數)

-yjm,--yarnjobManagerMemory 指JobManager Container的記憶體大小,單位為MB

-ytm,--yarntaskManagerMemory 指每個TaskManagerContainer的記憶體大小,單位為MB

可以看到對於yarn-cluster模式來說,這些選項都帶了一個 y 或 yarn (長引數選項)的字首。

注:通過為每個任務設定不同的環境變數 FLINK_CONF_DIR,可以為每個任務使用不同的配置目錄。從 Flink 分發包中複製 conf 目錄,然後修改配置,例如,每個任務不同的日誌設定。

6.      Flink YARN會話

YARN 是一個叢集資源管理框架。它允許在一個叢集之上執行多種分散式應用。Flink 與其他應用程式一同執行在 YARN 之上。如果使用者已經安裝 YARN,就不需要安裝其他東西了。

上面的操作中,我們在YARN叢集中啟動了一個Flink會話,此會話會啟動所有需要的 Flink 服務(JobManager 和 TaskManager),因此可以提交程式到叢集中。注意每個會話都可以執行多個程式。

Flink YARN會話啟動時,系統會使用 conf/flink-config.yaml 中的配置。

Flink on YARN 會覆蓋這些配置引數 jobmanager.rpc.address(因為JobManager 一直被分配在不同的機器上),taskmanager.tmp.dirs(我們使用 YARN 提供了的 tmp 目錄)和 parallelism.default(如果指定了 slot 數目)。

如果你不想通過修改配置檔案的方法來設定配置引數,你可以通過 -D 標記傳入動態屬性。所以你可以這樣傳遞引數,如下:

-Dfs.overwrite-files=true-Dtaskmanager.network.numberOfBuffers=16368

一旦 Flink 在 YARN 叢集中部署了,它會顯示 JobManager 連線的詳細資訊。

要停止 YARN 會話,可以通過結束 unix 程序(使用 CTRL+C)或者通過在客戶端中輸入'stop'。

7.      Flink on YARN的恢復機制

Flink的YARN客戶端可以通過引數來控制容器出現故障情況下的行為,這些引數定義在conf/flink-conf.yaml來設定,當然也可以通過啟動YARN會話時通過-D引數進行設定。

yarn.reallocate-failed:該引數控制了 Flink 是否該重新分配失敗的 TaskManager容器。預設:true。

yarn.maximum-failed-containers:ApplicationMaster能接受最多的失敗容器的數量,直到 YARN 會話失敗。預設:初始請求的 TaskManager 個數(-n)。

yarn.application-attempts:ApplicationMaster(以及它的TaskManager 容器)的嘗試次數。預設值為 1,當 ApplicationMaster 失敗了,整個 YARN 會話也會失敗。可以通過設定更大的值來更改 YARN 重啟ApplicationMaster 的次數。

8.      Flink on YARN內部實現

YARN 客戶端需要訪問 Hadoop 配置,從而連線 YARN 資源管理器和 HDFS。可以使用下面的策略來決定 Hadoop 配置:

  1. 測試 YARN_CONF_DIR, HADOOP_CONF_DIR 或 HADOOP_CONF_PATH 環境變數是否設定了(按該順序測試)。如果它們中有一個被設定了,那麼它們就會用來讀取配置。
  2. 如果上面的策略失敗了(如果正確安裝了 YARN 的話,這不應該會發生),客戶端會使用 HADOOP_HOME 環境變數。如果該變數設定了,客戶端會嘗試訪問 $HADOOP_HOME/etc/hadoop (Hadoop 2) 和 $HADOOP_HOME/conf(Hadoop 1)。

當啟動一個新的 Flink YARN Client會話,客戶端首先會檢查所請求的資源(容器和記憶體)是否可用。之後,它會上傳包含了 Flink 配置和 jar檔案到 HDFS(步驟 1)。

客戶端的下一步是請求(步驟 2)一個 YARN 容器啟動 ApplicationMaster (步驟 3)。因為客戶端將配置和jar 檔案作為容器的資源註冊了,所以執行在特定機器上的 YARN 的 NodeManager 會負責準備容器(例如,下載檔案)。一旦這些完成了,ApplicationMaster (AM) 就啟動了。

JobManager 和 AM 執行在同一個容器中。一旦它們成功地啟動了,AM 知道 JobManager 的地址(它自己)。它會為 TaskManager 生成一個新的 Flink 配置檔案(這樣它們才能連上 JobManager)。該檔案也同樣會上傳到 HDFS。另外,AM 容器同時提供了 Flink 的 Web 介面服務。Flink 用來提供服務的埠是由使用者 + 應用程式 id 作為偏移配置的。這使得使用者能夠並行執行多個 Flink YARN 會話。

之後,AM 開始為 Flink 的 TaskManager 分配容器,這會從 HDFS 下載 jar 檔案和修改過的配置檔案。一旦這些步驟完成了,Flink 就安裝完成並準備接受任務了。

相關推薦

Flink on Yarn部署

環境資訊:Hadoop版本:2.6.0Flink版本:1.1.2快速部署Flink on Yarn環境:比如啟動一個有4個TaskManager(每個節點都有4GB堆記憶體)的Yarn會話:1.      下載Flink的軟體包,如flink-1.1.2-bin-hadoop

flink部署操作-flink on yarn叢集安裝部署

flink叢集安裝部署 yarn叢集模式 Flink入門及實戰-上: Flink入門及實戰-下: 快速開始 在yarn上啟動一個一直執行的flink叢集 在yarn上執行一個flink job flink yarn session 啟動flink s

Flink on Yarn三部曲之二:部署和設定

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; 本文是《

Flink1.6系列之—Flink on yarn流程詳解

端口 準備 -a 根據 images mas info 使用 臨時 本篇我們介紹下,Flink在YARN上運行流程: 當開始一個新的Flink yarn 會話時,客戶端首先檢查所請求的資源(containers和內存)是否可用。如果資源夠用,之後,上傳

Flink on Yarn模式啟動流程分析

cin XML images ont list action -i 多個 信息 此文已由作者嶽猛授權網易雲社區發布。歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。Flink On Yarn 架構Paste_Image.png前提條件首先需要配置YARN_CONF_DIR

Flink on Yarn模式啟動流程原始碼分析

此文已由作者嶽猛授權網易雲社群釋出。 歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。 Flink on yarn的啟動流程可以參見前面的文章 Flink on Yarn啟動流程,下面主要是從原始碼角度看下這個實現,可能有的地方理解有誤,請給予指正,多謝。 --> 1.命令列啟動yarn sessi

Flink on Yarn模式啟動流程源代碼分析

www and *** err wap `` dem 註冊 contex 此文已由作者嶽猛授權網易雲社區發布。歡迎訪問網易雲社區,了解更多網易技術產品運營經驗。Flink on yarn的啟動流程可以參見前面的文章 Flink on Yarn啟動流程,下面主要是從源碼角度看

flink on yarn部分原始碼解析 (FLIP-6 new mode)

我們在https://www.cnblogs.com/dongxiao-yang/p/9403427.html文章裡分析了flink提交single job到yarn叢集上的程式碼,flink在1.5版本後對整個框架的deploy方式重構了全新的流程(參考https://cwiki.apache.org/co

flink開發實戰之 flink on yarn

flink 執行模式 Flink 和spark一樣有三種部署模式,分別是 Local、Standalone Cluster 和 Yarn Cluster。 實戰開發主要使用Yarn Cluster模式,所以本文主要介紹yarn  模式下flink任務的執行和資源分配。 Ya

Flink on yarn的問題:Invalid AMRMToken

目前採用的Flink的版本是1.4.2,執行在yarn上,總是時不時的報錯“Invalid AMRMToken from appattempt”,導致AM掛掉。   簡而言之,就是AM和RM溝通的過程中,突然AM提供的Token不被認可,導致拒絕連線,進而AM掛掉。   後來發現早

Flink-on-yarn

解壓 stat master swd run rec 地址 abi man 介紹 官網下載 https://www.apache.org/dyn/closer.lua/flink/flink-1.6.1/flink-1.6.1-bin-hadoop28-scala_

flink on yarn叢集搭建

前面一篇部落格中已經搭建了flink Standalone的叢集,需要的可以進去看一下,今天主要來說一下flink on yarn 叢集的搭建以及怎麼提交任務.這篇部落格寫的比較詳細,內容較多,希望大家耐心看完,都是乾貨. 版本資訊: flink-1.6.0 zooke

Flink之三 flink on yarn

Flink的執行模式  flink的執行模式有local模式,cluster,yarn等模式;flink叢集層次結構     這一節我們主要一起了解flink on yarn 模式,flink on yarn 有兩種模式:       一:long-running F

Flink on YARN快速入門指南

  Apache Flink是一個高效、分散式、基於Java和Scala(主要是由Java實現)實現的通用大資料分析引擎,它具有分散式 MapReduce一類平臺的高效性、靈活性和擴充套件性以及並行資料庫查詢優化方案,它支援批量和基於流的資料分析,且提供了基於Ja

flink on yarn模式

flink on yarn模式的相關知識點(重要):https://blog.csdn.net/xu470438000/article/details/79576989 在flink on yarn模式中,flink yarn-session的兩種提交方式 兩種提交方式 1

Flink on yarn

一: 配置: 1.配置yarn-site.xml <property><name>yarn.resourcemanager.am.max-attempts</name><value>4</value></property> 2.配置

Hadoop 分散式配置及Spark on yarn部署

配置Hadoop Hadoop的叢集部署模式需要修改Hadoop資料夾中/etc/hadoop/中的配置檔案,更多設定項可見官方說明,這裡只設置了常見的設定項:hadoop-env.sh,yarn-env.sh、core-site.xml、hdfs-site.xml、mapred-site.

最近寫Flink on Yarn程式遇到的一些問題

1.UDF造成的compile 編譯失敗 class GetDay() extends ScalarFunction{ // 這個變數千萬不能定義在這裡,否則除錯沒問題,on yarn執行會編譯出錯

Flink On Yarn 異常排除過程以及根據位元組碼名字獲取jar檔名字

最初學習Flink,寫了一個簡單的wordcount執行一下,發現報錯,異常資訊如下: The program finished with the following exception: java.lang.RuntimeException: Error deployin

storm on yarn 部署

1. 環境介紹 1.1 節點與服務對映關係 ip host 服務 192.168.40.132 master Namenode、NodeManager、DataNode、zookeeper 192.168.40.133 slave1 ResurceMa