1. 程式人生 > >Hadoop學習筆記之初識MapReduce以及WordCount例項分析

Hadoop學習筆記之初識MapReduce以及WordCount例項分析

MapReduce簡介

MapReduce是什麼?

MapReduce是一種程式設計模型,用於大規模資料集的分散式運算。

Mapreduce基本原理

1、MapReduce通俗解釋

圖書館要清點圖書數量,有10個書架,管理員為了加快統計速度,找來了10個同學,每個同學負責統計一個書架的圖書數量。

張同學統計 書架1

王同學統計 書架2

劉同學統計 書架3

……

過了一會兒,10個同學陸續到管理員這彙報自己的統計數字,管理員把各個數字加起來,就得到了圖書總數。

這個過程就可以理解為MapReduce的工作過程。

2、MapReduce中有兩個核心操作

(1)map

管理員分配哪個同學統計哪個書架,每個同學都進行相同的“統計”操作,這個過程就是map。

(2)reduce

每個同學的結果進行彙總,這個過程是reduce。

Mapreduce工作過程

MapReduce計算模型的執行機制

各個角色實體

1.程式執行時過程設計到的一個角色實體
1.1. Client:編寫mapreduce程式,配置作業,提交作業的客戶端 ;
1.2. ResourceManager:叢集中的資源分配管理 ;
1.3. NodeManager:啟動和監管各自節點上的計算資源 ;
1.4. ApplicationMaster:每個程式對應一個AM,負責程式的任務排程,本身也是執行在NM的Container中 ;
1.5. HDFS:分散式檔案系統,儲存作業的資料、配置資訊等等。

2.客戶端提交Job
2.1. 客戶端編寫好Job後,呼叫Job例項的Submit()或者waitForCompletion()方法提交作業;
2.2. 客戶端向ResourceManager請求分配一個Application ID,客戶端會對程式的輸出、輸入路徑進行檢查,如果沒有問題,進行作業輸入分片的計算。

3.Job提交到ResourceManager
3.1. 將作業執行所需要的資源拷貝到HDFS中(jar包、配置檔案和計算出來的輸入分片資訊等);
3.2. 呼叫ResourceManager的submitApplication方法將作業提交到ResourceManager。

4.給作業分配ApplicationMaster
4.1. ResourceManager收到submitApplication方法的呼叫之後會命令一個NodeManager啟動一個Container ;
4.2. 在該NodeManager的Container上啟動管理該作業的ApplicationMaster程序。

5. ApplicationMaster初始化作業
5.1. ApplicationMaster對作業進行初始化操作;
5.2. ApplicationMaster從HDFS中獲得輸入分片資訊(map、reduce任務數)

6.任務分配
6.1. ApplicationMaster為其每個map和reduce任務向RM請求計算資源;
6.2. map任務優先於reduce任,map資料優先考慮本地化的資料。

7.任務執行,在 Container 上啟動任務(通過YarnChild程序來執行),執行map/reduce任務。

時間先後順序

1.輸入分片(input split)
每個輸入分片會讓一個map任務來處理,預設情況下,以HDFS的一個塊的大小(預設為128M,可以設定)為一個分片。map輸出的結果會暫且放在一個環形記憶體緩衝區中(預設mapreduce.task.io.sort.mb=100M),當該緩衝區快要溢位時(預設mapreduce.map.sort.spill.percent=0.8),會在本地檔案系統中建立一個溢位檔案,將該緩衝區中的資料寫入這個檔案;

2.map階段:由我們自己編寫,最後呼叫 context.write(…);

3.partition分割槽階段
3.1. 在map中呼叫 context.write(k2,v2)方法輸出,該方法會立刻呼叫 Partitioner類對資料進行分割槽,一個分割槽對應一個 reduce task。
3.2. 預設的分割槽實現類是 HashPartitioner ,根據k2的雜湊值 % numReduceTasks,可能出現“資料傾斜”現象。
3.3. 可以自定義 partition ,呼叫 job.setPartitioner(…)自己定義分割槽函式。

