1. 程式人生 > >hadoop鏈式處理

hadoop鏈式處理

場景說明:

我們使用hadoop叢集處理文字的時候,如果想要複用之前的mapper,動態靈活的增加或者減少某些業務邏輯就可能會用到。
對於以下的文字,我們只對含有“房價”的單子或者句子感興趣,又想要複用之前的mapper邏輯(word->[word,1]),對於五次以下的單詞還要過濾掉,我們可以在一次job中解決。
石家莊房價真高啊
石家莊房價真高啊
石家莊房價真高啊
石家莊房價真高啊
石家莊房價還可以吧
石家莊房價還可以吧
石家莊房價還可以吧
石家莊房價還可以吧
石家莊房價太特麼高了
石家莊房價太特麼高了
石家莊房價太特麼高了
石家莊房價太特麼高了
石家莊房價太特麼高了
石家莊房價太特麼高了
石家莊房價太特麼高了
石家莊房價太特麼高了
北京房價便宜
北京房價便宜
北京房價便宜
北京房價便宜
北京房價便宜
北京房價便宜
北京房價便宜
北京房價便宜
北京房價便宜
hello tom
hello tom
hello tom
hello tom
hello tom
hello tom
hello tom
hello tom
hello tom2
hello tom2
hello tom2

mapper1,word->[word,1]

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * word count mapper類
 * 第一次map處理
 */
public class WcMapper1 extends Mapper<LongWritable,Text,Text,IntWritable>{

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text keyOut = new Text();
        IntWritable valueOut = new IntWritable();
        StringTokenizer tokenizer = new StringTokenizer(value.toString());
        while(tokenizer.hasMoreTokens()){
            keyOut.set(tokenizer.nextToken());
            valueOut.set(1);
            context.write(keyOut,valueOut);
        }
    }
}

mapper2 過濾出含有“房價”的詞

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * word count mapper類
 * 第二次map處理
 */
public class WcMapper2 extends Mapper<Text,IntWritable,Text,IntWritable>{

    @Override
    protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
        if(key.toString().indexOf("房價") != -1){
            context.write(key,value);
        }
    }
}

reduce 統計

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 * word count reducer
 */
public class WcReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        Iterator<IntWritable> iterator = values.iterator();
        int count = 0;
        while (iterator.hasNext()){
            count += iterator.next().get();
        }
        context.write(key,new IntWritable(count));
    }
}

reduceMapper 過濾掉5次以下的統計結果

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WcReducerMapper1 extends Mapper<Text,IntWritable, Text, IntWritable> {
    protected void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
        if(value.get() > 5){
            context.write(key,value);
        }
    }
}

執行主函式app

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WcApp {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","file:///");
        Job job = Job.getInstance(conf);

        //設定job的各種屬性
        job.setJobName("WcChainApp");                        //作業名稱
        job.setJarByClass(WcApp.class);                 //搜尋類
        job.setInputFormatClass(TextInputFormat.class); //設定輸入格式

        //新增輸入路徑
        FileInputFormat.addInputPath(job,new Path("/home/hadoop/chain/fangjia.txt"));
        //設定輸出路徑
        FileOutputFormat.setOutputPath(job,new Path("/home/hadoop/chain/out"));

        //在mapper鏈條上增加Mapper1
        ChainMapper.addMapper(job,WcMapper1.class, LongWritable.class,Text.class,Text.class,IntWritable.class, conf);
        //在mapper鏈條上增加Mapper2
        ChainMapper.addMapper(job,WcMapper2.class, Text.class, IntWritable.class,Text.class,IntWritable.class, conf);

        //在reduce鏈條上設定reduce
        ChainReducer.setReducer(job,WcReducer.class,Text.class,IntWritable.class,Text.class,IntWritable.class,conf);
        //在reduce鏈條上增加Mapper2
        ChainReducer.addMapper(job,WcReducerMapper1.class, Text.class, IntWritable.class,Text.class,IntWritable.class, conf);

        job.setNumReduceTasks(3);                       //reduce個數
        job.waitForCompletion(true);
    }
}

執行結果

//cat part-r-00001
石家莊房價太特麼高了	8
//cat part-r-00002
北京房價便宜	9

使用方式

點開ChainMappeer的原始碼,官方註釋中有:

<p>
 * Using the ChainMapper and the ChainReducer classes is possible to compose
//下面那句話正則的方式說明map要1一個或一個以上,reduce的map要0個或多個
 * Map/Reduce jobs that look like <code>[MAP+ / REDUCE MAP*]</code>. And
 * immediate benefit of this pattern is a dramatic reduction in disk IO.
 * </p>
 * <p>
 * IMPORTANT: There is no need to specify the output key/value classes for the
 * ChainMapper, this is done by the addMapper for the last mapper in the chain.
 * </p>
 * ChainMapper usage pattern:
 * <p>
 * 
 * <pre>
 * ...
 * Job = new Job(conf);
 *
 * Configuration mapAConf = new Configuration(false);
 * ...
 * ChainMapper.addMapper(job, AMap.class, LongWritable.class, Text.class,
 *   Text.class, Text.class, true, mapAConf);
 *
 * Configuration mapBConf = new Configuration(false);
 * ...
//第二個map的輸入時第一個的輸出,每個map都可以有自己的config
 * ChainMapper.addMapper(job, BMap.class, Text.class, Text.class,
 *   LongWritable.class, Text.class, false, mapBConf);
 *
 * ...
 *
 * job.waitForComplettion(true);
 * ...