MapReduce之--某個使用者在某個位置從某個時刻開始停留了多長時間
阿新 • • 發佈:2018-12-24
package kaoshi831; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * * 資料意義:某個使用者在某個位置從某個時刻開始停留了多長時間 處理邏輯: 對同一個使用者,在同一個位置,連續的多條記錄進行合併 合併原則:開始時間取最早的,停留時長加和 使用者ID,位置ID,開始時間,停留時長(分鐘) * user_a,location_a,2018-01-01 08:00:00,60 */ public class GroupSort { static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{ Text outkey = new Text(); Text outvalue = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] sp = value.toString().split(","); // String sb = sp[2].substring(11, 13); outkey.set(sp[0]+","+sp[1]); //key發使用者ID,位置ID outvalue.set(sp[2]+","+sp[3]); //value發開始時間,停留時長(分鐘) context.write(outkey, outvalue); } } static class MyReducer extends Reducer<Text, Text, Text, Text>{ List<String> list = new ArrayList<>(); Text outvalue = new Text(); int sum=0; @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { //user_a,location_a,2018-01-01 08:00:00,60 for(Text v:values){ String[] sp = v.toString().split(","); sum+=Integer.parseInt(sp[1]); System.out.println(sp[0]); list.add(sp[0]); //新增到list集合中 } Collections.sort(list); //對list集合排序(2018-01-01 08:00:00字串也可以排的處理) outvalue.set(list.get(0)+"\t"+sum); context.write(key, outvalue); sum=0; list.clear(); //為了保險清理一下快取 } } public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { System.setProperty("HADOOP_USER_NAME", "hadoop"); Configuration conf=new Configuration(); Job job=Job.getInstance(conf); job.setJarByClass(kaoshi831.GroupSort.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/ksin")); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf);//建立一個hdfs的檔案系統 Path path=new Path("hdfs://hadoop01:9000/ksout001"); if(fs.exists(path)){ //對所在路徑下的檔案清除 fs.delete(path, true); } FileOutputFormat.setOutputPath(job,path); job.waitForCompletion(true); //列印日誌 } }
使用了list集合做排序
在實際的生產中不建議使用,ArrayList底層是陣列
Integer.MAX_VALUE((2^31)-1)個元素。
當資料量達到50%時效能就會明顯下降
,處理資料有限,對效能影響很大
//執行結果
user_a,location_a 2018-01-01 08:00:00 240
user_a,location_b 2018-01-01 10:00:00 60
user_a,location_c 2018-01-01 08:00:00 180
user_b,location_a 2018-01-01 15:00:00 180