1. 程式人生 > >MapReduce 之-- 某個使用者在某個位置從某個時刻開始停留了多長時間--升級版

MapReduce 之-- 某個使用者在某個位置從某個時刻開始停留了多長時間--升級版

package kaoshi831;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable
; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MRsort { /** * * 資料意義:某個使用者在某個位置從某個時刻開始停留了多長時間 處理邏輯: 對同一個使用者,在同一個位置,連續的多條記錄進行合併 合併原則:開始時間取最早的,停留時長加和 使用者ID,位置ID,開始時間,停留時長(分鐘) * user_a,location_a,2018-01-01 08:00:00,60 */
static class MyMapper extends Mapper<LongWritable, Text, SortOwn, Text>{ private Text mv=new Text(); private SortOwn so = new SortOwn(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, SortOwn, Text>.Context
context) throws IOException, InterruptedException { //user_a,location_a,2018-01-01 08:00:00,60 String[] sp = value.toString().split(","); //讀取每行的元素 so.setUlid(sp[0]+"\t"+sp[1]); //將欄位,(使用者ID,位置ID,開始時間)新增到包裝類中 so.setTime(sp[2]); //按照,(使用者ID,位置ID)分組,(開始時間)排序 mv.set(sp[3]); context.write(so, mv); } } static class MyReducer extends Reducer<SortOwn,Text , Text, IntWritable>{ private Text outkey = new Text(); private IntWritable outvalue= new IntWritable(); @Override protected void reduce(SortOwn key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //user_a,location_a,2018-01-01 08:00:00,60 int sum=0; for(Text v:values){ String sp = v.toString(); sum+=Integer.parseInt(sp); System.out.println(key); //測試key的位置--(包裝類中我將"開始時間"按照倒序排列),所有取最後一個key.getTime(); } //values是個迭代器,類似指標的遍歷方式 String k=key.getUlid()+"\t"+key.getTime(); outkey.set(k); outvalue.set(sum); context.write(outkey, outvalue); } } public static void main(String[] args) throws IllegalArgumentException, IOException, URISyntaxException, ClassNotFoundException, InterruptedException { System.setProperty("HADOOP_USER_NAME", "hadoop"); Configuration conf=new Configuration(); Job job=Job.getInstance(conf); job.setJarByClass(kaoshi831.MRsort.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(SortOwn.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //新增分組 job.setGroupingComparatorClass(MyGroup.class); FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/ksin")); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf); Path path=new Path("hdfs://hadoop01:9000/ksout01"); if(fs.exists(path)){ fs.delete(path, true); } FileOutputFormat.setOutputPath(job,path); job.waitForCompletion(true); } }

//自定義包裝類,實現WritableComparable

package kaoshi831;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class SortOwn implements WritableComparable<SortOwn>{
    //user_a,location_a,2018-01-01 08:00:00,60
    private String ulid;
    private String time;

    @Override
    public String toString() {
        return ulid + "," + time;
    }
    public SortOwn() {
        super();
        // TODO Auto-generated constructor stub
    }
    public String getUlid() {
        return ulid;
    }
    public void setUlid(String ulid) {
        this.ulid = ulid;
    }
    public String getTime() {
        return time;
    }
    public void setTime(String time) {
        this.time = time;
    }
    public SortOwn(String ulid, String time) {
        super();
        this.ulid = ulid;
        this.time = time;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(ulid);
        out.writeUTF(time);

    }
    @Override
    public void readFields(DataInput in) throws IOException {
        this.ulid=in.readUTF();
        this.time=in.readUTF();
    }
    @Override
    public int compareTo(SortOwn o) {
        int tmp=this.ulid.compareTo(o.ulid);
        if(tmp==0){
            return o.time.compareTo(this.time);//倒序
        }
        return tmp;
    }

}

//自定義分組 繼承WritableComparator

package kaoshi831;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroup extends WritableComparator {
    public MyGroup(){//通過構造載入
        super(SortOwn.class,true);//用反射,載入要分組的類
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //定義分組條件
        SortOwn sb1=(SortOwn)a;
        SortOwn sb2=(SortOwn)b;
            return sb1.getUlid().compareTo(sb2.getUlid());
    }
}