1. 程式人生 > 實用技巧 >rabbitMQ和pika模組

rabbitMQ和pika模組

目錄

rabbitMQ

rabbit角色

  • management
  • policymaker
  • monitoring
  • administrator

新增使用者並分配角色

rabbitmqctl add_user name pass

rabbitmqctl set_user_tags name administrator

操作可用web控制檯進行

繫結使用者到vhost並開放連線許可權

rabbitmqctl set_permissions -p vhost名字 user名字 允許所有的連線

rabbitmqctl set_permissions -p qpm qpm ".*" ".*" ".*"

操作可用web控制檯進行

pika模組

python中想要使用rabbitMQ可以利用pika進行操作。

單個生產者對應單個消費者

生產者:

import pika

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.x.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()

# 宣告queue
channel.queue_declare(queue='hello')

channel.basic_publish(exchange="",
                      routing_key="hello",
                      body="hello world")

print("send message!")

connection.close()

消費者:

import pika

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.x.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()

# 消費者再次宣告queue
"""
生產者已經聲明瞭queue,為何消費者還要再次宣告?
假如生產者還沒建立queue,那消費者這邊就無法去queue中等待接收訊息,為了防止出現問題,生產者未建立queue的情況下,消費者先建立queue,並去等待訊息。
"""
channel.queue_declare(queue='hello')


# 回撥函式
def callback(ch, method, properties, body):
    # ch是通道的例項
    print(ch)
    # 生產者傳送訊息攜帶的引數
    print(method)
    print(properties)
    # 訊息的內容
    print(body)
    # 如果不加的話,消費者會hang住,監聽到有值就呼叫callback,沒值就等待監聽訊息
    # channel.close()


# 從hello的queue中拿資料,然後呼叫回撥函式
channel.basic_consume(queue="hello",
                      on_message_callback=callback,
                      auto_ack=True)


print("waiting for messages")

channel.start_consuming()

單個生產者對應多個消費者

一個生產者有多個消費者的情況下,rabbitMQ預設的機制是公平輪流分發給消費者,按照上面的程式碼開啟兩個消費者和一個消費者,消費者的訊息會輪流的分發給兩個消費者。

rabbitMQ消費端的確認和拒絕(安全性)

假如消費者接收到訊息中任務,在執行的過程中掛掉(斷開連線)了,那麼該任務就相當於丟失(沒有獲得結果,訊息也被取出來了,生產者也以為該訊息被消費了),因此為了保證訊息的安全性,我們需要做到當消費者沒有執行完任務的時候,該訊息依然存在,會發送給另一個消費者,或者重連後的消費者。

# 回撥函式
def callback(ch, method, properties, body):
    print("received message")
    time.sleep(20)
    print(body)

在上面的程式碼中,為消費者增加一個time.sleep,啟動消費者程式碼,再啟動生產者程式碼,在消費者程式碼睡眠的時間段,終止消費者程式碼,然後再重啟消費者程式碼,消費者端已經無法再次獲取到該訊息了(訊息已經被前一個消費者給取出來了,但任務沒執行完,就掛掉了,相當於該訊息丟失)。

在收到訊息後,訊息將立即保留在佇列中。為防止其他使用者再次處理訊息,Amazon SQS 設定了可見性超時,這是 Amazon SQS 防止其他使用者接收和處理訊息的時間段。訊息的預設可見性超時為 30 秒。最小值為 0 秒。最大值為 12 小時。

可見性超時從 Amazon SQS 返回訊息時開始。在這段時間裡,使用者可以處理和刪除訊息。但是,如果使用者在刪除訊息之前失敗,並且您的系統沒有在可見性超時結束之前對該訊息呼叫 DeleteMessage 操作,則其他使用者將可以看到該訊息並且再次接收該訊息。如果某條訊息只能被接收一次,則您的使用者應在可見性超時期間內刪除該訊息。

這個訊息被取走,會有一個可見性超時記錄,這個訊息實際還在佇列但是會隱藏,但是你處理完成,返回 DeleteMessage ,證明訊息完成了,到了時間設定卻沒返回 DeleteMessage,這個訊息會重新被獲取。

