1. 程式人生 > 程式設計 >Python程序間通訊 multiProcessing Queue佇列實現詳解

Python程序間通訊 multiProcessing Queue佇列實現詳解

一、程序間通訊

IPC(Inter-Process Communication)

IPC機制:實現程序之間通訊

管道:pipe 基於共享的記憶體空間

佇列:pipe+鎖的概念--->queue

二、佇列(Queue)

2.1 概念-----multiProcess.Queue

建立共享的程序佇列,Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞。

Queue([maxsize])建立共享的程序佇列。

引數 :maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。

底層佇列使用管道和鎖定實現。

2.2 Queue方法使用

2.2.1 q.get的使用:

是從佇列裡面取值並且把佇列面的取出來的值刪掉,沒有引數的情況下就是是預設一直等著取值

就算是佇列裡面沒有可取的值的時候,程式也不會結束,就會卡在哪裡,一直等著

from multiprocessing import Queue
q = Queue() # 生成一個佇列物件
# put方法是往佇列裡面放值
q.put('Cecilia陳')
q.put('xuchen')
q.put('喜陳')

# get方法是從佇列裡面取值
print(q.get())
print(q.get())
print(q.get())

q.put(5)
q.put(6)
print(q.get())

Cecilia陳

xuchen

喜陳

5

2.2.2 Queue(引數) +引數的使用:

Queue加引數以後,引數是數值

引數實幾就表示例項化的這個Queue佇列可以放幾個值

當佇列已經滿的時候,再放值,程式會阻塞,但不會結束

from multiprocessing import Queue
q = Queue(3)
q.put('Cecilia陳')
q.put('xuchen')
q.put('喜陳')
print(q.full()) # 判斷佇列是否滿了 返回的是True/False
q.put(2) # 當佇列已經滿的時候,再放值,程式會阻塞,但不會結束

True 佇列已經滿了

2.2.3 q.put(引數1,引數2,引數3,引數4):

q.put(self,obj,block=True,timeout=None)

self :put就相當於是Queue裡的一個方法,這個時候q.put就相當於是佇列物件q來呼叫物件的繫結方法,這個引數可以省略即可

obj:是我們需要往佇列裡面放的值

block=True :佇列如果滿了的話,再往佇列裡放值的話會等待,程式不會結束

timeout=None:是再block這個引數的基礎上的,當block的值為真的時候,timeout是用來等待多少秒,如果再這個時間裡,佇列一直是滿的,那麼程式就會報錯並結束(Queue.Full異常)

from multiprocessing import Queue
q = Queue(3)
q.put('zhao',timeout=2)
q.put('zhao',timeout=5) # 此時程式將對等待5秒以後報錯了

2.2.4 q.get(引數1,引數2,引數3,引數4):

q.get(self,timeout=None)

self :get就相當於是Queue裡的一個方法,這個時候q.get就相當於是佇列物件q來呼叫物件的繫結方法,這個引數可以省略即可

block=True :從佇列q物件裡面取值,如果娶不到值的話,程式不會結束

timeout=None:是再block這個引數的基礎上的,當block的值為真的時候,timeout是用來等待多少秒,如果再這個時間裡,get取不到佇列裡面的值的話,那麼程式就會報錯並結束(queue.Empty異常)

from multiprocessing import Queue
q = Queue()
q.put('Cecilia陳')
print(q.get())
q.get(block=True,timeout=2) # 此時程式會等待2秒後,報錯了,佇列裡面沒有值了

2.2.5 block=False:

如果block的值是False的話,那麼put方法再佇列是滿的情況下,不會等待阻塞,程式直接報錯(Queue.Full異常)結束

如果block的值是False的話,那麼get方法再佇列裡面沒有值的情況下,再去取的時候,不會等待阻塞,程式直接報錯(queue.Empty異常)結束

1.put()的block=False

from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陳')
q.put('喜陳')
print(q.full())
q.put('xichen',block=False) # 佇列已經滿了,我不等待了,直接報錯

2.get()的block=Flase

from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陳')
q.put('喜陳')
print(q.get())
print(q.get())
print(q.get(block=False)) # 佇列已經沒有值了,我不等待了,直接報錯

2.2.6 put_nowait()/get_nowait()

1.put_nowait() 相當於bolok=False,佇列滿的時候,再放值的時候,程式不等待,不阻塞,直接報錯

from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陳')
q.put('喜陳')
print(q.full())

q.put_nowait('xichen') # 程式不等待,不阻塞,直接報錯

2.get_nowait() 相當於bolok=False,當佇列裡沒有值的時候,再取值的時候,程式不等待,不阻塞,程式直接報錯

from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陳')
q.put('喜陳')
print(q.get())
print(q.get())
print(q.full())
q.get_nowait()# 再取值的時候,程式不等待,不阻塞,程式直接報錯

三、程式碼例項

3.1 單看佇列的存取資料用法

這個例子還沒有加入程序通訊,只是先來看看佇列為我們提供的方法,以及這些方法的使用和現象。

'''
multiprocessing模組支援程序間通訊的兩種主要形式:管道和佇列
都是基於訊息傳遞實現的,但是佇列介面
'''

from multiprocessing import Queue
q=Queue(3)

#put,get,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)  # 如果佇列已經滿了,程式就會停在這裡,等待資料被別人取走,再將資料放入佇列。
      # 如果佇列中的資料一直不被取走,程式就會永遠停在這裡。
try:
  q.put_nowait(3) # 可以使用put_nowait,如果佇列滿了不會阻塞,但是會因為佇列滿了而報錯。
except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去,但是會丟掉這個訊息。
  print('佇列已經滿了')

# 因此,我們再放入資料之前,可以先看一下佇列的狀態,如果已經滿了,就不繼續put了。
print(q.full()) #滿了
print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一樣,如果佇列已經空了,那麼繼續取就會出現阻塞。
try:
  q.get_nowait(3) # 可以使用get_nowait,如果佇列滿了不會阻塞,但是會因為沒取到值而報錯。
except: # 因此我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去。
  print('佇列已經空了')

print(q.empty()) #空了

3.2 子程序向父程序傳送資料

這是一個queue的簡單應用,使用佇列q物件呼叫get函式來取得佇列中最先進入的資料。

from multiprocessing import Process,Queue
def f(q,name,age):
  q.put(name,age) #呼叫主函式中p程序傳遞過來的程序引數 put函式為向佇列中新增一條資料。
if __name__ == '__main__':
  q = Queue() #建立一個Queue物件
  p = Process(target=f,args=(q,'Cecilia陳',18)) #建立一個程序
  p.start()
  print(q.get())
  p.join()

['Cecilia陳',18]

四、生產者消費者模型

生產者: 生產資料的任務

消費者: 處理資料的任務

生產者--佇列(盆)-->消費者

生產者可以不停的生產,達到了自己最大的生產效率,消費者可以不停的消費,也達到了自己最大的消費效率.

生產者消費者模型大大提高了生產者生產的效率和消費者消費的效率.

補充: queue不適合傳大檔案,通產傳一些訊息.

在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題。該模式通過平衡生產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度。

4.1 為什麼要使用生產者和消費者模型

線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

4.2 什麼是生產者消費者模型

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。

4.3 基於Queue佇列實現的生產者消費者模型

from multiprocessing import Queue,Process
# 生產者
def producer(q,food):
  for i in range(3):
    print(f'{name}生產了{food}{i}')
    res = f'{food}{i}'
    q.put(res)
# 消費者
def consumer(q,name):
  while True:
    res = q.get(timeout=5)
    print(f'{name}吃了{res}')
if __name__ == '__main__':
  q = Queue() # 為的是讓生產者和消費者使用同一個佇列,使用同一個佇列進行通訊
  p1 = Process(target=producer,'巧克力'))
  c1 = Process(target=consumer,'Tom'))
  p1.start()
  c1.start()

此時的問題是主程序永遠不會結束,原因是:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死迴圈中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往佇列中再發一個結束訊號,這樣消費者在接收到結束訊號後就可以break出死迴圈。

4.4 改良版----生產者消費者模型

注意:結束訊號None,不一定要由生產者發,主程序裡同樣可以發,但主程序需要等生產者結束後才應該傳送該訊號

from multiprocessing import Queue,Process
def producer(q,food):
  for i in range(3):
    print(f'{name}生產了{food}{i}')
    res = f'{food}{i}'
    q.put(res)
  q.put(None) # 當生產者結束生產的的時候,我們再佇列的最後再做一個表示,告訴消費者,生產者已經不生產了,讓消費者不要再去佇列裡拿東西了
def consumer(q,name):
  while True:
    res = q.get(timeout=5)
    if res == None:break # 判斷佇列拿出的是不是生產者放的結束生產的標識,如果是則不取,直接退出,結束程式
    print(f'{name}吃了{res}')
if __name__ == '__main__':
  q = Queue() # 為的是讓生產者和消費者使用同一個佇列,使用同一個佇列進行通訊
  p1 = Process(target=producer,'Tom'))
  p1.start()
  c1.start()

