1. 程式人生 > >大資料學習之路95-SparkStreaming寫WordCount

大資料學習之路95-SparkStreaming寫WordCount

程式如下:

package com.test.sparkStreaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object SparkStreamingWordCount {
  def main(args: Array[String]): Unit = {
    //這裡至少有兩個執行緒,一個執行緒拉取資料,另一個執行緒負責計算
     val conf: SparkConf = new SparkConf().setAppName("SparkStreamingWordCount").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(conf)
    //建立一個SparkStreamingContext
    val ssc: StreamingContext = new StreamingContext(sc,Seconds(2))
    //建立DStream(通過一個TCP埠拉取資料建立DStream)
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("marshal",8888)
    //對DStream進行操作
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_,1))
    val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

我們先啟動netcat:

nc -lk 8888

然後執行程式:

但是這樣的程式有一個缺點,就是它每換一行就會重新統計

接下來我們寫一個可以更新狀態的SparkStreamingWordCount

我們需要在原來的程式中修改reduceByKey為 updateStateByKey,而這個函式需要三個引數:

分別為一個函式,一個分割槽器,一個Boolean值

這裡的分割槽器我們使用spark的HashPartitioner,這個分割槽器我們需要傳一個分割槽數量的引數進去。

這裡我們傳一個預設的並行度進去,他原來有幾個分割槽我們就傳幾個分割槽。接下來我們還需要傳一個true,這個true的作用就是讓他記住以前的分割槽。

關鍵就是這個函式如何寫:

既然是函式就有輸入有返回。我們愛看一下他的輸入是什麼?

我們可以看到這裡傳入的是一個迭代器:

這個迭代器中的三個引數:

第一個:key即單詞

第二個:代表當前批次,該單詞出現的次數

第三個引數代表初始值或者是以前累加過的值。

寫完這些其實就差不多了,但是還會報錯如下:

那麼他為什麼會要求我們設定一個檢查點呢?

為了將計算的結果儲存起來,選擇checkpoint是一個非常好的事。

我們的Streaming程式是分散式的。那麼我們的checkpoint也應該寫到分散式檔案系統中,不能寫到本地目錄。

由於我們這裡的執行模式是local模式,所以我們這裡可以將checkpoint設到本機。如果我們想用分散式的話,應該把hdfs的配置檔案放到reource目錄下,這樣他就會去找hdfs。

碼如下:

package com.test.sparkStreaming

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object SparkStreamingWordCount2 {
  val updateFunc = (it : Iterator[(String, Seq[Int], Option[Int])]) => {
           it.map(t => (t._1,t._2.sum+t._3.getOrElse(0)))
  }
  def main(args: Array[String]): Unit = {
    //這裡至少有兩個執行緒,一個執行緒拉取資料,另一個執行緒負責計算
     val conf: SparkConf = new SparkConf().setAppName("SparkStreamingWordCount").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(conf)
    //建立一個SparkStreamingContext
    val ssc: StreamingContext = new StreamingContext(sc,Seconds(2))
    //如果想要更新歷史狀態(累加),要設定checkpoint
    ssc.checkpoint("./ckp")
    //建立DStream(通過一個TCP埠拉取資料建立DStream)
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("marshal",8888)
    //對DStream進行操作
    val words: DStream[String] = lines.flatMap(_.split(" "))
    val wordAndOne: DStream[(String, Int)] = words.map((_,1))
    val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

結果如下:

其實我們這裡還可以以模式匹配的方式來寫

updateFunc函式
val updateFunc = (it : Iterator[(String, Seq[Int], Option[Int])]) => {
      it.map{case (w,s,o) => (w,s.sum + o.getOrElse(0))}
    }

這種感覺更好一點。