Flink 系列(三)—— Flink Data Source
一、內建 Data Source
Flink Data Source 用於定義 Flink 程式的資料來源,Flink 官方提供了多種資料獲取方法,用於幫助開發者簡單快速地構建輸入流,具體如下:
1.1 基於檔案構建
1. readTextFile(path):按照 TextInputFormat 格式讀取文字檔案,並將其內容以字串的形式返回。示例如下:
env.readTextFile(filePath).print();
複製程式碼
2. readFile(fileInputFormat,path) :按照指定格式讀取檔案。
3. readFile(inputFormat,filePath,watchType,interval,typeInformation)
- inputFormat:資料流的輸入格式。
- filePath:檔案路徑,可以是本地檔案系統上的路徑,也可以是 HDFS 上的檔案路徑。
-
watchType:讀取方式,它有兩個可選值,分別是
FileProcessingMode.PROCESS_ONCE
和FileProcessingMode.PROCESS_CONTINUOUSLY
:前者表示對指定路徑上的資料只讀取一次,然後退出;後者表示對路徑進行定期地掃描和讀取。需要注意的是如果 watchType 被設定為PROCESS_CONTINUOUSLY
,那麼當檔案被修改時,其所有的內容 (包含原有的內容和新增的內容) 都將被重新處理,因此這會打破 Flink 的 exactly-once - interval:定期掃描的時間間隔。
- typeInformation:輸入流中元素的型別。
使用示例如下:
final String filePath = "D:\\log4j.properties";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readFile(new TextInputFormat(new Path(filePath)),FileProcessingMode.PROCESS_ONCE,1,BasicTypeInfo.STRING_TYPE_INFO).print();
env.execute();
複製程式碼
1.2 基於集合構建
1. fromCollection(Collection):基於集合構建,集合中的所有元素必須是同一型別。示例如下:
env.fromCollection(Arrays.asList(1,2,3,4,5)).print();
複製程式碼
2. fromElements(T ...): 基於元素構建,所有元素必須是同一型別。示例如下:
env.fromElements(1,5).print();
複製程式碼
3. generateSequence(from,to):基於給定的序列區間進行構建。示例如下:
env.generateSequence(0,100);
複製程式碼
4. fromCollection(Iterator,Class):基於迭代器進行構建。第一個引數用於定義迭代器,第二個引數用於定義輸出元素的型別。使用示例如下:
env.fromCollection(new CustomIterator(),BasicTypeInfo.INT_TYPE_INFO).print();
複製程式碼
其中 CustomIterator 為自定義的迭代器,這裡以產生 1 到 100 區間內的資料為例,原始碼如下。需要注意的是自定義迭代器除了要實現 Iterator 介面外,還必須要實現序列化介面 Serializable ,否則會丟擲序列化失敗的異常:
import java.io.Serializable;
import java.util.Iterator;
public class CustomIterator implements Iterator<Integer>,Serializable {
private Integer i = 0;
@Override
public boolean hasNext() {
return i < 100;
}
@Override
public Integer next() {
i++;
return i;
}
}
複製程式碼
5. fromParallelCollection(SplittableIterator,Class):方法接收兩個引數,第二個引數用於定義輸出元素的型別,第一個引數 SplittableIterator 是迭代器的抽象基類,它用於將原始迭代器的值拆分到多個不相交的迭代器中。
1.3 基於 Socket 構建
Flink 提供了 socketTextStream 方法用於構建基於 Socket 的資料流,socketTextStream 方法有以下四個主要引數:
- hostname:主機名;
- port:埠號,設定為 0 時,表示埠號自動分配;
- delimiter:用於分隔每條記錄的分隔符;
- maxRetry:當 Socket 臨時關閉時,程式的最大重試間隔,單位為秒。設定為 0 時表示不進行重試;設定為負值則表示一直重試。示例如下:
env.socketTextStream("192.168.0.229",9999,"\n",3).print();
複製程式碼
二、自定義 Data Source
2.1 SourceFunction
除了內建的資料來源外,使用者還可以使用 addSource
方法來新增自定義的資料來源。自定義的資料來源必須要實現 SourceFunction 介面,這裡以產生 [0,1000) 區間內的資料為例,程式碼如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SourceFunction<Long>() {
private long count = 0L;
private volatile boolean isRunning = true;
public void run(SourceContext<Long> ctx) {
while (isRunning && count < 1000) {
// 通過collect將輸入傳送出去
ctx.collect(count);
count++;
}
}
public void cancel() {
isRunning = false;
}
}).print();
env.execute();
複製程式碼
2.2 ParallelSourceFunction 和 RichParallelSourceFunction
上面通過 SourceFunction 實現的資料來源是不具有並行度的,即不支援在得到的 DataStream 上呼叫 setParallelism(n)
方法,此時會丟擲如下的異常:
Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
複製程式碼
如果你想要實現具有並行度的輸入流,則需要實現 ParallelSourceFunction 或 RichParallelSourceFunction 介面,其與 SourceFunction 的關係如下圖:
ParallelSourceFunction 直接繼承自 ParallelSourceFunction,具有並行度的功能。RichParallelSourceFunction 則繼承自 AbstractRichFunction,同時實現了 ParallelSourceFunction 介面,所以其除了具有並行度的功能外,還提供了額外的與生命週期相關的方法,如 open() ,closen() 。三、Streaming Connectors
3.1 內建聯結器
除了自定義資料來源外, Flink 還內建了多種聯結器,用於滿足大多數的資料收集場景。當前內建聯結器的支援情況如下:
- Apache Kafka (支援 source 和 sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Google PubSub (source/sink)
除了上述的聯結器外,你還可以通過 Apache Bahir 的聯結器擴充套件 Flink。Apache Bahir 旨在為分散式資料分析系統 (如 Spark,Flink) 等提供功能上的擴充套件,當前其支援的與 Flink 相關的聯結器如下:
- Apache ActiveMQ (source/sink)
- Apache Flume (sink)
- Redis (sink)
- Akka (sink)
- Netty (source)
隨著 Flink 的不斷髮展,可以預見到其會支援越來越多型別的聯結器,關於聯結器的後續發展情況,可以檢視其官方檔案:Streaming Connectors 。在所有 DataSource 聯結器中,使用的廣泛的就是 Kafka,所以這裡我們以其為例,來介紹 Connectors 的整合步驟。
3.2 整合 Kakfa
1. 匯入依賴
整合 Kafka 時,一定要注意所使用的 Kafka 的版本,不同版本間所需的 Maven 依賴和開發時所呼叫的類均不相同,具體如下:
Maven 依賴 | Flink 版本 | Consumer and Producer 類的名稱 | Kafka 版本 |
---|---|---|---|
flink-connector-kafka-0.8_2.11 | 1.0.0 + | FlinkKafkaConsumer08 FlinkKafkaProducer08 |
0.8.x |
flink-connector-kafka-0.9_2.11 | 1.0.0 + | FlinkKafkaConsumer09 FlinkKafkaProducer09 |
0.9.x |
flink-connector-kafka-0.10_2.11 | 1.2.0 + | FlinkKafkaConsumer010 FlinkKafkaProducer010 |
0.10.x |
flink-connector-kafka-0.11_2.11 | 1.4.0 + | FlinkKafkaConsumer011 FlinkKafkaProducer011 |
0.11.x |
flink-connector-kafka_2.11 | 1.7.0 + | FlinkKafkaConsumer FlinkKafkaProducer |
>= 1.0.0 |
這裡我使用的 Kafka 版本為 kafka_2.12-2.2.0,新增的依賴如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.0</version>
</dependency>
複製程式碼
2. 程式碼開發
這裡以最簡單的場景為例,接收 Kafka 上的資料並列印,程式碼如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
// 指定Kafka的連線位置
properties.setProperty("bootstrap.servers","hadoop001:9092");
// 指定監聽的主題,並定義Kafka位元組訊息到Flink物件之間的轉換規則
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic",new SimpleStringSchema(),properties));
stream.print();
env.execute("Flink Streaming");
複製程式碼
3.3 整合測試
1. 啟動 Kakfa
Kafka 的執行依賴於 zookeeper,需要預先啟動,可以啟動 Kafka 內建的 zookeeper,也可以啟動自己安裝的:
# zookeeper啟動命令
bin/zkServer.sh start
# 內建zookeeper啟動命令
bin/zookeeper-server-start.sh config/zookeeper.properties
複製程式碼
啟動單節點 kafka 用於測試:
# bin/kafka-server-start.sh config/server.properties
複製程式碼
2. 建立 Topic
# 建立用於測試主題
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 \
--partitions 1 \
--topic flink-stream-in-topic
# 檢視所有主題
bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
複製程式碼
3. 啟動 Producer
這裡 啟動一個 Kafka 生產者,用於傳送測試資料:
bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic flink-stream-in-topic
複製程式碼
4. 測試結果
在 Producer 上輸入任意測試資料,之後觀察程式控制臺的輸出:
程式控制臺的輸出如下: 可以看到已經成功接收並打印出相關的資料。參考資料
- data-sources:ci.apache.org/projects/fl…
- Streaming Connectors:ci.apache.org/projects/fl…
- Apache Kafka Connector: ci.apache.org/projects/fl…
更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南