1. 程式人生 > >[原始碼分析] 訊息佇列 Kombu 之 啟動過程

[原始碼分析] 訊息佇列 Kombu 之 啟動過程

# [原始碼分析] 訊息佇列 Kombu 之 啟動過程 ## 0x00 摘要 本系列我們介紹訊息佇列 Kombu。Kombu 的定位是一個相容 AMQP 協議的訊息佇列抽象。通過本文,大家可以瞭解 Kombu 是如何啟動,以及如何搭建一個基本的架子。 因為之前有一個綜述,所以大家會發現,一些概念講解文字會同時出現在後續文章和綜述之中。 ## 0x01 示例 下面使用如下程式碼來進行說明。 本示例來自https://liqiang.io/post/kombu-source-code-analysis-part-5系列,特此深表感謝。 ```python def main(arguments): hub = Hub() exchange = Exchange('asynt_exchange') queue = Queue('asynt_queue', exchange, 'asynt_routing_key') def send_message(conn): producer = Producer(conn) producer.publish('hello world', exchange=exchange, routing_key='asynt_routing_key') print('message sent') def on_message(message): print('received: {0!r}'.format(message.body)) message.ack() # hub.stop() # <-- exit after one message conn = Connection('redis://localhost:6379') conn.register_with_event_loop(hub) def p_message(): print(' kombu ') with Consumer(conn, [queue], on_message=on_message): send_message(conn) hub.timer.call_repeatedly(3, p_message) hub.run_forever() if __name__ == '__main__': sys.exit(main(sys.argv[1:])) ``` ## 0x02 啟動 讓我們順著程式流程看看Kombu都做了些什麼,也可以對 Kombu 內部有所瞭解。 本文關注的重點是:Connection,Channel 和 Hub 是如何聯絡在一起的。 ### 2.1 Hub 在程式開始,我們建立了Hub。 Hub的作用是建立訊息Loop,但是此時尚未建立,因此只是一個靜態例項。 ```python hub = Hub() ``` 其定義如下: ```python class Hub: """Event loop object. Arguments: timer (kombu.asynchronous.Timer): Specify custom timer instance. """ def __init__(self, timer=None): self.timer = timer if timer is not None else Timer() self.readers = {} self.writers = {} self.on_tick = set() self.on_close = set() self._ready = set() self._running = False self._loop = None self.consolidate = set() self.consolidate_callback = None self.propagate_errors = () self._create_poller() ``` 因為此時沒有建立loop,所以目前重要的步驟是建立Poll,其Stack如下: ```python _get_poller, eventio.py:321 poll, eventio.py:328 _create_poller, hub.py:113 __init__, hub.py:96 main, testU