1. 程式人生 > 實用技巧 >【原始碼】 flink 消費 kafka 消費組 offset 提交

【原始碼】 flink 消費 kafka 消費組 offset 提交

flink 消費 kafka 資料,提交消費組 offset 有三種類型

  • 1、開啟 checkpoint : 在 checkpoint 完成後提交
  • 2、開啟 checkpoint,禁用 checkpoint 提交: 不提交消費組 offset
  • 3、不開啟 checkpoint: 依賴kafka client 的自動提交

重點當然是開啟 checkpoint 的時候,怎麼提交消費組的 offset

一個簡單的 flink 程式: 讀取kafka topic 資料,寫到另一個 topic

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(
1) // enable checkpoint val stateBackend = new FsStateBackend("file:///out/checkpoint") env.setStateBackend(stateBackend) env.enableCheckpointing(1 * 60 * 1000, CheckpointingMode.EXACTLY_ONCE) val prop = Common.getProp // prop.setProperty("enable.auto.commit", "true") // prop.setProperty("auto.commit.interval.ms", "15000")
val kafkaSource = new FlinkKafkaConsumer[String]("kafka_offset", new SimpleStringSchema(), prop) // kafkaSource.setCommitOffsetsOnCheckpoints(false) val kafkaProducer = new FlinkKafkaProducer[String]("kafka_offset_out", new SimpleStringSchema(), prop) // kafkaProducer.setWriteTimestampToKafka(true)
env.addSource(kafkaSource) .setParallelism(1) .map(node => { node.toString + ",flinkx" }) .addSink(kafkaProducer) // execute job env.execute("KafkaToKafka")

## 1 啟動 checkpoint

開啟checkpoint 預設值就是 消費組 offset 的提交方式是: ON_CHECKPOINTS

offsetCommitMode 提交方法在 FlinkKafkaConsumerBase open 的時候會設定:

FlinkKafkaConsumer 提交消費者的 offset 的行為在 FlinkKafkaConsumerBase open 的時候會設定:

