1. 程式人生 > >Hadoop MapReduce資料處理過程以及更多示例

Hadoop MapReduce資料處理過程以及更多示例

上一篇文章介紹了Hadoop的單機配置以及一個簡單的MapReduce示例,今天看看MapReduce處理資料的流程是怎樣的。建議閱讀本文前,最好能看一下上一篇文章的程式碼。


上圖以上一篇文章的MapReduce示例為例,展示了單機配置下MapReduce的處理流程,由於單機情況下更容易理解處理流程,所以這篇文章以單機處理為例,實際上,分散式配置時,也是這樣的流程,只是在每個環節的資料形式有所不同,後面的文章會進行介紹。先來看一下上面的流程。

1.input

該階段很簡單,就是將輸入檔案中的內容按行分割為key和value的形式。看一下上篇文章中的map方法的引數

protected void map(LongWritable key, Text value, Context context)
這裡的key和value,就對應input階段生成的key和value。

2.Map

Map階段所做的工作就是我們map方法所做的事情,這是由我們自己定義的,主要對input實現細分和過濾,保留我們需要的資料,以key和value的形式,通過context輸出。之前例子中,我們在map方法中取出了每一個單詞的首字母,以首字母作為key,數值1作為value進行輸出。

3.Shuffle

注意上圖中Map輸出中所示的B,有兩行,也就是說,Map階段的輸出,會有key相同的記錄。而且,輸出是無序的。在Shuffle階段,會對map的輸出按照key進行合併和排序,然後作為reduce的輸入使用。如果使用自定義的類物件作為key,該類可以實現WritableComparable介面,定義自己的排序方式。

4.Reduce

Reduce的過程完成reduce方法所做的事情。我們的示例中將所屬key下的1進行了疊加,從而計算出一個字母作為首字母出現的次數。最後結果以key和value的方式輸出。

以上就是MapReduce的處理過程,最後說明一下,我們在派生Mapper類和Reducer類時,都分別傳入了四個型別引數,他們分別對應Map和Reduce階段的輸入key和value型別以及輸出key和value型別。對於基本型別,Hadoop提供了可序列化類,如示例中的LongWritable(對應Long),Text(對應String)等,最好使用這些類,以方便Hadoop在分散式情況下對資料進行處理。

瞭解了處理流程,這裡再給出兩個簡單示例,方便大家理解。先看第一個示例,我們將英文文章中出現的單詞進行排序,使用的輸入仍然是上一篇文章中的text。程式碼如下:

package com.yjp.mapreduce;

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordSort {

	// 執行Map
	private static class WordSortMapper 
		extends Mapper<LongWritable, Text, Text, Text> {
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();  
            StringTokenizer token = new StringTokenizer(line);
            while (token.hasMoreTokens()) {
            	String word = token.nextToken();
            	Pattern p = Pattern.compile("[A-Za-z]+");
                Matcher m = p.matcher(word);
                if (m.find()) {
                	context.write(new Text(m.group(0)), new Text());
                }
            }
		}
	}
	
	// 執行Reduce
	private static class WordSortReducer 
		extends Reducer<Text, Text, Text, Text> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context) 
				throws IOException, InterruptedException {
			context.write(key, new Text());
		}
	}
	
	public static void main(String[] args) throws Exception {
		if (args.length != 2) {
			System.err.println("Usage: WordSort <inout path> <output path>");
			System.exit(-1);
		}
		
		// 設定類資訊,方便hadoop從JAR檔案中找到
		Job job = Job.getInstance();
		job.setJarByClass(WordSort.class);
		job.setJobName("Word Sort");
		
		// 新增輸入輸出路徑
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		// 設定執行Map和Reduce的類
		job.setMapperClass(WordSortMapper.class);
		job.setReducerClass(WordSortReducer.class);
		
		//設定輸出資料型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
這個例子很簡單,上面的流程我們分析過,執行完Shuffle實際上已經完成了歸併和排序,所以,我們這個示例,Reduce過程很簡單,直接將傳遞進來的輸入進行輸出即可。

第二個示例,我們統計一段文字中出現的字母及其數目。瞭解了流程,這就簡單多了,我們只要基於上一篇文章的示例,修改它的map方法即可

@Override
protected void map(LongWritable key, Text value, Context context)
		throws IOException, InterruptedException {
	String line = value.toString();  
    StringTokenizer token = new StringTokenizer(line);
    
    while (token.hasMoreTokens()) {
    	String word = token.nextToken();
    	Pattern p = Pattern.compile("[A-Za-z]+");
        Matcher m = p.matcher(word);
        if (m.find()) {
        	word = m.group(0);
        	for (int i = 0; i < word.length(); i++) {
        		context.write(new Text(word.charAt(i) + ""), one);
        	}
        }
    }
}
程式碼已上傳直github.