1. 程式人生 > 實用技巧 >Flink 從 0 到 1 學習之(3) Data Source 介紹

Flink 從 0 到 1 學習之(3) Data Source 介紹

前言

Data Sources 是什麼呢?就字面意思其實就可以知道:資料來源。

Flink 做為一款流式計算框架,它可用來做批處理,即處理靜態的資料集、歷史的資料集;也可以用來做流處理,即實時的處理些實時資料流,實時的產生資料流結果,只要資料來源源不斷的過來,Flink 就能夠一直計算下去,這個 Data Sources 就是資料的來源地。

Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 來為你的程式新增資料來源。

Flink 已經提供了若干實現好了的 source functions,當然你也可以通過實現 SourceFunction 來自定義非並行的 source 或者實現 ParallelSourceFunction 介面或者擴充套件 RichParallelSourceFunction 來自定義並行的 source,

StreamExecutionEnvironment 中可以使用以下幾個已實現的 stream sources,

總的來說可以分為下面幾大類:

基於集合

1、fromCollection(Collection) - 從 Java 的 Java.util.Collection 建立資料流。集合中的所有元素型別必須相同。

2、fromCollection(Iterator, Class) - 從一個迭代器中建立資料流。Class 指定了該迭代器返回元素的型別。

3、fromElements(T …) - 從給定的物件序列中建立資料流。所有物件型別必須相同。

1

2
3
4
5
6
7
8
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Event> input = env.fromElements(
new Event(1, "barfoo", 1.0),
new Event(2, "start", 2.0),
new Event(3, "foobar", 3.0),
...
);

4、fromParallelCollection(SplittableIterator, Class) - 從一個迭代器中建立並行資料流。Class 指定了該迭代器返回元素的型別。

5、generateSequence(from, to) - 建立一個生成指定區間範圍內的數字序列的並行資料流。

基於檔案

1、readTextFile(path) - 讀取文字檔案,即符合 TextInputFormat 規範的檔案,並將其作為字串返回。

1
2
3
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");

2、readFile(fileInputFormat, path) - 根據指定的檔案輸入格式讀取檔案(一次)。

3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 這是上面兩個方法內部呼叫的方法。它根據給定的 fileInputFormat 和讀取路徑讀取檔案。根據提供的 watchType,這個 source 可以定期(每隔 interval 毫秒)監測給定路徑的新資料(FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理一次路徑對應檔案的資料並退出(FileProcessingMode.PROCESS_ONCE)。你可以通過 pathFilter 進一步排除掉需要處理的檔案。

1
2
3
4
5
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);

實現:

在具體實現上,Flink 把檔案讀取過程分為兩個子任務,即目錄監控和資料讀取。每個子任務都由單獨的實體實現。目錄監控由單個非並行(並行度為1)的任務執行,而資料讀取由並行執行的多個任務執行。後者的並行性等於作業的並行性。單個目錄監控任務的作用是掃描目錄(根據 watchType 定期掃描或僅掃描一次),查詢要處理的檔案並把檔案分割成切分片(splits),然後將這些切分片分配給下游 reader。reader 負責讀取資料。每個切分片只能由一個 reader 讀取,但一個 reader 可以逐個讀取多個切分片。

重要注意:

如果 watchType 設定為 FileProcessingMode.PROCESS_CONTINUOUSLY,則當檔案被修改時,其內容將被重新處理。這會打破“exactly-once”語義,因為在檔案末尾附加資料將導致其所有內容被重新處理。

如果 watchType 設定為 FileProcessingMode.PROCESS_ONCE,則 source 僅掃描路徑一次然後退出,而不等待 reader 完成檔案內容的讀取。當然 reader 會繼續閱讀,直到讀取所有的檔案內容。關閉 source 後就不會再有檢查點。這可能導致節點故障後的恢復速度較慢,因為該作業將從最後一個檢查點恢復讀取。

基於 Socket:

socketTextStream(String hostname, int port) - 從 socket 讀取。元素可以用分隔符切分。

1
2
3
4
5
6
7
8
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999) // 監聽 localhost 的 9999 埠過來的資料
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);

自定義:

