1. 程式人生 > >HBase權威指南學習記錄(五、hbase與MapReduce整合)

HBase權威指南學習記錄(五、hbase與MapReduce整合)

新增依賴:

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.4.9</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.4.9</version>
        </dependency>

student.txt資料:

1,Sam,18
2,Tom,16
3,Jetty,25
4,LiLei,56

 hbase建表:

create 'student','cf1'

1.MapReduce作業從一個檔案中讀取資料並寫入hbase表中

public class HadoopConnectTest extends Configured implements Tool{
    public static class Mapper1 extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String values[] = value.toString().split(",");
            String id = values[0];
            String name = values[1];
            String age = values[2];
//            新建put並插入資料
            Put put = new Put(id.getBytes());
            put.addColumn("cf1".getBytes(), "name".getBytes(), name.getBytes());
            put.addColumn("cf1".getBytes(), "age".getBytes(), age.getBytes());
            if (!put.isEmpty()) {
//                插入表名
                ImmutableBytesWritable ib = new ImmutableBytesWritable("student".getBytes());
                context.write(ib, put);
            }
        }
    }

    //HDFS路徑
    private static final String HDFS = "hdfs://192.168.30.141:9000";
    //輸入檔案路徑
    private static final String INPATH = HDFS + "/student.txt";

    public int run() throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = HBaseConfiguration.create();
        //任務的配置設定,configuration是一個任務的配置物件,封裝了任務的配置資訊
        conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
        conf.set("hbase.rootdir", "hdfs://hadoop1:9000/hbase");

        Job job = Job.getInstance(conf, "HFile bulk load test");
        job.setJarByClass(HadoopConnectTest.class);
        job.setMapperClass(Mapper1.class);
        // TableMapReduceUtil是HBase提供的工具類,會自動設定mapreuce提交到hbase任務的各種配置,封裝了操作,只需要簡單的設定即可
        //設定表名為student,reducer類為空,job為此前設定號的job
        TableMapReduceUtil.initTableReducerJob("student", null, job);
        // 設定reduce過程,這裡由map端的資料直接提交,不要使用reduce類,因而設定成null,並設定reduce的個數為0
        job.setNumReduceTasks(0);

        FileInputFormat.addInputPath(job, new Path(INPATH));
        return (job.waitForCompletion(true) ? 0 : -1);
    }

    public static void main(String[] args) {
        int status = new HadoopConnectTest().run();
        System.exit(status);
    }
}

 執行jar包:

./hadoop jar /usr/local/hbase.jar

驗證:

2.MapReduce從hbase表中讀取資料並存入檔案中

public class HBaseMapper extends TableMapper<Text, Text> {

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context)
            throws IOException, InterruptedException {
        for (Cell cell : value.rawCells()) {
            String row = new String(CellUtil.cloneRow(cell));
            String name = new String(CellUtil.cloneValue(cell));
            context.write(new Text(row), new Text(name));
        }
    }
}
public class HBaseJob {
    public static final String tableName = "student";
    public static final String outputFilePath = "hdfs://hadoop1:9000/output";

    public static Configuration conf = HBaseConfiguration.create();

    static {
        conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
        conf.set("hbase.rootdir", "hdfs://hadoop1:9000/hbase");
        conf.set("hbase.master", "hadoop1:60000");
    }

    public static void main(String[] args)
            throws IOException, InterruptedException, ClassNotFoundException {
        Scan scan = new Scan();
        scan.addColumn("cf1".getBytes(),"name".getBytes());

        Job job = Job.getInstance(conf, "hbase_word_count");
        job.setJarByClass(HBaseJob.class);

        TableMapReduceUtil.initTableMapperJob(
                "student",
                scan,
                HBaseMapper.class,
                Text.class,
                Text.class,
                job);

        FileOutputFormat.setOutputPath(job, new Path(outputFilePath));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}