Python高階程式設計之訊息佇列(Queue)與程序池(Pool)例項詳解
阿新 • • 發佈:2020-01-09
本文例項講述了Python高階程式設計之訊息佇列(Queue)與程序池(Pool)。分享給大家供大家參考,具體如下:
Queue訊息佇列
1.建立
import multiprocessing queue = multiprocessing.Queue(佇列長度)
2.方法
方法 | 描述 |
---|---|
put | 變數名.put(資料),放入資料(如佇列已滿,則程式進入阻塞狀態,等待佇列取出後再放入) |
put_nowait | 變數名.put_nowati(資料),放入資料(如佇列已滿,則不等待佇列資訊取出後再放入,直接報錯) |
get | 變數名.get(資料),取出資料(如佇列為空,則程式進入阻塞狀態,等待佇列防如資料後再取出) |
get_nowait | 變數名.get_nowait(資料),取出資料(如佇列為空,則不等待佇列放入資訊後取出資料,直接報錯),放入資料後立馬判斷是否為空有時為True,原因是放入值和判斷同時進行 |
qsize | 變數名.qsize(),訊息數量 |
empty | 變數名.empty()(返回值為True或False),判斷是否為空 |
full | 變數名.full()(返回值為True或False),判斷是否為滿 |
3.程序通訊
因為程序間不共享全域性變數,所以使用Queue進行資料通訊,可以在父程序中建立兩個字程序,一個往Queue裡寫資料,一個從Queue裡取出資料。
例:
import multiprocessing import time def write_queue(queue): # 迴圈寫入資料 for i in range(10): if queue.full(): print("佇列已滿!") break # 向佇列中放入訊息 queue.put(i) print(i) time.sleep(0.5) def read_queue(queue): # 迴圈讀取佇列訊息 while True: # 佇列為空,停止讀取 if queue.empty(): print("---佇列已空---") break # 讀取訊息並輸出 result = queue.get() print(result) if __name__ == '__main__': # 建立訊息佇列 queue = multiprocessing.Queue(3) # 建立子程序 p1 = multiprocessing.Process(target=write_queue,args=(queue,)) p1.start() # 等待p1寫資料程序執行結束後,再往下執行 p1.join() p1 = multiprocessing.Process(target=read_queue,)) p1.start()
執行結果:
Pool程序池
初始化Pool時,可以指定一個最大程序數,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到指定的最大值,那麼該請求就會等待,直到池中有程序結束,才會用之前的程序來執行新的任務。
1.建立
import multiprocessing pool = multiprocessing.Pool(最大程序數)
2.方法
方法 | 描述 |
---|---|
apply() | 以同步方式新增程序 |
apply_async() | 以非同步方式新增程序 |
close() | 關閉Pool,使其不接受新任務(還可以使用) |
terminate() | 不管任務是否完成,立即終止 |
join() | 主程序阻塞,等待子程序的退出,必須在close和terminate後使用 |
3.程序池內通訊
建立程序池內Queue訊息佇列通訊
import multiprocessing Queue:queue = multiprocessing.Manager().Queue()
例:
import multiprocessing import time
寫入資料的方法
def write_data(queue): # for迴圈 向訊息佇列中寫入值 for i in range(5): # 新增訊息 queue.put(i) print(i) time.sleep(0.2) print("佇列已滿~")
建立讀取資料的方法
def read_data(queue): # 迴圈讀取資料 while True: # 判斷佇列是否為空 if queue.qsize() == 0: print("佇列為空~") break # 從佇列中讀取資料 result = queue.get() print(result) if __name__ == '__main__': # 建立程序池 pool = multiprocessing.Pool(2) # 建立程序池佇列 queue = multiprocessing.Manager().Queue() # 在程序池中的程序間進行通訊 # 使用執行緒池同步的方式,先寫後讀 # pool.apply(write_data,(queue,)) # pool.apply(read_data,)) # apply_async() 返回ApplyResult 物件 result = pool.apply_async(write_data,)) # ApplyResult物件的wait() 方法,表示後續程序必須等待當前程序執行完再繼續 result.wait() pool.apply_async(read_data,)) pool.close() # 非同步後,主執行緒不再等待子程序執行結束,再結束 # join() 後,表示主執行緒會等待子程序執行結束後,再結束 pool.join()
執行結果:
4.案例(資料夾copy器)
程式碼:
# 匯入模組 import os import multiprocessing # 拷貝檔案函式 def copy_dir(file_name,source_dir,desk_dir): # 要拷貝的檔案路徑 source_path = source_dir+'/'+file_name # 目標路徑 desk_path = desk_dir+'/'+file_name # 獲取檔案大小 file_size = os.path.getsize(source_path) # 記錄拷貝次數 i = 0 # 以二進位制度讀方式開啟原檔案 with open(source_path,"rb") as source_file: # 以二進位制寫入方式建立並開啟目標檔案 with open(desk_path,"wb") as desk_file: # 迴圈寫入 while True: # 讀取1024位元組 file_data = source_file.read(1024) # 如果讀到的不為空,則將讀到的寫入目標檔案 if file_data: desk_file.write(file_data) # 讀取次數+1 i += 1 # 拷貝百分比進度等於拷貝次數*1024*100/檔案大小 n = i*102400/file_size if n >= 100: n = 100 print(file_name,"拷貝進度%.2f%%" % n) else: print(file_name,"拷貝成功") break if __name__ == '__main__': # 要拷貝的資料夾 source_dir = 'test' # 要拷貝到的路徑 desk_dir = 'C:/Users/Administrator/Desktop/'+source_dir # 存在資料夾則不建立 try: os.mkdir(desk_dir) except: print("目標資料夾已存在,未建立") # 獲取資料夾內檔案目錄,存到列表裡 file_list = os.listdir(source_dir) print(file_list) # 建立程序池,最多同時執行3個子程序 pool = multiprocessing.Pool(3) for file_name in file_list: # 非同步方式新增到程序池內 pool.apply_async(copy_dir,args=(file_name,desk_dir)) # 關閉程序池(停止新增,已新增的還可執行) pool.close() # 讓主程序阻塞,等待子程序結束 pool.join()
執行結果:
更多關於Python相關內容感興趣的讀者可檢視本站專題:《Python程序與執行緒操作技巧總結》、《Python資料結構與演算法教程》、《Python函式使用技巧總結》、《Python字串操作技巧彙總》、《Python入門與進階經典教程》、《Python+MySQL資料庫程式設計入門教程》及《Python常見資料庫操作技巧彙總》
希望本文所述對大家Python程式設計有所幫助。