1. 程式人生 > 實用技巧 >Flink流處理程式執行流程原始碼分析

Flink流處理程式執行流程原始碼分析

首先,上程式碼,從最簡單的例子開始

第一步:編寫流處理的小例子

  需求:接收來自Kafka中sensor-temperature主題下的溫度感測器資料,計算各感測器每天的5秒內的平均溫度

  程式碼:TemperatureAnalysis.java

package com.mengyao.flink;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.accumulators.AverageAccumulator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.windowing.time.Time; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.Duration;
import java.util.*; /** * @ClassName TemperatureAnalysis * @Description * @Created by MengYao * @Date 2020/11/18 10:14 * @Version V1.0 */ public class TemperatureAnalysis { // 作業名稱 private static final String JOB_NAME = TemperatureAnalysis.class.getSimpleName(); // 解析字串時間 private static final ThreadLocal<SimpleDateFormat> FMT = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); // 輸入的溫度資料 private static final List<String> DATA = Arrays.asList( "T1,2020-01-30 19:00:00,22", "T1,2020-01-30 19:00:01,25", "T1,2020-01-30 19:00:03,28", "T1,2020-01-30 19:00:06,26", "T1,2020-01-30 19:00:05,27", "T1,2020-01-30 19:00:12,31" ); public static void main(String[] args) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() // 每隔5秒進行一次CheckPoint,事件傳遞語義為恰好一次 .enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); // 使用事件時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); try { // 從集合中建立資料來源 env.addSource(new FromElementsFunction<>(Types.STRING.createSerializer(env.getConfig()), DATA), Types.STRING) .map(line -> TemperatureBean.of(line))// 將字串時間轉換為Temperature型別的POJO .filter(Objects::nonNull)// 排除空資料 .assignTimestampsAndWatermarks(WatermarkStrategy.<TemperatureBean>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((TemperatureBean event, long timestamp) -> event.getTs(FMT.get())))// 提取Temperature型別事件的事件時間欄位(cdt)作為註冊水位線的時間戳 .keyBy(TemperatureBean::getKey)// 根據Key(ID_yyyy-MM-dd)分組,按照每個溫度感測器每天的所有溫度資料放到一個分割槽中 .timeWindow(Time.seconds(5))// 5秒的滾動視窗 .aggregate(new AggregateFunction<TemperatureBean, Acc, String>() {// 對每個視窗的溫度資料進行聚合運算(平均) @Override public Acc createAccumulator() { return new Acc(); } @Override public Acc add(TemperatureBean value, Acc acc) { acc.setKey(value.getKey()); acc.add(value.getTemperature()); return acc; } @Override public String getResult(Acc acc) { return acc.toString(); } @Override public Acc merge(Acc a, Acc b) { b.add(a.getLocalValue()); return b; } }) .addSink(new PrintSinkFunction<>());// 從控制檯中列印 env.execute(JOB_NAME);// 執行作業 } catch (Exception e) { e.printStackTrace(); } } static class TemperatureBean { private String sensorId; private String cdt; private double temperature; public static TemperatureBean of(String line) { return of(",", line); } public static TemperatureBean of(String delimiter, String line) { if (StringUtils.isNotEmpty(line)) { String[] values = line.split(delimiter, 3); return new TemperatureBean(values[0], values[1], Double.parseDouble(values[2])); } return null; } public TemperatureBean() {} public TemperatureBean(String sensorId, String cdt, double temperature) { this.sensorId=sensorId; this.cdt = cdt; this.temperature = temperature; } public String getKey() { return getSensorId()+"_"+getDay(); } public long getTs(SimpleDateFormat sdf) { try { return sdf.parse(cdt).getTime(); } catch (ParseException e) { e.printStackTrace(); } return -1; } public String getDay() { return cdt.substring(0,10); } public String getSensorId() { return sensorId; } public void setSensorId(String sensorId) { this.sensorId = sensorId; } public String getCdt() { return cdt; } public void setCdt(String cdt) { this.cdt = cdt; } public double getTemperature() { return temperature; } public void setTemperature(double temperature) { this.temperature = temperature; } @Override public String toString() { return String.join(", ", sensorId, cdt, Double.toString(temperature)); } } static class Acc extends AverageAccumulator { private String key; public void setKey(String key) { this.key = key; } @Override public String toString() { return String.join(",", key, Double.toString(super.getLocalValue())); } } }
展開這段程式碼

第二步:執行程式,檢視local模式下,作業是如何執行的