1. 程式人生 > 其它 >mapreduce例項程式碼詳解(一行一行的註釋)

mapreduce例項程式碼詳解(一行一行的註釋)

技術標籤:javamapreduce

mapreduce的相關概念,以及執行原理網上都有很多,建議先大致掌握一下mapreduce的基礎工作方式再來看程式碼。

初開始學mapreduce看那一堆程式碼的時候很是鬱悶,現在把我對每一行程式碼的理解寫下來,希望對你們有一點幫助。

那麼第一個例項,就按慣例來寫詞彙統計好了。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.
IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.
mapreduce.lib.output.FileOutputFormat; public class word{ public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String l = value.toString(); context.write
(new Text(l),new IntWritable(1)); } } public static class MyReduce extends Reducer<Text,IntWritable,Text,IntWritable>{ protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{ int sum=0; for(IntWritable i : values){ sum+=i.get(); } context.write(key,new IntWritable(sum)); } } public static void main(String[] args) throws InterruptedException,IOException,ClassNotFoundException{ Configuration conf=new Configuration(); Job job=Job.getInstance(conf,"top"); job.setJobName("top"); job.setJarByClass(word.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } }

這是完整程式碼,作用是對檔案中的詞彙進行統計,統計不同的詞彙分別有多少個。比如說檔案內容是I love hadoop I love mapreduce。就會輸出I 2, love 2這樣的統計。下面是程式碼解析。

初開始先匯入jar包
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
上面這三個包基本上每個mapreduce程式都要匯入,像IOException包就是用來處理輸入輸出流中的異常問題

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
這三個包就是你在下面的程式碼中用到的資料型別
一般有Text,LongWritable,IntWritable,NullWritable等
Text:文字資訊,字串型別String
LongWritable:偏移量Long,表示該行在檔案中的位置(不能籠統的當作行號使用)
IntWritable:int型別,用於整數等數字
NullWritable:NullWritable是Writable的一個特殊類,實現方法是空方法,只充當佔位符,在一些地方會用到,但是本例項中沒有使用

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
以上五個jar包是寫mapreduce程式必帶的,每次寫程式之前都要匯入。

public class word{
	public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
	    定義map類,這裡是固定格式public static class 類名 extends Mapper<>{
	    Mapper後面的括號中跟的是資料型別,也就是我們在匯入jar包階段提到的。前兩個資料型別是讀取資料時的格式,
	通常情況下是<每行行首的偏移量,每一行的內容>,也就是<LongWritable,Text>
	    後兩個資料型別輸出的格式,也就是map階段得到的鍵值對<key,value>的對應的資料型別。
	這裡我打算在檔案中,每一行只輸入一個單詞,這樣map階段輸出的鍵值對就類似<love,1>
	    至於為什麼會輸出這樣的結果,往下看程式碼
		
		protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
		這裡基本也是固定格式
		//protected void map(資料型別 key,資料型別 value,Context context) throws IOException,InterruptedException{
		key和value前面的資料型別,就是上一句程式碼Mapper後面<>裡面的前兩個資料型別
		上面這兩句話就相當於一個大體的map類框架,現在要想實現mapreduce程式目標就要開始往裡面填充內容了
			
			String line = value.toString();
			定義line為value的String物件值
			value其實之前我說的,檔案裡每一行有一個單詞,此時value裡面裝的就是讀取到的單詞,將其轉換為String型別,然後放到line裡面
			
			context.write(new Text(line),new IntWritable(1));
			格式為context.write(a,b);
			輸出以a為key,b為value的資料,這是Mapper輸出的結果
			這句話就解釋了為什麼Mapper階段輸出的結果是<love,1>形式,line裝的是love的String型別,
		因為我們一開始設定的輸出型別(Mapper的後兩個)是Text,IntWritable,
			所以這裡要把line套上一個New Text()轉換為Text資料型別,那個數字1同理,套上一個New IntWritable()
			
		}
	}
	
	public static class MyReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
	現在是在定義一個Reduce類,Reducer後面跟的資料型別,前兩個就是Mapper輸出的資料型別,與其保持一致。
	後兩個是Reduce端要輸出的資料型別。
		
		protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
		key前面的資料格式就是從Mapper端接收到的資料格式
		Iterable是遍歷,排序的意思,要注意這裡是values而不是value,
		至於為什麼要加個s,mapreduce的工作原理一類的文章中應該會提到
		
			int sum=0;
			對詞彙進行統計,sum就是針對一個單詞的次數設定的變數,這個單詞出現多少次,sum的值就有多少
			reduce是按key來從map任務那裡領取資料的,我們之前是把一個單詞作為一個key,
			一把鑰匙開一把鎖,所以一個reduce端裡面只會有一個單詞,不用擔心sum把其他單詞的次數也算進去
			
			for(IntWritable i : values){
				sum+=i.get();
			}
			for迴圈,舉個例子
			for(String s:args)是String定義一個變數s,然後從陣列args[]中,每迴圈一次取出一個值賦給s。
			這裡是每次迴圈,從values中取出一個值放入i,然後sum每次加上i裡面的值。
			
			context.write(key,new IntWritable(sum));
			與Mapper端同理,輸出結果,這裡的結果就是最終結果,也就是mapreduce程式處理後結果形式。
		}
	}
	
	public static void main(String[] args) throws InterruptedException,IOException,ClassNotFoundException{   //main方法
		
		Configuration conf=new Configuration();
		Job job=Job.getInstance(conf,"top");
		job.setJobName("top");
		job.setJarByClass(word.class);
        //上面四句話是設定類資訊,方便hadoop從jar檔案中找到

		job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReduce.class);
        //設定執行Map與Reduce的類,也就是你之前設定的類名
        
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		//設定Map端輸出的資料型別,這兩句話也可以不寫
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		//設定輸出資料型別,這兩句話必須寫
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job,new Path(args[1]));
		//新增輸入輸出路徑
		
		job.waitForCompletion(true);
		像主方法裡面的這麼多句話,基本每次寫mapreduce程式都要用到,
		可以複製貼上然後改一改資料型別和類名什麼的就能用。
		但是要注意的是,我這裡給出的語句是在linux環境下執行eclipse的程式碼,在windows底下用這樣的程式碼是不行的。
	}
}

結果類似於
在這裡插入圖片描述