Flink 從 0 到 1 學習之(6)如何自定義 Data Sink ?
前言
前篇文章 介紹了 Flink Data Sink,也介紹了 Flink 自帶的 Sink,那麼如何自定義自己的 Sink 呢?這篇文章將寫一個 demo 教大家將從 Kafka Source 的資料 Sink 到 MySQL 中去。
準備工作
我們先來看下 Flink 從 Kafka topic 中獲取資料的 demo,首先你需要安裝好了 FLink 和 Kafka 。
執行啟動 Flink、Zookepeer、Kafka,
好了,都啟動了!
資料庫建表
1 |
DROP TABLE IF EXISTS `student`; |
實體類
Student.java
1 |
package com.zhisheng.flink.model; |
工具類
工具類往 kafka topic student 傳送資料
1 |
import com.alibaba.fastjson.JSON; |
SinkToMySQL
該類就是 Sink Function,繼承了 RichSinkFunction ,然後重寫了裡面的方法。在 invoke 方法中將資料插入到 MySQL 中。
1 |
package com.zhisheng.flink.sink; |
Flink 程式
這裡的 source 是從 kafka 讀取資料的,然後 Flink 從 Kafka 讀取到資料(JSON)後用阿里 fastjson 來解析成 student 物件,然後在 addSink 中使用我們建立的 SinkToMySQL,這樣就可以把資料儲存到 MySQL 了。
1 |
package com.zhisheng.flink; |
結果
執行 Flink 程式,然後再執行 KafkaUtils2.java 工具類,這樣就可以了。
如果資料插入成功了,那麼我們檢視下我們的資料庫:
資料庫中已經插入了 100 條我們從 Kafka 傳送的資料了。證明我們的 SinkToMySQL 起作用了。是不是很簡單?
專案結構
怕大家不知道我的專案結構,這裡發個截圖看下:
最後
本文主要利用一個 demo,告訴大家如何自定義 Sink Function,將從 Kafka 的資料 Sink 到 MySQL 中,如果你專案中有其他的資料來源,你也可以換成對應的 Source,也有可能你的 Sink 是到其他的地方或者其他不同的方式,那麼依舊是這個套路:繼承 RichSinkFunction 抽象類,重寫 invoke 方法。