白話Hadoop入門-WordCount詳細講解(2)
前一篇部落格講述瞭如何進行Hadoop壞境的搭建,以及第一個傳輸檔案程式的編寫,通過第一個檔案可能大概對Hadoop有一個瞭解了,但是Hadoop的精髓在於mapreduce,下面我們就來看看如何編寫Hadoop的第一個“hello world”程式--也就是WordCount程式。
有很多的部落格講述Wordcount是什麼,但是沒有對裡面的程式碼進行詳細講解,導致很多的入門者卡在了這塊。
1、MapReduce工作流程
mapreduce是如何讓工作的,以Wordcount為例,首先mapreduce分為兩個階段,分別是map階段和reduce階段,其中比如我們的data檔案是這樣的,一共三行。
hadoop is a good tool
python is a good language
python is a bad language
map階段:主要為分別讀取檔案的每一行,然後進行單詞的劃分,形成<key,value>對,eg: <hadoop,1>,<is,1>........等等這樣的形式,然後進行傳送,reduce進行接收,自至於傳送的細節,這都是Hadoop封裝好的。
reduce階段:因為當我們有一個較大的檔案時,會啟動多個map節點,因此reduce會接到多個map發過來的資料,並且自動將相同的key進行整合,所以reduce接到的就是這樣形式的<key,iter<value,value,...>, eg: <hadoop,1>,<is,{1,1,1}>,<python,{1,1}>....這樣形式。因此reduce程式只需要做的就是講這些iter裡面的value進行累加。
所以很簡單的邏輯,下面是實現的具體過程和程式碼。
2、總體架構
總體分別三個檔案一個map.class(map邏輯處理程式) 一個reduce.class(reduce邏輯處理程式) 一個runner.class(老大,調配前面兩個程式,並進行job的提交)
提前準備配置:右鍵工程在property裡面的java build path裡面新增我們需要的jar包,否則後邊無法進行,並且最好講配置好的core-site.xml hdfs-site.xml檔案新增進行,以供conf進行載入。
3、map邏輯處理程式
package wyk_firsthadoop; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import io.netty.util.internal.StringUtil; //其中LongWritable和Text對應著long和string,這裡就是Hadoop定義的一種序列化的型別,方便在網路間//進行傳輸 //繼承來自Hadoop的mapper類,四個引數分別為keyin.valuein,keyout,valueout.這裡僅僅是定義其型別 public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ @Override //map接受的引數,key value,和一個配置引數 protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //將Text轉到string格式 String line=value.toString(); //line就是data中的一行,接下來進行切分 String[] words=StringUtil.split(line,' '); //context.write將我們的統計資訊傳送出去,reduce進行接收,還是鍵值對的形式 for(String word : words) { //new Text(word)將string再次轉換為Text格式 context.write(new Text(word), new LongWritable(1)); } } }
4、reduce邏輯處理程式
package wyk_firsthadoop;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//還是繼承來自Hadoop的reducer類,其中四個引數分別為keyin,valuein,keyout,valueout.
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
//這裡的引數分別為value就是前面說的是一個迭代器
protected void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
//宣告一個計數器
long count =0;
//進行累加
for(LongWritable value:values) {
count+=value.get(); //get方法是進行資料型別的轉換to long
}
//繼續呼叫context.write,進行傳送
context.write(key, new LongWritable(count));
}
}
5、總體調配程式
mapreduce的提交需要以job形式來提交,前面定義好的map reduce的處理邏輯,那麼如何執行這兩個程式呢?怎麼才能及交給Hadoop?這裡就要統一進行調配,形成job提交,並執行。
package wyk_firsthadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
public class runner {
public static void main(String[] args) throws IOException, Exception, InterruptedException {
// mapreduce的提交需要以job形式來提交,前面定義好的mapreduce的處理邏輯,這裡
//就要統一進行調配,形成job提交,病執行
Configuration conf=new Configuration();
//建立job例項
Job job=Job.getInstance(conf);
//設定生成的jar包 所對應的檔案
job.setJarByClass(runner.class);
//設定哪一個是map處理程式,哪一個是reduce程式
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);
//設定輸出資料的格式,這裡分別設定了key value的格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//設定輸入資料的路徑,檔案位於HDFS的根目錄下
FileInputFormat.setInputPaths(job, new Path("/test_data.txt"));
//設定輸出檔案的路徑,注意這裡一定要是不存在的資料夾,不然會報錯-已存在
FileOutputFormat.setOutputPath(job, new Path("/wyk_out"));
//提交job,並執行
job.waitForCompletion(true);
}
}
6、NEXT
(1)將上述的程式所在的工程打成一個jar包。
(2)自己寫一個簡單的data檔案,並上傳至HDFS,命令:hadoop fs -put xxxx.txt / 放置根目錄下
(3)執行haoop jar worcount_wyk.jar wyk_firsthadoop.runner
(4)檢視輸出結果 hadoop fs -cat /wyk_out/part-5-0000 大家視情況而定,可能檔名不一樣
輸出結果類似:
hadoop 1
python 2
is 3
tool 1
.....