1. 程式人生 > >spark從hbase讀取寫入資料

spark從hbase讀取寫入資料

將RDD寫入hbase 注意點:

依賴:

將lib目錄下的hadoop開頭jar包、hbase開頭jar包新增至classpath

此外還有lib目錄下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少會提示hbase RpcRetryingCaller: Call exception不斷嘗試重連hbase,不報錯)、htrace-core-3.1.0-incubating.jar、guava-12.0.1.jar

$SPARK_HOME/lib目錄下的 spark-assembly-1.6.1-hadoop2.4.0.jar

不同的package中可能會有相同名稱的類,不要導錯

連線叢集:

spark應用需要連線到zookeeper叢集,然後藉助zookeeper訪問hbase。一般可以通過兩種方式連線到zookeeper:

第一種是將hbase-site.xml檔案加入classpath

第二種是在HBaseConfiguration例項中設定

如果不設定,預設連線的是localhost:2181會報錯:connection refused 

本文使用的是第二種方式。

hbase建立表:

雖然可以在spark應用中建立hbase表,但是不建議這樣做,最好在hbase shell中建立表,spark寫或讀資料

使用saveAsHadoopDataset寫入資料

package com.test   import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD.rddToPairRDDFunctions   object TestHBase {     def main(args: Array[String]): Unit = {     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")     val sc = new SparkContext(sparkConf)       val conf = HBaseConfiguration.create()     //設定zooKeeper叢集地址,也可以通過將hbase-site.xml匯入classpath,但是建議在程式裡這樣設定     conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")     //設定zookeeper連線埠,預設2181     conf.set("hbase.zookeeper.property.clientPort", "2181")       val tablename = "account"          //初始化jobconf,TableOutputFormat必須是org.apache.hadoop.hbase.mapred包下的!     val jobConf = new JobConf(conf)     jobConf.setOutputFormat(classOf[TableOutputFormat])     jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)          val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))         val rdd = indataRDD.map(_.split(',')).map{arr=>{       /*一個Put物件就是一行記錄,在構造方法中指定主鍵        * 所有插入的資料必須用org.apache.hadoop.hbase.util.Bytes.toBytes方法轉換        * Put.add方法接收三個引數:列族,列名,資料        */       val put = new Put(Bytes.toBytes(arr(0).toInt))       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))       //轉化成RDD[(ImmutableBytesWritable,Put)]型別才能呼叫saveAsHadoopDataset       (new ImmutableBytesWritable, put)      }}          rdd.saveAsHadoopDataset(jobConf)          sc.stop()   }   }

使用saveAsNewAPIHadoopDataset寫入資料

package com.test   import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.spark._ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes   object TestHBase3 {     def main(args: Array[String]): Unit = {     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")     val sc = new SparkContext(sparkConf)          val tablename = "account"          sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")     sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)          val job = new Job(sc.hadoopConfiguration)     job.setOutputKeyClass(classOf[ImmutableBytesWritable])     job.setOutputValueClass(classOf[Result])       job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])         val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))     val rdd = indataRDD.map(_.split(',')).map{arr=>{       val put = new Put(Bytes.toBytes(arr(0)))       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))       (new ImmutableBytesWritable, put)      }}          rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())   }   }

從hbase讀取資料轉化成RDD 本例基於官方提供的例子

package com.test   import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark._ import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.io._   object TestHBase2 {     def main(args: Array[String]): Unit = {     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")     val sc = new SparkContext(sparkConf)          val tablename = "account"     val conf = HBaseConfiguration.create()     //設定zooKeeper叢集地址,也可以通過將hbase-site.xml匯入classpath,但是建議在程式裡這樣設定     conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")     //設定zookeeper連線埠,預設2181     conf.set("hbase.zookeeper.property.clientPort", "2181")     conf.set(TableInputFormat.INPUT_TABLE, tablename)       // 如果表不存在則建立表     val admin = new HBaseAdmin(conf)     if (!admin.isTableAvailable(tablename)) {       val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))       admin.createTable(tableDesc)     }       //讀取資料並轉化成rdd     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],       classOf[org.apache.hadoop.hbase.client.Result])       val count = hBaseRDD.count()     println(count)     hBaseRDD.foreach{case (_,result) =>{       //獲取行鍵       val key = Bytes.toString(result.getRow)       //通過列族和列名獲取列       val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))       val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))       println("Row key:"+key+" Name:"+name+" Age:"+age)     }}       sc.stop()     admin.close()   } }