1. 程式人生 > >Hadoop 相關知識點(一)

Hadoop 相關知識點(一)

作業提交流程(MR執行過程)

Mapreduce2.x
這裡寫圖片描述

Client:用來提交作業

ResourceManager:協調叢集上的計算資源的分配

NodeManager:負責啟動和監控叢集上的計算容器(container)

ApplicationMaster:協調執行MapReduce任務,他和應用程式任務執行在container中,這些congtainer有RM分配並且由NM進行管理

主要過程分析:
【作業的提交】
1. Job的submit()方法建立一個內部的Jobsubmiter例項,並且呼叫它的submitJobInternal()方法。(圖中的第一步

/**
   * Submit the job to the cluster and return immediately.
   * @throws
IOException */
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new
PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: "
+ getTrackingURL()); }

2.提交作業以後,waitForCompletion()每秒輪詢作業的進度,如果發現自上次報告有所改變,便把進度報告提交到控制檯

 /**
   * Submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

3.JobSubmiter實現的作業提交流程:
首先,會向RM請求一個新的應用ID,用以MapReduce的作業ID(圖中的步驟2),
接著檢查作業的輸出說明(例如:如果作業沒有指定輸出目錄或者輸出目錄已經存在,作業人就不會提交,錯誤就會拋回給MapReduce)。
再接著,就是計算作業的輸入分片。如果分片無法計算,例如輸入分片不存在的話,作業就不會提交,錯誤就會拋回給MapReduce。
然後,講作業所需要的資源(作業JAR檔案,配置檔案,計算所得的輸入分片)複製到一個以作業ID命名的共享檔案系統中(HDFS)。(對應步驟3
再然後,呼叫資源的submitApplication()方法提交作業(步驟4

protected void submitApplication(
      ApplicationSubmissionContext submissionContext, long submitTime,
      String user) throws YarnException {
       //獲得作業ID
    ApplicationId applicationId = submissionContext.getApplicationId();
       //構建一個app並放入applicationACLS 
    RMAppImpl application =
        createAndPopulateNewRMApp(submissionContext, submitTime, user, false);
    ApplicationId appId = submissionContext.getApplicationId();

    if (UserGroupInformation.isSecurityEnabled()) {
      try {
        this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
            parseCredentials(submissionContext),
            submissionContext.getCancelTokensWhenComplete(),
            application.getUser());
      } catch (Exception e) {
        LOG.warn("Unable to parse credentials.", e);
        // Sending APP_REJECTED is fine, since we assume that the
        // RMApp is in NEW state and thus we haven't yet informed the
        // scheduler about the existence of the application

        assert application.getState() == RMAppState.NEW;
        this.rmContext.getDispatcher().getEventHandler()
          .handle(new RMAppEvent(applicationId,
              RMAppEventType.APP_REJECTED, e.getMessage()));
        throw RPCUtil.getRemoteException(e);
      }
    } else {
      // Dispatcher is not yet started at this time, so these START events
      // enqueued should be guaranteed to be first processed when dispatcher
      // gets started.
       //觸發app啟動事件
      this.rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.START));
    }
  }

【作業的初始化】
4.RM收到了呼叫它的submitApplication()訊息後,就會將請求傳遞給YARN排程器,排程器分配一個容器,然後資源管理器在節點管理器的管理下在容器中啟動 application Master的程序步驟5a、5b),MapReduce的application Master是一個Java應用程式,它的主類是MRAppMaster。它將接受來自任務的進度和完成報告(步驟6),接下來,他將會接受來自共享檔案系統的jar檔案和計算好的分片資訊(步驟7) , 然後對每一個分片建立一個map任務物件以及由mapreduce.job.recuces(通過作業的 setNumReduceTasks()方法設定)確定多個reduce任務物件。任務ID在此時分配。

application Master 必須確定如何構成MapReduce 的各個任務。如果作業很小,就選擇和自己在同一個JVM上執行任務,與在同一個節點上執行任務相比,application Master判斷在新的容器中分配和執行任務的開銷執行他們的開銷時,這樣的任務稱作為uberized,或者作為uber任務執行(小作業–少於10個map任務且只有一個reducer且輸出大小小於一個HDFS塊的作業)。

【作業的分配】
5、如果作業不適合作為uber任務執行,這個時候application Master就會為改作業的所有map任務和reduce任務向資源管理器請求資源容器步驟8),這個請求也為指定了記憶體需求和CPU數。

【任務的執行】
6、一旦資源管理器的排程器分配了一個特定節點上的容器,application Master就會通過與節點管理器的通訊來啟動容器(步驟9a 、9b),就是相當於啟動了任務,這個任務是由主類為YarnChild的一個java應用程式執行。在執行任務之前,要先將資源本地化,包括作業的配置,jar和所有來自分散式快取的檔案(10)。最後執行map任務或者reduce任務(11)。任務完成後,MRAppMaster程序會向ResourceManager 登出本次任務,代表任務完成,Yarn可以回收本次分配的全部資源,MRAppMaster程序也會結束。

hdfs的基本原理

簡介: Hadoop分散式檔案系統(HDFS)被設計成適合執行在通用硬體(commodity hardware)上的分散式檔案系統。HDFS體系結構中有兩類節點,一類是NameNode,又叫”元資料節點”;另一類是DataNode,又叫”資料節點”。這兩類節點分別承擔Master和Worker具體任務的執行節點。總的設計思想:分而治之——將大檔案、大批量檔案,分散式存放在大量獨立的伺服器上,以便於採取分而治之的方式對海量資料進行運算分析。

原理:
1 分散式檔案系統,它所管理的檔案是被切塊儲存在若干臺datanode伺服器上。

2 hdfs提供了一個統一的目錄樹來定位hdfs中的檔案,客戶端訪問檔案時只要指定目錄樹的路徑即可,不用關心檔案的具體物理位置。

3 每一個檔案的每一個切塊,在hdfs叢集中都可以儲存多個備份(預設3份),在hdfs-site.xml中,dfs.replication的value的數量就是備份的數量。(副本放置:首先第一個放在執行客戶端的節點上,其次第二個放在與第一個不同且隨機另外選擇的機架中的一個節點、 第三個放在與第二個副本同一機架且是隨機另外選擇的節點上)

4 hdfs中有一個關鍵程序服務程序:namenode,它維護了一個hdfs的目錄樹及hdfs目錄結構與檔案真實儲存位置的對映關係(元資料).而datanode服務程序專門負責接收和管理"檔案塊"-block,預設大小為128M(可配置),(dfs.blocksize),(老版本的hadoop的預設block是64M的)。

hadoop的shuffle過程:

這裡寫圖片描述

shuffle的過程

簡單的概括:map()輸出結果->記憶體(環形緩衝區,當記憶體大小達到指定數值,如80%,開始溢寫到本地磁碟)
溢寫之前,進行了分割槽partition操作,分割槽的目的在於資料的reduce指向,分割槽後進行二次排序,第一次是對partitions進行排序,第二次對各個partition中的資料進行排序,之後如果設定了combine,就會執行類似reduce的合併操作,還可以再進行壓縮,因為reduce在拷貝檔案時消耗的資源與檔案大小成正比
記憶體在達到一定比例時,開始溢寫到磁碟上
當檔案資料達到一定大小時,本地磁碟上會有很多溢寫檔案,需要再進行合併merge成一個檔案
reduce拷貝copy這些檔案,然後進行歸併排序(再次merge),合併為一個檔案作為reduce的輸入資料

Job Tracker:是Map-reduce框架的中心,他需要與叢集中的機器定時通訊heartbeat,需要管理哪些程式應該跑在哪些機器上,需要管理所有job失敗、重啟等操作。
TaskTracker是Map-Reduce叢集中每臺機器都有的一個部分,他做的事情主要是監視自己所在機器的資源情況。

PS:Hadoop的shuffle過程就是從map端輸出到reduce端輸入之間的過程,這一段應該是Hadoop中最核心的部分,因為涉及到Hadoop中最珍貴的網路資源,所以shuffle過程中會有很多可以調節的引數,也有很多策略可以研究,這方面可以看看大神董西成的相關文章或他寫的MapReduce相關書籍。

Shuffle過程淺析

2.1 Map端

  (1)在map端首先接觸的是InputSplit,在InputSplit中含有DataNode中的資料,每一個InputSplit都會分配一個Mapper任務,Mapper任務結束後產生<K2,V2>的輸出,這些輸出先存放在快取, 每個map有一個環形記憶體緩衝區,用於儲存任務的輸出。預設大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spil l.percent),一個後臺執行緒就把內容寫到(spill)Linux本地磁碟中的指定目錄(mapred.local.dir)下的新建的一個溢位 寫檔案。

總結:map過程的輸出是寫入本地磁碟而不是HDFS,但是一開始資料並不是直接寫入磁碟而是緩衝在記憶體中,快取的好處就是減少磁碟I/O的開銷,提高合併和排序的速度。又因為預設的記憶體緩衝大小是100M(當然這個是可以配置的),所以在編寫map函式的時候要儘量減少記憶體的使用,為shuffle過程預留更多的記憶體,因為該過程是最耗時的過程。

  (2)寫磁碟前,要進行partition、sort和combine等操作。通過分割槽,將不同型別的資料分開處理,之後對不同分割槽的資料進行 排序,如果有Combiner,還要對排序後的資料進行combine。等最後記錄寫完,將全部溢位檔案合併為一個分割槽且排序的檔案。

  (3)最後將磁碟中的資料送到Reduce中。

補充:在寫磁碟的時候採用壓縮的方式將map的輸出結果進行壓縮是一個減少網路開銷很有效的方法!關於如何使用壓縮,在本文第三部分會有介紹。

2.2 Reduce端

  (1)Copy階段:Reducer通過Http方式得到輸出檔案的分割槽。

  reduce端可能從n個map的結果中獲取資料,而這些map的執行速度不盡相同,當其中一個map執行結束時,reduce就會從 JobTracker中獲取該資訊。map執行結束後TaskTracker會得到訊息,進而將訊息彙報給JobTracker,reduce定時從 JobTracker獲取該資訊,reduce端預設有5個數據複製執行緒從map端複製資料。

  (2)Merge階段:如果形成多個磁碟檔案會進行合併

  從map端複製來的資料首先寫到reduce端的快取中,同樣快取佔用到達一定閾值後會將資料寫到磁碟中,(如果指定combiner,則在合併期間執行它,會降低寫入磁碟的資料量)然後後臺執行緒會將他們合併成更大的、排好序的檔案。複製完所有的map輸出後,reduce端進入排序階段(更恰當的說法是進入合併階段,因為排序是在map端進行的),這個階段將會合並map端輸出,維持其排序順序。
  (3)Reducer的引數:最後將合併後的結果作為輸入傳入Reduce程式任務中。

總結:當Reducer的輸入檔案確定後,整個Shuffle操作才最終結束。之後就是Reducer的執行了,最後Reducer會把結果存到HDFS上。