1. 程式人生 > >Python Day34 python並發編程之多進程

Python Day34 python並發編程之多進程

避免 空間 pan threading div rand -s 註意 兩個

一 multiprocessing模塊介紹:

python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python提供了multiprocessing。
multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。

  multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

需要再次強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內

二 Process類的介紹:

Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化得到的對象,表示一個子進程中的任務(尚未啟動)

強調:
1. 需要使用關鍵字的方式來指定參數
2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號
~~~~參數介紹:
    1.group參數未使用,值始終為None
    2.target表示調用對象,即子進程要執行的任務
    3.args表示調用對象的位置參數元祖,args=(1,2,‘egon’,‘age’:18)
    4.kwargs表示調用對象的字典,kwargs={‘name’:‘egon’,‘age’:18}
    5.name為子進程的名稱
~~~~方法介紹:
    1.p.start():
啟動進程,並調用該子進程中的p.run()
    2.p.run():進程啟動時運行的方法,正是他去調用target指定的函數,一定要實現該方法我們自定義類的類中一定要實現該方法
    3.p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍程序,使用該方法需要特別小心這種情況。如果p還保存了一個鎖n那麽也將不會被釋放,進而導致死鎖
    4.p.is_alive():如果p仍然運行,返回Ture
    5.p.join([timeout]):主線程等待p終止。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能jion住run開啟的進程

~~~~屬性介紹:

      1.p.daemon:默認值為False,如果設為True,代表p為後臺運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True後,p不能創建自己的新進程,必須在p.start()之前設置

      2.p.name:進程的名稱

      3.p.pid:進程的pid

      4.p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)

      5.p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

三 Process類的使用:

註意:在windows中Process()必須放到# if __name__ == ‘__main__‘:下

由於Windows沒有fork,多處理模塊啟動一個新的Python進程並導入調用模塊。 
如果在導入時調用Process(),那麽這將啟動無限繼承的新進程(或直到機器耗盡資源)。 
這是隱藏對Process()內部調用的原,使用if __name__ == “__main __”,這個if語句中的語句將不會在導入時被調用。
1.創建並開啟子進程的兩種方式
#開進程的方法一:
import time
import random
from multiprocessing import Process
def piao(name):
    print(%s piaoing %name)
    time.sleep(random.randrange(1,5))
    print(%s piao end %name)



p1=Process(target=piao,args=(egon,)) #必須加,號
p2=Process(target=piao,args=(alex,))
p3=Process(target=piao,args=(wupeqi,))
p4=Process(target=piao,args=(yuanhao,))

p1.start()
p2.start()
p3.start()
p4.start()
print(主線程)

方法一
#開進程的方法二:
import time
import random
from multiprocessing import Process


class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print(%s piaoing %self.name)

        time.sleep(random.randrange(1,5))
        print(%s piao end %self.name)

p1=Piao(egon)
p2=Piao(alex)
p3=Piao(wupeiqi)
p4=Piao(yuanhao)

p1.start() #start會自動調用run
p2.start()
p3.start()
p4.start()
print(主線程)

方法二

2.Process對象的join方法:

from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print(%s is piaoing %self.name)
        time.sleep(random.randrange(1,3))
        print(%s is piao end %self.name)


p=Piao(egon)
p.start()
p.join(0.0001) #等待p停止,等0.0001秒就不再等了
print(開始)

join:主進程等,等待子進程結束
from multiprocessing import Process
import time
import random
def piao(name):
    print(%s is piaoing %name)
    time.sleep(random.randint(1,3))
    print(%s is piao end %name)

p1=Process(target=piao,args=(egon,))
p2=Process(target=piao,args=(alex,))
p3=Process(target=piao,args=(yuanhao,))
p4=Process(target=piao,args=(wupeiqi,))

p1.start()
p2.start()
p3.start()
p4.start()

#有的同學會有疑問:既然join是等待進程結束,那麽我像下面這樣寫,進程不就又變成串行的了嗎?
#當然不是了,必須明確:p.join()是讓誰等?
#很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p,

