Python RabbitMQ實現簡單的程序間通訊示例
RabbitMQ 訊息佇列
PY
threading Queue
程序Queue 父程序與子程序,或同一父程序下的多個子程序進行互動
缺點:兩個不同Python檔案不能通過上面兩個Queue進行互動
erlong
基於這個語言建立的一種中間商
win中需要先安裝erlong才能使用
rabbitmq_server start
安裝 Python module
pip install pika
or
easy_install pika
or
原始碼
rabbit 預設埠15672
檢視當前時刻的佇列數
rabbitmqctl.bat list_queue
exchange
在定義的時候就是有型別的,決定到底哪些queue符合條件,可以接受訊息
direct:通過routingkey和exchange決定唯一的queue可以接受訊息
topic: 所有符合routingkey(此時可以是一個表示式)的routingkey所bind的queue都可以接受訊息
表示式符號說明:
# 代表一個或多個字元 * 代表任何字元
RPC
remote procedure call 雙向傳輸,指令<-------->指令執行結果
實現方法: 建立兩個佇列,一個佇列收指令,一個佇列傳送執行結果
用rabbitmq實現簡單的生產者消費者模型
1) rabbit_producer.py
# Author : Xuefeng import pika connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() # create the queue,the name of queue is "hello" # durable=True can make the queue be exist,although the service have stopped before. channel.queue_declare(queue="hello",durable=True) # n RabbitMQ a message can never be sent directly to queue,it always need to go through channel.basic_publish(exchange = " ",routing_key = "hello",body = "Hello world!",properties = pika.BasicPropreties( delivery_mode=2,# make the message persistence ) ) print("[x] sent 'Hello world!'") connection.close()
2) rabbit_consumer.py
# Author : Xuefeng import pika connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() channel.queue_declare(queue="hello",durable=True) def callback(ch,method,properties,body): ''' Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ''' print("------>",ch,properties ) print("[x] Recieved %r" % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag) # follow is for consumer to auto change with the ability channel.basic_qos(profetch_count=1) # no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident. channel.basic_consume(callback,# If have recieved message,enable the callback() function to handle the message. queue = "hello",no_ack = True) print("[*] Waiting for messages. To Exit press CTRL+C") channel.start_consuming()
用rabbitmq中的fanout模式實現廣播模式
1) fanout_rabbit_publish.py
# Author : Xuefeng import pika import sys # 廣播模式: # 生產者傳送一條訊息,所有的開通連結的消費者都可以接收到訊息 connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() channel.exchange_declare(exchange="logs",type="fanout") message = ' '.join(sys.argv[1:]) or "info:Hello world!" channel.basic_publish( exchange="logs",routing_key="",body=message ) print("[x] Send %r" % message) connection.close()
2) fanout_rabbit_consumer.py
# Author : Xuefeng import pika import sys connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() # exclusive 排他,唯一的 隨機生成queue result = channel.queue_declare(exclusive=True) queue_name = result.method.queue print("Random queue name:",queue_name) channel.queue_bind(exchange="logs",queue=queue_name) def callback(ch,properties ) print("[x] Recieved %r" % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag) # no_ack = True represent that the message cannot be transfor to next consumer,no_ack = True) print("[*] Waiting for messages. To Exit press CTRL+C") channel.start_consuming()
用rabbitmq中的direct模式實現訊息過濾模式
1) direct_rabbit_publisher.py
# Author : Xuefeng import pika import sys # 訊息過濾模式: # 生產者傳送一條訊息,通過severity優先順序來確定是否可以接收到訊息 connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() channel.exchange_declare(exchange="direct_logs",type="direct") severity = sys.argv[1] if len(sys.argv) > 1 else "info" message = ' '.join(sys.argv[2:]) or "info:Hello world!" channel.basic_publish( exchange="direct_logs",routing_key=severity,body=message ) print("[x] Send %r:%r" % (severity,message)) connection.close()
2) direct_rabbit_consumer.py
# Author : Xuefeng import pika import sys connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() channel.exchange_declare(exchange="direct_logs",type="direct") # exclusive 排他,唯一的 隨機生成queue result = channel.queue_declare(exclusive=True) queue_name = result.method.queue print("Random queue name:",queue_name) severities = sys.argv[1:] if not severities: sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange="direct_logs",queue=queue_name,routing_key=severity) def callback(ch,no_ack = True) print("[*] Waiting for messages. To Exit press CTRL+C") channel.start_consuming()
用rabbitmq中的topic模式實現細緻訊息過濾模式
1) topic_rabbit_publisher.py
# Author : Xuefeng import pika import sys # 訊息細緻過濾模式: # 生產者傳送一條訊息,通過執行指令碼 *.info 等確定接收訊息型別進行對應接收 connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() channel.exchange_declare(exchange="topic_logs",type="topic") binding_key = sys.argv[1] if len(sys.argv) > 1 else "info" message = ' '.join(sys.argv[2:]) or "info:Hello world!" channel.basic_publish( exchange="topic_logs",routing_key=binding_key,body=message ) print("[x] Send %r:%r" % (binding_key,message)) connection.close()
2) topic_rabbit_consumer.py
# Author : Xuefeng import pika import sys connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() channel.exchange_declare(exchange="topic_logs",type="topic") # exclusive 排他,唯一的 隨機生成queue result = channel.queue_declare(exclusive=True) queue_name = result.method.queue print("Random queue name:",queue_name) binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange="topic_logs",routing_key=binding_key) def callback(ch,properties) print("[x] Recieved %r" % body) # ack by ourself ch.basic_ack(delivery_tag=method.delivery_tag) # no_ack = True represent that the message cannot be transfor to next consumer,enable the callback() function to handle the message. queue="hello",no_ack=True) print("[*] Waiting for messages. To Exit press CTRL+C") channel.start_consuming()
用rabbitmq實現rpc操作
1) Rpc_rabbit_client.py
# Author : Xuefeng import pika import time import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost")) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 隨機的生成一個接收命令執行結果的佇列 self.channel.basic_consume(self.on_response,# 只要收到訊息就呼叫 no_ack=True,queue=self.callback_queue) def on_response(self,props,body): if self.corr_id == props.correlation_id: self.response = body def call(self,n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange="",routing_key="rpc_queue",properties=pika.BasicPropreties( rely_to=self.callback_queue,correlation_id=self.corr_id # 通過隨機生成的ID來驗證指令執行結果與指令的匹配性 ),body=str(n) ) while self.response is None: self.connection.process_data_events() # 非阻塞版的start_consume,有沒有訊息都繼續 print("no message...") time.sleep(0.5) return int(self.response) fibonacci_rcp = FibonacciRpcClient() print("[x] Requesting fib(30)") response = fibonacci_rcp.call(30) print("[x] Rec %r" % response)
2) Rpc_rabbit_server.py
# Author : Xuefeng import pika import sys connection = pika.BlockingConnection(pika.Connection.Parameters( "localhost" )) # statement a channel channel = connection.channel() channel.queue_declare(queue="rpc_queue") def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1)+fib(n-2) def on_request(ch,body): n = int(body) print("[.] fib(%s)" % n) response = fib(n) ch.basic_publish( exchange="",routing_key=props.rely_to,properties=pika.BasicPropreties(correlation_id=\ props.correlation),body = str(body) ) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request,queue="rpc_queue") print("[x] Awaiting RPC requests") channel.start_consumeing() channel.exchange_declare(exchange="direct_logs",queue_name) severities = sys.argv[1:]
到此這篇關於Python RabbitMQ實現簡單的程序間通訊示例的文章就介紹到這了,更多相關Python RabbitMQ程序間通訊內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!