HBase權威指南學習記錄(五、hbase與MapReduce整合)
阿新 • • 發佈:2019-01-13
新增依賴:
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.4.9</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.4.9</version> </dependency>
student.txt資料:
1,Sam,18
2,Tom,16
3,Jetty,25
4,LiLei,56
hbase建表:
create 'student','cf1'
1.MapReduce作業從一個檔案中讀取資料並寫入hbase表中
public class HadoopConnectTest extends Configured implements Tool{ public static class Mapper1 extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String values[] = value.toString().split(","); String id = values[0]; String name = values[1]; String age = values[2]; // 新建put並插入資料 Put put = new Put(id.getBytes()); put.addColumn("cf1".getBytes(), "name".getBytes(), name.getBytes()); put.addColumn("cf1".getBytes(), "age".getBytes(), age.getBytes()); if (!put.isEmpty()) { // 插入表名 ImmutableBytesWritable ib = new ImmutableBytesWritable("student".getBytes()); context.write(ib, put); } } } //HDFS路徑 private static final String HDFS = "hdfs://192.168.30.141:9000"; //輸入檔案路徑 private static final String INPATH = HDFS + "/student.txt"; public int run() throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); //任務的配置設定,configuration是一個任務的配置物件,封裝了任務的配置資訊 conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181"); conf.set("hbase.rootdir", "hdfs://hadoop1:9000/hbase"); Job job = Job.getInstance(conf, "HFile bulk load test"); job.setJarByClass(HadoopConnectTest.class); job.setMapperClass(Mapper1.class); // TableMapReduceUtil是HBase提供的工具類,會自動設定mapreuce提交到hbase任務的各種配置,封裝了操作,只需要簡單的設定即可 //設定表名為student,reducer類為空,job為此前設定號的job TableMapReduceUtil.initTableReducerJob("student", null, job); // 設定reduce過程,這裡由map端的資料直接提交,不要使用reduce類,因而設定成null,並設定reduce的個數為0 job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(INPATH)); return (job.waitForCompletion(true) ? 0 : -1); } public static void main(String[] args) { int status = new HadoopConnectTest().run(); System.exit(status); } }
執行jar包:
./hadoop jar /usr/local/hbase.jar
驗證:
2.MapReduce從hbase表中讀取資料並存入檔案中
public class HBaseMapper extends TableMapper<Text, Text> { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { for (Cell cell : value.rawCells()) { String row = new String(CellUtil.cloneRow(cell)); String name = new String(CellUtil.cloneValue(cell)); context.write(new Text(row), new Text(name)); } } }
public class HBaseJob {
public static final String tableName = "student";
public static final String outputFilePath = "hdfs://hadoop1:9000/output";
public static Configuration conf = HBaseConfiguration.create();
static {
conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
conf.set("hbase.rootdir", "hdfs://hadoop1:9000/hbase");
conf.set("hbase.master", "hadoop1:60000");
}
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
Scan scan = new Scan();
scan.addColumn("cf1".getBytes(),"name".getBytes());
Job job = Job.getInstance(conf, "hbase_word_count");
job.setJarByClass(HBaseJob.class);
TableMapReduceUtil.initTableMapperJob(
"student",
scan,
HBaseMapper.class,
Text.class,
Text.class,
job);
FileOutputFormat.setOutputPath(job, new Path(outputFilePath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}