大資料學習之路95-SparkStreaming寫WordCount
阿新 • • 發佈:2018-11-09
程式如下:
package com.test.sparkStreaming import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object SparkStreamingWordCount { def main(args: Array[String]): Unit = { //這裡至少有兩個執行緒,一個執行緒拉取資料,另一個執行緒負責計算 val conf: SparkConf = new SparkConf().setAppName("SparkStreamingWordCount").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) //建立一個SparkStreamingContext val ssc: StreamingContext = new StreamingContext(sc,Seconds(2)) //建立DStream(通過一個TCP埠拉取資料建立DStream) val lines: ReceiverInputDStream[String] = ssc.socketTextStream("marshal",8888) //對DStream進行操作 val words: DStream[String] = lines.flatMap(_.split(" ")) val wordAndOne: DStream[(String, Int)] = words.map((_,1)) val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) result.print() ssc.start() ssc.awaitTermination() } }
我們先啟動netcat:
nc -lk 8888
然後執行程式:
但是這樣的程式有一個缺點,就是它每換一行就會重新統計
接下來我們寫一個可以更新狀態的SparkStreamingWordCount
我們需要在原來的程式中修改reduceByKey為 updateStateByKey,而這個函式需要三個引數:
分別為一個函式,一個分割槽器,一個Boolean值
這裡的分割槽器我們使用spark的HashPartitioner,這個分割槽器我們需要傳一個分割槽數量的引數進去。
這裡我們傳一個預設的並行度進去,他原來有幾個分割槽我們就傳幾個分割槽。接下來我們還需要傳一個true,這個true的作用就是讓他記住以前的分割槽。
關鍵就是這個函式如何寫:
既然是函式就有輸入有返回。我們愛看一下他的輸入是什麼?
我們可以看到這裡傳入的是一個迭代器:
這個迭代器中的三個引數:
第一個:key即單詞
第二個:代表當前批次,該單詞出現的次數
第三個引數代表初始值或者是以前累加過的值。
寫完這些其實就差不多了,但是還會報錯如下:
那麼他為什麼會要求我們設定一個檢查點呢?
為了將計算的結果儲存起來,選擇checkpoint是一個非常好的事。
我們的Streaming程式是分散式的。那麼我們的checkpoint也應該寫到分散式檔案系統中,不能寫到本地目錄。
由於我們這裡的執行模式是local模式,所以我們這裡可以將checkpoint設到本機。如果我們想用分散式的話,應該把hdfs的配置檔案放到reource目錄下,這樣他就會去找hdfs。
碼如下:
package com.test.sparkStreaming
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object SparkStreamingWordCount2 {
val updateFunc = (it : Iterator[(String, Seq[Int], Option[Int])]) => {
it.map(t => (t._1,t._2.sum+t._3.getOrElse(0)))
}
def main(args: Array[String]): Unit = {
//這裡至少有兩個執行緒,一個執行緒拉取資料,另一個執行緒負責計算
val conf: SparkConf = new SparkConf().setAppName("SparkStreamingWordCount").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
//建立一個SparkStreamingContext
val ssc: StreamingContext = new StreamingContext(sc,Seconds(2))
//如果想要更新歷史狀態(累加),要設定checkpoint
ssc.checkpoint("./ckp")
//建立DStream(通過一個TCP埠拉取資料建立DStream)
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("marshal",8888)
//對DStream進行操作
val words: DStream[String] = lines.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
結果如下:
其實我們這裡還可以以模式匹配的方式來寫
updateFunc函式
val updateFunc = (it : Iterator[(String, Seq[Int], Option[Int])]) => {
it.map{case (w,s,o) => (w,s.sum + o.getOrElse(0))}
}
這種感覺更好一點。