1. 程式人生 > >SparkStreaming整合kafka入門

SparkStreaming整合kafka入門

package kafka

import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * spark Streaming 與kafka 0.10x版本的整合
  */
object SSCDriectKafka010 {
  def main(args: Array[String]): Unit = {
    val config = ConfigFactory.load()
    val conf = new SparkConf().setAppName("kafka-streaming").setMaster("local[*]")
    //批處理時間3秒
    val ssc = new StreamingContext(conf,Seconds(3))

    //設定消費者組id
    val groupId = "day_02"

    /**
      * 構建kafka連線引數
      * latest,earliest,none
      *
      * earliest
      * //當各分割槽下有已提交的offset時,從提交的offset開始消費,無提交的offset時,從頭開始消費
      * latest
      * 當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料
      * none
      * topic各分割槽都存在已提交的offset時,從offset後開始消費,只要有一個分割槽不存在已提交的offset,則丟擲異常
      */
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop01:9092,hadoop02:9092,hadoop03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest",
      //"auto.commit.interval.ms"-> "1000",設定為1秒提交一次offset,預設是5秒
      "enable.auto.commit" -> (false: java.lang.Boolean)  //是否自動遞交偏移量
    )
    //指定主題
    val topics = Array("user")

    /**
      *指定kafka資料來源
      * locationStrategy位置策略
      * 包含了兩個引數PreferBrokers,PreferConsistent
      * 如果說kafka的broker節點跟spark的executor節點不在同一臺機器的話,name就使用PreferConsistent
      *
      * 那麼在企業中多數情況下,kafka的broker和executor是不會在一臺伺服器的,但是對於多數
      * 中小企業來說會部署到一臺
      * 設定位置策略的原因是,會以最優化的策略進行讀取資料
      * 如果兩者在同一臺伺服器的話,讀寫資料效能會非常高,不需要走網路傳輸
      * PreferConsistent,將來kafka拉取的資料會盡量的將資料平均分散到所有的executor節點上
      */
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](topics, kafkaParams)
      //傳入兩個引數,一個是主題,一個是配置的引數集
    )
    //遍歷RDD
    stream.foreachRDD(rdd=>{
      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      // Array[OffsetRange]裡面有幾個offsetRange,有幾個分割槽就有幾個OffsetRange
      for (o <- offsetRanges){
        println(s"topic=${o.topic},partittion=${o.partition},fromoffset=${o.fromOffset},endoffset=${o.untilOffset}")
      }
      //主動發起遞交偏移量
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}