​ ——aws SQS

如何實現訊息的安全性?

rabbitMQ中引入了ack機制,消費者在訂閱佇列的時候可以指定autoAck引數。

當autoAck等於True時,rabbitMQ會自動把傳送出去的訊息設定為確認(直接刪除),而不管消費者是否真正的消費到了這些資料。

當autoAck等於False時,rabbitMQ會等待消費者顯式地回覆確認訊號才從移除訊息(實際上時先打上刪除標記,收到確認,刪除標記),因此消費者可以有足夠的時間來處理訊息,而不用擔心消費過程中異常退出而導致訊息丟失,因為rabbitMQ會一直持有訊息,知道消費者呼叫basic.ack為止。

unacked為1,代表著有一個訊息沒有被消費

因此,在python中,利用pika模組操作,只需要在消費者新增兩個引數:

  • auto_ack=False:關閉自動確認
  • ch.basic_ack(delivery_tag=method.delivery_tag):確認訊息
  • ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False):拒絕訊息
# 生產者程式碼

import pika

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.56.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()

# 宣告queue
channel.queue_declare(queue='hello')

channel.basic_publish(exchange="",
                      routing_key="hello",
                      body="hello world",
                      # properties=pika.BasicProperties(
                      #     delivery_mode=2,  # 訊息的持久化
                      # )
                      )

print("send message!")

connection.close()
# 消費者程式碼

import pika
import time

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.56.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()

# 消費者再次宣告queue
"""
生產者已經聲明瞭queue,為何消費者還要再次宣告?
假如生產者還沒建立queue,那消費者這邊就無法去queue中等待接收訊息,為了防止出現問題,生產者未建立queue的情況下,消費者先建立queue,並去等待訊息。
"""
channel.queue_declare(queue='hello')


# 回撥函式
def callback(ch, method, properties, body):
    print(body)
    time.sleep(10)
    #  delivery_tag 是在 channel 中的一個訊息計數, 每次訊息提取行為都對應一個數字
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print("任務執行完成!")
    """
    消費者在接收到訊息之後,還可以拒絕訊息,我們只需要呼叫basic_reject就可以,如下:
    ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
    """
    # 如果不加的話,消費者會hang住,監聽到有值就呼叫callback,沒值就等待監聽訊息
    # channel.close()


# 從hello的queue中拿資料,然後呼叫回撥函式
channel.basic_consume(queue="hello",
                      on_message_callback=callback,
                      auto_ack=False
                      )


print("waiting for messages.....")

channel.start_consuming()

rabbitMQ生產端的確認和拒絕(安全性)

如果在傳送方發出訊息後,如果exchange寫錯了,或者沒有任何佇列繫結我們傳送的exchange,那麼在這時候傳送方對此渾然不知,因此引入了confirm(確認)模式。

生產者將通道設定成confirm模式,一旦通道進入confirm模式,所有在該通道上面釋出的訊息都會被指派一個唯一的ID(從1開始),一旦訊息被投遞到所有匹配的佇列之後,rabbit就會發送一個確認(Basic.Ack)給生產者(包含訊息的唯一ID),這樣就使生產者知道訊息已經確證到達目的地了。

如果訊息和佇列是持久化的,那麼確認訊息會在訊息寫入磁碟之後發出。RabbitMQ回傳給生產者的確認訊息中的deliveryTag包含了確認訊息的序號,此外RabbitMQ也可以設定channel.basicAck方法中的multiple引數,表示到這個序號之前的所有訊息都已經得到了處理,如下圖所示:

通過呼叫channel.confirm_delivery()就可以將通道設定為comfirm模式。

import pika

user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()

channel.exchange_declare(exchange='testexchange', exchange_type='fanout', durable=True)

channel.queue_declare(queue='hello', durable=True)
channel.queue_bind(exchange='testexchange', queue='hello')

channel.confirm_delivery()