#詳細解析如下:
#進程只要start就會在開始運行了,所以p1-p4.start()時,系統中已經有四個並發的進程了
#而我們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵
#join是讓主線程等,而p1-p4仍然是並發執行的,p1.join的時候,其余p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接通過檢測,無需等待
# 所以4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間
p1.join()
p2.join()
p3.join()
p4.join()

print(主線程)


#上述啟動進程與join進程可以簡寫為
# p_l=[p1,p2,p3,p4]
# 
# for p in p_l:
#     p.start()
# 
# for p in p_l:
#     p.join()

有了join,程序不就是串行了嗎???

四 守護進程:

主進程創建守護進程

  其一:守護進程會在主進程代碼執行結束後就終止

  其二:守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children

註意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print(%s is piaoing %self.name)
        time.sleep(random.randrange(1,3))
        print(%s is piao end %self.name)


p=Piao(egon)
p.daemon=True #一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行
p.start()
print()

五 進程同步(鎖,也叫互斥鎖)

進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,

競爭帶來的結果就是錯亂,如何控制,就是加鎖處理

part1:多個進程共享同一打印終端

#並發運行,效率高,但競爭同一打印終端,帶來了打印錯亂
from multiprocessing import Process
import os,time
def work():
    print(%s is running %os.getpid())
    time.sleep(2)
    print(%s is done %os.getpid())

if __name__ == __main__:
    for i in range(3):
        p=Process(target=work)
        p.start()

並發運行,效率高,但競爭同一打印終端,帶來了打印錯亂
#由並發變成了串行,犧牲了運行效率,但避免了競爭
from multiprocessing import Process,Lock
import os,time
def work(lock):
    lock.acquire()
    print(%s is running %os.getpid())
    time.sleep(2)
    print(%s is done %os.getpid())
    lock.release()
if __name__ == __main__:
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,))
        p.start()

加鎖:由並發變成了串行,犧牲了運行效率,但避免了競爭

part2:多個進程共享同一文件

文件當數據庫,模擬搶票

#文件db的內容為:{"count":1}
#註意一定要用雙引號,不然json無法識別
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open(db.txt))
    print(\033[43m剩余票數%s\033[0m %dic[count])

def get():
    dic=json.load(open(db.txt))
    time.sleep(0.1) #模擬讀數據的網絡延遲
    if dic[count] >0:
        dic[count]-=1
        time.sleep(0.2) #模擬寫數據的網絡延遲
        json.dump(dic,open(db.txt,w))
        print(\033[43m購票成功\033[0m)

def task(lock):
    search()
    get()
if __name__ == __main__:
    lock=Lock()
    for i in range(100): #模擬並發100個客戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()

並發運行,效率高,但競爭寫同一文件,數據寫入錯亂
#文件db的內容為:{"count":1}
#註意一定要用雙引號,不然json無法識別
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open(db.txt))
    print(\033[43m剩余票數%s\033[0m %dic[count])

def get():
    dic=json.load(open(db.txt))
    time.sleep(0.1) #模擬讀數據的網絡延遲
    if dic[count] >0:
        dic[count]-=1
        time.sleep(0.2) #模擬寫數據的網絡延遲
        json.dump(dic,open(db.txt,w))
        print(\033[43m購票成功\033[0m)

def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()
if __name__ == __main__:
    lock=Lock()
    for i in range(100): #模擬並發100個客戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()

加鎖:購票行為由並發變成了串行,犧牲了運行效率,但保證了數據安全

 

總結:

加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.需要自己加鎖處理

因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。

隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,
我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。

六 隊列:

進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

 創建隊列的類(底層就是以管道和鎖定的方式實現)

Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。 
參數介紹:
maxsize是隊列中允許最大項數,省略則無大小限制。
方法介紹:
    

  q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那麽在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.

q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)

q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。
q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。
q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣

~~~ 應用:

‘‘‘
multiprocessing模塊支持進程間通信的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,但是隊列接口
‘‘‘

from multiprocessing import Process,Queue
import time
q=Queue(3)


#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
print(q.full()) #滿了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了






Python Day34 python並發編程之多進程