Mapreduce之分割槽與自定義計數器
Mepreduce分割槽
分割槽概述
在 MapReduce 中, 通過我們指定分割槽, 會將同一個分割槽的資料傳送到同一個 Reduce 當中進行處理
例如: 為了資料的統計, 可以把一批類似的資料傳送到同一個 Reduce 當中, 在同一個 Reduce 當
中統計相同型別的資料, 就可以實現類似的資料分割槽和統計等
其實就是相同型別的資料, 有共性的資料, 送到一起去處理
Reduce 當中預設的分割槽只有一個
實戰:
下面我們將一個csv檔案進行分開成兩個檔案,根據他的第六個欄位為開獎數進行區分
分割槽步驟:
Step 1. 定義 Mapper
這個 Mapper 程式不做任何邏輯, 也不對 Key-Value 做任何改變, 只是接收資料, 然後往下發送
package cn.itcast.partition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
K1:行偏移量 LongWritable
V1:行文字資料 Text
K2:行文字資料 Text
V2:佔位符 NUllWritable
*/
public class PartitionMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
//map方法將K1和V1轉為K2和V2
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//方式1:自定義計數器
Counter counter = context.getCounter("MY_COUNTER", "partition_counter");
//每執行一次,計數器的變數加1
counter.increment(1L);
context.write(value,NullWritable.get());
}
}
Step 2. 自定義 Partitioner
主要的邏輯就在這裡, 這也是這個案例的意義, 通過 Partitioner 將資料分發給不同的 Reducer
package cn.itcast.partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartitioner extends Partitioner<Text, NullWritable> {
/*
1.定義分割槽規則
2.返回對應的分割槽編號
*/
@Override
public int getPartition(Text text, NullWritable nullWritable, int i) {
//1.拆分行文字資料(K2),獲取中獎欄位的值
String s = text.toString().split("\t")[5];
//2.判斷中獎欄位的值和15的關係,然後返回對應的分割槽編號
if(Integer.parseInt(s)>15)
return 1;
else
return 0;
}
}
Step 3. 定義 Reducer 邏輯
這個 Reducer 也不做任何處理, 將資料原封不動的輸出即可
package cn.itcast.partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/*
K2: 一行的文字資料
V2: NullWritable
K3: Text
V3: NullWritable
*/
public class PartitionerReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
public static enum Counter{
MY_INPUT_RECOREDS,MY_INPUT_BYTES
}
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.getCounter(Counter.MY_INPUT_RECOREDS).increment(1L);
context.write(key,NullWritable.get());
}
}
Step 4. 主類中設定分割槽類和ReduceTask個數
package cn.itcast.partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
//1.建立job任務物件
Job job = Job.getInstance(super.getConf(), "partition_mapreduce");
//job.setJarByClass(JobMain.class);
//2.對job任務進行配置(八個步驟)
//第一步:設定輸入類和輸入引數
job.setInputFormatClass(TextInputFormat.class);
//TextInputFormat.addInputPath(job,new Path("hdfs://hadoop101:8020/input"));
TextInputFormat.addInputPath(job,new Path("file:///E:\\input"));
//第二部:設定mapper類和型別
job.setMapperClass(PartitionMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//第三:指定分割槽類
job.setPartitionerClass(MyPartitioner.class);
// 第四,五,六步
//第七步:指定Reducer類和資料型別
job.setReducerClass(PartitionerReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//設定ReduceTask的個數
job.setNumReduceTasks(2);
//第八步:指定輸出類和輸出路徑
job.setOutputFormatClass(TextOutputFormat.class);
//TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop101:8020/out/partition_out"));
TextOutputFormat.setOutputPath(job,new Path("file:///E:\\out\\partition_out3"));
//3.等待任務結束
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
//啟動job任務
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}
結果:會產生兩個檔案
第一個檔案裡的開獎數都是小於等於15
第二個檔案裡的開獎數都是大於15的
MapReduce 中的計數器
計數器是收集作業統計資訊的有效手段之一,用於質量控制或應用級統計。計數器還可輔助
診斷系統故障。如果需要將日誌資訊傳輸到 map 或 reduce 任務, 更好的方法通常是看能否
用一個計數器值來記錄某一特定事件的發生。對於大型分散式作業而言,使用計數器更為方
便。除了因為獲取計數器值比輸出日誌更方便,還有根據計數器值統計特定事件的發生次數
要比分析一堆日誌檔案容易得多。
hadoop內建計數器列表
自定義計數器方法
第一種方式定義計數器,通過context上下文物件可以獲取我們的計數器,進行記錄 通過
context上下文物件,在map端使用計數器進行統計
public class PartitionMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
//map方法將K1和V1轉為K2和V2
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//方式1:自定義計數器
Counter counter = context.getCounter("MY_COUNTER", "partition_counter");
//每執行一次,計數器的變數加1
counter.increment(1L);
context.write(value,NullWritable.get());
}
}
第二種方式
通過enum列舉型別來定義計數器 統計reduce端資料的輸入的key有多少個
public class PartitionerReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
public static enum Counter{
MY_INPUT_RECOREDS,MY_INPUT_BYTES
}
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.getCounter(Counter.MY_INPUT_RECOREDS).increment(1L);
context.write(key,NullWritable.get());
}
}
輸出顯示: