大資料之hbase(五) --- 匯出Hbase的表文件到HDFS,Hbase Bulk Load Hbase,MySQL資料通過MR匯入到Hbase表中
阿新 • • 發佈:2018-12-22
一、匯出Hbase的表文件到HDFS -------------------------------------------------------------------------- 1.複製hbase的jar檔案和metrices-core-xxx.jar檔案到hadoop類路徑下. $>cd /soft/hbase/lib $>ls | grep hbase | cp `xargs` /soft/hadoop/share/hadoop/common/lib $>ls | grep metric | cp `xargs` /soft/hadoop/share/hadoop/common/lib 2.執行hbase-server-VERSION.jar下的MR程式,匯出hbase的資料[TSV檔案]到hdfs中。這個就是hbase庫的資料 $> cd /soft/hbase/lib $> hadoop jar hbase-server-1.2.6.jar export call:calllogs /data/HbaseTableDataout 二、Hbase Bulk Load Hbase大批量資料的遷移 A表-->B空表[可以跨空間] ------------------------------------------------------------------------- 1.原理 B是空表,和A的表結構相同 將A表在HDFS上的資料檔案直接拷貝到B表中,從而使B表擁有和A表相同的檔案 2.複製hbase的jar檔案和metrices-core-xxx.jar檔案到hadoop類路徑下. $>cd /soft/hbase/lib $>ls | grep hbase | cp `xargs` /soft/hadoop/share/hadoop/common/lib $>ls | grep metric | cp `xargs` /soft/hadoop/share/hadoop/common/lib 3.hbase上建B表 $hbase> create 'ns1:mytable1' , 'f1'. 'f2' 4.通過hbase的completebulkload命令實現TSV資料載入到B庫的ns1:mytable表中 $> cd /soft/hbase/lib $> hadoop jar hbase-server-1.2.6.jar completebulkload /hbase/data/call/calllogs/4471f0b068b2b425fdec957d25d4ab02 ns1:mytable1 [/hbase/data/call/calllogs/4471f0b068b2b425fdec957d25d4ab02 就是A表在hbase中的資料路徑] 三、MySQL資料通過MR匯入到Hbase表中 ------------------------------------------------------------- 1.新增依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>groupId</groupId> <artifactId>TestHbase</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-common</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-resourcemanager</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.17</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.6</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.6</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-auth --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>2.7.3</version> </dependency> </dependencies> </project>
2.主函式App
package hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; /** * AppMian */ public class MyApp { public static void main(String [] args) { try { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,"ns1:customers"); job.getConfiguration().set(TableOutputFormat.QUORUM_ADDRESS,"s100:2181:/hbase"); //設定job job.setJobName("WC"); job.setJarByClass(MyApp.class); //配置資料庫資訊 DBConfiguration.configureDB(job.getConfiguration(), "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.43.1:3306/mydata", "mysql", "mysql"); //配置資料輸入源 DBInputFormat.setInput(job, MyDBWritable.class, "select id,name,age from myhbase ", "select count(*) from myhbase"); //設定輸出路徑--輸出到Hbase job.setOutputFormatClass(TableOutputFormat.class); //設定map和reduce job.setMapperClass(MyMapper.class); //設定任務屬性 job.setNumReduceTasks(0); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Put.class); // job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } } }
3.自定義Writable
package hbase;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
public class MyDBWritable implements Writable,DBWritable {
private int id = 0;
private String name;
private int age;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
//資料庫的序列化,將資料寫入stats表中對用的第1,2列資料中
public void write(PreparedStatement ppst) throws SQLException {
// ppst.setString(1, word);
// ppst.setInt(2,count);
}
//資料庫的反序列化,從DB中讀取資料,從words表中讀取第2,3列的資料
public void readFields(ResultSet rs) throws SQLException {
id = rs.getInt(1);
name = rs.getString(2);
age = rs.getInt(3);
}
//序列化和反序列化
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeUTF(name);
out.writeInt(age);
}
public void readFields(DataInput in) throws IOException {
id = in.readInt();
name = in.readUTF();
age = in.readInt();
}
}
4.Map類
package hbase;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Mapper類
*/
public class MyMapper extends Mapper<LongWritable, MyDBWritable, NullWritable,Put> {
@Override
protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {
int id = value.getId();
String name = value.getName();
int age = value.getAge();
Put put = new Put(Bytes.toBytes(id));
put.add(Bytes.toBytes("f1"), Bytes.toBytes("id"), Bytes.toBytes(id));
put.add(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes(name));
put.add(Bytes.toBytes("f1"), Bytes.toBytes("age"), Bytes.toBytes(age));
context.write(NullWritable.get(), put);
}
}