1. 程式人生 > >大資料(十二):自定義OutputFormat與ReduceJoin合併(資料傾斜)

大資料(十二):自定義OutputFormat與ReduceJoin合併(資料傾斜)

一、OutputFormat介面

        OutputFormat是MapReduce輸出的基類,所有實現MapReduce輸出都實現了OutputFormat介面。

1.文字輸出TextOutPutFormat

        預設的輸出格式是TextOutputFormat,它把每條記錄寫為文字行。他的鍵和值可以是任意型別,會通過toString()方法吧他們轉換為在字串。

2.SequenceFileOutputFormat

        SequenceFileOutputFormat將它的輸出寫為一個順序檔案。如果輸出需要作為後續MapReduce任務的輸出,這便是一種很好的輸出格式,因為它的格式緊湊,很容易被壓縮。

3.自定義OutputFormat

 

二、自定義OutputFormat

        為了實現控制最終檔案的輸出路徑,可以自定義OutputFormat

        在一個MapReduce程式中更具資料的不同輸出兩類結果到不同目錄,這種靈活的輸出需求就需要通過自定義outputformat來實現。 

1.自定義OutputFormat步驟

  1. 自定義一個類繼承FileOutputFormat

  2. 改寫recordwriter,重寫輸出資料的方法write()

 

三、.過濾文字內容及自定義檔案輸出路徑(自定義OutputFormat)

1.需求

過濾輸入的log日誌中是否包含.com

  1. 以com結尾的網站輸出到d:/com.log裡

  2. 不以com結尾的網站輸出到d:/other.log裡

2.輸入資料

一個名叫log.txt的日誌檔案裡包含多條url

3.自定義OutputFormat

public class FilterOutputFormat extends FileOutputFormat<Text,NullWritable>{
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new FilterRecordWriter(taskAttemptContext);
    }
}

4.自定義RecordWriter

public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {
    private Configuration configuration;
    private FSDataOutputStream comFs = null;
    private FSDataOutputStream otherFs = null;
    public FilterRecordWriter() {
    }

    public FilterRecordWriter(TaskAttemptContext job){
        configuration = job.getConfiguration();
        //獲取檔案系統
        FileSystem fileSystem = null;
        try {
        fileSystem = FileSystem.get(configuration);
        //建立兩個輸出流
        comFs = fileSystem.create(new Path("d:/com.log"));
        otherFs = fileSystem.create(new Path("d:/other.log"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
        //判斷資料是否包含com
        if (text.toString().contains("com")){
            comFs.write(text.getBytes());
        }else {
            otherFs.write(text.getBytes());
        }
    }

    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        //關閉流
        if (comFs !=null){
            comFs.close();
        }
        if (otherFs !=null){
            otherFs.close();
        }
    }
}

5.編寫Mapper程式碼

public class FilterMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
    Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //獲取一行資料
        String line = value.toString();
        //設定key
        k.set(line);
        //輸出
        context.write(k,NullWritable.get());
    }
}

6.編寫Reducer程式碼

public class FilterReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        String k = key.toString() + "\r\n";
        context.write(new Text(k),NullWritable.get());
    }
}

7.編寫Driver

public class FilterDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //獲取配置資訊
        Configuration conf=new Configuration();
        Job job = Job.getInstance(conf);
        //設定jar包載入路徑
        job.setJarByClass(FilterDriver.class);
        //載入map/reduce類
        job.setMapperClass(FilterMapper.class);
        job.setReducerClass(FilterReducer.class);
        //設定OutFormat
        job.setOutputFormatClass(FilterOutputFormat.class);
        //設定map輸出資料key和value型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        //設定最終輸出資料key和value型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //設定輸入資料和輸出資料路徑
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //提交
        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);
    }
}

 

四、ReduceJoin

1.原理

        Map端的主要工作:為來自不同表(檔案)的key/value對打標籤以區別不同來源的記錄。然後用連線欄位作為key,其餘部分和新加的標誌作為value,最後進行輸出。

        Reduce端的工作:在reduce端以連線欄位作為key的分組已經完成,只需要在每一個分組當中將那些來源不同檔案的記錄分開,最後進行合併就可以了。