for i in range(10):
    channel.basic_publish(exchange='testexchange123', routing_key='hello', body='Hello World!{}'.format(i),
                          mandatory=True,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 訊息持久化

                          ))
    print(" [x] Sent 'Hello World!{}'".format(i))
connection.close()

現在,如果我們寫錯了交換機,在傳送的時候就會報錯:pika.exceptions.ChannelClosedByBroker: (404, "NOT_FOUND - no exchange 'testexchange123' in vhost '/'")。 而mandatory=True引數來確認發出的訊息是否有queue接收, 並且所有queue都成功接收,如果沒有繫結佇列在交換機上的話,在傳送的時候就會報錯:pika.exceptions.UnroutableError: 1 unroutable message(s) returned

傳送方確認機制最大的好處在於它是非同步的,一旦釋出一條訊息,生產者應用程式就可以在等通道返回確認的同時繼續傳送下一條訊息,當訊息最終得到確認之後,生產者應用程式便可以通過回撥方法來處理該確認訊息,如果rabbitmq因為自身內部錯誤導致訊息丟失,就會發送一條nack(basic.nack)命令,生產者應用程式同樣可以在回撥方法中處理該nack命令。

rabbitMQ的持久化

在RabbitMQ中,如果遇到RabbitMQ服務停止或者掛掉,那麼我們的訊息將會出現丟失的情況,為了在RabbitMQ服務重啟的情況下,不丟失訊息,我們可以將Exchange(交換機)、Queue(佇列)與Message(訊息)都設定為可持久化的(durable)。這樣的話,能夠保證絕大部分的訊息不會被丟失,但是還有有一些小概率會發生訊息丟失的情況。

簡而言之就是,防止一些沒有被消費者取走的訊息,在rabbitMQ宕機重啟之後,這些訊息還能恢復。

佇列持久化

讓佇列持久化,在宣告queue的時候,加上durable=True

# 宣告queue
channel.queue_declare(queue='hello', durable=True)

需要注意,生產者宣告的佇列和消費者宣告的佇列是否持久化要保持一致!否錯會報錯!

佇列是持久化的佇列,不能被修改為不持久化,同理,不持久化的佇列同樣不能修改為持久化。

只能刪除不持久化的佇列,重新建立持久化的佇列。

try:
    # 宣告queue
    channel.queue_declare(queue='hello', durable=True)  # 如果佇列是不持久化的此處會報錯
except Exception as e:
    channel = connection.channel()
    channel.queue_delete(queue='hello')  # 刪除不持久化的佇列
    channel.queue_declare(queue='hello', durable=True)  # 重新宣告持久化的佇列

訊息持久化

讓訊息持久化,就在傳送訊息的地方增加持久化屬性:delivery_mode=2

channel.basic_publish(exchange="",
                      routing_key="hello",
                      body="hello world",
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 訊息的持久化
                      )
                      )

需要注意:訊息持久化的基礎是建立於佇列的持久化,如果佇列都不持久化,那即使訊息被設定成了持久化,rabbitMQ重啟之後,佇列肯定會沒有,那訊息也會丟失。

# 生產者程式碼

import pika

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.56.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()


channel.queue_declare(queue='hello', durable=True)

channel.basic_publish(exchange="",
                      routing_key="hello",
                      body="hello world",
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 訊息的持久化
                      )
                      )

print("send message!")

connection.close()
# 消費者程式碼

import pika
import time

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.56.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()

# 消費者再次宣告queue
channel.queue_declare(queue='hello', durable=True)


# 回撥函式
def callback(ch, method, properties, body):
    print(body)
    print(method.delivery_tag)
    # time.sleep(10)
    #  delivery_tag 是在 channel 中的一個訊息計數, 每次訊息提取行為都對應一個數字
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print("任務執行完成!")
    """
    消費者在接收到訊息之後,還可以拒絕訊息,我們只需要呼叫basic_reject就可以,如下:
    ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
    """
    # 如果不加的話,消費者會hang住,監聽到有值就呼叫callback,沒值就等待監聽訊息
    # channel.close()


# 從hello的queue中拿資料,然後呼叫回撥函式
channel.basic_consume(queue="hello",
                      on_message_callback=callback,
                      auto_ack=False
                      )