addSource - 新增一個新的 source function。例如,你可以 addSource(new FlinkKafkaConsumer011<>(…)) 以從 Apache Kafka 讀取資料

說下上面幾種的特點吧

1、基於集合:有界資料集,更偏向於本地測試用

2、基於檔案:適合監聽檔案修改並讀取其內容

3、基於 Socket:監聽主機的 host port,從 Socket 中獲取資料

4、自定義 addSource:大多數的場景資料都是無界的,會源源不斷的過來。比如去消費 Kafka 某個 topic 上的資料,這時候就需要用到這個 addSource,可能因為用的比較多的原因吧,Flink 直接提供了 FlinkKafkaConsumer011 等類可供你直接使用。你可以去看看 FlinkKafkaConsumerBase 這個基礎類,它是 Flink Kafka 消費的最根本的類。

1
2
3
4
5
6
7
8
9
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer011<>(
parameterTool.getRequired("input-topic"), //從引數中獲取傳進來的 topic
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()));

Flink 目前支援如下圖裡面常見的 Source:

如果你想自己自定義自己的 Source 呢?

那麼你就需要去了解一下 SourceFunction 介面了,它是所有 stream source 的根介面,它繼承自一個標記介面(空介面)Function。

SourceFunction 定義了兩個介面方法:

1、run : 啟動一個 source,即對接一個外部資料來源然後 emit 元素形成 stream(大部分情況下會通過在該方法裡執行一個 while 迴圈的形式來產生 stream)。

2、cancel : 取消一個 source,也即將 run 中的迴圈 emit 元素的行為終止。

正常情況下,一個 SourceFunction 實現這兩個介面方法就可以了。其實這兩個介面方法也固定了一種實現模板。

比如,實現一個 XXXSourceFunction,那麼大致的模板是這樣的:(直接拿 FLink 原始碼的例項給你看看)

最後

本文主要講了下 Flink 的常見 Source 有哪些並且簡單的提了下如何自定義 Source。

關注我

轉載請務必註明原創地址為:http://www.54tianzhisheng.cn/2018/10/28/flink-sources/

微信公眾號:zhisheng

另外我自己整理了些 Flink 的學習資料,目前已經全部放到微信公眾號(zhisheng)了,你可以回覆關鍵字:Flink 即可無條件獲取到。另外也可以加我微信 你可以加我的微信:yuanblog_tzs,探討技術!

更多私密資料請加入知識星球!

專欄介紹

掃碼下面專欄二維碼可以訂閱該專欄

首發地址:http://www.54tianzhisheng.cn/2019/11/15/flink-in-action/

專欄地址:https://gitbook.cn/gitchat/column/5dad4a20669f843a1a37cb4f

Github 程式碼倉庫

https://github.com/zhisheng17/flink-learning/

以後這個專案的所有程式碼都將放在這個倉庫裡,包含了自己學習 flink 的一些 demo 和部落格

部落格

1、Flink 從0到1學習 —— Apache Flink 介紹

2、Flink 從0到1學習 —— Mac 上搭建 Flink 1.6.0 環境並構建執行簡單程式入門

3、Flink 從0到1學習 —— Flink 配置檔案詳解

4、Flink 從0到1學習 —— Data Source 介紹

5、Flink 從0到1學習 —— 如何自定義 Data Source ?

6、Flink 從0到1學習 —— Data Sink 介紹

7、Flink 從0到1學習 —— 如何自定義 Data Sink ?

8、Flink 從0到1學習 —— Flink Data transformation(轉換)

9、Flink 從0到1學習 —— 介紹 Flink 中的 Stream Windows

10、Flink 從0到1學習 —— Flink 中的幾種 Time 詳解

11、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 ElasticSearch

12、Flink 從0到1學習 —— Flink 專案如何執行?

13、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 Kafka

14、Flink 從0到1學習 —— Flink JobManager 高可用性配置

15、Flink 從0到1學習 —— Flink parallelism 和 Slot 介紹

16、Flink 從0到1學習 —— Flink 讀取 Kafka 資料批量寫入到 MySQL

17、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 RabbitMQ

18、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 HBase

19、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 HDFS

20、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 Redis

21、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 Cassandra