@Override
public void open(Configuration configuration) throws Exception {
  // determine the offset commit mode
  this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
      getIsAutoCommitEnabled(),
      enableCommitOnCheckpoints,  // 預設值 true
      ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

fromConfiguration 對應程式碼

public static OffsetCommitMode fromConfiguration(
      boolean enableAutoCommit,
      boolean enableCommitOnCheckpoint,
      boolean enableCheckpointing) {

  if (enableCheckpointing) {
    // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
    return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
  } else {
    // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
    return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
  }
}

當 flink 觸發一次 checkpoint 的時候,會依次呼叫所有運算元的 notifyCheckpointComplete 方法,kafka source 會呼叫到 FlinkKafkaConsumerBase.notifyCheckpointComplete

注:FlinkKafkaConsumerBase 是 FlinkKafkaConsumer 的父類

@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
  ....

  if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
    // only one commit operation must be in progress
    ...

    try {
      // 獲取當前checkpoint id 對應的待提交的 offset index
      final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
      if (posInMap == -1) {
        LOG.warn("Consumer subtask {} received confirmation for unknown checkpoint id {}",
          getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
        return;
      }
      // 根據 offset index 獲取 offset 值,待提交的就直接刪除了
      @SuppressWarnings("unchecked")
      Map<KafkaTopicPartition, Long> offsets =
        (Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
      
      ....

      // 呼叫 KafkaFetcher的 commitInternalOffsetsToKafka 方法 提交 offset
      fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
    
    ....

最後呼叫了 AbstractFetcher.commitInternalOffsetsToKafka

public final void commitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception {
  // Ignore sentinels. They might appear here if snapshot has started before actual offsets values
  // replaced sentinels
  doCommitInternalOffsetsToKafka(filterOutSentinels(offsets), commitCallback);
}

protected abstract void doCommitInternalOffsetsToKafka(
    Map<KafkaTopicPartition, Long> offsets,
    @Nonnull KafkaCommitCallback commitCallback) throws Exception;

AbstractFetcher.doCommitInternalOffsetsToKafka 的實現 KafkaFetcher.doCommitInternalOffsetsToKafka

使用 Map<KafkaTopicPartition, Long> offsets 構造提交 kafka offset 的 Map<TopicPartition, OffsetAndMetadata> offsetsToCommit

注:offset + 1 表示下一次消費的位置

@Override
protected void doCommitInternalOffsetsToKafka(
  Map<KafkaTopicPartition, Long> offsets,
  @Nonnull KafkaCommitCallback commitCallback) throws Exception {

  @SuppressWarnings("unchecked")
  List<KafkaTopicPartitionState<T, TopicPartition>> partitions = subscribedPartitionStates();

  Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(partitions.size());

  for (KafkaTopicPartitionState<T, TopicPartition> partition : partitions) {
    Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
    if (lastProcessedOffset != null) {
      checkState(lastProcessedOffset >= 0, "Illegal offset value to commit");

      // committed offsets through the KafkaConsumer need to be 1 more than the last processed offset.
      // This does not affect Flink's checkpoints/saved state.
      long offsetToCommit = lastProcessedOffset + 1;

      offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
      partition.setCommittedOffset(offsetToCommit);
    }
  }

  // record the work to be committed by the main consumer thread and make sure the consumer notices that
  consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
}

然後呼叫 KafkaConsumerThread.setOffsetsToCommit: 將待提交的 offset 放到 kafka 的消費執行緒對於的屬性 nextOffsetsToCommit 中,等待下一個消費迴圈提交

void setOffsetsToCommit(
      Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
      @Nonnull KafkaCommitCallback commitCallback) {

    // 把待提交的 offsetsToCommit 放到 nextOffsetsToCommit 中,供 kafka 的消費執行緒來取
    // 返回值不為 null,說明上次的沒提交完成
    // record the work to be committed by the main consumer thread and make sure the consumer notices that
    if (nextOffsetsToCommit.getAndSet(Tuple2.of(offsetsToCommit, commitCallback)) != null) {
      log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
          "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
          "This does not compromise Flink's checkpoint integrity.");
    }

    // if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
    handover.wakeupProducer();

    synchronized (consumerReassignmentLock) {
      if (consumer != null) {
        consumer.wakeup();
      } else {
        // the consumer is currently isolated for partition reassignment;
        // set this flag so that the wakeup state is restored once the reassignment is complete
        hasBufferedWakeup = true;
      }
    }
  }

然後就到了kafka 消費的執行緒,KafkaConsumerThread.run 方法中: 這裡是消費 kafka 資料的地方,也提交對應消費組的offset

@Override
  public void run() {
    ...

      this.consumer = getConsumer(kafkaProperties);

    ....
      // 迴圈從kafka poll 資料
      // main fetch loop 
      while (running) {
        // 這裡就是提交 offset 的地方了
        // check if there is something to commit
        if (!commitInProgress) {

          // nextOffsetsToCommit 就是 那邊執行緒放入 offset 的物件了

          // get and reset the work-to-be committed, so we don't repeatedly commit the same
          final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
              nextOffsetsToCommit.getAndSet(null);

          // 如果取出commitOffsetsAndCallback 不為空,就非同步提交 offset 到kafka
          if (commitOffsetsAndCallback != null) {
            log.debug("Sending async offset commit request to Kafka broker");

            // also record that a commit is already in progress
            // the order here matters! first set the flag, then send the commit command.
            commitInProgress = true;
            consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
          }
        }

       ... 
        // get the next batch of records, unless we did not manage to hand the old batch over
        if (records == null) {
          try {
            records = consumer.poll(pollTimeout);
          }
          catch (WakeupException we) {
            continue;
          }
        }

        ...
      }

到這裡就能看到 flink 的offset 提交到了 kafka 中


## 2 開啟 checkpoint 禁用 commit on checkpoint

這是啟動 checkpoing kafka consumer offset 提交的預設行為,現在看下,關閉在 checkpoint 的時候提交:
先關閉 commitOnCheckpoints

val kafkaSource = new FlinkKafkaConsumer[String]("kafka_offset", new SimpleStringSchema(), Common.getProp)
kafkaSource.setCommitOffsetsOnCheckpoints(false)

對應方法程式碼:

public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
  // enableCommitOnCheckpoints 的預設值是 true
  this.enableCommitOnCheckpoints = commitOnCheckpoints;
  return this;
}

警告: 如果啟用了 checkpoint,但是禁用 CommitOffsetsOnCheckpoints, kafka 消費組的 offset 不會提交到 kafka,也就是說: 消費組的 offset 是不會有變化的

如下 CURRENT-OFFSET 是不會變化的:

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
kafka_offset    0          4172            4691            519             -               -               -

官網: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

## 3 不開啟 checkpoint 模式

禁用了 checkpointing,則 Flink Kafka Consumer 依賴於內部使用的 Kafka client 自動定期 offset 提交功能
要禁用或啟用 offset 的提交,只需將 enable.auto.commit 或者 auto.commit.interval.ms 的Key 值設定為提供的 Properties 配置中的適當值

prop.setProperty("enable.auto.commit", "true")
prop.setProperty("auto.commit.interval.ms", "15000")

然後,發現這個問題超綱了,跳過

O(∩_∩)O哈哈~

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文