1. 程式人生 > >maprecue將兩類ip分類去重並且輸出到不同目錄檔案中

maprecue將兩類ip分類去重並且輸出到不同目錄檔案中

有一份含有兩類ip的資料,根據一個欄位標記來區分,現在需要將去重,兩類Ip分類儲存到不同檔案中,第三類資料捨棄。

主要知識點##:

1、 自定義分割槽:繼承Partitoner類,重寫getPartitoin()方法;
2、多路徑輸出:MultipleOutputs類的用法;

mapreduce程式如下:

maven依賴pom檔案

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.2144</groupId> <artifactId>dataclean</artifactId> <version>
1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <!--<scope>provided</scope>--> </dependency
>
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.5</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.5</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.5</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>2.6.5</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.jodd</groupId> <artifactId>jodd-core</artifactId> <version>4.3.1</version> </dependency> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20171018</version> </dependency> <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcprov-jdk15on</artifactId> <version>1.56</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> </dependencies> <build> <!--指定jar包名字--> <finalName>hm_distinct_ip</finalName> <plugins> <!--打包外掛--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.2</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <manifestEntries> <Main-Class>com.js.dataclean.distinctip.DistIPDriver</Main-Class> </manifestEntries> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/services/org.apache.hadoop.fs.FileSystem</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>

map類

package com.js.dataclean.distinctip;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.json.JSONObject;

import java.io.IOException;

public class DistIPMapper extends Mapper<LongWritable,Text,Text,NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String str = value.toString();
        JSONObject jsonObject = new JSONObject(str);
        String decrypted_data = jsonObject.getString("decrypted_data");
        String time = jsonObject.getString("time").split(" ")[0];
        String ip = jsonObject.getString("remote_addr");
        String flag = "";
        if(decrypted_data.contains("userAgent")){
            flag = "userAgent";
        }

        if(decrypted_data.contains("success")){
            flag = "success";
        }
        context.write(new Text(time + "\t" + flag + "\t" + ip),NullWritable.get());
    }
}

reduce類

package com.js.dataclean.distinctip;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import java.io.IOException;

public class DistIPReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
    private MultipleOutputs<Text,NullWritable> mos;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        mos = new MultipleOutputs(context);
    }

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        String[] arr = key.toString().split("\t");
        String time = arr[0];
        String flag = arr[1];
        String ip = arr[2];
        // 設定輸出路徑
        String name = time + "/" + flag + "/part";
        mos.write(new Text(ip),NullWritable.get(),name);
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        mos.close();
    }
}

自定義分割槽類

package com.js.dataclean.distinctip;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class DistIPPartition extends Partitioner<Text,NullWritable> {
    @Override
    public int getPartition(Text text, NullWritable nullWritable, int numPartitions) {
        int flag = 2;
        if (text.toString().contains("success")){
            flag = 0;
        }

        if(text.toString().contains("userAgent")){
            flag = 1;
        }

        return flag;
    }
}

driver類

package com.js.dataclean.distinctip;

import com.js.dataclean.utils.HdfsUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

public class DistIPDriver extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        if(args.length != 2){
            System.exit(1);
        }
        int etc = ToolRunner.run(new Configuration(),new DistIPDriver(),args);
        System.exit(etc);
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance();

        job.setJarByClass(DistIPDriver.class);

        job.setMapperClass(DistIPMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setReducerClass(DistIPReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 設定分割槽函式及分割槽數
        job.setPartitionerClass(DistIPPartition.class);
        job.setNumReduceTasks(3);

        // 輸入輸出路徑
        String inpath = strings[0];
        String output = strings[1];


        // 輸入資料小檔案合併
        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMinInputSplitSize(job,656932864); // 一個map最少處理64M檔案
        CombineTextInputFormat.setMaxInputSplitSize(job,new Long(656932864)); // 最多處理128M檔案

        // 讓輸入路徑可以遞迴
        FileInputFormat.setInputDirRecursive(job,true);
        FileInputFormat.setInputPaths(job,inpath);

        if(HdfsUtil.existsFiles(conf,output)){
            HdfsUtil.deleteFolder(conf,output);
        }

        // 輸出路徑
        FileOutputFormat.setOutputPath(job,new Path(output));
        LazyOutputFormat.setOutputFormatClass(job,TextOutputFormat.class);

        return job.waitForCompletion(true)?0:1;

    }
}