22、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 Flume

23、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 InfluxDB

24、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 RocketMQ

25、Flink 從0到1學習 —— 你上傳的 jar 包藏到哪裡去了

26、Flink 從0到1學習 —— 你的 Flink job 日誌跑到哪裡去了

27、阿里巴巴開源的 Blink 實時計算框架真香

28、Flink 從0到1學習 —— Flink 中如何管理配置?

29、Flink 從0到1學習—— Flink 不可以連續 Split(分流)?

30、Flink 從0到1學習—— 分享四本 Flink 國外的書和二十多篇 Paper 論文

31、Flink 架構、原理與部署測試

32、為什麼說流處理即未來?

33、OPPO 資料中臺之基石:基於 Flink SQL 構建實時資料倉庫

34、流計算框架 Flink 與 Storm 的效能對比

35、Flink狀態管理和容錯機制介紹

36、Apache Flink 結合 Kafka 構建端到端的 Exactly-Once 處理

37、360深度實踐:Flink與Storm協議級對比

38、如何基於Flink+TensorFlow打造實時智慧異常檢測平臺?只看這一篇就夠了

39、Apache Flink 1.9 重大特性提前解讀

40、Flink 全網最全資源(視訊、部落格、PPT、入門、實戰、原始碼解析、問答等持續更新)

41、Flink 靈魂兩百問,這誰頂得住?

42、Flink 從0到1學習 —— 如何使用 Side Output 來分流?

43、你公司到底需不需要引入實時計算引擎?

44、一文讓你徹底瞭解大資料實時計算引擎 Flink

原始碼解析

1、Flink 原始碼解析 —— 原始碼編譯執行

2、Flink 原始碼解析 —— 專案結構一覽

3、Flink 原始碼解析—— local 模式啟動流程

4、Flink 原始碼解析 —— standalone session 模式啟動流程

5、Flink 原始碼解析 —— Standalone Session Cluster 啟動流程深度分析之 Job Manager 啟動

6、Flink 原始碼解析 —— Standalone Session Cluster 啟動流程深度分析之 Task Manager 啟動

7、Flink 原始碼解析 —— 分析 Batch WordCount 程式的執行過程

8、Flink 原始碼解析 —— 分析 Streaming WordCount 程式的執行過程

9、Flink 原始碼解析 —— 如何獲取 JobGraph?

10、Flink 原始碼解析 —— 如何獲取 StreamGraph?

11、Flink 原始碼解析 —— Flink JobManager 有什麼作用?

12、Flink 原始碼解析 —— Flink TaskManager 有什麼作用?

13、Flink 原始碼解析 —— JobManager 處理 SubmitJob 的過程

14、Flink 原始碼解析 —— TaskManager 處理 SubmitJob 的過程

15、Flink 原始碼解析 —— 深度解析 Flink Checkpoint 機制

16、Flink 原始碼解析 —— 深度解析 Flink 序列化機制

17、Flink 原始碼解析 —— 深度解析 Flink 是如何管理好記憶體的?

18、Flink Metrics 原始碼解析 —— Flink-metrics-core

19、Flink Metrics 原始碼解析 —— Flink-metrics-datadog

20、Flink Metrics 原始碼解析 —— Flink-metrics-dropwizard

21、Flink Metrics 原始碼解析 —— Flink-metrics-graphite

22、Flink Metrics 原始碼解析 —— Flink-metrics-influxdb

23、Flink Metrics 原始碼解析 —— Flink-metrics-jmx

24、Flink Metrics 原始碼解析 —— Flink-metrics-slf4j

25、Flink Metrics 原始碼解析 —— Flink-metrics-statsd

26、Flink Metrics 原始碼解析 —— Flink-metrics-prometheus

26、Flink Annotations 原始碼解析

27、Flink 原始碼解析 —— 如何獲取 ExecutionGraph ?

28、大資料重磅炸彈——實時計算框架 Flink

29、Flink Checkpoint-輕量級分散式快照

30、Flink Clients 原始碼解析

×

純屬好玩

掃碼打賞,你說多少就多少

開啟支付寶掃一掃,即可進行掃碼打賞哦

閱讀全文