1. 程式人生 > 實用技巧 >Mapreduce之分割槽與自定義計數器

Mapreduce之分割槽與自定義計數器

Mepreduce分割槽

分割槽概述

在 MapReduce 中, 通過我們指定分割槽, 會將同一個分割槽的資料傳送到同一個 Reduce 當中進行處理
例如: 為了資料的統計, 可以把一批類似的資料傳送到同一個 Reduce 當中, 在同一個 Reduce 當
中統計相同型別的資料, 就可以實現類似的資料分割槽和統計等
其實就是相同型別的資料, 有共性的資料, 送到一起去處理
Reduce 當中預設的分割槽只有一個

實戰:

下面我們將一個csv檔案進行分開成兩個檔案,根據他的第六個欄位為開獎數進行區分

分割槽步驟:

Step 1. 定義 Mapper
這個 Mapper 程式不做任何邏輯, 也不對 Key-Value 做任何改變, 只是接收資料, 然後往下發送

package cn.itcast.partition;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
K1:行偏移量 LongWritable
V1:行文字資料 Text

K2:行文字資料 Text
V2:佔位符 NUllWritable
 
*/ public class PartitionMapper extends Mapper<LongWritable, Text,Text, NullWritable> { //map方法將K1和V1轉為K2和V2 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //方式1:自定義計數器 Counter counter = context.getCounter("MY_COUNTER", "partition_counter");
//每執行一次,計數器的變數加1 counter.increment(1L); context.write(value,NullWritable.get()); } }

Step 2. 自定義 Partitioner
主要的邏輯就在這裡, 這也是這個案例的意義, 通過 Partitioner 將資料分發給不同的 Reducer

package cn.itcast.partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends Partitioner<Text, NullWritable> {

    /*
    1.定義分割槽規則
    2.返回對應的分割槽編號
     */
    @Override
    public int getPartition(Text text, NullWritable nullWritable, int i) {
        //1.拆分行文字資料(K2),獲取中獎欄位的值
        String s = text.toString().split("\t")[5];
        //2.判斷中獎欄位的值和15的關係,然後返回對應的分割槽編號
        if(Integer.parseInt(s)>15)
            return 1;
        else
            return 0;
    }
}

Step 3. 定義 Reducer 邏輯
這個 Reducer 也不做任何處理, 將資料原封不動的輸出即可

package cn.itcast.partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
K2:  一行的文字資料
V2:  NullWritable

K3:  Text
V3: NullWritable
 */
public class PartitionerReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
    public static enum Counter{
        MY_INPUT_RECOREDS,MY_INPUT_BYTES
    }


    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.getCounter(Counter.MY_INPUT_RECOREDS).increment(1L);
        context.write(key,NullWritable.get());
    }
}

Step 4. 主類中設定分割槽類和ReduceTask個數

package cn.itcast.partition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        //1.建立job任務物件
        Job job = Job.getInstance(super.getConf(), "partition_mapreduce");
        //job.setJarByClass(JobMain.class);
        //2.對job任務進行配置(八個步驟)
        //第一步:設定輸入類和輸入引數
        job.setInputFormatClass(TextInputFormat.class);
        //TextInputFormat.addInputPath(job,new Path("hdfs://hadoop101:8020/input"));
        TextInputFormat.addInputPath(job,new Path("file:///E:\\input"));

        //第二部:設定mapper類和型別
        job.setMapperClass(PartitionMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        //第三:指定分割槽類
        job.setPartitionerClass(MyPartitioner.class);
        // 第四,五,六步
        //第七步:指定Reducer類和資料型別
        job.setReducerClass(PartitionerReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //設定ReduceTask的個數
        job.setNumReduceTasks(2);


        //第八步:指定輸出類和輸出路徑
        job.setOutputFormatClass(TextOutputFormat.class);
        //TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop101:8020/out/partition_out"));
        TextOutputFormat.setOutputPath(job,new Path("file:///E:\\out\\partition_out3"));
    //3.等待任務結束
        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        //啟動job任務
        Configuration configuration = new Configuration();
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    }

}

結果:會產生兩個檔案

第一個檔案裡的開獎數都是小於等於15

第二個檔案裡的開獎數都是大於15的

MapReduce 中的計數器

計數器是收集作業統計資訊的有效手段之一,用於質量控制或應用級統計。計數器還可輔助
診斷系統故障。如果需要將日誌資訊傳輸到 map 或 reduce 任務, 更好的方法通常是看能否
用一個計數器值來記錄某一特定事件的發生。對於大型分散式作業而言,使用計數器更為方
便。除了因為獲取計數器值比輸出日誌更方便,還有根據計數器值統計特定事件的發生次數
要比分析一堆日誌檔案容易得多。


hadoop內建計數器列表

自定義計數器方法

第一種方式定義計數器,通過context上下文物件可以獲取我們的計數器,進行記錄 通過
context上下文物件,在map端使用計數器進行統計

public class PartitionMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
//map方法將K1和V1轉為K2和V2
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //方式1:自定義計數器
        Counter counter = context.getCounter("MY_COUNTER", "partition_counter");
        //每執行一次,計數器的變數加1
        counter.increment(1L);
        context.write(value,NullWritable.get());
    }
}

第二種方式
通過enum列舉型別來定義計數器 統計reduce端資料的輸入的key有多少個

public class PartitionerReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
    public static enum Counter{
        MY_INPUT_RECOREDS,MY_INPUT_BYTES
    }


    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.getCounter(Counter.MY_INPUT_RECOREDS).increment(1L);
        context.write(key,NullWritable.get());
    }
}

輸出顯示: