1. 程式人生 > 其它 >Flink 入門案例二(參考官方文件)

Flink 入門案例二(參考官方文件)

技術標籤:flink

Flink 入門案例

具有一定實際意義的流處理程式。
結合信用卡欺詐驗證場景,實現的具體Demo。

  • 定義程式資料流

package com.sanxiau;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import
org.apache.flink.walkthrough.common.entity.Alert; import org.apache.flink.walkthrough.common.entity.Transaction; import org.apache.flink.walkthrough.common.source.TransactionSource; /** * 定義了程式的資料流 * * Skeleton code for the datastream walkthrough */ public class FraudDetectionJob { public static
void main(String[] args) throws Exception { // 設定執行環境。 任務執行環境用於定義任務的屬性、建立資料來源以及最終啟動任務的執行。 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 建立資料來源 官方案例中存在資料 DataStream<Transaction> transactions = env .addSource(new TransactionSource()) .name("transactions"
); // 對事件分割槽 & 欺詐檢測 DataStream<Alert> alerts = transactions .keyBy(Transaction::getAccountId) //keyBy對流進行分割槽,保證同一個task處理同一個的key的所有資料 .process(new FraudDetectorTwo()) //對流綁定了一個操作,這個操作將會對流上的每一個訊息呼叫所定義好的函式。 .name("fraud-detector"); // 輸出結果 alerts //sink 會將 DataStream 寫出到外部系統 .addSink(new AlertSink()) // AlertSink 使用 INFO 的日誌級別列印每一個 Alert 的資料記錄,而不是將其寫入持久儲存,以便你可以方便地檢視結果。 .name("send-alerts"); // 執行作業 Flink 程式是懶載入的,並且只有在完全搭建好之後,才能夠釋出到叢集上執行。 // 呼叫execute 時給任務傳遞一個任務名引數,就可以開始執行任務。 env.execute("Fraud Detection"); } }
  • 資料具體處理邏輯
package com.sanxiau;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;


/**
 * 對於每筆交易,欺詐檢測器都會檢查該帳戶的標記狀態
 */
public class FraudDetectorTwo  extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    /**
     * 小額 交易
     */
    private static final double SMALL_AMOUNT = 1.00;
    /**
     * 大額 交易
     */
    private static final double LARGE_AMOUNT = 500.00;
    /**
     *  定時器時間
     */
    private static final long ONE_MINUTE = 60 * 1000;


    /**
     *  狀態型別是 ValueState,這是一種能夠為被其封裝的變數新增容錯能力的型別
     *
     *  ValueState 是一個包裝類,類似於 Java 標準庫裡邊的 AtomicReference 和 AtomicLong
     *      update 用於更新狀態
     *      value 用於獲取狀態值
     *      lear 用於清空狀態。
     *
     *  ValueState 的作用域始終限於當前的 key,ValueState 是允許空值的
     *      ValueState<Boolean> 實際上有 3 種狀態
     *          unset (null),true,和 false
     */
    private transient ValueState<Boolean> flagState;

    /**
     *  定時器狀態
     */
    private transient ValueState<Long> timerState;

	/**
     *
     * 初始化 flag 和 timerState 狀態
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
                "timer-state",
                Types.LONG);
        timerState = getRuntimeContext().getState(timerDescriptor);
    }

	/**
     * 
     * 具體 流資料 邏輯處理
     * 
     * @param transaction 
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement(Transaction transaction, Context context, Collector<Alert> collector) throws Exception {
        // Get the current state for the current key
        Boolean lastTransactionWasSmall = flagState.value();

        // Check if the flag is set
        // 最後一筆交易狀態 不為null,且數額大於 LARGE_AMOUNT
        if (lastTransactionWasSmall != null) {
            if (transaction.getAmount() > LARGE_AMOUNT) {
                //Output an alert downstream
                Alert alert = new Alert();
                alert.setId(transaction.getAccountId());

                collector.collect(alert);
            }
            //  Clean up our state
            //  標記狀態被重置時,刪除定時器
            cleanUp(context);
        }

        // 交易數額小於 SMALL_AMOUNT 設定定時器,進行欺詐驗證檢測
        if (transaction.getAmount() < SMALL_AMOUNT) {
            // set the flag to true
            flagState.update(true);
            // 標記狀態設定為true時,設定當前時間一分鐘後觸發的定時器。
            long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
            context.timerService().registerProcessingTimeTimer(timer);

            timerState.update(timer);
        }
    }

	
	/**
     * 
     * 當定時器被觸發時,重置標記狀態。
     * 
     * @param timestamp 
     * @param ctx
     * @param out
     * @throws Exception
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
        // remove flag after 1 minute
        timerState.clear();
        flagState.clear();
    }


    /**
     * 標記狀態被重置時,刪除定時器
     *
     * @param ctx
     * @throws Exception
     */
    private void cleanUp(Context ctx) throws Exception {
        // delete timer
        Long timer = timerState.value();
        ctx.timerService().deleteProcessingTimeTimer(timer);

        // clean up all state
        timerState.clear();
        flagState.clear();
    }
}

  • 執行結果

執行結果