1. 程式人生 > >MapReduce之--某個使用者在某個位置從某個時刻開始停留了多長時間

MapReduce之--某個使用者在某個位置從某個時刻開始停留了多長時間

package kaoshi831;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 
 *
	資料意義:某個使用者在某個位置從某個時刻開始停留了多長時間
	處理邏輯:
	對同一個使用者,在同一個位置,連續的多條記錄進行合併
	合併原則:開始時間取最早的,停留時長加和
	使用者ID,位置ID,開始時間,停留時長(分鐘)
 *	user_a,location_a,2018-01-01 08:00:00,60
 */
public class GroupSort {
	 
	static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
		Text outkey = new Text();
		Text outvalue = new Text();
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String[] sp = value.toString().split(",");
//			String sb = sp[2].substring(11, 13);
			outkey.set(sp[0]+","+sp[1]);		//key發使用者ID,位置ID
			outvalue.set(sp[2]+","+sp[3]);		//value發開始時間,停留時長(分鐘)
			context.write(outkey, outvalue);
		}
	}
	static class MyReducer extends Reducer<Text, Text, Text, Text>{
		List<String> list = new ArrayList<>();
		Text outvalue = new Text();
		int sum=0;
		@Override
		protected void reduce(Text key,
				Iterable<Text> values, 
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {

			//user_a,location_a,2018-01-01 08:00:00,60
			for(Text v:values){
				String[] sp = v.toString().split(",");
				sum+=Integer.parseInt(sp[1]);
				System.out.println(sp[0]);
				list.add(sp[0]);		//新增到list集合中
			}
			Collections.sort(list);		//對list集合排序(2018-01-01 08:00:00字串也可以排的處理)
			outvalue.set(list.get(0)+"\t"+sum);
			context.write(key, outvalue);
			sum=0;
			list.clear();		//為了保險清理一下快取
		}
	}

	public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		Configuration conf=new Configuration();
		Job job=Job.getInstance(conf);
		
		job.setJarByClass(kaoshi831.GroupSort.class);
		
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/ksin"));
		
		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf);//建立一個hdfs的檔案系統
		Path path=new Path("hdfs://hadoop01:9000/ksout001");
		if(fs.exists(path)){			//對所在路徑下的檔案清除
			fs.delete(path, true);
		}
		FileOutputFormat.setOutputPath(job,path);
		
		job.waitForCompletion(true);	//列印日誌

	}

}

使用了list集合做排序

在實際的生產中不建議使用,ArrayList底層是陣列

Integer.MAX_VALUE((2^31)-1)個元素。

當資料量達到50%時效能就會明顯下降

,處理資料有限,對效能影響很大

//執行結果
user_a,location_a	2018-01-01 08:00:00	240
user_a,location_b	2018-01-01 10:00:00	60
user_a,location_c	2018-01-01 08:00:00	180
user_b,location_a	2018-01-01 15:00:00	180