Flink 入門案例二(參考官方文件)
阿新 • • 發佈:2021-01-11
技術標籤:flink
Flink 入門案例
具有一定實際意義的流處理程式。
結合信用卡欺詐驗證場景,實現的具體Demo。
- 定義程式資料流
package com.sanxiau;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* 定義了程式的資料流
*
* Skeleton code for the datastream walkthrough
*/
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
// 設定執行環境。 任務執行環境用於定義任務的屬性、建立資料來源以及最終啟動任務的執行。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 建立資料來源 官方案例中存在資料
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions" );
// 對事件分割槽 & 欺詐檢測
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId) //keyBy對流進行分割槽,保證同一個task處理同一個的key的所有資料
.process(new FraudDetectorTwo()) //對流綁定了一個操作,這個操作將會對流上的每一個訊息呼叫所定義好的函式。
.name("fraud-detector");
// 輸出結果
alerts //sink 會將 DataStream 寫出到外部系統
.addSink(new AlertSink()) // AlertSink 使用 INFO 的日誌級別列印每一個 Alert 的資料記錄,而不是將其寫入持久儲存,以便你可以方便地檢視結果。
.name("send-alerts");
// 執行作業 Flink 程式是懶載入的,並且只有在完全搭建好之後,才能夠釋出到叢集上執行。
// 呼叫execute 時給任務傳遞一個任務名引數,就可以開始執行任務。
env.execute("Fraud Detection");
}
}
- 資料具體處理邏輯
package com.sanxiau;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
/**
* 對於每筆交易,欺詐檢測器都會檢查該帳戶的標記狀態
*/
public class FraudDetectorTwo extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
/**
* 小額 交易
*/
private static final double SMALL_AMOUNT = 1.00;
/**
* 大額 交易
*/
private static final double LARGE_AMOUNT = 500.00;
/**
* 定時器時間
*/
private static final long ONE_MINUTE = 60 * 1000;
/**
* 狀態型別是 ValueState,這是一種能夠為被其封裝的變數新增容錯能力的型別
*
* ValueState 是一個包裝類,類似於 Java 標準庫裡邊的 AtomicReference 和 AtomicLong
* update 用於更新狀態
* value 用於獲取狀態值
* lear 用於清空狀態。
*
* ValueState 的作用域始終限於當前的 key,ValueState 是允許空值的
* ValueState<Boolean> 實際上有 3 種狀態
* unset (null),true,和 false
*/
private transient ValueState<Boolean> flagState;
/**
* 定時器狀態
*/
private transient ValueState<Long> timerState;
/**
*
* 初始化 flag 和 timerState 狀態
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timer-state",
Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}
/**
*
* 具體 流資料 邏輯處理
*
* @param transaction
* @param context
* @param collector
* @throws Exception
*/
@Override
public void processElement(Transaction transaction, Context context, Collector<Alert> collector) throws Exception {
// Get the current state for the current key
Boolean lastTransactionWasSmall = flagState.value();
// Check if the flag is set
// 最後一筆交易狀態 不為null,且數額大於 LARGE_AMOUNT
if (lastTransactionWasSmall != null) {
if (transaction.getAmount() > LARGE_AMOUNT) {
//Output an alert downstream
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
// Clean up our state
// 標記狀態被重置時,刪除定時器
cleanUp(context);
}
// 交易數額小於 SMALL_AMOUNT 設定定時器,進行欺詐驗證檢測
if (transaction.getAmount() < SMALL_AMOUNT) {
// set the flag to true
flagState.update(true);
// 標記狀態設定為true時,設定當前時間一分鐘後觸發的定時器。
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
}
/**
*
* 當定時器被觸發時,重置標記狀態。
*
* @param timestamp
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
// remove flag after 1 minute
timerState.clear();
flagState.clear();
}
/**
* 標記狀態被重置時,刪除定時器
*
* @param ctx
* @throws Exception
*/
private void cleanUp(Context ctx) throws Exception {
// delete timer
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
// clean up all state
timerState.clear();
flagState.clear();
}
}
- 執行結果