SparkStreaming(5):例項-處理socket源資料
阿新 • • 發佈:2018-11-08
1.實現功能:
SparkStreaming處理socket源的資料,並進行wordcount的統計。
2.scala程式碼
package Spark import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * spark Streaming 處理socket資料 * * 使用nc測試nc -lk 6789 */ object NetworkWordCount { def main(args: Array[String]): Unit = { val sparkConf=new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") /*** * 建立StreamingContext需要sparkConf和batch interval */ val ssc=new StreamingContext(sparkConf,Seconds(5)) val lines=ssc.socketTextStream("bigdata.ibeifeng.com",6789) val result= lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) result.print() ssc.start() ssc.awaitTermination() } }
3.測試
nc -lk 6789