1. 程式人生 > >SparkStreaming(5):例項-處理socket源資料

SparkStreaming(5):例項-處理socket源資料

1.實現功能:

SparkStreaming處理socket源的資料,並進行wordcount的統計。

 

2.scala程式碼

package Spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * spark Streaming 處理socket資料
  *
  * 使用nc測試nc -lk 6789
  */
object NetworkWordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf=new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    /***
      * 建立StreamingContext需要sparkConf和batch interval
      */
    val ssc=new StreamingContext(sparkConf,Seconds(5))

    val lines=ssc.socketTextStream("bigdata.ibeifeng.com",6789)

    val result= lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    result.print()


    ssc.start()
    ssc.awaitTermination()
  }
}

3.測試

nc -lk 6789