Hadoop學習筆記之初識MapReduce以及WordCount例項分析
阿新 • • 發佈:2019-01-30
MapReduce簡介
MapReduce是什麼?
MapReduce是一種程式設計模型,用於大規模資料集的分散式運算。
Mapreduce基本原理
1、MapReduce通俗解釋
圖書館要清點圖書數量,有10個書架,管理員為了加快統計速度,找來了10個同學,每個同學負責統計一個書架的圖書數量。
張同學統計 書架1
王同學統計 書架2
劉同學統計 書架3
……
過了一會兒,10個同學陸續到管理員這彙報自己的統計數字,管理員把各個數字加起來,就得到了圖書總數。
這個過程就可以理解為MapReduce的工作過程。
2、MapReduce中有兩個核心操作
(1)map
管理員分配哪個同學統計哪個書架,每個同學都進行相同的“統計”操作,這個過程就是map。
(2)reduce
每個同學的結果進行彙總,這個過程是reduce。
Mapreduce工作過程
各個角色實體
1.程式執行時過程設計到的一個角色實體 1.1. Client:編寫mapreduce程式,配置作業,提交作業的客戶端 ; 1.2. ResourceManager:叢集中的資源分配管理 ; 1.3. NodeManager:啟動和監管各自節點上的計算資源 ; 1.4. ApplicationMaster:每個程式對應一個AM,負責程式的任務排程,本身也是執行在NM的Container中 ; 1.5. HDFS:分散式檔案系統,儲存作業的資料、配置資訊等等。 2.客戶端提交Job 2.1. 客戶端編寫好Job後,呼叫Job例項的Submit()或者waitForCompletion()方法提交作業; 2.2. 客戶端向ResourceManager請求分配一個Application ID,客戶端會對程式的輸出、輸入路徑進行檢查,如果沒有問題,進行作業輸入分片的計算。 3.Job提交到ResourceManager 3.1. 將作業執行所需要的資源拷貝到HDFS中(jar包、配置檔案和計算出來的輸入分片資訊等); 3.2. 呼叫ResourceManager的submitApplication方法將作業提交到ResourceManager。 4.給作業分配ApplicationMaster 4.1. ResourceManager收到submitApplication方法的呼叫之後會命令一個NodeManager啟動一個Container ; 4.2. 在該NodeManager的Container上啟動管理該作業的ApplicationMaster程序。 5. ApplicationMaster初始化作業 5.1. ApplicationMaster對作業進行初始化操作; 5.2. ApplicationMaster從HDFS中獲得輸入分片資訊(map、reduce任務數) 6.任務分配 6.1. ApplicationMaster為其每個map和reduce任務向RM請求計算資源; 6.2. map任務優先於reduce任,map資料優先考慮本地化的資料。 7.任務執行,在 Container 上啟動任務(通過YarnChild程序來執行),執行map/reduce任務。
時間先後順序
1.輸入分片(input split) 每個輸入分片會讓一個map任務來處理,預設情況下,以HDFS的一個塊的大小(預設為128M,可以設定)為一個分片。map輸出的結果會暫且放在一個環形記憶體緩衝區中(預設mapreduce.task.io.sort.mb=100M),當該緩衝區快要溢位時(預設mapreduce.map.sort.spill.percent=0.8),會在本地檔案系統中建立一個溢位檔案,將該緩衝區中的資料寫入這個檔案; 2.map階段:由我們自己編寫,最後呼叫 context.write(…); 3.partition分割槽階段 3.1. 在map中呼叫 context.write(k2,v2)方法輸出,該方法會立刻呼叫 Partitioner類對資料進行分割槽,一個分割槽對應一個 reduce task。 3.2. 預設的分割槽實現類是 HashPartitioner ,根據k2的雜湊值 % numReduceTasks,可能出現“資料傾斜”現象。 3.3. 可以自定義 partition ,呼叫 job.setPartitioner(…)自己定義分割槽函式。 4.combiner合併階段:將屬於同一個reduce處理的輸出結果進行合併操作 4.1. 是可選的; 4.2. 目的有三個:1.減少Key-Value對;2.減少網路傳輸;3.減少Reduce的處理。 4.3. combiner操作是有風險的,使用它的原則是combiner的輸入不會影響到reduce計算的最終輸入,例如:如果計算只是求總數,最大值,最小值可以 使用combiner,但是做平均值計算使用combiner的話,最終的reduce計算結果就會出錯。 5.shuffle階段:即Map和Reduce中間的這個過程 5.1. 首先 map 在做輸出時候會在記憶體裡開啟一個環形記憶體緩衝區,專門用來做輸出,同時map還會啟動一個守護執行緒; 5.2. 如緩衝區的記憶體達到了閾值的80%,守護執行緒就會把內容寫到磁碟上,這個過程叫spill,另外的20%記憶體可以繼續寫入要寫進磁碟的資料; 5.3. 寫入磁碟和寫入記憶體操作是互不干擾的,如果快取區被撐滿了,那麼map就會阻塞寫入記憶體的操作,讓寫入磁碟操作完成後再繼續執行寫入記憶體操作; 5.4. 寫入磁碟時會有個排序操作,如果定義了combiner函式,那麼排序前還會執行combiner操作; 5.5. 每次spill操作也就是寫入磁碟操作時候就會寫一個溢位檔案,也就是說在做map輸出有幾次spill就會產生多少個溢位檔案,等map輸出全部做完後,map會合並這些輸出檔案,這個過程裡還會有一個Partitioner操作(如上) 5.6. 最後 reduce 就是合併map輸出檔案,Partitioner會找到對應的map輸出檔案,然後進行復制操作,複製操作時reduce會開啟幾個複製執行緒,這些執行緒默認個數是5個(可修改),這個複製過程和map寫入磁碟過程類似,也有閾值和記憶體大小,閾值一樣可以在配置檔案裡配置,而記憶體大小是直接使用reduce的tasktracker的記憶體大小,複製時候reduce還會進行排序操作和合並檔案操作,這些操作完了就會進行reduce計算了。 6.reduce階段:由我們自己編寫,最終結果儲存在hdfs上的。
Mapreduce例項—WordCount單詞統計
Hadoop中的WordCount原始碼解析
自定義WordCount
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
class WordCountMap extends Mapper<Object, Text, Text, IntWritable>{
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
//得到每一行資料
String line = value.toString();
//通過空格分割,對這一行資料進行拆分
String [] words = line.split(" ");
//迴圈遍歷 輸出
for(String word:words){
context.write(new Text(word), new IntWritable(1));
}
}
}
class WordCountReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
Integer count = 0;
for(IntWritable value: values ){
count +=value.get();
}
context.write(key, new IntWritable(count));
}
}
public class WordCount extends Mapper<Object, Text, Text, IntWritable>{
public static void main(String[] args) throws Exception {
//建立配置物件
Configuration conf = new Configuration();
//建立job物件
Job job = new Job(conf, "word count");
//設定執行的job類
job.setJarByClass(WordCount.class);
//設定mapper類
job.setMapperClass(WordCountMap.class);
//設定reducer類
job.setReducerClass(WordCountReducer.class);
//設定map 輸出的key value
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設定reduce 輸出的key value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設定輸入輸出的路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交job
boolean b = job.waitForCompletion(true);
if(!b){
System.out.println("wordcount task fail!");
}
}
}
執行時出現的錯誤以及解決方案
- 做mapreduce計算時候,輸出一般是一個資料夾,而且該資料夾是不能存在,我在出面試題時候提到了這個問題,而且這個檢查做的很早,當我們提交job時候就會進行,mapreduce之所以這麼設計是保證資料可靠性,如果輸出目錄存在reduce就搞不清楚你到底是要追加還是覆蓋,不管是追加和覆蓋操作都會有可能導致最終結果出問題,mapreduce是做海量資料計算,一個生產計算的成本很高,例如一個job完全執行完可能要幾個小時,因此一切影響錯誤的情況mapreduce是零容忍的。
2.eclipse中編譯的jdk與叢集上的jdk版本不同會報錯, 解決方案: 工程上右鍵->properties->java Bulid Path-> Libraries ->移除原來的Jar System ->add Library ->新增與叢集相同的jdk