1. 程式人生 > 程式設計 >Hadoop原始碼篇 --- DataNode的初始化與註冊流程

Hadoop原始碼篇 --- DataNode的初始化與註冊流程

前言

因為大家讀原始碼的方式都各有千秋,這裡的閱讀過程並不代表最佳實踐,只是一個自身閱讀過程的再現而已。所以如果有一些遺漏的重點,也可以在留言處替我指出。先前也有很多小夥伴提出了一些我的錯誤或者改進的地方,這裡表示衷心的感謝。

那按照先前的套路,我們提出兩個任務,整篇就為了完成驗證下面的任務而進行:

1、DataNode初始化:我們平時搭建叢集時,通過jps命令時可以看到DataNode的服務的,所以DataNode就應該是RPC的服務端

2、DataNode的註冊:HDFS是一個主從架構,NameNode是主節點而DataNode是從節點,所以DataNode啟動時是需要和NameNode進行註冊的

3、心跳機制:從節點需要傳送心跳讓主節點得知從節點的存活

4、NameNode是如何管理元資料的,HA高可用方案的實現原理

(補充)Hadoop HA高可用方案原理

先把一個簡單的給搞定吧,因為直接上原始碼真的挺犯困的

在Hadoop1.x的時候,我們只有一個NameNode和DataNode,NameNode負責管理元資料,DataNode用於儲存資料,為了保證資料安全,每個副本會存在三個備份,每個block佔據64M的大小

這樣NameNode就會存在單點故障的問題,所以這時候hadoop的團隊就開始解決這個問題了,因為NameNode是管理著叢集的元資料的,這是一個有狀態的服務,這就說明瞭它是不能隨便停止服務的,那既然問題其實是算只有一個NameNode不好乾活的鍋,那我們增加NameNode數量不就好了?可是平白無故地增加一個NameNode,它倆如何能夠保證元資料的一致性呢?

解決這個單點的問題,首先就是這兩個NameNode如何保證元資料的一致,此時解決方案有三個

① 使用linux的共享儲存目錄

這個做法其實就是讓我們的叢集元資料不存放於Namenode中

而是放在一個共享的儲存單元中,這是apache官方當時推薦的做法,可是大家都沒吃它的這一套。現在肯定是沒人再這麼玩的了

② 使用zookeeper叢集

這也很簡單,就是我們的一個NameNode去往zookeeper叢集中寫元資料,然後另外一個就去zookeeper裡面去讀,有部分公司確實也是在採用這個方案

③ cloudera QJM

第三種方案其實是由cloudera這家公司所提出的,它通過一個Journalnode叢集來完成,這個東西和zookeeper其實差不太多,實現邏輯也基本雷同

而且它本身也是一個叢集,健康依據為過半節點存活即可,比如我圖中的3臺掛掉一臺,那就是2/3>0.5,那此時判斷這個叢集是健康狀態。

有可能你會問,為什麼你就這樣草率地決定了第一個NameNode是寫元資料的第二個是同步元資料的呢?因為它們兩個還有各自的狀態問題

此時active狀態下的NameNode負責往叢集寫,而standby的從叢集讀

當然現在我們的問題解決了嗎,其實並沒有,如果NameNode(active)在某天凌晨突然宕機,那我豈不是凌晨就要開啟電腦,連上公司的服務,使用命令強行把standby狀態下的NameNode設定為active叢集才恢復正常工作了?

那我們現在如何保證NameNode狀態的自動切換呢?

④ 實現NameNode狀態的切換

此時我們又引入了老朋友zookeeper,真是哪都有它,在zookeeper中建立一個鎖的目錄,然後NameNode啟動的時候都會過去搶佔鎖,兩個NameNode誰先搶到,誰就是active狀態。

而且每一個NameNode上還有一個ZKFC的服務,持續監聽NameNode的健康狀態,如果active NameNode出現問題,ZKFC將會報告給zookeeper,然後zookeeper會將鎖分配給standby的NameNode上。使其自動切換為active狀態

一、DataNode的初始化(原始碼片段基於Hadoop2.7.0)

我們直接找到DataNode的main方法處,它和NameNode差不多,進行一個引數的判斷之後,不滿足就退出。除了這麼一句,就還剩這麼一個secureMain(args,null)了,那核心程式碼就是這一個

點進去發現了一個creataDataNode,所以我就最喜歡這種命名這麼直接的,那就點進去creataDataNode吧

把註釋先複製一下,貼上到百度

那我們一看,例項化的英文不就是instantiate嘛,所以不用想太多,點進去就是了,下面的那個if也很簡單,因為當這個例項化成功,DataNode不為null,那就把這個DataNode啟動起來唄,就是這麼簡單而已,這個啟動其實就是就是一堆執行緒

看原始碼的時候要帶著目的,不然就會被其他的一些程式碼帶跑,有try看try,還有就是一大串單詞的時候看最後一個單詞判斷它的作用,比如前面的if,Configuration是配置呀,args是引數集啊,這些我們都不關心。我們現在就想知道你怎麼例項化,所以就盯著這個instance這玩意點就是了

同理,前面那些permission許可權,checker檢查器我們也不關心,看到最後是返回一個DataNode即可。那我們就點這個

映入眼簾的是一堆引數的配置,先不管,拉到我們想看的位置,大概到465行左右會有一個try,我們瞧瞧

StartDataNode到底經歷了幾個過程

看到啟動DataNode的程式碼了,所以這個就是我們想看的

① (補充)DataXceiver

補充一下:大致拉到1182行,這個initDataXceiver(conf)是初始化了我們的DataXceiver,這個是幹啥用的呢,點進去

我們知道,NameNode只是負責管理叢集中的元資料資訊的,而真正儲存的資料是儲存在DataNode上的,實際上DataNode就是通過這個DataXceiver的服務來接收上傳上來的資料的

在974行左右有一段設定為後臺執行緒的程式碼,這個意思其實就是這個執行緒和主執行緒共存亡,如果主執行緒結束了,這個執行緒也會隨之停止。

② (補充)startInfoServer

回到StartDataNode的那個位置

相信你一定還有印象,就是在我們的 Hadoop原始碼篇 --- NameNode的啟動流程 1.4.1解析NameNode啟動流程中,也存在類似的一個startHttpServer,當時這個startHttpServer就是綁定了很多servlet,來增強自身的功能。而我們的DataNode它也是一樣,會繫結servlet增強自身

眼尖的小夥伴肯定就看到了一些熟悉的名詞,checksum不就是我們對資料進行完整性校驗所使用的校驗和嘛,這說明獲取校驗和這個功能也是繫結上去的一個servlet

③ 初始化RPC的服務

再次回到StartDataNode的那個位置,看到initIpcServer

initIpcServer是啟動RPC的服務使用的。

這個程式碼我們又很熟悉了,這和 Hadoop原始碼篇 --- NameNode的啟動流程 1.6.2 驗證存在設定引數的過程又是十分類似。而且同樣的建立服務端完成之後,它也綁定了很多的協議,比如DataNode和DataNode之間進行通訊的interDatanodeProtocolXlator,而且同樣這些協議也有一個唯一的versionID

④ 建立BlockPoolManager

blockManager的作用在註釋上已經給出來了,我們點進去refreshNamenodes瞧瞧

看見do,繼續點進去,這個東西有100多行程式碼,需要分點說明,但是僅說明最重要的兩個點

判斷存在著多少份不同的元資料

把註釋給翻譯一下,大致意思就是,它會判斷對於每一個新的nameservices到底是對已有的nameservices的更新還是一個全新的nameservices

我們平時搭建的HDFS叢集是HA高可用架構的,即NameNode分為active和standby兩個,這兩個管理的元資料也是一樣的,所以它們管理的是同一個nameservices(在這裡這麼理解,nameservices就是存放元資料的一個目錄即可)。而對於聯邦來說,每一個聯邦都會管理著一份元資料,兩個聯邦那自然就會存在兩份不一樣的元資料。

遍歷聯邦

針對每一個聯邦建立一個BPOfferService,針對聯邦裡的每個NameNode(其實也就2個)建立一個BPServiceActor

到這裡初始化的步驟就已經走完了,是不是感覺一臉蒙圈,沒事,還有註冊流程

二、DataNode的註冊流程

剛剛的位置結尾有一個startAll(),這就是註冊的主要邏輯,點進去

可以看到這裡它遍歷了聯邦,然後我們點進去start()方法

連起來就是,先遍歷聯邦,再遍歷聯邦裡面的NameNode,然後將DataNode分別註冊進去遍歷出來的NameNode,繼續點start

呼叫執行緒的start方法實際上就是呼叫run方法,這個大家應該還是知道的。

這裡的connectToNNAndHandshake()就是註冊的核心程式碼了,直譯過來呢就是和NameNode進行握手。這裡使用了一個死迴圈來保證這個註冊一定能夠被執行,如果出現異常,設定回睡眠5秒之後再次嘗試。如果執行成功,就break。。反正我就死賴在這裡了,就一定要你執行成功。

2.1 connectToNNAndHandshake()

既然剛剛都不惜使用死迴圈來讓connectToNNAndHandshake()一定要執行成功,那就點進來瞧瞧吧

此時我們的主線目標就是呼叫NameNode的方法將DataNode給註冊進去,其實就是往NameNode儲存這個DataNode的資訊。這裡我們獲取了NameNode的代理,為什麼這塊需要用到代理呢

代理物件角色內部含有對真實物件的引用,從而可以操作真實物件,同時代理物件提供與真實物件相同的介面以便在任何時刻都能代替真實物件。同時,代理物件可以在執行真實物件操作時,附加其他的操作,相當於對真實物件進行封裝。

所以此時我們獲取到了一個bpNameNode的代理物件,然後通過register()往上面進行註冊

2.2 DataNode的註冊資訊createRegistration

一直往下點可以看到DatanodeRegistration類的屬性欄位,有興趣的可以去了解一下

而且我們也可以發現,在這個過程中,我們的主機名,埠號···等資訊都已經一併傳送過去了

獲取完DataNode資訊後的registerDatanode

回到register()

因為向NameNode進行註冊的程式碼也是十分重要,所以使用了和剛剛一樣的死迴圈套路,保證中間的註冊過程一定被執行,成功就break,異常就延遲一秒後重試。

此時我們看到通過bpNameNode這個物件呼叫了一個registerDatanode方法,這裡明顯是呼叫了NamenodeRpcServer裡的同名方法的。不信我們點進去,開啟NamenodeRpcServer然後ctrl+f這個registerDatanode即可

這裡繼續點進去

有try看try,這裡明顯是DataNodeManager來處理關於DataNode的問題的,繼續點進去register

裡面程式碼比較長,我們一直拉到大約995行處,可以看到一個addDataNode

addDatanode(nodeDescr)

引數nodeDescr就是剛剛createRegistration中提到的封裝好的一個DataNode的註冊資訊

註冊資訊的方式其實就是往這些資料結構中去填,比如這個什麼datanodeMap,點進去一看

這明顯就是第一個引數為這個DataNode的唯一標識,第二個引數DataNode的description描述,就肯定是這DataNode的註冊資訊,把這些資訊分別儲存在不同的資料結構中

addDatanode(nodeDescr)

這裡的引數nodeDescr和上面是一樣的,都是DataNode的註冊資訊

好處就是以後遍歷心跳的資訊的話就直接遍歷DatanodeDescriptor這個資料結構而不用再先遍歷DataNode然後再一個個取它們的心跳資訊出來

總得來說,註冊其實就是往各個資料結構存放資訊,寫入完成後,註冊就完成了

三、開始傳送心跳

跳轉回 2.1 connectToNNAndHandshake() 的那裡往下看,也就是BPServiceActor類的大概890行。如果connectToNNAndHandshake成功,就break掉,開始傳送心跳的步驟

這個迴圈就真的是一個死迴圈了,連break都沒有,出現異常也只會重試,所以我們點進去看看

DataNode和NameNode的心跳間隔

在if (startTime - lastHeartbeat >= dnConf.heartBeatInterval)是判斷如果當前時間-最後一次心跳的時間大於dnConf.heartBeatInterval,就執行,所以我們看看這個heartBeatInterval到底是多長,點進去

看一下這個 DFS_HEARTBEAT_INTERVAL_DEFAULT 預設值為3

所以以後咱們就知道了,DataNode和NameNode保持心跳其實是3秒一次的

之後就是 HeartbeatResponse resp = sendHeartBeat() 這句,它這個 HeartbeatResponse 就是NameNode給DataNode下發的指令,點進去sendHeartBeat()

發現其實心跳時會連帶這把DataNode的一些資訊也附帶過去,比如說第一個reports是報告的意思,點進去看看

你看,其實就是記憶體容量,記憶體使用情況···等等這些資訊

傳送心跳 sendHeartbeat

通過NameNode的代理物件bpNamenode呼叫傳送心跳,那其實就是直接呼叫NameNode的sendHeartbeat方法,我們直接來到NamenodeRpcServer找到同名方法

那些引數也一併帶進來了,我們繼續點

這裡的cmds就是NameNode希望DataNode進行操作的指令,它會作為一個引數封裝成一個HeartbeatResponse物件返回給DataNode

我們嘗試著看看處理心跳的方法

處理心跳 handleHeartbeat

看到這裡其實都有種一直在看重複程式碼的感覺,不過讀原始碼就是那麼糾結的過程

第一個getDatanode是根據每一個DataNode獨一無二的id號來取出對應的DataNode

updateHeartbeat這個方法一直往下點可以點到這個更新心跳狀態的這個方法

這裡因為我們的DataNode一直是在工作的,它必須保持自己的狀態更新,然後非常重要的就是它必須修改上一次的心跳時間,因為我們剛剛也看過了,我們是通過當前時間-上一次心跳時間=3s的話,就會再次傳送心跳,所以這一步非常重要。而且我們也需要用到心跳這項指標來判斷這個DataNode節點是否存活,因為如果超過了一定的時間(這個值是自己配置的)未建立心跳,NameNode是會判定這個節點掛掉然後它再讓其他的block複製多一份資料的。

那到這裡其實DataNode就差不多了。

finally