1. 程式人生 > >SparkStreaming與kafka通過直連方式讀取資料

SparkStreaming與kafka通過直連方式讀取資料

1、Spark-Streaming的receive的方式和直連方式有什麼區別:
Receive接收固定時間間隔的資料(放在記憶體中),達到固定的時間才進行處理,效率低並且容易丟失資料(Kafka高階API),自動維護偏移量
Direct直連方式,相當於直接連線到Kafka的分割槽上,相當於Kafka底層API,效率很高,需要自己維護偏移量,讀一條處理一條(把指定的時間間隔當做一個批次)。
2、直接連到kafka的分割槽上讀取,一個RDD的分割槽對應一個kafka的分割槽,一個分割槽會生成一個Task,這個Task不會消失,會一直盯著這個分割槽,不停的讀取資料。
3、在用Reciver方式,消費消費者時,不用指定broker,在直連的方式,需要指定broker,因為這種方式相當於直接練到Kafka的分割槽中,需要broker
4、zookeeper的作用,zookeeper中記錄的是,以組名和topic名作為唯一標識,不同的組可以讀取同一topic中的資料,記偏移量是從前面記錄

package day01.Dirctor

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.
apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object DrictorDemoV4 { def main(args: Array[String]): Unit = { val group = "groupTT"//指定組名 val conf = new SparkConf
().setAppName("kafkaWC").setMaster("local[2]") val sc = new SparkContext(conf) //建立SparkStreaming,並設定時間間隔 val ssc = new StreamingContext(conf,Duration(5000)) //指定消費的topic名字 val topic = "tt2" //指定kafka的broker地址,Streaming的Task直連到kafka的分割槽上,用底層的API,效率更高 val brokerList = "hadoop01:9092,hadoop02:9092,hadoop03:9092" //指定zk地址,後期更新消費的偏移量時使用(以後可以Redis、MySQL) val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181" //建立DStream時使用topic名字的集合,SparkStreaming可以同時消費多少topic val topics:Set[String] = Set(topic) //建立一個ZKGroupTopicDirs物件,其實就是指定往zk中寫入資料的目錄,用於儲存偏移量 val topicDirs = new ZKGroupTopicDirs(group,topic) //獲取zookeeper中的路徑"groupTT/offsets/tt01" val zkTopicPath:String = s"${topicDirs.consumerOffsetDir}" //準備kafka引數 val kafkaParams = Map( "metadata.broker.list"->brokerList, "group.id"->group, "auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString ) //zookeeper的host和ip,建立一個Client,用於更新偏移量 //是zookeeper的一個客戶端,可以從zk中讀取,偏移量的資料,並跟新偏移量 val zkClient: ZkClient = new ZkClient(zkQuorum) //查詢該路徑下是否位元組點(預設有位元組點為我們自己儲存不同Partition生成的) // /consumers/組名/offsets/topic名/分割槽名/偏移量, 可以zkClient.sh 插詢 val children = zkClient.countChildren(zkTopicPath) //建立一個InputDStream, 要是var,因為不去定是不是以前讀過,要先判斷,再賦值 //key 是 kafka的Key,預設不設定是null,value是讀取的內容 var kafkaStream:InputDStream [(String,String)] = null //如果zookeeper中儲存offset,我們會利用這個Offset作為kafkaStream的讀取位置 var fromOffsets:Map[TopicAndPartition,Long] = Map() //如果儲存過Offset,以前讀取過 if(children >0){ for (i<- 0 until children){ //zkClient根據檔案位置讀取偏移量( /consumers/組名/offsets/topic名/分割槽名/偏移量,) val partitionOffset: String = zkClient.readData[String](s"$zkTopicPath/${i}") val tp: TopicAndPartition = TopicAndPartition(topic,i) //將不同 Partition對應的Offset增加到fromOffset中( // fromOffsets += (tp-> partitionOffset.toLong) //這個會將kafka的訊息進行transform,最終的kafka的資料會變成kafka的key,message)這樣的Tuple val messageHandler = (mam:MessageAndMetadata[String,String])=>(mam.key(),mam.message()) //通過KafkaUtils建立直連的DStream,fromOffset引數的作用是按照之間計算好的偏移量繼續讀取 //[String,String,StringDecoder,StringDecoder,(String,String)] // key value key的解碼, value的解碼 kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)]( ssc,kafkaParams,fromOffsets,messageHandler ) } }else{ //從頭開始讀,之前沒有讀取過 kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics) } var offsetRanges = Array[OffsetRange]() //從kafka中讀取訊息,DStream的Transform方法,可以將當前批次RDD取出來來 //該transform方法計算獲取當前批次RDD,然後將RDD的偏移量取出來,然後將RDD返回DStream val transformed: DStream[(String, String)] = kafkaStream.transform(rdd => { //得到該rdd對應的kafka的訊息的offset //該RDD是個kafkaRDD,可以獲取偏移量的範圍 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }) val messages: DStream[String] = transformed.map((_._2)) //一次迭代DStream中的RDD messages.foreachRDD(rdd=>{ //對RDD進行操作,觸發Action rdd.foreachPartition(partition=>{ partition.foreach(x=>{ println(x) }) }) }) for(off <- offsetRanges){ //獲取zk 中記錄偏移量的目錄, // /consumers/組名/offsets/topic名/分割槽名 val zkPath = s"${topicDirs.consumerOffsetDir}/${off.partition}" //更新偏移量 ZkUtils.updatePersistentPath(zkClient,zkPath,off.untilOffset.toString) } ssc.start() ssc.awaitTermination() } }