MapReduce 之-- 某個使用者在某個位置從某個時刻開始停留了多長時間--升級版
阿新 • • 發佈:2018-12-24
package kaoshi831;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable ;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRsort {
/**
*
*
資料意義:某個使用者在某個位置從某個時刻開始停留了多長時間
處理邏輯:
對同一個使用者,在同一個位置,連續的多條記錄進行合併
合併原則:開始時間取最早的,停留時長加和
使用者ID,位置ID,開始時間,停留時長(分鐘)
* user_a,location_a,2018-01-01 08:00:00,60
*/
static class MyMapper extends Mapper<LongWritable, Text, SortOwn, Text>{
private Text mv=new Text();
private SortOwn so = new SortOwn();
@Override
protected void map(LongWritable key,
Text value,
Mapper<LongWritable, Text, SortOwn, Text>.Context context)
throws IOException, InterruptedException {
//user_a,location_a,2018-01-01 08:00:00,60
String[] sp = value.toString().split(","); //讀取每行的元素
so.setUlid(sp[0]+"\t"+sp[1]); //將欄位,(使用者ID,位置ID,開始時間)新增到包裝類中
so.setTime(sp[2]); //按照,(使用者ID,位置ID)分組,(開始時間)排序
mv.set(sp[3]);
context.write(so, mv);
}
}
static class MyReducer extends Reducer<SortOwn,Text , Text, IntWritable>{
private Text outkey = new Text();
private IntWritable outvalue= new IntWritable();
@Override
protected void reduce(SortOwn key,
Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
//user_a,location_a,2018-01-01 08:00:00,60
int sum=0;
for(Text v:values){
String sp = v.toString();
sum+=Integer.parseInt(sp);
System.out.println(key); //測試key的位置--(包裝類中我將"開始時間"按照倒序排列),所有取最後一個key.getTime();
} //values是個迭代器,類似指標的遍歷方式
String k=key.getUlid()+"\t"+key.getTime();
outkey.set(k);
outvalue.set(sum);
context.write(outkey, outvalue);
}
}
public static void main(String[] args) throws IllegalArgumentException, IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
System.setProperty("HADOOP_USER_NAME", "hadoop");
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(kaoshi831.MRsort.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(SortOwn.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//新增分組
job.setGroupingComparatorClass(MyGroup.class);
FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/ksin"));
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf);
Path path=new Path("hdfs://hadoop01:9000/ksout01");
if(fs.exists(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job,path);
job.waitForCompletion(true);
}
}
//自定義包裝類,實現WritableComparable
package kaoshi831;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class SortOwn implements WritableComparable<SortOwn>{
//user_a,location_a,2018-01-01 08:00:00,60
private String ulid;
private String time;
@Override
public String toString() {
return ulid + "," + time;
}
public SortOwn() {
super();
// TODO Auto-generated constructor stub
}
public String getUlid() {
return ulid;
}
public void setUlid(String ulid) {
this.ulid = ulid;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public SortOwn(String ulid, String time) {
super();
this.ulid = ulid;
this.time = time;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(ulid);
out.writeUTF(time);
}
@Override
public void readFields(DataInput in) throws IOException {
this.ulid=in.readUTF();
this.time=in.readUTF();
}
@Override
public int compareTo(SortOwn o) {
int tmp=this.ulid.compareTo(o.ulid);
if(tmp==0){
return o.time.compareTo(this.time);//倒序
}
return tmp;
}
}
//自定義分組 繼承WritableComparator
package kaoshi831;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyGroup extends WritableComparator {
public MyGroup(){//通過構造載入
super(SortOwn.class,true);//用反射,載入要分組的類
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
//定義分組條件
SortOwn sb1=(SortOwn)a;
SortOwn sb2=(SortOwn)b;
return sb1.getUlid().compareTo(sb2.getUlid());
}
}