Flink基礎(二):快速上手
阿新 • • 發佈:2020-08-03
1 搭建 maven 工程 FlinkTutorial
1.1 pom 檔案<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</View Code 1.2 新增 scala 框架 和 scala 資料夾modelVersion> <groupId>com.atguigu.flink</groupId> <artifactId>FlinkTutorial</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId><version>1.7.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.7.2</version> </dependency> </dependencies> <build> <plugins> <!-- 該外掛用於將 Scala 程式碼編譯成 class 檔案 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <!-- 宣告繫結到 maven 的 compile 階段 --> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
2 批處理 wordcount
src/main/scala/com.atguigu.wc/WordCount.scalaobject WordCount { def main(args: Array[String]): Unit = { // 建立執行環境 val env = ExecutionEnvironment.getExecutionEnvironment // 從檔案中讀取資料 val inputPath = "D:\\Projects\\BigData\\TestWC1\\src\\main\\resources\\hello.txt" val inputDS: DataSet[String] = env.readTextFile(inputPath) // 分詞之後,對單詞進行 groupby 分組,然後用 sum 進行聚合 val wordCountDS: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1) // 列印輸出 wordCountDS.print() } }View Code 注意:Flink 程式支援 java 和 scala 兩種語言,本課程中以 scala 語言為主。在引入包中,有 java 和 scala 兩種包時注意要使用 scala 的包。
3 流處理 wordcount
src/main/scala/com.atguigu.wc/StreamWordCount.scalaobject StreamWordCount { def main(args: Array[String]): Unit = { // 從外部命令中獲取引數 val params: ParameterTool = ParameterTool.fromArgs(args) val host: String = params.get("host") val port: Int = params.getInt("port") // 建立流處理環境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 接收 socket 文字流 val textDstream: DataStream[String] = env.socketTextStream(host, port) // flatMap 和 Map 需要引用的隱式轉換 import org.apache.flink.api.scala._ val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s")).filter(_.nonEmpty).map((_, 1)).keyBy(0).sum(1) dataStream.print().setParallelism(1) // 啟動 executor,執行任務 env.execute("Socket stream word count") } }View Code 測試——在 linux 系統中用 netcat 命令進行傳送測試。
nc -lk 7777