Hadoop知識總結
------------恢復內容開始------------
Hadoop知識點什麼是HadoopHadoop和Spark差異Hadoop常見版本,有哪些特點,一般是如何進行選擇Hadoop常用埠號搭建Hadoop叢集的流程Hadoop中需要哪些配置檔案,其作用是什麼?HDFS讀寫流程MapReduce的Shuffle過程,Hadoop優化方案基於MapReduce做Hadoop的優化Yarn的job提交流程Yarn預設的排程器,分類,以及它們之間的區別Hadoop的引數優化Hadoop宕機Hadoop資料傾斜問題NameNode和secondarynameNode工作機制
什麼是Hadoop
Hadoop是一個能夠對大量資料進行分散式處理的軟體框架。以一種可靠、高效、可伸縮的方式進行資料處理。主要包括三部分內容:Hdfs,MapReduce,Yarn
從2.7.3版本開始block size的預設大小為128M,之前版本的預設值是64M。對於新檔案切分的大小,單位byte。每一個節點都要指定,包括客戶端。
原理:檔案塊越大,定址時間越短,但磁碟傳輸時間越長;檔案塊越小,定址時間越長,但磁碟傳輸時間越短。
Hadoop和Spark差異
Hadoop常見版本,有哪些特點,一般是如何進行選擇
Hadoop1.0
Hadoop2.0即為克服Hadoop1.0中的不足,提出了以下關鍵特性:
-
Yarn:它是Hadoop2.0引入的一個全新的通用資源管理系統,完全代替了Hadoop1.0中的JobTracker。在MRv1 中的 JobTracker 資源管理和作業跟蹤的功能被抽象為 ResourceManager 和 AppMaster 兩個元件。Yarn 還支援多種應用程式和框架,提供統一的資源排程和管理功能
-
NameNode 單點故障得以解決:Hadoop2.2.0 同時解決了 NameNode 單點故障問題和記憶體受限問題,並提供 NFS,QJM 和 Zookeeper 三種可選的共享儲存系統
-
HDFS 快照:指 HDFS(或子系統)在某一時刻的只讀映象,該只讀映象對於防止資料誤刪、丟失等是非常重要的。例如,管理員可定時為重要檔案或目錄做快照,當發生了資料誤刪或者丟失的現象時,管理員可以將這個資料快照作為恢復資料的依據
-
支援Windows 作業系統:Hadoop 2.2.0 版本的一個重大改進就是開始支援 Windows 作業系統
-
Append:新版本的 Hadoop 引入了對檔案的追加操作
同時,新版本的Hadoop對於HDFS做了兩個非常重要的「增強」,分別是支援異構的儲存層次和通過資料節點為儲存在HDFS中的資料提供記憶體緩衝功能
相比於Hadoop2.0,Hadoop3.0 是直接基於 JDK1.8 釋出的一個新版本,同時,Hadoop3.0引入了一些重要的功能和特性
-
HDFS可擦除編碼:這項技術使HDFS在不降低可靠性的前提下節省了很大一部分儲存空間
-
多NameNode支援:在Hadoop3.0中,新增了對多NameNode的支援。當然,處於Active狀態的NameNode例項必須只有一個。也就是說,從Hadoop3.0開始,在同一個叢集中,支援一個 ActiveNameNode 和 多個 StandbyNameNode 的部署方式。
-
MR Native Task優化
-
Yarn基於cgroup 的記憶體和磁碟 I/O 隔離
-
Yarn container resizing
Hadoop常用埠號
dfs.namenode.http-address:50070
dfs.datanode.http-address:50075
SecondaryNameNode:50090
dfs.datanode.address:50010
fs.defaultFS:8020 或者9000
yarn.resourcemanager.webapp.address:8088
歷史伺服器web訪問埠:19888
埠名稱 | Hadoop2.x | Hadoop3.x |
---|---|---|
NameNode 內部通訊埠 | 8020 / 9000 | 8020 / 9000/9820 |
NameNode HTTP UI | 50070 | 9870 |
MapReduce 檢視執行任務埠 | 8088 | 8088 |
jobhistoryserver歷史伺服器通訊埠 | 19888 | 19888 |
datanode HTTP UI | 50075 | |
SecondaryNameNode web端 | 50090 |
搭建Hadoop叢集的流程
1)準備一臺虛擬機器樣機(配置好IP地址,主機名,對映,關閉防火牆,jdk安裝。jdk安裝需要配置環境變數,JAVA_HOME/bin目錄下所有命令全域性生效)
2)克隆兩臺虛擬機器。(設定ssh免密登入,配置對映,修改主機名,和IP地址)
3)安裝Hadoop解壓到指定目錄檔案下
4)Hadoop檔案配置
a)core-site.xml配置HDFS中namenode的地址,指定Hadoop執行時產生檔案的儲存目錄
b)hdfs-site.xml配置hdfs副本數量
c)yarn-site.xml配置reducer獲取資料的方式,指定yarn的resourceManager的地址
d)mapred-env.sh配置MR執行在yarn上,配置歷史伺服器端地址,歷史伺服器web端地址
e)slaves配置其他虛擬機器的主機名
f)hadoop-env.sh : Hadoop 環境配置檔案 export JAVA_HOME=/usr/local/soft/jdk1.8.0_171
5)分發Hadoop的安裝包(scp -r hadoop node1:pwd
)
6)格式化hdfs(hdfs namenode -format)
7)啟動叢集(start-all.sh,因為我們是把resourcemanager和2nn放在master一臺上進行啟動)
Hadoop中需要哪些配置檔案,其作用是什麼?
1)core-site.xml:
(1)fs.defaultFS:hdfs://cluster1(域名),這裡的值指的是預設的HDFS路徑 。
(2)hadoop.tmp.dir:/export/data/hadoop_tmp,這裡的路徑預設是NameNode、DataNode、secondaryNamenode等存放資料的公共目錄。使用者也可以自己單獨指定這三類節點的目錄。
(3)ha.zookeeper.quorum:hadoop101:2181,hadoop102:2181,hadoop103:2181,這裡是ZooKeeper叢集的地址和埠。注意,數量一定是奇數,且不少於三個節點 。
2)hadoop-env.sh: 只需設定jdk的安裝路徑,如:export JAVA_HOME=/usr/local/jdk。
3)hdfs-site.xml:
(1) dfs.replication:他決定著系統裡面的檔案塊的資料備份個數,預設為3個。
(2) dfs.data.dir:datanode節點儲存在檔案系統的目錄 。
(3) dfs.name.dir:是namenode節點儲存hadoop檔案系統資訊的本地系統路徑 。
4)mapred-site.xml:
mapreduce.framework.name: yarn指定mr執行在yarn上
HDFS讀寫流程
1)客戶端通過Distributed FileSystem模組向NameNode請求上傳檔案,NameNode檢查目標檔案是否已存在,父目錄是否存在。
2)NameNode返回是否可以上傳。
3)客戶端請求第一個 Block上傳到哪幾個DataNode伺服器上。
4)NameNode返回3個DataNode節點,分別為dn1、dn2、dn3。
5)客戶端通過FSDataOutputStream模組請求dn1上傳資料,dn1收到請求會繼續呼叫dn2,然後dn2呼叫dn3,將這個通訊管道建立完成。
6)dn1、dn2、dn3逐級應答客戶端。
7)客戶端開始往dn1上傳第一個Block(先從磁碟讀取資料放到一個本地記憶體快取),以Packet為單位,dn1收到一個Packet就會傳給dn2,dn2傳給dn3;dn1每傳一個packet會放入一個應答佇列等待應答。
8)當一個Block傳輸完成之後,客戶端再次請求NameNode上傳第二個Block的伺服器。(重複執行3-7步)。
1)客戶端通過Distributed FileSystem向NameNode請求下載檔案,NameNode通過查詢元資料,找到檔案塊所在的DataNode地址。
2)挑選一臺DataNode(就近原則,然後隨機)伺服器,請求讀取資料。
3)DataNode開始傳輸資料給客戶端(從磁盤裡面讀取資料輸入流,以Packet為單位來做校驗)。
4)客戶端以Packet為單位接收,先在本地快取,然後寫入目標檔案。
MapReduce的Shuffle過程,Hadoop優化方案
MapReduce資料讀取並寫入HDFS流程實際上是有10步
Shuffle階段
1、Map方法之後Reduce方法之前這段處理過程叫「Shuffle」
2、Map方法之後,資料首先進入到分割槽方法,把資料標記好分割槽,然後把資料傳送到環形緩衝區;環形緩衝區預設大小100m,環形緩衝區達到80%時,進行溢寫;溢寫前對資料進行排序,排序按照對key的索引進行字典順序排序,排序的手段「快排」;溢寫產生大量溢寫檔案,需要對溢寫檔案進行「歸併排序」;對溢寫的檔案也可以進行Combiner操作,前提是彙總操作,求平均值不行。最後將檔案按照分割槽儲存到磁碟,等待Reduce端拉取。
3、每個Reduce拉取Map端對應分割槽的資料。拉取資料後先儲存到記憶體中,記憶體不夠了,再儲存到磁碟。拉取完所有資料後,採用歸併排序將記憶體和磁碟中的資料都進行排序。在進入Reduce方法前,可以對資料進行分組操作。
Shuffle優化
1)Map階段
(1)增大環形緩衝區大小。由100m擴大到200m
(2)增大環形緩衝區溢寫的比例。由80%擴大到90%
(3)減少對溢寫檔案的merge次數。(10個檔案,一次20個merge)
(4)不影響實際業務的前提下,採用Combiner提前合併,減少 I/O
2)Reduce階段
(1)合理設定Map和Reduce數:兩個都不能設定太少,也不能設定太多。太少,會導致Task等待,延長處理時間;太多,會導致 Map、Reduce任務間競爭資源,造成處理超時等錯誤。
(2)設定Map、Reduce共存:調整slowstart.completedmaps引數,使Map執行到一定程度後,Reduce也開始執行,減少Reduce的等待時間。
(3)規避使用Reduce,因為Reduce在用於連線資料集的時候將會產生大量的網路消耗。
(4)增加每個Reduce去Map中拿資料的並行數
(5)叢集效能可以的前提下,增大Reduce端儲存資料記憶體的大小。
3)IO傳輸
採用資料壓縮的方式,減少網路IO的的時間。安裝Snappy和LZOP壓縮編碼器。
壓縮:
(1)map輸入端主要考慮資料量大小和切片,支援切片的有Bzip2、LZO。注意:LZO要想支援切片必須建立索引;
(2)map輸出端主要考慮速度,速度快的snappy、LZO;
(3)reduce輸出端主要看具體需求,例如作為下一個mr輸入需要考慮切片,永久儲存考慮壓縮率比較大的gzip。
4)整體
(1)NodeManager預設記憶體8G,需要根據伺服器實際配置靈活調整,例如128G記憶體,配置為100G記憶體左右,yarn.nodemanager.resource.memory-mb。
(2)單任務預設記憶體8G,需要根據該任務的資料量靈活調整,例如128m資料,配置1G記憶體,yarn.scheduler.maximum-allocation-mb。
(3)mapreduce.map.memory.mb :控制分配給MapTask記憶體上限,如果超過會kill掉程序(報:Container is running beyond physical memory limits. Current usage:565MB of512MB physical memory used;Killing Container)。預設記憶體大小為1G,如果資料量是128m,正常不需要調整記憶體;如果資料量大於128m,可以增加MapTask記憶體,最大可以增加到4-5g。
(4)mapreduce.reduce.memory.mb:控制分配給ReduceTask記憶體上限。預設記憶體大小為1G,如果資料量是128m,正常不需要調整記憶體;如果資料量大於128m,可以增加ReduceTask記憶體大小為4-5g。
(5)mapreduce.map.java.opts:控制MapTask堆記憶體大小。(如果記憶體不夠,報:java.lang.OutOfMemoryError)
(6)mapreduce.reduce.java.opts:控制ReduceTask堆記憶體大小。(如果記憶體不夠,報:java.lang.OutOfMemoryError)
(7)可以增加MapTask的CPU核數,增加ReduceTask的CPU核數
(8)增加每個Container的CPU核數和記憶體大小
(9)在hdfs-site.xml檔案中配置多目錄
(10)NameNode有一個工作執行緒池,用來處理不同DataNode的併發心跳以及客戶端併發的元資料操作。dfs.namenode.handler.count=20 * log2(Cluster Size),比如叢集規模為10臺時,此引數設定為60。
基於MapReduce做Hadoop的優化
1)HDFS小檔案影響
-
影響NameNode的壽命,因為檔案元資料儲存在NameNode的記憶體中
-
影響計算引擎的任務數量,比如每個小的檔案都會生成一個Map任務
2)資料輸入小檔案處理
-
合併小檔案:對小檔案進行歸檔(Har)、自定義Inputformat將小檔案儲存成SequenceFile檔案。
-
採用ConbinFileInputFormat來作為輸入,解決輸入端大量小檔案場景
-
對於大量小檔案Job,可以開啟JVM重用會減少45%執行時間
3)Map階段
-
增大環形緩衝區大小。由100m擴大到200m
-
增大環形緩衝區溢寫的比例。由80%擴大到90%
-
減少對溢寫檔案的merge次數。(10個檔案,一次20個merge)
-
不影響實際業務的前提下,採用Combiner提前合併,減少 I/O
4)Reduce階段
-
合理設定Map和Reduce數:兩個都不能設定太少,也不能設定太多。太少,會導致Task等待,延長處理時間;太多,會導致 Map、Reduce任務間競爭資源,造成處理超時等錯誤。
-
設定Map、Reduce共存:調整
slowstart.completedmaps
引數,使Map執行到一定程度後,Reduce也開始執行,減少Reduce的等待時間 -
規避使用Reduce,因為Reduce在用於連線資料集的時候將會產生大量的網路消耗。
-
增加每個Reduce去Map中拿資料的並行數
-
叢集效能可以的前提下,增大Reduce端儲存資料記憶體的大小
5)IO傳輸
-
採用資料壓縮的方式,減少網路IO的的時間
-
使用SequenceFile二進位制檔案
6)整體
-
MapTask預設記憶體大小為1G,可以增加MapTask記憶體大小為4
-
ReduceTask預設記憶體大小為1G,可以增加ReduceTask記憶體大小為4-5g
-
可以增加MapTask的cpu核數,增加ReduceTask的CPU核數
-
增加每個Container的CPU核數和記憶體大小
-
調整每個Map Task和Reduce Task最大重試次數
7)壓縮
詳細檢視Hadoop3.x的mapreduce壓縮P58
Yarn的job提交流程
job提交過程詳解
(1)作業提交
1)Clinet呼叫job.waitForCompletion方法,向整個叢集提交MapReduce作業**
2)Clinet向RM申請一個作業id**
3)RM給Clinet返回該job資源的提交路徑和作業id**
4)Client 提交 jar 包、切片資訊和配置檔案到指定的資源提交路徑。**
5)Client 提交完資源後,向 RM 申請執行 MrAppMaster。
(2)作業初始化
6)當 RM 收到 Client 的請求後,將該 job 新增到容量排程器
7)某一個空閒的 NM 領取到該 Job。
8)該 NM 建立 Container,併產生 MRAppmaster。
9)下載 Client 提交的資源到本地。
(3)任務分配
10)MrAppMaster 向 RM 申請執行多個 MapTask 任務資源。
11)RM 將執行 MapTask 任務分配給另外兩個 NodeManager,另兩個 NodeManager
分別領取任務並建立容器。
(4)任務執行
12)MR 向兩個接收到任務的 NodeManager 傳送程式啟動指令碼,這兩個NodeManager 分別啟動 MapTask,MapTask 對資料分割槽排序。
13)MrAppMaster等待所有MapTask執行完畢後,向RM申請容器,執行ReduceTask。
14)ReduceTask 向 MapTask 獲取相應分割槽的資料
15)程式執行完畢後,MR 會向 RM 申請登出自己。
(5)進度和狀態更新
YARN 中的任務將其進度和狀態(包括 counter)返回給應用管理器, 客戶端每秒(通過mapreduce.client.progressmonitor.pollinterval 設定)嚮應用管理器請求進度更新, 展示給使用者。
(6)作業完成
除了嚮應用管理器請求作業進度外, 客戶端每 5 秒都會通過呼叫 waitForCompletion()來檢查作業是否完成。時間間隔可以通過 mapreduce.client.completion.pollinterval 來設定。作業完成之後, 應用管理器和 Container 會清理工作狀態。作業的資訊會被作業歷史伺服器儲存以備之後使用者核查。
其中簡略版對應的步驟分別如下:
1、client向RM提交應用程式,其中包括啟動該應用的ApplicationMaster的必須資訊,例如ApplicationMaster程式、啟動ApplicationMaster的命令、使用者程式等
2、ResourceManager啟動一個container用於執行ApplicationMaster
3、啟動中的ApplicationMaster向ResourceManager註冊自己,啟動成功後與RM保持心跳
4、ApplicationMaster向ResourceManager傳送請求,申請相應數目的container
5、申請成功的container,由ApplicationMaster進行初始化。container的啟動資訊初始化後,AM與對應的NodeManager通訊,要求NM啟動container
6、NM啟動container
7、container執行期間,ApplicationMaster對container進行監控。container通過RPC協議向對應的AM彙報自己的進度和狀態等資訊
8、應用執行結束後,ApplicationMaster向ResourceManager登出自己,並允許屬於它的container被收回
Yarn預設的排程器,分類,以及它們之間的區別
1)Hadoop排程器主要分為三類:
-
FIFO Scheduler:先進先出排程器:優先提交的,優先執行,後面提交的等待【生產環境不會使用】
-
Capacity Scheduler:容量排程器:允許看建立多個任務對列,多個任務對列可以同時執行。但是一個佇列內部還是先進先出。【Hadoop2.7.2預設的排程器】
-
Fair Scheduler:公平排程器:第一個程式在啟動時可以佔用其他佇列的資源(100%佔用),當其他佇列有任務提交時,佔用資源的佇列需要將資源還給該任務。還資源的時候,效率比較慢。【CDH版本的yarn排程器預設】
Hadoop的引數優化
-
在hdfs-site.xml檔案中配置多目錄,最好提前配置好,否則更改目錄需要重新啟動叢集
-
NameNode有一個工作執行緒池,用來處理不同DataNode的併發心跳以及客戶端併發的元資料操作
-
編輯日誌儲存路徑dfs.namenode.edits.dir設定與映象檔案儲存路徑dfs.namenode.name.dir儘量分開,達到最低寫入延遲
-
伺服器節點上YARN可使用的實體記憶體總量,預設是8192(MB),注意,如果你的節點記憶體資源不夠8GB,則需要調減小這個值,而YARN不會智慧的探測節點的實體記憶體總量
-
單個任務可申請的最多實體記憶體量,預設是8192(MB
Hadoop宕機
如果MR造成系統宕機。此時要控制Yarn同時執行的任務數,和每個任務申請的最大記憶體。調整引數:yarn.scheduler.maximum-allocation-mb(單個任務可申請的最多實體記憶體量,預設是8192MB)。
如果寫入檔案過量造成NameNode宕機。那麼調高Kafka的儲存大小,控制從Kafka到HDFS的寫入速度。高峰期的時候用Kafka進行快取,高峰期過去資料同步會自動跟上。
Hadoop資料傾斜問題
1)提前在map進行combine,減少傳輸的資料量
在Mapper加上combiner相當於提前進行reduce,即把一個Mapper中的相同key進行了聚合,減少shuffle過程中傳輸的資料量,以及Reducer端的計算量。
如果導致資料傾斜的key 大量分佈在不同的mapper的時候,這種方法就不是很有效了
2)資料傾斜的key 大量分佈在不同的mapper
在這種情況,大致有如下幾種方法:
-
「區域性聚合加全域性聚合」
第一次在map階段對那些導致了資料傾斜的key 加上1到n的隨機字首,這樣本來相同的key 也會被分到多個Reducer 中進行區域性聚合,數量就會大大降低。
第二次mapreduce,去掉key的隨機字首,進行全域性聚合。
「思想」:二次mr,第一次將key隨機雜湊到不同 reducer 進行處理達到負載均衡目的。第二次再根據去掉key的隨機字首,按原key進行reduce處理。
這個方法進行兩次mapreduce,效能稍差
「增加Reducer,提升並行度」
JobConf.setNumReduceTasks(int)
-
「實現自定義分割槽」
根據資料分佈情況,自定義雜湊函式,將key均勻分配到不同Reducer
NameNode和secondarynameNode工作機制
思考:NameNode中的元資料是儲存在哪裡的?
首先,我們做個假設,如果儲存在NameNode節點的磁碟中,因為經常需要進行隨機訪問,還有響應客戶請求,必然是效率過低。因此,元資料需要存放在記憶體中。但如果只存在記憶體中,一旦斷電,元資料丟失,整個叢集就無法工作了。因此產生在磁碟中備份元資料的FsImage。
這樣又會帶來新的問題,當在記憶體中的元資料更新時,如果同時更新FsImage,就會導致效率過低,但如果不更新,就會發生一致性問題,一旦NameNode節點斷電,就會產生資料丟失。因此,引入Edits檔案(只進行追加操作,效率很高)。每當元資料有更新或者新增元資料時,修改記憶體中的元資料並追加到Edits中。這樣,一旦NameNode節點斷電,可以通過FsImage和Edits的合併,合成元資料。
但是,如果長時間新增資料到Edits中,會導致該檔案資料過大,效率降低,而且一旦斷電,恢復元資料需要的時間過長。因此,需要定期進行FsImage和Edits的合併,如果這個操作由NameNode節點完成,又會效率過低。因此,引入一個新的節點SecondaryNamenode,專門用於FsImage和Edits的合併。
-
第一階段:NameNode啟動
(1)第一次啟動NameNode格式化後,建立Fsimage和Edits檔案。如果不是第一次啟動,直接載入編輯日誌和映象檔案到記憶體。
(2)客戶端對元資料進行增刪改的請求。
(3)NameNode記錄操作日誌,更新滾動日誌。
(4)NameNode在記憶體中對資料進行增刪改。
-
第二階段:Secondary NameNode工作
(1)Secondary NameNode詢問NameNode是否需要CheckPoint。直接帶回NameNode是否檢查結果。
(2)Secondary NameNode請求執行CheckPoint。
(3)NameNode滾動正在寫的Edits日誌。
(4)將滾動前的編輯日誌和映象檔案拷貝到Secondary NameNode。
(5)Secondary NameNode載入編輯日誌和映象檔案到記憶體,併合並。
(6)生成新的映象檔案fsimage.chkpoint。
(7)拷貝fsimage.chkpoint到NameNode。
(8)NameNode將fsimage.chkpoint重新命名成fsimage。
------------恢復內容結束------------
**------------恢復內容開始------------**------------恢復內容開始------------
Hadoop知識點什麼是HadoopHadoop和Spark差異Hadoop常見版本,有哪些特點,一般是如何進行選擇Hadoop常用埠號搭建Hadoop叢集的流程Hadoop中需要哪些配置檔案,其作用是什麼?HDFS讀寫流程MapReduce的Shuffle過程,Hadoop優化方案基於MapReduce做Hadoop的優化Yarn的job提交流程Yarn預設的排程器,分類,以及它們之間的區別Hadoop的引數優化Hadoop宕機Hadoop資料傾斜問題NameNode和secondarynameNode工作機制
什麼是Hadoop
Hadoop是一個能夠對大量資料進行分散式處理的軟體框架。以一種可靠、高效、可伸縮的方式進行資料處理。主要包括三部分內容:Hdfs,MapReduce,Yarn
從2.7.3版本開始block size的預設大小為128M,之前版本的預設值是64M。對於新檔案切分的大小,單位byte。每一個節點都要指定,包括客戶端。
原理:檔案塊越大,定址時間越短,但磁碟傳輸時間越長;檔案塊越小,定址時間越長,但磁碟傳輸時間越短。
Hadoop和Spark差異
Hadoop常見版本,有哪些特點,一般是如何進行選擇
Hadoop1.0
Hadoop2.0即為克服Hadoop1.0中的不足,提出了以下關鍵特性:
-
Yarn:它是Hadoop2.0引入的一個全新的通用資源管理系統,完全代替了Hadoop1.0中的JobTracker。在MRv1 中的 JobTracker 資源管理和作業跟蹤的功能被抽象為 ResourceManager 和 AppMaster 兩個元件。Yarn 還支援多種應用程式和框架,提供統一的資源排程和管理功能
-
NameNode 單點故障得以解決:Hadoop2.2.0 同時解決了 NameNode 單點故障問題和記憶體受限問題,並提供 NFS,QJM 和 Zookeeper 三種可選的共享儲存系統
-
HDFS 快照:指 HDFS(或子系統)在某一時刻的只讀映象,該只讀映象對於防止資料誤刪、丟失等是非常重要的。例如,管理員可定時為重要檔案或目錄做快照,當發生了資料誤刪或者丟失的現象時,管理員可以將這個資料快照作為恢復資料的依據
-
支援Windows 作業系統:Hadoop 2.2.0 版本的一個重大改進就是開始支援 Windows 作業系統
-
Append:新版本的 Hadoop 引入了對檔案的追加操作
同時,新版本的Hadoop對於HDFS做了兩個非常重要的「增強」,分別是支援異構的儲存層次和通過資料節點為儲存在HDFS中的資料提供記憶體緩衝功能
相比於Hadoop2.0,Hadoop3.0 是直接基於 JDK1.8 釋出的一個新版本,同時,Hadoop3.0引入了一些重要的功能和特性
-
HDFS可擦除編碼:這項技術使HDFS在不降低可靠性的前提下節省了很大一部分儲存空間
-
多NameNode支援:在Hadoop3.0中,新增了對多NameNode的支援。當然,處於Active狀態的NameNode例項必須只有一個。也就是說,從Hadoop3.0開始,在同一個叢集中,支援一個 ActiveNameNode 和 多個 StandbyNameNode 的部署方式。
-
MR Native Task優化
-
Yarn基於cgroup 的記憶體和磁碟 I/O 隔離
-
Yarn container resizing
Hadoop常用埠號
dfs.namenode.http-address:50070
dfs.datanode.http-address:50075
SecondaryNameNode:50090
dfs.datanode.address:50010
fs.defaultFS:8020 或者9000
yarn.resourcemanager.webapp.address:8088
歷史伺服器web訪問埠:19888
埠名稱 | Hadoop2.x | Hadoop3.x |
---|---|---|
NameNode 內部通訊埠 | 8020 / 9000 | 8020 / 9000/9820 |
NameNode HTTP UI | 50070 | 9870 |
MapReduce 檢視執行任務埠 | 8088 | 8088 |
jobhistoryserver歷史伺服器通訊埠 | 19888 | 19888 |
datanode HTTP UI | 50075 | |
SecondaryNameNode web端 | 50090 |
搭建Hadoop叢集的流程
1)準備一臺虛擬機器樣機(配置好IP地址,主機名,對映,關閉防火牆,jdk安裝。jdk安裝需要配置環境變數,JAVA_HOME/bin目錄下所有命令全域性生效)
2)克隆兩臺虛擬機器。(設定ssh免密登入,配置對映,修改主機名,和IP地址)
3)安裝Hadoop解壓到指定目錄檔案下
4)Hadoop檔案配置
a)core-site.xml配置HDFS中namenode的地址,指定Hadoop執行時產生檔案的儲存目錄
b)hdfs-site.xml配置hdfs副本數量
c)yarn-site.xml配置reducer獲取資料的方式,指定yarn的resourceManager的地址
d)mapred-env.sh配置MR執行在yarn上,配置歷史伺服器端地址,歷史伺服器web端地址
e)slaves配置其他虛擬機器的主機名
f)hadoop-env.sh : Hadoop 環境配置檔案 export JAVA_HOME=/usr/local/soft/jdk1.8.0_171
5)分發Hadoop的安裝包(scp -r hadoop node1:pwd
)
6)格式化hdfs(hdfs namenode -format)
7)啟動叢集(start-all.sh,因為我們是把resourcemanager和2nn放在master一臺上進行啟動)
Hadoop中需要哪些配置檔案,其作用是什麼?
1)core-site.xml:
(1)fs.defaultFS:hdfs://cluster1(域名),這裡的值指的是預設的HDFS路徑 。
(2)hadoop.tmp.dir:/export/data/hadoop_tmp,這裡的路徑預設是NameNode、DataNode、secondaryNamenode等存放資料的公共目錄。使用者也可以自己單獨指定這三類節點的目錄。
(3)ha.zookeeper.quorum:hadoop101:2181,hadoop102:2181,hadoop103:2181,這裡是ZooKeeper叢集的地址和埠。注意,數量一定是奇數,且不少於三個節點 。
2)hadoop-env.sh: 只需設定jdk的安裝路徑,如:export JAVA_HOME=/usr/local/jdk。
3)hdfs-site.xml:
(1) dfs.replication:他決定著系統裡面的檔案塊的資料備份個數,預設為3個。
(2) dfs.data.dir:datanode節點儲存在檔案系統的目錄 。
(3) dfs.name.dir:是namenode節點儲存hadoop檔案系統資訊的本地系統路徑 。
4)mapred-site.xml:
mapreduce.framework.name: yarn指定mr執行在yarn上
HDFS讀寫流程
1)客戶端通過Distributed FileSystem模組向NameNode請求上傳檔案,NameNode檢查目標檔案是否已存在,父目錄是否存在。
2)NameNode返回是否可以上傳。
3)客戶端請求第一個 Block上傳到哪幾個DataNode伺服器上。
4)NameNode返回3個DataNode節點,分別為dn1、dn2、dn3。
5)客戶端通過FSDataOutputStream模組請求dn1上傳資料,dn1收到請求會繼續呼叫dn2,然後dn2呼叫dn3,將這個通訊管道建立完成。
6)dn1、dn2、dn3逐級應答客戶端。
7)客戶端開始往dn1上傳第一個Block(先從磁碟讀取資料放到一個本地記憶體快取),以Packet為單位,dn1收到一個Packet就會傳給dn2,dn2傳給dn3;dn1每傳一個packet會放入一個應答佇列等待應答。
8)當一個Block傳輸完成之後,客戶端再次請求NameNode上傳第二個Block的伺服器。(重複執行3-7步)。
1)客戶端通過Distributed FileSystem向NameNode請求下載檔案,NameNode通過查詢元資料,找到檔案塊所在的DataNode地址。
2)挑選一臺DataNode(就近原則,然後隨機)伺服器,請求讀取資料。
3)DataNode開始傳輸資料給客戶端(從磁盤裡面讀取資料輸入流,以Packet為單位來做校驗)。
4)客戶端以Packet為單位接收,先在本地快取,然後寫入目標檔案。
MapReduce的Shuffle過程,Hadoop優化方案
MapReduce資料讀取並寫入HDFS流程實際上是有10步
Shuffle階段
1、Map方法之後Reduce方法之前這段處理過程叫「Shuffle」
2、Map方法之後,資料首先進入到分割槽方法,把資料標記好分割槽,然後把資料傳送到環形緩衝區;環形緩衝區預設大小100m,環形緩衝區達到80%時,進行溢寫;溢寫前對資料進行排序,排序按照對key的索引進行字典順序排序,排序的手段「快排」;溢寫產生大量溢寫檔案,需要對溢寫檔案進行「歸併排序」;對溢寫的檔案也可以進行Combiner操作,前提是彙總操作,求平均值不行。最後將檔案按照分割槽儲存到磁碟,等待Reduce端拉取。
3、每個Reduce拉取Map端對應分割槽的資料。拉取資料後先儲存到記憶體中,記憶體不夠了,再儲存到磁碟。拉取完所有資料後,採用歸併排序將記憶體和磁碟中的資料都進行排序。在進入Reduce方法前,可以對資料進行分組操作。
Shuffle優化
1)Map階段
(1)增大環形緩衝區大小。由100m擴大到200m
(2)增大環形緩衝區溢寫的比例。由80%擴大到90%
(3)減少對溢寫檔案的merge次數。(10個檔案,一次20個merge)
(4)不影響實際業務的前提下,採用Combiner提前合併,減少 I/O
2)Reduce階段
(1)合理設定Map和Reduce數:兩個都不能設定太少,也不能設定太多。太少,會導致Task等待,延長處理時間;太多,會導致 Map、Reduce任務間競爭資源,造成處理超時等錯誤。
(2)設定Map、Reduce共存:調整slowstart.completedmaps引數,使Map執行到一定程度後,Reduce也開始執行,減少Reduce的等待時間。
(3)規避使用Reduce,因為Reduce在用於連線資料集的時候將會產生大量的網路消耗。
(4)增加每個Reduce去Map中拿資料的並行數
(5)叢集效能可以的前提下,增大Reduce端儲存資料記憶體的大小。
3)IO傳輸
採用資料壓縮的方式,減少網路IO的的時間。安裝Snappy和LZOP壓縮編碼器。
壓縮:
(1)map輸入端主要考慮資料量大小和切片,支援切片的有Bzip2、LZO。注意:LZO要想支援切片必須建立索引;
(2)map輸出端主要考慮速度,速度快的snappy、LZO;
(3)reduce輸出端主要看具體需求,例如作為下一個mr輸入需要考慮切片,永久儲存考慮壓縮率比較大的gzip。
4)整體
(1)NodeManager預設記憶體8G,需要根據伺服器實際配置靈活調整,例如128G記憶體,配置為100G記憶體左右,yarn.nodemanager.resource.memory-mb。
(2)單任務預設記憶體8G,需要根據該任務的資料量靈活調整,例如128m資料,配置1G記憶體,yarn.scheduler.maximum-allocation-mb。
(3)mapreduce.map.memory.mb :控制分配給MapTask記憶體上限,如果超過會kill掉程序(報:Container is running beyond physical memory limits. Current usage:565MB of512MB physical memory used;Killing Container)。預設記憶體大小為1G,如果資料量是128m,正常不需要調整記憶體;如果資料量大於128m,可以增加MapTask記憶體,最大可以增加到4-5g。
(4)mapreduce.reduce.memory.mb:控制分配給ReduceTask記憶體上限。預設記憶體大小為1G,如果資料量是128m,正常不需要調整記憶體;如果資料量大於128m,可以增加ReduceTask記憶體大小為4-5g。
(5)mapreduce.map.java.opts:控制MapTask堆記憶體大小。(如果記憶體不夠,報:java.lang.OutOfMemoryError)
(6)mapreduce.reduce.java.opts:控制ReduceTask堆記憶體大小。(如果記憶體不夠,報:java.lang.OutOfMemoryError)
(7)可以增加MapTask的CPU核數,增加ReduceTask的CPU核數
(8)增加每個Container的CPU核數和記憶體大小
(9)在hdfs-site.xml檔案中配置多目錄
(10)NameNode有一個工作執行緒池,用來處理不同DataNode的併發心跳以及客戶端併發的元資料操作。dfs.namenode.handler.count=20 * log2(Cluster Size),比如叢集規模為10臺時,此引數設定為60。
基於MapReduce做Hadoop的優化
1)HDFS小檔案影響
-
影響NameNode的壽命,因為檔案元資料儲存在NameNode的記憶體中
-
影響計算引擎的任務數量,比如每個小的檔案都會生成一個Map任務
2)資料輸入小檔案處理
-
合併小檔案:對小檔案進行歸檔(Har)、自定義Inputformat將小檔案儲存成SequenceFile檔案。
-
採用ConbinFileInputFormat來作為輸入,解決輸入端大量小檔案場景
-
對於大量小檔案Job,可以開啟JVM重用會減少45%執行時間
3)Map階段
-
增大環形緩衝區大小。由100m擴大到200m
-
增大環形緩衝區溢寫的比例。由80%擴大到90%
-
減少對溢寫檔案的merge次數。(10個檔案,一次20個merge)
-
不影響實際業務的前提下,採用Combiner提前合併,減少 I/O
4)Reduce階段
-
合理設定Map和Reduce數:兩個都不能設定太少,也不能設定太多。太少,會導致Task等待,延長處理時間;太多,會導致 Map、Reduce任務間競爭資源,造成處理超時等錯誤。
-
設定Map、Reduce共存:調整
slowstart.completedmaps
引數,使Map執行到一定程度後,Reduce也開始執行,減少Reduce的等待時間 -
規避使用Reduce,因為Reduce在用於連線資料集的時候將會產生大量的網路消耗。
-
增加每個Reduce去Map中拿資料的並行數
-
叢集效能可以的前提下,增大Reduce端儲存資料記憶體的大小
5)IO傳輸
-
採用資料壓縮的方式,減少網路IO的的時間
-
使用SequenceFile二進位制檔案
6)整體
-
MapTask預設記憶體大小為1G,可以增加MapTask記憶體大小為4
-
ReduceTask預設記憶體大小為1G,可以增加ReduceTask記憶體大小為4-5g
-
可以增加MapTask的cpu核數,增加ReduceTask的CPU核數
-
增加每個Container的CPU核數和記憶體大小
-
調整每個Map Task和Reduce Task最大重試次數
7)壓縮
詳細檢視Hadoop3.x的mapreduce壓縮P58
Yarn的job提交流程
job提交過程詳解
(1)作業提交
1)Clinet呼叫job.waitForCompletion方法,向整個叢集提交MapReduce作業**
2)Clinet向RM申請一個作業id**
3)RM給Clinet返回該job資源的提交路徑和作業id**
4)Client 提交 jar 包、切片資訊和配置檔案到指定的資源提交路徑。**
5)Client 提交完資源後,向 RM 申請執行 MrAppMaster。
(2)作業初始化
6)當 RM 收到 Client 的請求後,將該 job 新增到容量排程器
7)某一個空閒的 NM 領取到該 Job。
8)該 NM 建立 Container,併產生 MRAppmaster。
9)下載 Client 提交的資源到本地。
(3)任務分配
10)MrAppMaster 向 RM 申請執行多個 MapTask 任務資源。
11)RM 將執行 MapTask 任務分配給另外兩個 NodeManager,另兩個 NodeManager
分別領取任務並建立容器。
(4)任務執行
12)MR 向兩個接收到任務的 NodeManager 傳送程式啟動指令碼,這兩個NodeManager 分別啟動 MapTask,MapTask 對資料分割槽排序。
13)MrAppMaster等待所有MapTask執行完畢後,向RM申請容器,執行ReduceTask。
14)ReduceTask 向 MapTask 獲取相應分割槽的資料
15)程式執行完畢後,MR 會向 RM 申請登出自己。
(5)進度和狀態更新
YARN 中的任務將其進度和狀態(包括 counter)返回給應用管理器, 客戶端每秒(通過mapreduce.client.progressmonitor.pollinterval 設定)嚮應用管理器請求進度更新, 展示給使用者。
(6)作業完成
除了嚮應用管理器請求作業進度外, 客戶端每 5 秒都會通過呼叫 waitForCompletion()來檢查作業是否完成。時間間隔可以通過 mapreduce.client.completion.pollinterval 來設定。作業完成之後, 應用管理器和 Container 會清理工作狀態。作業的資訊會被作業歷史伺服器儲存以備之後使用者核查。
其中簡略版對應的步驟分別如下:
1、client向RM提交應用程式,其中包括啟動該應用的ApplicationMaster的必須資訊,例如ApplicationMaster程式、啟動ApplicationMaster的命令、使用者程式等
2、ResourceManager啟動一個container用於執行ApplicationMaster
3、啟動中的ApplicationMaster向ResourceManager註冊自己,啟動成功後與RM保持心跳
4、ApplicationMaster向ResourceManager傳送請求,申請相應數目的container
5、申請成功的container,由ApplicationMaster進行初始化。container的啟動資訊初始化後,AM與對應的NodeManager通訊,要求NM啟動container
6、NM啟動container
7、container執行期間,ApplicationMaster對container進行監控。container通過RPC協議向對應的AM彙報自己的進度和狀態等資訊
8、應用執行結束後,ApplicationMaster向ResourceManager登出自己,並允許屬於它的container被收回
Yarn預設的排程器,分類,以及它們之間的區別
1)Hadoop排程器主要分為三類:
-
FIFO Scheduler:先進先出排程器:優先提交的,優先執行,後面提交的等待【生產環境不會使用】
-
Capacity Scheduler:容量排程器:允許看建立多個任務對列,多個任務對列可以同時執行。但是一個佇列內部還是先進先出。【Hadoop2.7.2預設的排程器】
-
Fair Scheduler:公平排程器:第一個程式在啟動時可以佔用其他佇列的資源(100%佔用),當其他佇列有任務提交時,佔用資源的佇列需要將資源還給該任務。還資源的時候,效率比較慢。【CDH版本的yarn排程器預設】
Hadoop的引數優化
-
在hdfs-site.xml檔案中配置多目錄,最好提前配置好,否則更改目錄需要重新啟動叢集
-
NameNode有一個工作執行緒池,用來處理不同DataNode的併發心跳以及客戶端併發的元資料操作
-
編輯日誌儲存路徑dfs.namenode.edits.dir設定與映象檔案儲存路徑dfs.namenode.name.dir儘量分開,達到最低寫入延遲
-
伺服器節點上YARN可使用的實體記憶體總量,預設是8192(MB),注意,如果你的節點記憶體資源不夠8GB,則需要調減小這個值,而YARN不會智慧的探測節點的實體記憶體總量
-
單個任務可申請的最多實體記憶體量,預設是8192(MB
Hadoop宕機
如果MR造成系統宕機。此時要控制Yarn同時執行的任務數,和每個任務申請的最大記憶體。調整引數:yarn.scheduler.maximum-allocation-mb(單個任務可申請的最多實體記憶體量,預設是8192MB)。
如果寫入檔案過量造成NameNode宕機。那麼調高Kafka的儲存大小,控制從Kafka到HDFS的寫入速度。高峰期的時候用Kafka進行快取,高峰期過去資料同步會自動跟上。
Hadoop資料傾斜問題
1)提前在map進行combine,減少傳輸的資料量
在Mapper加上combiner相當於提前進行reduce,即把一個Mapper中的相同key進行了聚合,減少shuffle過程中傳輸的資料量,以及Reducer端的計算量。
如果導致資料傾斜的key 大量分佈在不同的mapper的時候,這種方法就不是很有效了
2)資料傾斜的key 大量分佈在不同的mapper
在這種情況,大致有如下幾種方法:
-
「區域性聚合加全域性聚合」
第一次在map階段對那些導致了資料傾斜的key 加上1到n的隨機字首,這樣本來相同的key 也會被分到多個Reducer 中進行區域性聚合,數量就會大大降低。
第二次mapreduce,去掉key的隨機字首,進行全域性聚合。
「思想」:二次mr,第一次將key隨機雜湊到不同 reducer 進行處理達到負載均衡目的。第二次再根據去掉key的隨機字首,按原key進行reduce處理。
這個方法進行兩次mapreduce,效能稍差
「增加Reducer,提升並行度」
JobConf.setNumReduceTasks(int)
-
「實現自定義分割槽」
根據資料分佈情況,自定義雜湊函式,將key均勻分配到不同Reducer
NameNode和secondarynameNode工作機制
思考:NameNode中的元資料是儲存在哪裡的?
首先,我們做個假設,如果儲存在NameNode節點的磁碟中,因為經常需要進行隨機訪問,還有響應客戶請求,必然是效率過低。因此,元資料需要存放在記憶體中。但如果只存在記憶體中,一旦斷電,元資料丟失,整個叢集就無法工作了。因此產生在磁碟中備份元資料的FsImage。
這樣又會帶來新的問題,當在記憶體中的元資料更新時,如果同時更新FsImage,就會導致效率過低,但如果不更新,就會發生一致性問題,一旦NameNode節點斷電,就會產生資料丟失。因此,引入Edits檔案(只進行追加操作,效率很高)。每當元資料有更新或者新增元資料時,修改記憶體中的元資料並追加到Edits中。這樣,一旦NameNode節點斷電,可以通過FsImage和Edits的合併,合成元資料。
但是,如果長時間新增資料到Edits中,會導致該檔案資料過大,效率降低,而且一旦斷電,恢復元資料需要的時間過長。因此,需要定期進行FsImage和Edits的合併,如果這個操作由NameNode節點完成,又會效率過低。因此,引入一個新的節點SecondaryNamenode,專門用於FsImage和Edits的合併。
-
第一階段:NameNode啟動
(1)第一次啟動NameNode格式化後,建立Fsimage和Edits檔案。如果不是第一次啟動,直接載入編輯日誌和映象檔案到記憶體。
(2)客戶端對元資料進行增刪改的請求。
(3)NameNode記錄操作日誌,更新滾動日誌。
(4)NameNode在記憶體中對資料進行增刪改。
-
第二階段:Secondary NameNode工作
(1)Secondary NameNode詢問NameNode是否需要CheckPoint。直接帶回NameNode是否檢查結果。
(2)Secondary NameNode請求執行CheckPoint。
(3)NameNode滾動正在寫的Edits日誌。
(4)將滾動前的編輯日誌和映象檔案拷貝到Secondary NameNode。
(5)Secondary NameNode載入編輯日誌和映象檔案到記憶體,併合並。
(6)生成新的映象檔案fsimage.chkpoint。
(7)拷貝fsimage.chkpoint到NameNode。
(8)NameNode將fsimage.chkpoint重新命名成fsimage。
------------恢復內容結束------------
**------------恢復內容結束------------**