print("waiting for messages.....")

channel.start_consuming()

rabbitMQ的釋出和訂閱

上面的列子中,我們傳送訊息和接收訊息都是一對一,訊息是在一個queue中傳輸的,但有的時候,我們想達到讓一個訊息被所有的queue收到,類似廣播的效果,這時候就要用到exchange了。

廣播在一直播放,當開啟錄音機的時候可以收到,關掉錄音機的時候就收不到,是一種實時的獲取方式,且沒有所謂的歷史記錄。

rabbitMQ的釋出和訂閱,就類似廣播,當消費者沒有開始接收訊息的時候,rabbitMQ的釋出/訂閱模式不會保留訊息,等待消費者上線接收訊息。

Exchange在定義的時候是有型別的,以決定到底哪些queue符號條件,可以接收訊息

  • fanout:所有bind到此exchange的queue都可以接收訊息——廣播
  • direct:通過routingKey和exchange決定那一個組的queue可以接收訊息——組播
  • topic:所有符合routingKey(此時可以是一個表示式)的routingKey所bind的queue可以接收訊息——根據特徵自定義
表示式符號說明:#代表一個或多個字元,*代表任何字元

例:#.a會匹配a.a, aa.a, aaa.a等
*.a會匹配a.a, b.a, c.a等

注:使用RoutingKey為#, Exchange Type為topic的時候相當於使用fanout
  • headers:通過headers來決定把訊息發給哪些queue——通過訊息頭來定義,一般不用

廣播

假如我們linux上的日誌檔案需要傳送給所有的人,那麼就可以使用廣播

生產者需要宣告一個exchange和exchange的型別,並使用exchange傳送訊息

# 生產者程式碼

import pika

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.56.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()

# 宣告一個exchange
channel.exchange_declare(exchange='logs', exchange_type='fanout', durable=True)

# 傳送配置為exchange
channel.basic_publish(exchange="logs",
                      routing_key="",
                      body="exchange message",
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 訊息的持久化
                      )
                      )

print("send message!")

connection.close()

消費者同樣需要宣告一個exchange,並將queue繫結到exchange。

# 消費者程式碼

import pika

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.56.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout', durable=True)

# 消費者再次宣告queue
result = channel.queue_declare('', exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除

queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print("waiting for logs......")


def callback(ch, method, properties, body):
    print(body)


channel.basic_consume(queue=queue_name,
                      on_message_callback=callback,
                      auto_ack=True)

channel.start_consuming()

組播

假如日誌不僅要發給所有人,還需要分組,分為錯誤日誌,正常日誌,告警日誌等組,這裡就需要用到分組了。

生產者需要將exchange的型別改為direct

# 生產者程式碼

import pika
import sys

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.56.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()


channel.exchange_declare(exchange='direct_logs', exchange_type='direct', durable=True)

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'  # 嚴重程度,級別

message = ''.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(exchange="direct_logs",
                      routing_key=severity,
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 訊息的持久化
                      )
                      )

print("send message!")

connection.close()
# 消費者程式碼

import pika
import sys

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.56.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct', durable=True)

