Python Day34 python並發編程之多進程
一 multiprocessing模塊介紹:
python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。
multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。
multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。
需要再次強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內
二 Process類的介紹:
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動) 強調: 1. 需要使用關鍵字的方式來指定參數 2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號
~~~~參數介紹:
1.group參數未使用,值始終為None
2.target表示調用對象,即子進程要執行的任務
3.args表示調用對象的位置參數元祖,args=(1,2,‘egon’,‘age’:18)
4.kwargs表示調用對象的字典,kwargs={‘name’:‘egon’,‘age’:18}5.name為子進程的名稱
~~~~方法介紹:
1.p.start():啟動進程,並調用該子進程中的p.run()
2.p.run():進程啟動時運行的方法,正是他去調用target指定的函數,一定要實現該方法我們自定義類的類中一定要實現該方法
3.p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍程序,使用該方法需要特別小心這種情況。如果p還保存了一個鎖n那麽也將不會被釋放,進而導致死鎖
4.p.is_alive():如果p仍然運行,返回Ture
5.p.join([timeout]):主線程等待p終止。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能jion住run開啟的進程
~~~~屬性介紹:
1.p.daemon:默認值為False,如果設為True,代表p為後臺運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True後,p不能創建自己的新進程,必須在p.start()之前設置
2.p.name:進程的名稱
3.p.pid:進程的pid
4.p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)
5.p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
三 Process類的使用:
註意:在windows中Process()必須放到# if __name__ == ‘__main__‘:下
由於Windows沒有fork,多處理模塊啟動一個新的Python進程並導入調用模塊。 如果在導入時調用Process(),那麽這將啟動無限繼承的新進程(或直到機器耗盡資源)。 這是隱藏對Process()內部調用的原,使用if __name__ == “__main __”,這個if語句中的語句將不會在導入時被調用。1.創建並開啟子進程的兩種方式
#開進程的方法一: import time import random from multiprocessing import Process def piao(name): print(‘%s piaoing‘ %name) time.sleep(random.randrange(1,5)) print(‘%s piao end‘ %name) p1=Process(target=piao,args=(‘egon‘,)) #必須加,號 p2=Process(target=piao,args=(‘alex‘,)) p3=Process(target=piao,args=(‘wupeqi‘,)) p4=Process(target=piao,args=(‘yuanhao‘,)) p1.start() p2.start() p3.start() p4.start() print(‘主線程‘) 方法一
#開進程的方法二: import time import random from multiprocessing import Process class Piao(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print(‘%s piaoing‘ %self.name) time.sleep(random.randrange(1,5)) print(‘%s piao end‘ %self.name) p1=Piao(‘egon‘) p2=Piao(‘alex‘) p3=Piao(‘wupeiqi‘) p4=Piao(‘yuanhao‘) p1.start() #start會自動調用run p2.start() p3.start() p4.start() print(‘主線程‘) 方法二
2.Process對象的join方法:
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print(‘%s is piaoing‘ %self.name) time.sleep(random.randrange(1,3)) print(‘%s is piao end‘ %self.name) p=Piao(‘egon‘) p.start() p.join(0.0001) #等待p停止,等0.0001秒就不再等了 print(‘開始‘) join:主進程等,等待子進程結束
from multiprocessing import Process import time import random def piao(name): print(‘%s is piaoing‘ %name) time.sleep(random.randint(1,3)) print(‘%s is piao end‘ %name) p1=Process(target=piao,args=(‘egon‘,)) p2=Process(target=piao,args=(‘alex‘,)) p3=Process(target=piao,args=(‘yuanhao‘,)) p4=Process(target=piao,args=(‘wupeiqi‘,)) p1.start() p2.start() p3.start() p4.start() #有的同學會有疑問:既然join是等待進程結束,那麽我像下面這樣寫,進程不就又變成串行的了嗎? #當然不是了,必須明確:p.join()是讓誰等? #很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p, #詳細解析如下: #進程只要start就會在開始運行了,所以p1-p4.start()時,系統中已經有四個並發的進程了 #而我們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵 #join是讓主線程等,而p1-p4仍然是並發執行的,p1.join的時候,其余p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接通過檢測,無需等待 # 所以4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間 p1.join() p2.join() p3.join() p4.join() print(‘主線程‘) #上述啟動進程與join進程可以簡寫為 # p_l=[p1,p2,p3,p4] # # for p in p_l: # p.start() # # for p in p_l: # p.join() 有了join,程序不就是串行了嗎???
四 守護進程:
主進程創建守護進程
其一:守護進程會在主進程代碼執行結束後就終止
其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
註意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print(‘%s is piaoing‘ %self.name) time.sleep(random.randrange(1,3)) print(‘%s is piao end‘ %self.name) p=Piao(‘egon‘) p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行 p.start() print(‘主‘)
五 進程同步(鎖,也叫互斥鎖)
進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,
競爭帶來的結果就是錯亂,如何控制,就是加鎖處理
part1:多個進程共享同一打印終端
#並發運行,效率高,但競爭同一打印終端,帶來了打印錯亂 from multiprocessing import Process import os,time def work(): print(‘%s is running‘ %os.getpid()) time.sleep(2) print(‘%s is done‘ %os.getpid()) if __name__ == ‘__main__‘: for i in range(3): p=Process(target=work) p.start() 並發運行,效率高,但競爭同一打印終端,帶來了打印錯亂
#由並發變成了串行,犧牲了運行效率,但避免了競爭 from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() print(‘%s is running‘ %os.getpid()) time.sleep(2) print(‘%s is done‘ %os.getpid()) lock.release() if __name__ == ‘__main__‘: lock=Lock() for i in range(3): p=Process(target=work,args=(lock,)) p.start() 加鎖:由並發變成了串行,犧牲了運行效率,但避免了競爭
part2:多個進程共享同一文件
文件當數據庫,模擬搶票
#文件db的內容為:{"count":1} #註意一定要用雙引號,不然json無法識別 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open(‘db.txt‘)) print(‘\033[43m剩余票數%s\033[0m‘ %dic[‘count‘]) def get(): dic=json.load(open(‘db.txt‘)) time.sleep(0.1) #模擬讀數據的網絡延遲 if dic[‘count‘] >0: dic[‘count‘]-=1 time.sleep(0.2) #模擬寫數據的網絡延遲 json.dump(dic,open(‘db.txt‘,‘w‘)) print(‘\033[43m購票成功\033[0m‘) def task(lock): search() get() if __name__ == ‘__main__‘: lock=Lock() for i in range(100): #模擬並發100個客戶端搶票 p=Process(target=task,args=(lock,)) p.start() 並發運行,效率高,但競爭寫同一文件,數據寫入錯亂
#文件db的內容為:{"count":1} #註意一定要用雙引號,不然json無法識別 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open(‘db.txt‘)) print(‘\033[43m剩余票數%s\033[0m‘ %dic[‘count‘]) def get(): dic=json.load(open(‘db.txt‘)) time.sleep(0.1) #模擬讀數據的網絡延遲 if dic[‘count‘] >0: dic[‘count‘]-=1 time.sleep(0.2) #模擬寫數據的網絡延遲 json.dump(dic,open(‘db.txt‘,‘w‘)) print(‘\033[43m購票成功\033[0m‘) def task(lock): search() lock.acquire() get() lock.release() if __name__ == ‘__main__‘: lock=Lock() for i in range(100): #模擬並發100個客戶端搶票 p=Process(target=task,args=(lock,)) p.start() 加鎖:購票行為由並發變成了串行,犧牲了運行效率,但保證了數據安全
總結:
加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.需要自己加鎖處理
因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。
隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,
我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。
六 隊列:
進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
創建隊列的類(底層就是以管道和鎖定的方式實現):
Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。
參數介紹:
maxsize是隊列中允許最大項數,省略則無大小限制。
方法介紹:
q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那麽在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。
q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。
q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
~~~ 應用:
‘‘‘ multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列 都是基於消息傳遞實現的,但是隊列接口 ‘‘‘ from multiprocessing import Process,Queue import time q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) print(q.full()) #滿了 print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #空了
Python Day34 python並發編程之多進程