1. 程式人生 > >Spark修煉之道(進階篇)——Spark入門到精通:第十六節 Spark Streaming與Kafka

Spark修煉之道(進階篇)——Spark入門到精通:第十六節 Spark Streaming與Kafka

作者:周志湖

主要內容

  1. Spark Streaming與Kafka版的WordCount示例(一)
  2. Spark Streaming與Kafka版的WordCount示例(二)

1. Spark Streaming與Kafka版本的WordCount示例 (一)

  1. 啟動kafka叢集
root@sparkslave02:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties 
root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties 
root@sparkmaster:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties

向kafka叢集傳送訊息

root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest
  1. 編寫如下程式
import org.apache.kafka.clients.producer
.{ProducerConfig, KafkaProducer, ProducerRecord} import org.apache.log4j.{Level, Logger} import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.{Logging, SparkConf} object KafkaWordCount { def main(args: Array[String]) { if (args.length < 4) { System.err
.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[4]") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap //建立ReceiverInputDStream val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() ssc.awaitTermination() } }

配置執行引數:
這裡寫圖片描述
具體如下:

sparkmaster:2181  test-consumer-group kafkatopictest 1

sparkmaster:2181,zookeeper監聽地址
test-consumer-group, consumer-group的名稱,必須和$KAFKA_HOME/config/consumer.properties中的group.id的配置內容一致
kafkatopictest,topic名稱
1,執行緒數

執行KafkaWordCount 後,在producer中輸入下列內容

root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest
[2015-11-04 03:25:39,666] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
Spark
Spark TEST
TEST Spark Streaming

這裡寫圖片描述

得到結果如下:
這裡寫圖片描述

2. Spark Streaming與Kafka版本的WordCount示例(二)

前面的例子中,producer是通過kafka的指令碼生成的,本例中將給出通過編寫程式生成的producer

// 隨機生成1-100間的數字
object KafkaWordCountProducer {

  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }

    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

    // Zookeeper連線屬性配置
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    //建立KafkaProducer
    val producer = new KafkaProducer[String, String](props)

    // 向kafka叢集傳送訊息
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")

        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }

      Thread.sleep(1000)
    }
  }

}

KafkaWordCountProducer 執行引數設定如下:

sparkmaster:9092 kafkatopictest 5 8

sparkmaster:9092,broker-list
kafkatopictest,top名稱
5表示每秒發多少條訊息
8表示每條訊息中有幾個單詞

先KafkaWordCountProducer,然後再執行KafkaWordCount ,得到的計算結果如下:
這裡寫圖片描述