大資料(十二):自定義OutputFormat與ReduceJoin合併(資料傾斜)
一、OutputFormat介面
OutputFormat是MapReduce輸出的基類,所有實現MapReduce輸出都實現了OutputFormat介面。
1.文字輸出TextOutPutFormat
預設的輸出格式是TextOutputFormat,它把每條記錄寫為文字行。他的鍵和值可以是任意型別,會通過toString()方法吧他們轉換為在字串。
2.SequenceFileOutputFormat
SequenceFileOutputFormat將它的輸出寫為一個順序檔案。如果輸出需要作為後續MapReduce任務的輸出,這便是一種很好的輸出格式,因為它的格式緊湊,很容易被壓縮。
3.自定義OutputFormat
二、自定義OutputFormat
為了實現控制最終檔案的輸出路徑,可以自定義OutputFormat
在一個MapReduce程式中更具資料的不同輸出兩類結果到不同目錄,這種靈活的輸出需求就需要通過自定義outputformat來實現。
1.自定義OutputFormat步驟
-
自定義一個類繼承FileOutputFormat
-
改寫recordwriter,重寫輸出資料的方法write()
三、.過濾文字內容及自定義檔案輸出路徑(自定義OutputFormat)
1.需求
過濾輸入的log日誌中是否包含.com
-
以com結尾的網站輸出到d:/com.log裡
-
不以com結尾的網站輸出到d:/other.log裡
2.輸入資料
一個名叫log.txt的日誌檔案裡包含多條url
3.自定義OutputFormat
public class FilterOutputFormat extends FileOutputFormat<Text,NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new FilterRecordWriter(taskAttemptContext); } }
4.自定義RecordWriter
public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {
private Configuration configuration;
private FSDataOutputStream comFs = null;
private FSDataOutputStream otherFs = null;
public FilterRecordWriter() {
}
public FilterRecordWriter(TaskAttemptContext job){
configuration = job.getConfiguration();
//獲取檔案系統
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(configuration);
//建立兩個輸出流
comFs = fileSystem.create(new Path("d:/com.log"));
otherFs = fileSystem.create(new Path("d:/other.log"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
//判斷資料是否包含com
if (text.toString().contains("com")){
comFs.write(text.getBytes());
}else {
otherFs.write(text.getBytes());
}
}
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
//關閉流
if (comFs !=null){
comFs.close();
}
if (otherFs !=null){
otherFs.close();
}
}
}
5.編寫Mapper程式碼
public class FilterMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//獲取一行資料
String line = value.toString();
//設定key
k.set(line);
//輸出
context.write(k,NullWritable.get());
}
}
6.編寫Reducer程式碼
public class FilterReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
String k = key.toString() + "\r\n";
context.write(new Text(k),NullWritable.get());
}
}
7.編寫Driver
public class FilterDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//獲取配置資訊
Configuration conf=new Configuration();
Job job = Job.getInstance(conf);
//設定jar包載入路徑
job.setJarByClass(FilterDriver.class);
//載入map/reduce類
job.setMapperClass(FilterMapper.class);
job.setReducerClass(FilterReducer.class);
//設定OutFormat
job.setOutputFormatClass(FilterOutputFormat.class);
//設定map輸出資料key和value型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//設定最終輸出資料key和value型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//設定輸入資料和輸出資料路徑
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
四、ReduceJoin
1.原理
Map端的主要工作:為來自不同表(檔案)的key/value對打標籤以區別不同來源的記錄。然後用連線欄位作為key,其餘部分和新加的標誌作為value,最後進行輸出。
Reduce端的工作:在reduce端以連線欄位作為key的分組已經完成,只需要在每一個分組當中將那些來源不同檔案的記錄分開,最後進行合併就可以了。
2.缺點
這種方法的缺點比較明顯會造成shuffle階段出現大量的資料傳輸,效率低下。
五、MapReduce中多表合併案例(資料傾斜)
1.需求
訂單資料表t_order(檔名為order.txt)
id |
pid |
amount |
1001 |
01 |
1 |
1002 |
02 |
2 |
1003 |
03 |
3 |
商品資訊表t_product(檔名為pd.txt)
pid |
pname |
01 |
小米 |
02 |
華為 |
03 |
格力 |
將商品資訊表中資料根據商品pid合併到訂單資料表中
最終結果:
id |
pid |
amount |
1001 |
小米 |
1 |
1002 |
華為 |
2 |
1003 |
格力 |
3 |
2.程式分析
-
mapper中處理邏輯
-
獲取輸入檔案型別
-
獲取輸入資料
-
不同檔案分別處理
-
封裝bean物件輸出
-
- 預設對產品id排序
-
reduce方法快取訂單資料集合和資料表然後再合併
3.編寫Bean程式碼
public class TableBean implements Writable {
/**
* 訂單id
*/
private String orderId;
/**
* 產品id
*/
private String pid;
/**
* 產品數量
*/
private int amount;
/**
* 產品名稱
*/
private String pName;
/**
* 標記是訂單表(0)還是產品表(1)
*/
private String flag;
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(orderId);
dataOutput.writeUTF(pid);
dataOutput.writeInt(amount);
dataOutput.writeUTF(pName);
dataOutput.writeUTF(flag);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.orderId = dataInput.readUTF();
this.pid = dataInput.readUTF();
this.amount = dataInput.readInt();
this.pName = dataInput.readUTF();
this.flag = dataInput.readUTF();
}
@Override
public String toString() {
return orderId + "/t" + pName + "/t" + amount;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getpName() {
return pName;
}
public void setpName(String pName) {
this.pName = pName;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
}
4.編寫Mapper程式碼
public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
TableBean v = new TableBean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//區分兩張表
FileSplit split = (FileSplit) context.getInputSplit();
String name = split.getPath().getName();
//獲取一行資料
String line = value.toString();
//切割資料
String[] fields = line.split("\t");
if (name.startsWith("order")) {
//訂單表
v.setOrderId(fields[0]);
v.setPid(fields[1]);
v.setAmount(Integer.parseInt(fields[2]));
v.setpName("");
v.setFlag("0");
k.set(fields[1]);
} else {
//產品表
v.setOrderId("");
v.setPid(fields[0]);
v.setAmount(0);
v.setpName(fields[1]);
v.setFlag("1");
k.set(fields[0]);
}
context.write(k, v);
}
}
5.編寫Reducer程式碼
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
//準備集合
List<TableBean> orderBeans = new ArrayList<>();
TableBean pdBean = new TableBean();
//資料拷貝
for (TableBean value : values) {
if ("0".equals(value.getFlag())) {
//訂單表
TableBean tableBean = new TableBean();
try {
BeanUtils.copyProperties(tableBean, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
orderBeans.add(tableBean);
} else {
//產品表
try {
BeanUtils.copyProperties(pdBean, value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
//拼接表
for (TableBean orderBean : orderBeans) {
orderBean.setpName(pdBean.getpName());
//輸出
context.write(orderBean, NullWritable.get());
}
}
}
6.編寫Driver程式碼
public class TableDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//獲取配置資訊,或者job物件例項
Configuration entries = new Configuration();
Job job = Job.getInstance(entries);
//指定程式的jar包所在位置
job.setJarByClass(TableDriver.class);
//指定jbo要是的mapper和Reducer
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
//指定mapper的輸出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
//指定最終輸出
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
//指定job輸入原始檔案的目錄和輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//執行
boolean flag = job.waitForCompletion(true);
System.exit(flag ? 0 : 1);
}
}