Kafka(八)Python生產者和消費者API使用
阿新 • • 發佈:2018-07-22
time() subscript value orm exit __name__ 股票代碼 sum ros 單線程生產者
#!/usr/bin/env python # -*- coding: utf-8 -*- import random import sys from kafka import KafkaProducer from kafka.client import log import time import json __metaclass__ = type class Producer: def __init__(self, KafkaServer='127.0.0.1', KafkaPort='9092', ClientId="Procucer01", Topic='Test'): """ 用於設置生產者配置信息,這些配置項可以從源碼中找到,下面為必要參數。 :param KafkaServer: kafka服務器IP :param KafkaPort: kafka工作端口 :param ClientId: 生產者名稱 :param Topic: 主題 """ self._bootstrap_server = '{host}:{port}'.format(host=KafkaServer, port=KafkaPort) self._topic = Topic self._clientId = ClientId """ 初始化一個生產者實例,生產者是線程安全的,多個線程共享一個生產者實例效率比每個線程都使用一個生產者實例要高 acks: 消費者只能消費被提交的,而只有消息在所有副本中都有了才算提交,生產者發送了消息是否要等待所有副本都同步了該消息呢?這個值就是控制這個的。默認是1,表示只要該分區的Leader副本成功寫入日誌就返回。 0表示生產者無需等待,發送完就返回;all是所有副本都寫入該消息才返回。 all可靠性最高但是效率最低,0效率最高但是可靠性最低,所以一般用1。 retries: 表示請求重試次數,默認是0,上面的acks配置請求完成的標準,如果請求失敗,生產者將會自動重試,如果配置為0則不重試。但是如果重試則有可能發生重復發送消息。 key_serializer: 鍵的序列化器,默認不設置,采用字節碼 value_serializer: 值得序列化器,默認不設置,采用字節碼,因為可以發送單一字符,也可以發送鍵值型消息 """ try: self._producer = KafkaProducer(bootstrap_servers=self._bootstrap_server, client_id=self._clientId, acks=1, value_serializer=lambda m: json.dumps(m).encode('utf-8')) except Exception as err: print err.message def _TIMESTAMP(self): t = time.time() return int((round(t * 1000))) # 時間戳轉換為普通時間 def getNormalTime(self, temp_timeStamp, timeSize=10): timeStamp = temp_timeStamp if timeSize == 13: timeStamp = int(temp_timeStamp / 1000) timeArray = time.localtime(timeStamp) otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray) return otherStyleTime # 發送成功的回調函數 def _on_send_success(self, record_metadata): print "Topic: %s Partition: %d Offset: %s" % (record_metadata.topic, record_metadata.partition, record_metadata.offset) # 發送失敗的回調函數 def _on_send_error(self, excp): log.error('I am an errback', exc_info=excp) def sendMsg(self, msg, partition=None): """ 發送消息 :param msg: 消息 :param partition: 分區也可以不指定 :return: """ if not msg: print "消息不能為空。" return None # 發送的消息必須是序列化後的,或者是字節 message = json.dumps(msg, encoding='utf-8', ensure_ascii=False) try: TIMESTAMP = self._TIMESTAMP() # 發送數據,異步方式,調用之後立即返回,因為這裏其實是發送到緩沖區,所以你可以多次調用,然後一起flush出去。 self._producer.send(self._topic, partition=partition, key=self._clientId, value=message, timestamp_ms=TIMESTAMP).add_callback(self._on_send_success).add_errback(self._on_send_error) # 下面的 flush是阻塞的,只有flush才會真正通過網絡把緩沖區的數據發送到對端,如果不調用flush,則等到時間或者緩沖區滿了就會發送。 self._producer.flush() print self.getNormalTime(TIMESTAMP, timeSize=13) + " send msg: " + message except Exception as err: print err def main(): p = Producer(KafkaServer="172.16.48.171", KafkaPort="9092", Topic='AAA') for i in range(10): time.sleep(1) closePrice = random.randint(1, 500) msg = { "股票代碼": 60000 + i, "昨日收盤價": closePrice, "今日開盤價": 0, "今日收盤價": 0, } p.sendMsg(msg) if __name__ == "__main__": try: main() finally: sys.exit()
消費者
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys from kafka import KafkaConsumer import json __metaclass__ = type class Consumer: def __init__(self, KafkaServer='127.0.0.1', KafkaPort='9092', GroupID='TestGroup', ClientId="Test", Topic='Test'): """ 用於設置消費者配置信息,這些配置項可以從源碼中找到,下面為必要參數。 :param KafkaServer: kafka服務器IP :param KafkaPort: kafka工作端口 :param GroupID: 消費者組ID :param ClientId: 消費者名稱 :param Topic: 主題 """ self._bootstrap_server = '{host}:{port}'.format(host=KafkaServer, port=KafkaPort) self._groupId = GroupID self._topic = Topic self._clientId = ClientId def consumeMsg(self): try: """ 初始化一個消費者實例,消費者不是線程安全的,所以建議一個線程實現一個消費者,而不是一個消費者讓多個線程共享 下面這些是可選參數,可以在初始化KafkaConsumer實例的時候傳遞進去 enable_auto_commit 是否自動提交,默認是true auto_commit_interval_ms 自動提交間隔毫秒數 """ consumer = KafkaConsumer(self._topic, bootstrap_servers=self._bootstrap_server, group_id=self._groupId, client_id=self._clientId, enable_auto_commit=True, auto_commit_interval_ms=5000, value_deserializer=lambda m: json.loads(m.decode('utf-8'))) """ 這裏不需要顯示的調用訂閱函數,在初始化KafkaConsumer對象的時候已經指定了主題,如果主題字段不為空則會自動調用訂閱函數,至於 這個線程消費哪個分區則是自動分配的。如果你希望手動指定分區則就需要使用 assign() 函數,並且在初始的時候不輸入主題。 """ # consumer.subscribe(self._topicList) # 返回一個集合 print "當前消費的分區為:", consumer.partitions_for_topic(self._topic) print "當前訂閱的主題為:", consumer.subscription() while True: for msg in consumer: if msg: print "Topic: %s Partition: %d Offset: %s Key: %s Message: %s " % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) except Exception as err: print err def main(): try: c = Consumer(KafkaServer='172.16.48.171', Topic='AAA') c.consumeMsg() except Exception as err: print err.message if __name__ == "__main__": try: main() finally: sys.exit()
執行效果
Kafka(八)Python生產者和消費者API使用