1. 程式人生 > >雲星資料---Apache Flink實戰系列(精品版)】:Flink流處理API詳解與程式設計實戰002-Flink基於流的wordcount示例002

雲星資料---Apache Flink實戰系列(精品版)】:Flink流處理API詳解與程式設計實戰002-Flink基於流的wordcount示例002

三、基於socket的wordcount

1.傳送資料

1.傳送資料命令
    nc -lk 9999 
2.傳送資料內容
    good good study
    day day up

2.處理資料

2.1執行程式


package code.book.stream.socketwc

//0.引用必要的元素
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object SocketWC {
  def main(args: Array[String]): Unit = {
    //1.
建立執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.定義資料流來源 val text = env.socketTextStream("qingcheng11", 9999) //3.進行wordcount計算 val counts = text.flatMap(_.toLowerCase.split("\\W+") filter (_.nonEmpty)) .map((_, 1)) .keyBy(0) .timeWindow(Time.seconds
(5)) .sum(1) //4.列印結果 counts.print //觸發計算 env.execute("Window Stream WordCount") } }

2.2執行效果

四、基於kafka的wordcount

1.準備資料

1.1啟動kafka叢集

ssh root@qingcheng11 "${KAFKA_HOME}/bin/kafka-server-start.sh 
${KAFKA_HOME}/config/server.properties  > /dev/null 2>&1 &"
ssh root@qingcheng12 "${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties > /dev/null 2>&1 &" ssh root@qingcheng13 "${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties > /dev/null 2>&1 &"

1.2傳送資料

1.傳送資料的命令
${KAFKA_HOME}/bin/kafka-console-producer.sh
--topic food 
--broker-list qingcheng11:9092,qingcheng12:9092,qingcheng13:9092

2.傳送資料的內容
spark hadoop flink
flink spark storm

2.處理資料

2.1新增maven依賴

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
    <version>1.1.3</version>
</dependency>

2.2執行程式

package code.book.stream.streamwc

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.log4j.{Level, Logger}


object FlinkKafkaStreamingWC {
  def main(args: Array[String]) {
    //1.關閉日誌,可以減少不必要的日誌輸出
    Logger.getLogger("org").setLevel(Level.OFF)

    //2指定kafka資料流的相關資訊
    val zkCluster="qingcheng11,qingcheng12,qingcheng13:2181"
    val kafkaCluster="qingcheng11:9092,qingcheng12:9092,qingcheng13:9092"
    val kafkaTopicName = "food"
    //3.建立流處理環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //4.建立kafka資料流
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", kafkaCluster)
    properties.setProperty("zookeeper.connect",zkCluster )
    properties.setProperty("group.id", kafkaTopicName)
    val kafka09=new FlinkKafkaConsumer09[String](kafkaTopicName,new SimpleStringSchema(),properties)
    val text = env.addSource(kafka09).setParallelism(4)

    //5.執行運算
    val counts = text.flatMap(_.toLowerCase.split("\\W+")).map((_, 1)).keyBy(0).sum(1)
    counts.print()
    //6.觸發運算
    env.execute("flink-kafka-wordcunt")
  }
}

2.3執行效果