1. 程式人生 > >MapReduce的兩表join一般操作

MapReduce的兩表join一般操作

案例:(部門員工兩表的join查詢)

原始資料

員工表(emp):

 empno ename  job      mgr  hiredate   sal  comm deptno loc
 7499  allen  salesman 7698 1981-02-20 1600 300  30
 7782  clark  manager  7639 1981-06-09 2450      10
 7654  martin salesman 7698 1981-03-22 1250 1400 30     boston
 7900  james  clerk    7698 1981-03-20 950       30
 7788  scott  analyst  7566 1981-09-01 3000 100  20 

部門表(dep):

deptno dname     loc
 30     sales     chicago
 20     research  dallas
 10     accouting newyork

實現的功能類似於:select e.empno,e.ename,d.deptno,dname from emp e join dept d on e.deptno=d.deptno;
用join on後面的欄位作為key
一對多關係

解析:

最後輸出的結果包含兩張表共四個屬性(員工id,員工姓名,部門id(外來鍵),部門名稱)

我們可以將部門id作為map傳值的key,將四個屬性構造一個JavaBean作為map傳值的value,其中自定義的JavaBean中除了包含四個屬性外,還應有區分是員工表還是部門表的欄位flag.

1.JavaBean

/*
 *實現join的兩張表通用的一個bean,並且bean中加一個通用的標識flag,用於區分兩張表
 *實現writableCompare介面(由於資料要在網路上傳輸必須序列化,hadoop處理的時候需要分組和排序)
 */
public class Bean implements WritableComparable<Bean> {
	// 兩個表共查詢的屬性
	private String empno = " ";
	private String empname = " ";
	private String depno = " ";
	private String depname = " ";
	private int flag = 0; // 0:部門 1:員工

	public Bean() {
	}

	public Bean(String empno, String empname, String depno, String depname,
			int flag) {
		super();
		this.empno = empno;
		this.empname = empname;
		this.depno = depno;
		this.depname = depname;
		this.flag = flag;
	}

	public Bean(Bean bean) {
		this.empno = bean.getEmpno();
		this.empname = bean.getEmpname();
		this.depno = bean.getDepno();
		this.depname = bean.getDepname();
		this.flag = bean.getFlag();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		// 寫資料
		out.writeUTF(empno);
		out.writeUTF(empname);
		out.writeUTF(depno);
		out.writeUTF(depname);
		out.writeInt(flag);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		// 讀資料
		this.empno = in.readUTF();
		this.empname = in.readUTF();
		this.depno = in.readUTF();
		this.depname = in.readUTF();
		this.flag = in.readInt();
	}

	@Override
	public String toString() {
		return "empno=" + empno + ", empname=" + empname + ", depno=" + depno
				+ ", depname=" + depname;
	}

	@Override
	public int compareTo(Bean arg0) {
		// TODO Auto-generated method stub
		return 0;
	}

	public String getEmpno() {
		return empno;
	}

	public void setEmpno(String empno) {
		this.empno = empno;
	}

	public String getEmpname() {
		return empname;
	}

	public void setEmpname(String empname) {
		this.empname = empname;
	}

	public String getDepno() {
		return depno;
	}

	public void setDepno(String depno) {
		this.depno = depno;
	}

	public String getDepname() {
		return depname;
	}

	public void setDepname(String depname) {
		this.depname = depname;
	}

	public int getFlag() {
		return flag;
	}

	public void setFlag(int flag) {
		this.flag = flag;
	}

}


2.Map類( map的輸出的key為join時員工表的deptno,輸出的value為物件Bean,reduce時 會將兩個相同的key組成一起)

/*
 * map的輸出的key為join時員工表的deptno,輸出的value為物件Bean
 * reduce時 會將兩個相同的key組成一起
 */
/*
 * 將emp和dep定義相同的bean來處理
 */
public class StaffDepMap extends Mapper<LongWritable, Text, IntWritable, Bean> {
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, IntWritable, Bean>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		String[] str = line.split("\t");
		// 判斷表的型別
		if (str.length == 3) {
			// 部門資料
			Bean dep = new Bean();
			dep.setDepno(str[0]);
			dep.setDepname(str[1]);
			dep.setFlag(0);
			// 傳遞部門資料
			context.write(new IntWritable(Integer.parseInt(str[0])), dep);
		} else {
			// 員工資料
			Bean emp = new Bean();
			emp.setEmpno(str[0]);
			emp.setEmpname(str[1]);
			emp.setDepno(str[7]);
			emp.setFlag(1);
			// 傳遞員工資料
			context.write(new IntWritable(Integer.parseInt(str[7])), emp);
		}
	}
}

3.Reduce類(輸入的即是兩個表的depno)

public class StaffDepRedu extends
		Reducer<IntWritable, Bean, NullWritable, Text> {
	@Override
	protected void reduce(IntWritable key, Iterable<Bean> values,
			Reducer<IntWritable, Bean, NullWritable, Text>.Context context)
			throws IOException, InterruptedException {
		Bean dep = null;
		List<Bean> emps = new ArrayList<Bean>();
		for (Bean bean : values) {
			if (bean.getFlag() == 0) {
				// 部門資料
				dep = new Bean(bean); // 重新構造物件
			} else {
				// 員工資料
				emps.add(new Bean(bean));
			}
		}
		// 給員工資料list新增部門的dname
		for (Bean emp : emps) {
			emp.setDepname(dep.getDepname());
			context.write(NullWritable.get(), new Text(emp.toString()));
		}

	}
}

4.job類

public class StaffDepMain {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(StaffDepMain.class);

		job.setMapperClass(StaffDepMap.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Bean.class);

		job.setReducerClass(StaffDepRedu.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.waitForCompletion(true);
	}
}