# 消費者再次宣告queue
result = channel.queue_declare('', exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除

queue_name = result.method.queue

severities = sys.argv[1:]

if not severities:
    sys.stderr.write('Usage: %s [info] [warning] [error]\n' % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

print("waiting for logs......")


def callback(ch, method, properties, body):
    print(method.routing_key)
    print(body)


channel.basic_consume(queue=queue_name,
                      on_message_callback=callback,
                      auto_ack=True)

channel.start_consuming()

自定義組播

topic,通過在routing_key中自定義匹配符,形成表示式,進行組播。
啟動命令:
python sender.py mysql.error err happend


# 生產者
import pika
import sys

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.56.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' # 嚴重程度,級別

message = ''.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(exchange="topic_logs",
  routing_key=routing_key,
  body=message)

print("send message!")

connection.close()

啟動命令:
python receive.py #:會接受所有的生產者傳送的資訊
python receive.py mysql.*:會匹配mysql.開頭的生產者傳送的資訊

# 消費者

import pika
import sys

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.56.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 消費者再次宣告queue
result = channel.queue_declare('', exclusive=True)  # 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除

queue_name = result.method.queue

binding_keys = sys.argv[1:]

if not binding_keys:
    sys.stderr.write('Usage: %s [binding_key]\n' % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print("waiting for logs......")

def callback(ch, method, properties, body):
    print(method.routing_key)
    print(body)

channel.basic_consume(queue=queue_name,
  on_message_callback=callback,
  auto_ack=True)

channel.start_consuming()

rabbitMQ的訊息RPC

上面的都是單向的,生產者向消費者傳送訊息,如果生產者和消費者互相通訊,就是RPC(remote producre call)遠端過程呼叫。

客戶端迴圈取值方案:

# server端程式碼


import pika
import subprocess

# 使用者名稱和密碼
credentials = pika.PlainCredentials('qpm', 'cljslrl0620')

# 連線到rabbitMQ伺服器,建立socket通道(virtual_host: 使用者繫結的vhost)
connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.56.162.92", credentials=credentials, virtual_host="qpm",
))

# 在socket通道之上建立了rabbit協議的通道
channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def cmd_exec(cmd):
    try:
        print(cmd)
        res = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        if res.returncode == 0:
            try:
                return res.stdout.decode('utf-8')
            except:
                return res.stdout.decode('gbk')
        try:
            return res.stderr.decode('utf-8')
        except:
            return res.stderr.decode('gbk')
    except Exception as e:
        return str(e)

def on_request(ch, method, props, body):
    cmd = body.decode('utf-8')

    response = cmd_exec(cmd)

    ch.basic_publish(exchange='',
  routing_key=props.reply_to,
  properties=pika.BasicProperties(correlation_id=props.correlation_id),
  body=response)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)  # 消費者一次只取一個任務,誰先完成誰繼續取,處理不完別來找我
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print("等待命令....")
channel.start_consuming()

# 客戶端


import pika
import uuid

class FibonacciRpcClient:
    def __init__(self):
        """
  初始化函式的時候就建立管道,接收服務端的任務結果
 """  credentials = pika.PlainCredentials('qpm', 'cljslrl0620')
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
    "123.56.162.92", credentials=credentials, virtual_host="qpm"))

        self.channel = self.connection.channel()

        # 建立隨機管道,用於告訴服務端,任務的結果放在這個隨機管道中
  result = self.channel.queue_declare('', exclusive=True)

        self.callback_queue = result.method.queue

        # 從隨機管道中取任務
  self.channel.basic_consume(
            queue=self.callback_queue,
  on_message_callback=self.on_response, # 回撥函式
  auto_ack=True,
  )

    # 收到任務結果的回撥函式
  def on_response(self, ch, method, props, body):
        # 如果客戶端的隨機字串和服務端傳送過來的隨機字串相等,就代表著該結果屬於該任務
  if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, cmd):
        """
  :param cmd:  :return:
 exchange: 交換器
 routing_key: 是管道的名字
 reply_to: 告訴服務端執行完命令把結果丟到哪個管道中
 """  self.response = None
  self.corr_id = str(uuid.uuid4())  # 唯一識別符號, 用於標識服務端的結果和客戶端命令之間的聯絡,防止服務端和客戶端命令和結果不對等
  self.channel.basic_publish(exchange="",
  routing_key='rpc_queue',
  properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
  correlation_id=self.corr_id,
  ),
  body=str(cmd))
        count = 0
  while self.response is None:
            self.connection.process_data_events(time_limit=3)  # 檢查佇列中有沒有新訊息,沒加time_limit代表不會阻塞,加了之後會進入阻塞態
  count += 1
  print(f"check {count}")

        return self.response  # type: bytes

fibonacci_rpc = FibonacciRpcClient()
print("等待接收結果.....")

response = fibonacci_rpc.call("dir")
print("返回的結果是:%r" % response.decode('utf-8'))