4.5 主程序在生產者生產結束以後,傳送結束訊號

使用這個方法的話,是很low的,有幾個消費者就要在主程序中向佇列中put幾個結束訊號

from multiprocessing import Queue,Process
import time,random

def producer(q,food):
  for i in range(3):
    print(f'{name}生產了{food}{i}')
    time.sleep((random.randint(1,3)))
    res = f'{food}{i}'
    q.put(res)
  # q.put(None) # 當生產者結束生產的的時候,我們再佇列的最後再做一個表示,告訴消費者,生產者已經不生產了,讓消費者不要再去佇列裡拿東西了



def consumer(q,name):
  while True:
    res = q.get(timeout=5)
    if res == None:break # 判斷佇列拿出的是不是生產者放的結束生產的標識,如果是則不取,直接退出,結束程式
    time.sleep((random.randint(1,3)))
    print(f'{name}吃了{res}')

if __name__ == '__main__':
  q = Queue() # 為的是讓生產者和消費者使用同一個佇列,使用同一個佇列進行通訊
  # 多個生產者程序
  p1 = Process(target=producer,'巧克力'))
  p2 = Process(target=producer,'xichen','冰激凌'))
  p3 = Process(target=producer,'喜陳','可樂'))
  # 多個消費者程序
  c1 = Process(target=consumer,'Tom'))
  c2 = Process(target=consumer,'jack'))


  # 告訴作業系統啟動生產者程序
  p1.start()
  p2.start()
  p3.start()

  # 告訴作業系統啟動消費者程序
  c1.start()
  c2.start()

  p1.join()
  p2.join()
  p3.join()

  q.put(None) # 幾個消費者put幾次
  q.put(None)

五、JoinableQueue方法

建立可連線的共享程序佇列。這就像是一個Queue物件,但佇列允許專案的使用者通知生產者專案已經被成功處理。通知程序是使用共享的訊號和條件變數來實現的。

5.1 方法介紹

JoinableQueue的例項p除了與Queue物件相同的方法之外,還具有以下方法:

q.task_done():使用者使用此方法發出訊號,表示q.get()返回的專案已經被處理。如果呼叫此方法的次數大於從佇列中刪除的專案數量,將引發ValueError異常。

q.join():生產者將使用此方法進行阻塞,直到佇列中所有專案均被處理。阻塞將持續到為佇列中的每個專案均呼叫q.task_done()方法為止。

5.2 joinableQueue佇列實現生產者消費者模型

from multiprocessing import Queue,Process,JoinableQueue
import time,food):
  for i in range(3):
    print(f'{name}生產了{food}{i}')
    # time.sleep((random.randint(1,3)))
    res = f'{food}{i}'
    q.put(res)
  # q.put(None) # 當生產者結束生產的的時候,我們再佇列的最後再做一個表示,告訴消費者,生產者已經不生產了,讓消費者不要再去佇列裡拿東西了
  q.join()


def consumer(q,name):
  while True:
    res = q.get(timeout=5)
    # if res == None:break # 判斷佇列拿出的是不是生產者放的結束生產的標識,如果是則不取,直接退出,結束程式
    # time.sleep((random.randint(1,3)))
    print(f'{name}吃了{res}')
    q.task_done()#向q.join()傳送一次訊號,證明一個數據已經被取走了


if __name__ == '__main__':
  q = JoinableQueue() # 為的是讓生產者和消費者使用同一個佇列,使用同一個佇列進行通訊
  # 多個生產者程序
  p1 = Process(target=producer,'jack'))


  # 告訴作業系統啟動生產者程序
  p1.start()
  p2.start()
  p3.start()

  # 把生產者設為守護程序
  c1.daemon = True
  c2.daemon = True
  # 告訴作業系統啟動消費者程序
  c1.start()
  c2.start()

  p1.join()
  p2.join()
  p3.join() # 等待生產者生產完畢

  print('主程序')

  ### 分析
  # 生產者生產完畢--這是主程序最後一行程式碼結束--q.join()消費者已經取乾淨了,沒有存在的意義了
  # 這是主程序最後一行程式碼結束,消費者已經取乾淨了,沒有存在的意義了.守護程序的概念.

5.3 測試joinableQueue

from multiprocessing import Process,Queue,JoinableQueue
q = JoinableQueue()
q.put('zhao') # 放佇列裡一個任務
q.put('qian')
print(q.get())
q.task_done() # 完成了一次任務
print(q.get())
q.task_done() # 完成了一次任務
q.join() #計數器不為0的時候 阻塞等待計數器為0後通過

# 想象成一個計數器 :put +1  task_done -1

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。