2.缺點

        這種方法的缺點比較明顯會造成shuffle階段出現大量的資料傳輸,效率低下。

 

五、MapReduce中多表合併案例(資料傾斜)

1.需求

訂單資料表t_order(檔名為order.txt)

id

pid

amount

1001

01

1

1002

02

2

1003

03

3

商品資訊表t_product(檔名為pd.txt)

pid

pname

01

小米

02

華為

03

格力

將商品資訊表中資料根據商品pid合併到訂單資料表中

最終結果:

id

pid

amount

1001

小米

1

1002

華為

2

1003

格力

3

2.程式分析

  1. mapper中處理邏輯

    1. 獲取輸入檔案型別

    2. 獲取輸入資料

    3. 不同檔案分別處理

    4. 封裝bean物件輸出

  2. 預設對產品id排序
  3. reduce方法快取訂單資料集合和資料表然後再合併

3.編寫Bean程式碼

public class TableBean implements Writable {
    /**
    * 訂單id
    */
    private String orderId;
    /**
    * 產品id
    */
    private String pid;
    /**
    * 產品數量
    */
    private int amount;
    /**
    * 產品名稱
    */
    private String pName;
    /**
    * 標記是訂單表(0)還是產品表(1)
    */
    private String flag;

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(orderId);
        dataOutput.writeUTF(pid);
        dataOutput.writeInt(amount);
        dataOutput.writeUTF(pName);
        dataOutput.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.orderId = dataInput.readUTF();
        this.pid = dataInput.readUTF();
        this.amount = dataInput.readInt();
        this.pName = dataInput.readUTF();
        this.flag = dataInput.readUTF();
    }

    @Override
    public String toString() {
        return orderId + "/t" + pName + "/t" + amount;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getpName() {
        return pName;
    }

    public void setpName(String pName) {
        this.pName = pName;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }
}

4.編寫Mapper程式碼

public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
    TableBean v = new TableBean();
    Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //區分兩張表
        FileSplit split = (FileSplit) context.getInputSplit();
        String name = split.getPath().getName();

        //獲取一行資料
        String line = value.toString();
        //切割資料
        String[] fields = line.split("\t");
        if (name.startsWith("order")) {
            //訂單表
            v.setOrderId(fields[0]);
            v.setPid(fields[1]);
            v.setAmount(Integer.parseInt(fields[2]));
            v.setpName("");
            v.setFlag("0");

            k.set(fields[1]);
        } else {
            //產品表
            v.setOrderId("");
            v.setPid(fields[0]);
            v.setAmount(0);
            v.setpName(fields[1]);
            v.setFlag("1");
            k.set(fields[0]);
        }
        context.write(k, v);
    }
}

5.編寫Reducer程式碼

public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
        //準備集合
        List<TableBean> orderBeans = new ArrayList<>();
       TableBean pdBean = new TableBean();

        //資料拷貝
        for (TableBean value : values) {
            if ("0".equals(value.getFlag())) {
                //訂單表
                TableBean tableBean = new TableBean();
                try {
                    BeanUtils.copyProperties(tableBean, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
                orderBeans.add(tableBean);
            } else {
                //產品表
                try {
                    BeanUtils.copyProperties(pdBean, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }
        //拼接表
        for (TableBean orderBean : orderBeans) {
            orderBean.setpName(pdBean.getpName());
            //輸出
            context.write(orderBean, NullWritable.get());
        }
    }
}

6.編寫Driver程式碼

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //獲取配置資訊,或者job物件例項
        Configuration entries = new Configuration();
        Job job = Job.getInstance(entries);
        //指定程式的jar包所在位置
        job.setJarByClass(TableDriver.class);
        //指定jbo要是的mapper和Reducer
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);
        //指定mapper的輸出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);
        //指定最終輸出
        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);
        //指定job輸入原始檔案的目錄和輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //執行
        boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}