kafka-python 獲取topic lag值方式
阿新 • • 發佈:2020-01-09
說真,這個問題看上去很簡單,但“得益”與kafka-python神奇的文件,真的不算簡單,反正我是搜了半天還看了半天原始碼。
直接上程式碼吧
from kafka import SimpleClient,KafkaConsumer from kafka.common import OffsetRequestPayload,TopicPartition def get_topic_offset(brokers,topic): """ 獲取一個topic的offset值的和 """ client = SimpleClient(brokers) partitions = client.topic_partitions[topic] offset_requests = [OffsetRequestPayload(topic,p,-1,1) for p in partitions.keys()] offsets_responses = client.send_offset_request(offset_requests) return sum([r.offsets[0] for r in offsets_responses]) def get_group_offset(brokers,group_id,topic): """ 獲取一個topic特定group已經消費的offset值的和 """ consumer = KafkaConsumer(bootstrap_servers=brokers,group_id=group_id,) pts = [TopicPartition(topic=topic,partition=i) for i in consumer.partitions_for_topic(topic)] result = consumer._coordinator.fetch_committed_offsets(pts) return sum([r.offset for r in result.values()]) if __name__ == '__main__': topic_offset = get_topic_offset("brokers","topic") group_offset = get_group_offset("brokers","group_id","topic") lag = topic_offset - group_offset
以上這篇kafka-python 獲取topic lag值方式就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。