1. 程式人生 > >大資料-Hadoop生態(13)-MapReduce框架原理--Job提交原始碼和切片原始碼解析

大資料-Hadoop生態(13)-MapReduce框架原理--Job提交原始碼和切片原始碼解析

1.MapReduce的資料流

1) Input -> Mapper階段

輸入源是一個檔案,經過InputFormat之後,到了Mapper就成了K,V對,以上一章的流量案例來說,經過InputFormat之後,變成了手機號為key,這一行資料為value的K,V對,所以這裡我們可以自定義InputFormat,按照具體的業務來實現將檔案轉為K,V對的方式

2) Mapper -> Reducer階段

這一階段叫做shuffle(洗牌)階段,從Mapper出來的資料是無序的K,V對,那到了Reducer階段,就變成了有序了.所以我們可以自定義排序規則

3) Reducer -> Output階段

檔案資料經過之前的種種處理,已經變成了有序的資料,這一階段就是將資料寫入檔案

 

2.資料切片與MapTask並行度決定機制

問題引出

MapTask的並行度決定Map階段的任務處理併發度,進而影響到整個Job的處理速度。

思考:1G的資料,啟動8個MapTask,可以提高叢集的併發處理能力。那麼1K的資料,也啟動8個MapTask,會提高叢集效能嗎?MapTask並行任務是否越多越好呢?哪些因素影響了MapTask並行度?

一個300M的資料,按照blocksize=128M進行儲存,在datanode上分別是0-128m,128m-256m,256m-300m

1) 紅線切片,將資料按照100M進行切片,每個MapTask處理同樣大小的100M資料,看似很公平,datanode1的MapTask處理100M資料,剩下的28m傳輸給datanode2的MapTask,datanode2的MapTask處理28m+本地的72m資料,剩下的56m再傳輸給datanode3的MapTask.這樣就增加了84m的網路傳輸資料.為了減少網路傳輸,yarn有一個本地原則,即block儲存在哪個節點上,就在哪個節點上啟動MapTask

2) 藍線切片,切片大小128m=blocksize,每一個datanode啟動的MapTask處理的資料都是128m,雖然看似處理速度比1)的100m慢一些,但是卻節省了緊張的網路傳輸資源

所以約定俗成的,切片大小=blocksize

 

3.Job提交流程原始碼

確定job的輸出路徑是否有問題

 建立jobid和臨時資料夾

生成job的配置資訊

 

經過這一步寫入後,資料夾裡就有了檔案

最後提交job

整理一下

waitForCompletion()

submit();

// 1建立連線
    connect();    
        
// 1)建立提交Job的代理 new Cluster(getConfiguration()); // (1)判斷是本地yarn還是遠端 initialize(jobTrackAddr, conf); // 2 提交job submitter.submitJobInternal(Job.this, cluster) // 1)建立給叢集提交資料的Stag路徑 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); // 2)獲取jobid ,並建立Job路徑 JobID jobId = submitClient.getNewJobID(); // 3)拷貝jar包到叢集 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir); // 4)計算切片,生成切片規劃檔案 writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job); // 5)向Stag路徑寫XML配置檔案 writeConf(conf, submitJobFile); conf.writeXml(out); // 6)提交Job,返回提交狀態 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

 

3.FileInputFormat切片機制

上一小章的job提交流程裡,生成的臨時資料夾裡面,有一個job.split檔案,說明切片是在job提交之前就切好了,是本地的Driver類做的

那麼就再追一下切片原始碼

 

最小值minsize=1,最大值maxsize是int的最大值

遍歷input源的資料夾,對每一個檔案進行切片,先判斷檔案是否可切

 

計算切片大小

如,minsize=1,blocksize=32G(33554432),maxsize=9223372036854775807

通過計算,返回的是blocksize.也就是返回的中間值

如果想增加切片大小,那麼就增加minsize的大小到>blocksize,如果想減小切片大小,那麼就減小maxsize大小到<blocksize

bytesRemaining是檔案的大小,根據上面計算到的splitSize來進行切片

404行的SPLIT_SLOP的值是1.1,比的時候是按1.1倍來比,而409行切的時候是按1倍來切

假如blocksize=32M,現在要切一個32.001M的檔案,為了多出來的1k啟動一個MapTask,很浪費不值當~,所以這樣做是為了保證,切出來的片,至少是blocksize大小的10%以上,如果不夠10%,那就交給最後的MapTask哥們來處理好了

最後將切片放到切片list裡

 

總結一下