SparkStreaming整合kafka入門
阿新 • • 發佈:2018-12-27
package kafka import com.typesafe.config.ConfigFactory import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} /** * spark Streaming 與kafka 0.10x版本的整合 */ object SSCDriectKafka010 { def main(args: Array[String]): Unit = { val config = ConfigFactory.load() val conf = new SparkConf().setAppName("kafka-streaming").setMaster("local[*]") //批處理時間3秒 val ssc = new StreamingContext(conf,Seconds(3)) //設定消費者組id val groupId = "day_02" /** * 構建kafka連線引數 * latest,earliest,none * * earliest * //當各分割槽下有已提交的offset時,從提交的offset開始消費,無提交的offset時,從頭開始消費 * latest * 當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料 * none * topic各分割槽都存在已提交的offset時,從offset後開始消費,只要有一個分割槽不存在已提交的offset,則丟擲異常 */ val kafkaParams: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "earliest", //"auto.commit.interval.ms"-> "1000",設定為1秒提交一次offset,預設是5秒 "enable.auto.commit" -> (false: java.lang.Boolean) //是否自動遞交偏移量 ) //指定主題 val topics = Array("user") /** *指定kafka資料來源 * locationStrategy位置策略 * 包含了兩個引數PreferBrokers,PreferConsistent * 如果說kafka的broker節點跟spark的executor節點不在同一臺機器的話,name就使用PreferConsistent * * 那麼在企業中多數情況下,kafka的broker和executor是不會在一臺伺服器的,但是對於多數 * 中小企業來說會部署到一臺 * 設定位置策略的原因是,會以最優化的策略進行讀取資料 * 如果兩者在同一臺伺服器的話,讀寫資料效能會非常高,不需要走網路傳輸 * PreferConsistent,將來kafka拉取的資料會盡量的將資料平均分散到所有的executor節點上 */ val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams) //傳入兩個引數,一個是主題,一個是配置的引數集 ) //遍歷RDD stream.foreachRDD(rdd=>{ val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // Array[OffsetRange]裡面有幾個offsetRange,有幾個分割槽就有幾個OffsetRange for (o <- offsetRanges){ println(s"topic=${o.topic},partittion=${o.partition},fromoffset=${o.fromOffset},endoffset=${o.untilOffset}") } //主動發起遞交偏移量 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }) ssc.start() ssc.awaitTermination() } }