1. 程式人生 > >剖析HDFS的檔案讀寫

剖析HDFS的檔案讀寫

客戶端通過對DistributedFileSystem物件呼叫create()來新建檔案(步驟1)。DistributedFileSystem對namenode建立一個RPC呼叫,在檔案系統的名稱空間中新建一個檔案,此時該檔案中還沒有相應的資料塊(步驟2)。namenode執行各種不同的檢查以確保這個檔案不存在以及客戶端有新建該檔案的許可權。如果這些檢查均通過,namenode就會為建立新檔案記錄一條記錄,如果檢查未通過,會導致檔案建立失敗並向客戶端丟擲一個IOException。DistributedFileSystem向客戶端返回一個FSDataOutputStream物件,由此客戶端可以開始寫入資料。就像讀取事件一樣,FSDataOutputStream封裝一個DFSOutputStream物件,該物件負責處理datanode和namenode之間的通訊。

   
      圖-客戶端將資料寫入HDFS

在客戶端寫入資料時(步驟3), DFSOutputStream將它分成一個個的資料包,並寫入內部佇列,稱為“資料佇列”(data queue)。DataStreamer處理資料佇列,它的責任是挑選出適合儲存資料複本的一組datanode,並據此來要求namenode分配新的資料塊。這一組 datanode構成一個管線——我們假設複本數為3,所以管線中有3個節點。DataStreamer將資料包流式傳輸到管線中第1個datanode,該datanode儲存資料包並將它傳送到管線中的第2個datanode。同樣,第2個datanode儲存該資料包並且傳送給管線中的第3個(也是最後一個)datanode (步驟4

)。
DFSOutputStream也維護著一個內部資料包佇列來等待datanode的收到確認回執,稱為“確認佇列”(ack queue)。收到管道中所有datanode 確認資訊後,該資料包才會從確認佇列刪除(步驟5)。

     如果任何datanode在資料寫入期間發生故障,則執行以下操作(對寫入資料的客戶端是透明的)。首先關閉管線,確認把佇列中的所有資料包都添加回資料佇列的最前端,以確保故障節點下游的datanode不會漏掉任何一個數據包。為儲存在另一正常datanode的當前資料塊指定一個新的標識,並將該標識傳送給namenode, 以便故障datanode在恢復後可以刪除儲存的部分資料塊。從管線中刪除故障datanode,基於兩個正常datanode 構建一條新管線。餘下的資料塊寫入管線中正常的datanode。namenode 注意到塊複本量不足時,會在另一個節點上建立一個新的複本。後續的資料塊繼續正常接受處理。

     在一個塊被寫入期間可能會有多個datanode同時發生故障,但非常少見。只要寫入了dfs.namenode.replication.min的複本數(預設為1),寫操作就會成功,並且這個塊可以在叢集中非同步複製,直到達到其目標複本數(dfs.replication的預設值為3)。

      客戶端完成資料的寫入後,對資料流呼叫close()方法(步驟6)。該操作將剩餘的所有資料包寫人datanode 管線,並在聯絡到namenode告知其檔案寫人完成之前,等待確認(步驟7)。namenode已經知道檔案由哪些塊組成(因為Datastreamer請求分配資料塊),所以它在返回成功前只需要等待資料塊進行最小量的複製。

寫流程簡化步驟:

1、 客戶端向NameNode發出寫檔案請求。
2、檢查是否已存在檔案、檢查許可權。若通過檢查,直接先將操作寫入EditLog,並返回輸出流物件。 
(注:WAL,write ahead log,先寫Log,再寫記憶體,因為EditLog記錄的是最新的HDFS客戶端執行所有的寫操作。如果後續真實寫操作失敗了,由於在真實寫操作之前,操作就被寫入EditLog中了,故EditLog中仍會有記錄,我們不用擔心後續client讀不到相應的資料塊,因為在第5步中DataNode收到塊後會有一返回確認資訊,若沒寫成功,傳送端沒收到確認資訊,會一直重試,直到成功)
3、client端按128MB的塊切分檔案。
4、client開始往datanode上傳第一個block(先從磁碟讀取資料放到一個本地記憶體快取),以pipeline( 管道) 的形式將packet寫入,並以packet為單位(一個packet為64kb),當然在寫入的時候datanode會進行資料校驗,它並不是通過一個packet進行一次校驗而是以chunk為單位進行校驗(512byte),第一臺datanode收到一個packet就會傳給第二臺,第二臺傳給第三臺;第一臺每傳一個packet會放入一個應答佇列等待應答。
(注:並不是寫好一個塊或一整個檔案後才向後分發)
5、最後一個datanode成功儲存之後會返回一個ack packet( 確認佇列) , 在pipeline裡傳遞至客戶端, 在客戶端的開發庫內部維護著”ack queue”, 成功收到datanode返回的ack packet後會從”ack queue”移除相應的packet。
6、如果傳輸過程中, 有某個datanode出現了故障, 那麼當前的pipeline會被關閉, 出現故障的datanode會從當前的pipeline中移除, 剩餘的block會繼續剩下的datanode中繼續以pipeline的形式傳輸, 同時Namenode會分配一個新的datanode, 保持replications設定的數量。當一個block傳輸完成之後,client再次請求namenode上傳第二個block的伺服器。
7、寫完資料,關閉輸輸出流。
8、傳送完成訊號給NameNode。 
(注:傳送完成訊號的時機取決於叢集是強一致性還是最終一致性,強一致性則需要所有DataNode寫完後才向NameNode彙報。最終一致性則其中任意一個DataNode寫完後就能單獨向NameNode彙報,HDFS一般情況下都是強調強一致性)

 讀流程的簡化步驟:

1、使用HDFS提供的客戶端Client, 向遠端的Namenode發起RPC請求。
2、Namenode會視情況返回檔案的部分或者全部block列表, 對於每個block, Namenode都會返回有該block拷貝的DataNode地址。
3、客戶端Client會選取離客戶端最近的DataNode來讀取block; 如果客戶端本身就是DataNode, 那麼將從本地直接獲取資料。
4、讀取完當前block的資料後, 關閉當前的DataNode連結, 併為讀取下一個block尋找最佳的DataNode。
5、當讀完列表block後, 且檔案讀取還沒有結束, 客戶端會繼續向Namenode獲取下一批的block列表。
6、讀取完一個block都會進行checksum驗證, 如果讀取datanode時出現錯誤, 客戶端會通知Namenode, 然後再從下一個擁有該block拷貝的datanode繼續讀。