1. 程式人生 > 程式設計 >kafka-python 獲取topic lag值方式

kafka-python 獲取topic lag值方式

說真,這個問題看上去很簡單,但“得益”與kafka-python神奇的文件,真的不算簡單,反正我是搜了半天還看了半天原始碼。

直接上程式碼吧

from kafka import SimpleClient,KafkaConsumer
from kafka.common import OffsetRequestPayload,TopicPartition

def get_topic_offset(brokers,topic):
  """
  獲取一個topic的offset值的和
  """
  client = SimpleClient(brokers)
  partitions = client.topic_partitions[topic]
  offset_requests = [OffsetRequestPayload(topic,p,-1,1) for p in partitions.keys()]
  offsets_responses = client.send_offset_request(offset_requests)
  return sum([r.offsets[0] for r in offsets_responses])


def get_group_offset(brokers,group_id,topic):
  """
  獲取一個topic特定group已經消費的offset值的和
  """
  consumer = KafkaConsumer(bootstrap_servers=brokers,group_id=group_id,)
  pts = [TopicPartition(topic=topic,partition=i) for i in
      consumer.partitions_for_topic(topic)]
  result = consumer._coordinator.fetch_committed_offsets(pts)
  return sum([r.offset for r in result.values()])


if __name__ == '__main__':
  topic_offset = get_topic_offset("brokers","topic")
  group_offset = get_group_offset("brokers","group_id","topic")
  lag = topic_offset - group_offset

以上這篇kafka-python 獲取topic lag值方式就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。