1. 程式人生 > >sparkStreaming整合Kafka

sparkStreaming整合Kafka

這幾天看了spark整合Kafka,消費Kafka資料並向Kafka傳送資料,仿照官方樣例寫了兩個小例子。在此分享一下。

  • 1.新增Kafka的repository
  • 2.DirectKafkaWordCountDemo程式碼展示
  • 3.kafkaProducer程式碼展示
  • 4.從Kafka 叢集中消費資料並處理後再存入Kafka程式碼展示

本案例中使用的Kafka為三個broker一個zookeeper的Kafka叢集。
本案例中原始訊息體為“message: 我是第 n 條資訊“

1.新增Kafka的repository

spark中整合Kafka需要引入Kafka的repository,在pom檔案中新增如下依賴

  <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

2.DirectKafka程式碼展示

該段程式碼,從 Kafka叢集中消費原始“message: 我是第n條資訊“並把資料進行截斷,過濾處理,輸出為“我是第n條資訊“

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by yangyibo on 16/12/1.
  */
object DirectKafkaWordCountDemo {
  def main(args: Array[String]) {
    val
sprakConf = new SparkConf().setAppName("DirectKafkaWordCountDemo") //此處在idea中執行時請保證local[2]核心數大於2 sprakConf.setMaster("local[2]") val ssc = new StreamingContext(sprakConf, Seconds(3)) val brokers = "192.168.100.41:9092,192.168.100.42:9092,192.168.100.43:9092"; val topics = "abel"; val topicSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) val lines = messages.map(_._2) val words = lines.flatMap(_.split(" ")).filter(!_.equals("message:")) val wordCounts = words.map(x=>(x, 1l)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }

3.kafkaProducer程式碼展示

此段程式碼吧array 中的字串當作資料傳送到Kafka叢集中。

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**
  * Created by yangyibo on 16/11/29.
  */
object KafkaProducerDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("KafkaProducer")
    val sc = new SparkContext(conf)
    val array = ArrayBuffer("one","tow","three")    
      kafkaProducer(array)
    }

  def kafkaProducer(args: ArrayBuffer[String]) {
    if (args != null) {
      val brokers = "192.168.100.41:9092,192.168.100.42:9092,192.168.100.43:9092"
      // Zookeeper connection properties
      val props = new HashMap[String, Object]()
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer")
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer")
      val producer = new KafkaProducer[String, String](props)
      val topic = "abel2"
      // Send some messages
        for (arg <- args) {
          println(arg + "----------我已經發送")
          val message = new ProducerRecord[String, String](topic, null, arg)
          producer.send(message)
        }
        Thread.sleep(500)
        producer.close()
    }
  }
}

4.從Kafka 叢集中讀取資料處理後再存入Kafka程式碼展示

此段程式碼是sparkStreaming集合Kafka的消費和傳送資料。從Kafka叢集中消費原始資料“message: 我是第n條資訊“將原始資料處理後為“message: 我是第n條資訊--read“並將處理後的資料傳送到Kafka叢集的另外一個topic中。

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkConf}

/**
  * Created by yangyibo on 16/11/28.
  */
object KafkaWordCountDemo {
  private val brokers = "192.168.100.41:9092,192.168.100.42:9092,192.168.100.43:9092"
  // Zookeeper connection properties
  private val props = new HashMap[String, Object]()
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer")
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer")
  private val producer = new KafkaProducer[String, String](this.props)

  def main(args: Array[String]): Unit = {
    run()
  }

  def run(): Unit = {
    val zkQuorum = "192.168.100.48:2181"
    val group = "spark-streaming-test"
    val topics = "abel"
    val numThreads = 1

    val sparkConf = new SparkConf().setAppName("KafkaWordCountDemo")
    sparkConf.setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")


    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)       
    val array = ArrayBuffer[String]()
    lines.foreachRDD(rdd => {
      val count = rdd.count().toInt;
      rdd.take(count + 1).take(count).foreach(x => {
        array += x + "--read"
      })
      kafkaProducerSend(array)
      array.clear()
    })
    ssc.start()
    ssc.awaitTermination()
  }

  def kafkaProducerSend(args: ArrayBuffer[String]) {
    if (args != null) {
      val topic = "abel2"
      // Send some messages
      for (arg <- args) {
        println(arg + "----------我已經讀取")
        val message = new ProducerRecord[String, String](topic, null, arg)
        producer.send(message)
      }
      Thread.sleep(500)
    }
  }

}