雲星資料---Apache Flink實戰系列(精品版)】:Flink流處理API詳解與程式設計實戰002-Flink基於流的wordcount示例002
阿新 • • 發佈:2019-01-28
三、基於socket的wordcount
1.傳送資料
1.傳送資料命令
nc -lk 9999
2.傳送資料內容
good good study
day day up
2.處理資料
2.1執行程式
package code.book.stream.socketwc
//0.引用必要的元素
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object SocketWC {
def main(args: Array[String]): Unit = {
//1. 建立執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.定義資料流來源
val text = env.socketTextStream("qingcheng11", 9999)
//3.進行wordcount計算
val counts = text.flatMap(_.toLowerCase.split("\\W+") filter (_.nonEmpty))
.map((_, 1))
.keyBy(0)
.timeWindow(Time.seconds (5))
.sum(1)
//4.列印結果
counts.print
//觸發計算
env.execute("Window Stream WordCount")
}
}
2.2執行效果
四、基於kafka的wordcount
1.準備資料
1.1啟動kafka叢集
ssh root@qingcheng11 "${KAFKA_HOME}/bin/kafka-server-start.sh
${KAFKA_HOME}/config/server.properties > /dev/null 2>&1 &"
ssh root@qingcheng12 "${KAFKA_HOME}/bin/kafka-server-start.sh
${KAFKA_HOME}/config/server.properties > /dev/null 2>&1 &"
ssh root@qingcheng13 "${KAFKA_HOME}/bin/kafka-server-start.sh
${KAFKA_HOME}/config/server.properties > /dev/null 2>&1 &"
1.2傳送資料
1.傳送資料的命令
${KAFKA_HOME}/bin/kafka-console-producer.sh
--topic food
--broker-list qingcheng11:9092,qingcheng12:9092,qingcheng13:9092
2.傳送資料的內容
spark hadoop flink
flink spark storm
2.處理資料
2.1新增maven依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>1.1.3</version>
</dependency>
2.2執行程式
package code.book.stream.streamwc
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.log4j.{Level, Logger}
object FlinkKafkaStreamingWC {
def main(args: Array[String]) {
//1.關閉日誌,可以減少不必要的日誌輸出
Logger.getLogger("org").setLevel(Level.OFF)
//2指定kafka資料流的相關資訊
val zkCluster="qingcheng11,qingcheng12,qingcheng13:2181"
val kafkaCluster="qingcheng11:9092,qingcheng12:9092,qingcheng13:9092"
val kafkaTopicName = "food"
//3.建立流處理環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//4.建立kafka資料流
val properties = new Properties()
properties.setProperty("bootstrap.servers", kafkaCluster)
properties.setProperty("zookeeper.connect",zkCluster )
properties.setProperty("group.id", kafkaTopicName)
val kafka09=new FlinkKafkaConsumer09[String](kafkaTopicName,new SimpleStringSchema(),properties)
val text = env.addSource(kafka09).setParallelism(4)
//5.執行運算
val counts = text.flatMap(_.toLowerCase.split("\\W+")).map((_, 1)).keyBy(0).sum(1)
counts.print()
//6.觸發運算
env.execute("flink-kafka-wordcunt")
}
}