4.combiner合併階段:將屬於同一個reduce處理的輸出結果進行合併操作
4.1. 是可選的;
4.2. 目的有三個:1.減少Key-Value對;2.減少網路傳輸;3.減少Reduce的處理。
4.3. combiner操作是有風險的,使用它的原則是combiner的輸入不會影響到reduce計算的最終輸入,例如:如果計算只是求總數,最大值,最小值可以             使用combiner,但是做平均值計算使用combiner的話,最終的reduce計算結果就會出錯。

5.shuffle階段:即Map和Reduce中間的這個過程
5.1. 首先 map 在做輸出時候會在記憶體裡開啟一個環形記憶體緩衝區,專門用來做輸出,同時map還會啟動一個守護執行緒;
5.2. 如緩衝區的記憶體達到了閾值的80%,守護執行緒就會把內容寫到磁碟上,這個過程叫spill,另外的20%記憶體可以繼續寫入要寫進磁碟的資料;
5.3. 寫入磁碟和寫入記憶體操作是互不干擾的,如果快取區被撐滿了,那麼map就會阻塞寫入記憶體的操作,讓寫入磁碟操作完成後再繼續執行寫入記憶體操作;
5.4. 寫入磁碟時會有個排序操作,如果定義了combiner函式,那麼排序前還會執行combiner操作;
5.5. 每次spill操作也就是寫入磁碟操作時候就會寫一個溢位檔案,也就是說在做map輸出有幾次spill就會產生多少個溢位檔案,等map輸出全部做完後,map會合並這些輸出檔案,這個過程裡還會有一個Partitioner操作(如上)
5.6. 最後 reduce 就是合併map輸出檔案,Partitioner會找到對應的map輸出檔案,然後進行復制操作,複製操作時reduce會開啟幾個複製執行緒,這些執行緒默認個數是5個(可修改),這個複製過程和map寫入磁碟過程類似,也有閾值和記憶體大小,閾值一樣可以在配置檔案裡配置,而記憶體大小是直接使用reduce的tasktracker的記憶體大小,複製時候reduce還會進行排序操作和合並檔案操作,這些操作完了就會進行reduce計算了。

6.reduce階段:由我們自己編寫,最終結果儲存在hdfs上的。

Mapreduce例項—WordCount單詞統計

Hadoop中的WordCount原始碼解析

自定義WordCount

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


 class WordCountMap extends Mapper<Object, Text, Text, IntWritable>{
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
//得到每一行資料
        String line = value.toString();
//通過空格分割,對這一行資料進行拆分 
        String [] words = line.split(" ");
 //迴圈遍歷 輸出
        for(String word:words){
          context.write(new Text(word), new IntWritable(1));
      }

    }
 }

  class WordCountReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
     Integer count = 0;
     for(IntWritable value: values ){
         count +=value.get();
         }
     context.write(key, new IntWritable(count));
     }
  }

  public class WordCount  extends Mapper<Object, Text, Text, IntWritable>{
  public static void main(String[] args) throws Exception {
   //建立配置物件
      Configuration conf = new Configuration();
    //建立job物件
      Job job = new Job(conf, "word count");
    //設定執行的job類
      job.setJarByClass(WordCount.class);
    //設定mapper類
      job.setMapperClass(WordCountMap.class);
      //設定reducer類
    job.setReducerClass(WordCountReducer.class);

    //設定map 輸出的key value
    job.setMapOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    //設定reduce 輸出的key value
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    //設定輸入輸出的路徑
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    //提交job
    boolean b = job.waitForCompletion(true);
    if(!b){
    System.out.println("wordcount task fail!");
    }

  }
}

執行時出現的錯誤以及解決方案

  1. 做mapreduce計算時候,輸出一般是一個資料夾,而且該資料夾是不能存在,我在出面試題時候提到了這個問題,而且這個檢查做的很早,當我們提交job時候就會進行,mapreduce之所以這麼設計是保證資料可靠性,如果輸出目錄存在reduce就搞不清楚你到底是要追加還是覆蓋,不管是追加和覆蓋操作都會有可能導致最終結果出問題,mapreduce是做海量資料計算,一個生產計算的成本很高,例如一個job完全執行完可能要幾個小時,因此一切影響錯誤的情況mapreduce是零容忍的。

2.eclipse中編譯的jdk與叢集上的jdk版本不同會報錯, 解決方案: 工程上右鍵->properties->java Bulid Path-> Libraries ->移除原來的Jar System ->add Library ->新增與叢集相同的jdk

參考: