1. 程式人生 > 實用技巧 >Flink基礎(二):快速上手

Flink基礎(二):快速上手

1 搭建 maven 工程 FlinkTutorial

1.1 pom 檔案
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</
modelVersion> <groupId>com.atguigu.flink</groupId> <artifactId>FlinkTutorial</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.7.2</
version> </dependency> </dependencies> <build> <plugins> <!-- 該外掛用於將 Scala 程式碼編譯成 class 檔案 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <!-- 宣告繫結到 maven 的 compile 階段 --> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
View Code 1.2 新增 scala 框架 和 scala 資料夾

2 批處理 wordcount

src/main/scala/com.atguigu.wc/WordCount.scala
object WordCount {
 def main(args: Array[String]): Unit = {
 // 建立執行環境
 val env = ExecutionEnvironment.getExecutionEnvironment
 // 從檔案中讀取資料
 val inputPath = "D:\\Projects\\BigData\\TestWC1\\src\\main\\resources\\hello.txt"
 val inputDS: DataSet[String] = env.readTextFile(inputPath)
 // 分詞之後,對單詞進行 groupby 分組,然後用 sum 進行聚合
 val wordCountDS: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split(" 
")).map((_, 1)).groupBy(0).sum(1)
 // 列印輸出
 wordCountDS.print()
 } }
View Code 注意:Flink 程式支援 java 和 scala 兩種語言,本課程中以 scala 語言為主。在引入包中,有 java 和 scala 兩種包時注意要使用 scala 的包。

3 流處理 wordcount

src/main/scala/com.atguigu.wc/StreamWordCount.scala
object StreamWordCount {
 def main(args: Array[String]): Unit = {
 // 從外部命令中獲取引數
 val params: ParameterTool = ParameterTool.fromArgs(args)
 val host: String = params.get("host")
 val port: Int = params.getInt("port")
 // 建立流處理環境
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 // 接收 socket 文字流
 val textDstream: DataStream[String] = env.socketTextStream(host, port)
 // flatMap 和 Map 需要引用的隱式轉換
 import org.apache.flink.api.scala._
 val dataStream: DataStream[(String, Int)] = 
textDstream.flatMap(_.split("\\s")).filter(_.nonEmpty).map((_, 1)).keyBy(0).sum(1)
 dataStream.print().setParallelism(1)
 // 啟動 executor,執行任務
 env.execute("Socket stream word count")
 } }
View Code 測試——在 linux 系統中用 netcat 命令進行傳送測試。
nc -lk 7777