守護程序,互斥鎖,IPC,佇列,生產者與消費者模型
一.守護程序
守護程序表示一個程序b 守護另一個程序a 當被守護的程序結束後,那麼守護程序b也跟著結束了
應用場景:之所以開子程序,是為了幫助主程序完成某個任務,然而,如果主程序認為自己的事情一旦做完了就沒有必要使用子程序了,就可以將子程序設定為守護程序
例如:在執行qq的過程,開啟一個程序,用於下載檔案,然而檔案還沒有下載完畢,qq就退出了,下載任務也應該跟隨qq的退出而結束.
from multiprocessing import Process
import time
def task():
print('妃子的一生')
time.sleep(3)
print('妃子88了')
if __name__ == '__main__':
fz = Process(target=task)
fz.daemon = True # bool型別 將子程序作為主程序的守護程序,注意:必須在開啟子程序前設定!
fz.start()
print('皇帝登基!')
print('當了三年皇帝!')
time.sleep(1)
print('皇帝駕崩了!')
當多個程序共享一個數據時,可能會造成資料錯亂
1.使用join來讓這些程序序列,但是這樣就造成了無法併發,並且程序執行的順序就固定了
2.使用鎖 將需要共享的資料增加鎖 其他的程序在訪問資料時,就必須等待當前程序使用完畢
不使用鎖:
多個任務在共享一個數據時
序列效率低 但是不會出問題
併發效率高 但是資料可能錯亂
使用鎖:
from multiprocessing import Process,Lock
import time,random
def task1(lock):
lock.acquire() #是一個阻塞函式,會等到別的子程序釋放鎖才能繼續執行
print('name1:henry')
time.sleep(random.randint(1,2))
print('age1:29')
time.sleep(random.randint(1, 2))
print('sex1:man')
lock.release()
def task2(lock):
lock.acquire()
print('name2:wendy')
time.sleep(random.randint(1, 2))
print('age2:24')
time.sleep(random.randint(1, 2))
print('sex2:woman')
lock.release()
def task3(lock):
lock.acquire()
print('name3:irene')
time.sleep(random.randint(1, 2))
print('age3:27')
time.sleep(random.randint(1, 2))
print('sex3:woman')
lock.release()
if __name__ == '__main__':
lock = Lock()
p1 = Process(target=task1,args=(lock,))
p1.start()
p2 = Process(target=task2,args=(lock,))
p2.start()
p3 = Process(target=task3,args=(lock,))
p3.start()
1.join中順序是固定的,不公平
2.join是完全的序列,而鎖可以使部分程式碼序列 其他程式碼還是併發
互斥鎖的使用場景_搶票
from multiprocessing import Process,Lock
import json,time,random
def check_ticket(user):
time.sleep(random.randint(1,3))
with open('ticket.json',mode='rt',encoding='utf-8') as f:
dic = json.load(f)
print('使用者%s,您好!還剩%s張票!'%(user,dic['count']))
def buy_ticket(user):
with open('ticket.json',mode='rt',encoding='utf-8') as f:
dic = json.load(f)
if dic['count'] > 0:
time.sleep(random.randint(1,3))
dic['count'] -= 1
with open('ticket.json',mode='wt',encoding='utf-8') as f2:
json.dump(dic,f2)
print('使用者%s購票成功!'%user)
def task(user,lock):
check_ticket(user)
lock.acquire()
buy_ticket(user)
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(1,11):
p = Process(target=task,args=(i,lock))
p.start()
注意 在使用鎖的時候必須保證是同一個鎖
鎖的實現原理 虛擬碼
l = False
def task(lock):
global l
if l == False:
l = True
print('你好!')
l = False
Lock和Rlock的區別:
from multiprocessing import Lock
lock = Lock()
lock.acquire()
lock.acquire()
print('我出來了!')
lock.release()
#這個程式會卡死
from multiprocessing import RLock
lock = RLock()
lock.acquire()
lock.acquire()
print('我出來了!')
lock.release()
#這個程式會打印出結果
Rlock表示可重複鎖,特點是可以多次執行acquire
Rlock在執行多次acquire時和普通Lock沒有任何區別
而Lock如果多次執行acquire就會被鎖死
還要注意的一點是Rlock在一個子程序中執行了幾次acquire就要執行幾次release,這樣才會執行下一個子程序
from multiprocessing import Process,RLock
import time
def task(i,lock):
lock.acquire()
lock.acquire()
print(i)
time.sleep(3)
lock.release()
lock.release()
if __name__ == '__main__':
lock = RLock()
p1 = Process(target=task,args=(1,lock))
p1.start()
p2 = Process(target=task,args=(2,lock))
p2.start()
指的是鎖無法開啟導致程式卡死 首先要明確的是一把鎖是無法鎖死的,正常開發時一把鎖足夠使用,不需要開多把鎖
from multiprocessing import Process,Lock
import time
def task1(l1,l2,i):
l1.acquire()
print('%s搶走了碗!'%i)
time.sleep(1)
l2.acquire()
print('%s搶走了筷子!'%i)
print('%s開始吃飯!'%i)
l1.release()
l2.release()
def task2(l1,l2,i):
l2.acquire()
print('%s搶走了筷子!'%i)
l1.acquire()
print('%s搶走了碗!'%i)
print('%s開始吃飯!'%i)
l2.release()
l1.releaase()
if __name__ == '__main__':
l1 = Lock()
l2 = Lock()
p1 = Process(target=task1,args=(l1,l2,'henry'))
p1.start()
p2 = Process(target=task2,args=(l1,l2,'seulgi'))
p2.start()
由於程序之間是相互獨立的,所以需要設立方案使得各個程序之間可以相互傳遞資料
1.使用共享檔案,多個程序同時讀寫一個檔案
IO速度慢,傳輸資料大小不受限制
2.管道 是基於記憶體的,速度快,但是是單向的,用起來麻煩
3.申請共享記憶體空間,多個程序可以共享這個記憶體區域(重點)
速度快,但資料量不能太大
from multiprocessing import Manager,Process
def work(d):
d['count'] -= 1
if __name__ == '__main__':
with Manager() as m:
dic = m.dict({'count':100}) #建立一個共享的字典
p_l = []
for i in range(100):
p = Process(target=work,args=(dic,))
p_l.append(p)
p.start()
for P in p_l:
p.join()
print(dic)
佇列不只用於程序間通訊,也是一種常見的資料容器
特點:先進先出
優點:即使在多程序下,也可以保證資料不會錯亂,因為put和get預設都是阻塞的
堆疊:先進後出
from multiprocessing import Queue
q = Queue(1)#建立一個對列,最多可以存一個數據
q.put('henry')
q.put('wendy') #當容器中已經填滿的時候,put預設會阻塞
print('over')
from multiprocessing import Queue
q = Queue(1)#建立一個對列,最多可以存一個數據
q.put('henry')
print(q.get())
print(q.get())#當容器中已經沒有資料時,get預設會阻塞
print('over')
from multiprocessing import Queue
q = Queue(1)#建立一個對列,最多可以存一個數據
q.put('henry')
q.put('wendy',False) #第二個引數設定為False表示不會阻塞,無論容器滿了沒滿都會強行往裡面塞,如果滿了丟擲異常
from multiprocessing import Queue
q = Queue(1)#建立一個對列,最多可以存一個數據
q.put('henry')
print(q.get())
print(q.get(timeout=3))#timeout 僅用於阻塞時
什麼是生產者消費者模型?
生產者產生資料的一方
消費者處理資料的一方
例如需要做一個爬蟲?
1.爬取資料
2.解析資料
爬取和解析都是耗時的操作,如果正常按照順序來編寫程式碼,會造成解析需要等待爬取,爬取也需要等待解析,這樣的話效率就會很低
要提高效率,就是讓生產者和消費者解開耦合,自己幹自己的
如何實現:
1.將兩個任務分配給不同的程序
2.提供一個程序共享的資料容器
from multiprocessing import Process,Queue
import time,random
# 爬資料
def get_data(q):
for num in range(1,6):
print('正在爬取第%s個數據...'%num)
time.sleep(random.randint(1,2))
print('第%s個數據爬取完成!'%num)
#把資料裝到佇列中
q.put('第%s個數據'%num)
#解析資料
def parse_data(q):
for num in range(1,6):
#取出資料
data = q.get()
print('正在解析%s...'%data)
time.sleep(random.randint(1,2))
print('%s解析完成!'%data)
if __name__ == '__main__':
#共享資料容器
q = Queue()
#生產者程序
produce = Process(target=get_data,args=(q,))
produce.start()
#消費者程序
customer = Process(target=parse_data,args=(q,))
customer.start()