1. 程式人生 > 實用技巧 >Hadoop基礎(二十九):資料清洗(ETL)(二)複雜解析版

Hadoop基礎(二十九):資料清洗(ETL)(二)複雜解析版

資料清洗案例實操-複雜解析版

1.需求

對Web訪問日誌中的各欄位識別切分,去除日誌中不合法的記錄。根據清洗規則,輸出過濾後資料。

1)輸入資料

2)期望輸出資料

都是合法的資料

2.實現程式碼

1)定義一個bean,用來記錄日誌資料中的各資料欄位

package com.atguigu.mapreduce.log;

public class LogBean {
    private String remote_addr;// 記錄客戶端的ip地址
    private String remote_user;// 記錄客戶端使用者名稱稱,忽略屬性"-"
    private
String time_local;// 記錄訪問時間與時區 private String request;// 記錄請求的url與http協議 private String status;// 記錄請求狀態;成功是200 private String body_bytes_sent;// 記錄傳送給客戶端檔案主體內容大小 private String http_referer;// 用來記錄從那個頁面連結訪問過來的 private String http_user_agent;// 記錄客戶瀏覽器的相關資訊 private boolean valid = true;// 判斷資料是否合法
public String getRemote_addr() { return remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getRemote_user() { return remote_user; } public void setRemote_user(String remote_user) {
this.remote_user = remote_user; } public String getTime_local() { return time_local; } public void setTime_local(String time_local) { this.time_local = time_local; } public String getRequest() { return request; } public void setRequest(String request) { this.request = request; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getBody_bytes_sent() { return body_bytes_sent; } public void setBody_bytes_sent(String body_bytes_sent) { this.body_bytes_sent = body_bytes_sent; } public String getHttp_referer() { return http_referer; } public void setHttp_referer(String http_referer) { this.http_referer = http_referer; } public String getHttp_user_agent() { return http_user_agent; } public void setHttp_user_agent(String http_user_agent) { this.http_user_agent = http_user_agent; } public boolean isValid() { return valid; } public void setValid(boolean valid) { this.valid = valid; } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.valid); sb.append("\001").append(this.remote_addr); sb.append("\001").append(this.remote_user); sb.append("\001").append(this.time_local); sb.append("\001").append(this.request); sb.append("\001").append(this.status); sb.append("\001").append(this.body_bytes_sent); sb.append("\001").append(this.http_referer); sb.append("\001").append(this.http_user_agent); return sb.toString(); } }
View Code

2)編寫LogMapper

package com.atguigu.mapreduce.log;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
    Text k = new Text();
    
    @Override
    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {

        // 1 獲取1行
        String line = value.toString();
        
        // 2 解析日誌是否合法
        LogBean bean = parseLog(line);
        
        if (!bean.isValid()) {
            return;
        }
        
        k.set(bean.toString());
        
        // 3 輸出
        context.write(k, NullWritable.get());
    }

    // 解析日誌
    private LogBean parseLog(String line) {

        LogBean logBean = new LogBean();
        
        // 1 擷取
        String[] fields = line.split(" ");
        
        if (fields.length > 11) {

            // 2封裝資料
            logBean.setRemote_addr(fields[0]);
            logBean.setRemote_user(fields[1]);
            logBean.setTime_local(fields[3].substring(1));
            logBean.setRequest(fields[6]);
            logBean.setStatus(fields[8]);
            logBean.setBody_bytes_sent(fields[9]);
            logBean.setHttp_referer(fields[10]);
            
            if (fields.length > 12) {
                logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);
            }else {
                logBean.setHttp_user_agent(fields[11]);
            }
            
            // 大於400,HTTP錯誤
            if (Integer.parseInt(logBean.getStatus()) >= 400) {
                logBean.setValid(false);
            }
        }else {
            logBean.setValid(false);
        }
        
        return logBean;
    }
}
View Code

3)編寫LogDriver

package com.atguigu.mapreduce.log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogDriver {
    public static void main(String[] args) throws Exception {
        
// 1 獲取job資訊
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 載入jar包
        job.setJarByClass(LogDriver.class);

        // 3 關聯map
        job.setMapperClass(LogMapper.class);

        // 4 設定最終輸出型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 5 設定輸入和輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 6 提交
        job.waitForCompletion(true);
    }
}
View Code