1. 程式人生 > >Spark+Kafka的Direct方式將偏移量傳送到Zookeeper的實現

Spark+Kafka的Direct方式將偏移量傳送到Zookeeper的實現

Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,並且通過這種方式去實現零資料丟失(zero data loss)相比使用基於Receiver的方法要高效。但是因為是Spark Streaming系統自己維護Kafka的讀偏移量,而Spark Streaming系統並沒有將這個消費的偏移量傳送到Zookeeper中,這將導致那些基於偏移量的Kafka叢集監控軟體(比如:Apache Kafka監控之Kafka Web Console、Apache Kafka監控之KafkaOffsetMonitor等)失效。本文就是基於為了解決這個問題,使得我們編寫的Spark Streaming程式能夠在每次接收到資料之後自動地更新Zookeeper中Kafka的偏移量。

  我們從Spark的官方文件可以知道,維護Spark內部維護Kafka便宜了資訊是儲存在HasOffsetRanges類的offsetRanges中,我們可以在Spark Streaming程式裡面獲取這些資訊:
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

這樣我們就可以獲取所以分割槽消費資訊,只需要遍歷offsetsList,然後將這些資訊傳送到Zookeeper即可更新Kafka消費的偏移量。完整的程式碼片段如下:

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
      messages.foreachRDD(rdd => {
        val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        val kc = new KafkaCluster(kafkaParams)
        for (offsets < - offsetsList) {
          val topicAndPartition = TopicAndPartition("test-topic", offsets.partition)
          val o = kc.setConsumerOffsets(args(0), Map((topicAndPartition, offsets.untilOffset)))
          if (o.isLeft) {
            println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
          }
        }
})

  

KafkaCluster類用於建立和Kafka叢集的連結相關的操作工具類,我們可以對Kafka中Topic的每個分割槽設定其相應的偏移量Map((topicAndPartition, offsets.untilOffset)),然後呼叫KafkaCluster類的setConsumerOffsets方法去更新Zookeeper裡面的資訊,這樣我們就可以更新Kafka的偏移量,最後我們就可以通過KafkaOffsetMonitor之類軟體去監控Kafka中相應Topic的消費資訊,下圖是KafkaOffsetMonitor的監控情況:

 

從圖中我們可以看到KafkaOffsetMonitor監控軟體已經可以監控到Kafka相關分割槽的消費情況,這對監控我們整個Spark Streaming程式來非常重要,因為我們可以任意時刻了解Spark讀取速度。另外,KafkaCluster工具類的完整程式碼如下:

package org.apache.spark.streaming.kafka
 
import kafka.api.OffsetCommitRequest
import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import org.apache.spark.SparkException
import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig
 
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.NonFatal
 
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
  type Err = ArrayBuffer[Throwable]
  @transient private var _config: SimpleConsumerConfig = null
 
  def config: SimpleConsumerConfig = this.synchronized {
    if (_config == null) {
      _config = SimpleConsumerConfig(kafkaParams)
    }
    _config
  }
 
  def setConsumerOffsets(groupId: String,
                         offsets: Map[TopicAndPartition, Long]
                          ): Either[Err, Map[TopicAndPartition, Short]] = {
    setConsumerOffsetMetadata(groupId, offsets.map { kv =>
      kv._1 -> OffsetMetadataAndError(kv._2)
    })
  }
 
  def setConsumerOffsetMetadata(groupId: String,
                                metadata: Map[TopicAndPartition, OffsetMetadataAndError]
                                 ): Either[Err, Map[TopicAndPartition, Short]] = {
    var result = Map[TopicAndPartition, Short]()
    val req = OffsetCommitRequest(groupId, metadata)
    val errs = new Err
    val topicAndPartitions = metadata.keySet
    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
      val resp = consumer.commitOffsets(req)
      val respMap = resp.requestInfo
      val needed = topicAndPartitions.diff(result.keySet)
      needed.foreach { tp: TopicAndPartition =>
        respMap.get(tp).foreach { err: Short =>
          if (err == ErrorMapping.NoError) {
            result += tp -> err
          } else {
            errs.append(ErrorMapping.exceptionFor(err))
          }
        }
      }
      if (result.keys.size == topicAndPartitions.size) {
        return Right(result)
      }
    }
    val missing = topicAndPartitions.diff(result.keySet)
    errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
    Left(errs)
  }
 
  private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
                         (fn: SimpleConsumer => Any): Unit = {
    brokers.foreach { hp =>
      var consumer: SimpleConsumer = null
      try {
        consumer = connect(hp._1, hp._2)
        fn(consumer)
      } catch {
        case NonFatal(e) =>
          errs.append(e)
      } finally {
        if (consumer != null) {
          consumer.close()
        }
      }
    }
  }
 
  def connect(host: String, port: Int): SimpleConsumer =
    new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)
}