1. 程式人生 > >守護程序,互斥鎖,IPC,佇列,生產者與消費者模型

守護程序,互斥鎖,IPC,佇列,生產者與消費者模型

小知識點:在子程序中不能使用input輸入!

一.守護程序

守護程序表示一個程序b 守護另一個程序a 當被守護的程序結束後,那麼守護程序b也跟著結束了

應用場景:之所以開子程序,是為了幫助主程序完成某個任務,然而,如果主程序認為自己的事情一旦做完了就沒有必要使用子程序了,就可以將子程序設定為守護程序

例如:在執行qq的過程,開啟一個程序,用於下載檔案,然而檔案還沒有下載完畢,qq就退出了,下載任務也應該跟隨qq的退出而結束.

from multiprocessing import Process
import time
def task():
  print('妃子的一生')
  time.sleep(3)
  print('妃子88了')
if __name__ == '__main__':
  fz = Process(target=task)
  fz.daemon = True # bool型別 將子程序作為主程序的守護程序,注意:必須在開啟子程序前設定!
  fz.start()
  print('皇帝登基!')
  print('當了三年皇帝!')
  time.sleep(1)
  print('皇帝駕崩了!')

 

二.互斥鎖

當多個程序共享一個數據時,可能會造成資料錯亂

1.使用join來讓這些程序序列,但是這樣就造成了無法併發,並且程序執行的順序就固定了

2.使用鎖 將需要共享的資料增加鎖 其他的程序在訪問資料時,就必須等待當前程序使用完畢

不使用鎖:
多個任務在共享一個數據時
序列效率低 但是不會出問題
併發效率高 但是資料可能錯亂
使用鎖:
from multiprocessing import Process,Lock
import time,random
def task1(lock):
  lock.acquire() #是一個阻塞函式,會等到別的子程序釋放鎖才能繼續執行
  print('name1:henry')
  time.sleep(random.randint(1,2))
  print('age1:29')
  time.sleep(random.randint(1, 2))
  print('sex1:man')
  lock.release()
def task2(lock):
  lock.acquire()
  print('name2:wendy')
  time.sleep(random.randint(1, 2))
  print('age2:24')
  time.sleep(random.randint(1, 2))
  print('sex2:woman')
  lock.release()
def task3(lock):
  lock.acquire()
  print('name3:irene')
  time.sleep(random.randint(1, 2))
  print('age3:27')
  time.sleep(random.randint(1, 2))
  print('sex3:woman')
  lock.release()
if __name__ == '__main__':
  lock = Lock()
  p1 = Process(target=task1,args=(lock,))
  p1.start()
  p2 = Process(target=task2,args=(lock,))
  p2.start()
  p3 = Process(target=task3,args=(lock,))
  p3.start()

 

join 和 鎖的區別

1.join中順序是固定的,不公平

2.join是完全的序列,而鎖可以使部分程式碼序列 其他程式碼還是併發

互斥鎖的使用場景_搶票
from multiprocessing import Process,Lock
import json,time,random
def check_ticket(user):
  time.sleep(random.randint(1,3))
  with open('ticket.json',mode='rt',encoding='utf-8') as f:
    dic = json.load(f)
    print('使用者%s,您好!還剩%s張票!'%(user,dic['count']))
def buy_ticket(user):
  with open('ticket.json',mode='rt',encoding='utf-8') as f:
    dic = json.load(f)
    if dic['count'] > 0:
      time.sleep(random.randint(1,3))
      dic['count'] -= 1
      with open('ticket.json',mode='wt',encoding='utf-8') as f2:
        json.dump(dic,f2)
        print('使用者%s購票成功!'%user)
def task(user,lock):
  check_ticket(user)
  lock.acquire()
  buy_ticket(user)
  lock.release()

if __name__ == '__main__':
  lock = Lock()
  for i in range(1,11):
    p = Process(target=task,args=(i,lock))
    p.start()

鎖的本質是一個bool型別的資料,在執行程式碼前會先判斷這個值

注意 在使用鎖的時候必須保證是同一個鎖

鎖的實現原理 虛擬碼
l = False
def task(lock):
  global l
  if l == False:
    l = True
    print('你好!')
  l = False

 

Lock和Rlock的區別:

from multiprocessing import Lock
lock = Lock()
lock.acquire()
lock.acquire()
print('我出來了!')
lock.release()
#這個程式會卡死

from multiprocessing import RLock
lock = RLock()
lock.acquire()
lock.acquire()
print('我出來了!')
lock.release()
#這個程式會打印出結果

Rlock表示可重複鎖,特點是可以多次執行acquire
Rlock在執行多次acquire時和普通Lock沒有任何區別
而Lock如果多次執行acquire就會被鎖死
還要注意的一點是Rlock在一個子程序中執行了幾次acquire就要執行幾次release,這樣才會執行下一個子程序

from multiprocessing import Process,RLock
import time
def task(i,lock):
  lock.acquire()
  lock.acquire()
  print(i)
  time.sleep(3)
  lock.release()
  lock.release()
if __name__ == '__main__':
  lock = RLock()
  p1 = Process(target=task,args=(1,lock))
  p1.start()
  p2 = Process(target=task,args=(2,lock))
  p2.start()

死鎖:

指的是鎖無法開啟導致程式卡死 首先要明確的是一把鎖是無法鎖死的,正常開發時一把鎖足夠使用,不需要開多把鎖

from multiprocessing import Process,Lock
import time
def task1(l1,l2,i):
  l1.acquire()
  print('%s搶走了碗!'%i)
  time.sleep(1)
  l2.acquire()
  print('%s搶走了筷子!'%i)
  print('%s開始吃飯!'%i)
  l1.release()
  l2.release()
def task2(l1,l2,i):
l2.acquire()
  print('%s搶走了筷子!'%i)
  l1.acquire()
  print('%s搶走了碗!'%i)
  print('%s開始吃飯!'%i)
  l2.release()
  l1.releaase()
if __name__ == '__main__':
  l1 = Lock()
  l2 = Lock()
  p1 = Process(target=task1,args=(l1,l2,'henry'))
  p1.start()
  p2 = Process(target=task2,args=(l1,l2,'seulgi'))
  p2.start()

 

三.IPC

由於程序之間是相互獨立的,所以需要設立方案使得各個程序之間可以相互傳遞資料

1.使用共享檔案,多個程序同時讀寫一個檔案

IO速度慢,傳輸資料大小不受限制

2.管道 是基於記憶體的,速度快,但是是單向的,用起來麻煩

3.申請共享記憶體空間,多個程序可以共享這個記憶體區域(重點)

速度快,但資料量不能太大

from multiprocessing import Manager,Process
def work(d):
  d['count'] -= 1
if __name__ == '__main__':
  with Manager() as m:
    dic = m.dict({'count':100}) #建立一個共享的字典
    p_l = []
    for i in range(100):
    p = Process(target=work,args=(dic,))
    p_l.append(p)
    p.start()

  for P in p_l:
    p.join()
  print(dic)

 

四.佇列

佇列不只用於程序間通訊,也是一種常見的資料容器

特點:先進先出

優點:即使在多程序下,也可以保證資料不會錯亂,因為put和get預設都是阻塞的

堆疊:先進後出

from multiprocessing import Queue
q = Queue(1)#建立一個對列,最多可以存一個數據
q.put('henry')
q.put('wendy') #當容器中已經填滿的時候,put預設會阻塞
print('over')

 

from multiprocessing import Queue
q = Queue(1)#建立一個對列,最多可以存一個數據
q.put('henry')
print(q.get())
print(q.get())#當容器中已經沒有資料時,get預設會阻塞
print('over')

 

from multiprocessing import Queue
q = Queue(1)#建立一個對列,最多可以存一個數據
q.put('henry')
q.put('wendy',False) #第二個引數設定為False表示不會阻塞,無論容器滿了沒滿都會強行往裡面塞,如果滿了丟擲異常

 

from multiprocessing import Queue
q = Queue(1)#建立一個對列,最多可以存一個數據
q.put('henry')
print(q.get())
print(q.get(timeout=3))#timeout 僅用於阻塞時

 

五.生產者與消費者模型

什麼是生產者消費者模型?

生產者產生資料的一方

消費者處理資料的一方

 

例如需要做一個爬蟲?

1.爬取資料

2.解析資料

爬取和解析都是耗時的操作,如果正常按照順序來編寫程式碼,會造成解析需要等待爬取,爬取也需要等待解析,這樣的話效率就會很低

要提高效率,就是讓生產者和消費者解開耦合,自己幹自己的

如何實現:

1.將兩個任務分配給不同的程序

2.提供一個程序共享的資料容器

from multiprocessing import Process,Queue
import time,random
# 爬資料
def get_data(q):
  for num in range(1,6):
    print('正在爬取第%s個數據...'%num)
    time.sleep(random.randint(1,2))
    print('第%s個數據爬取完成!'%num)
    #把資料裝到佇列中
    q.put('第%s個數據'%num)
#解析資料
def parse_data(q):
  for num in range(1,6):
    #取出資料
    data = q.get()
    print('正在解析%s...'%data)
    time.sleep(random.randint(1,2))
    print('%s解析完成!'%data)
if __name__ == '__main__':
  #共享資料容器
  q = Queue()
  #生產者程序
  produce = Process(target=get_data,args=(q,))
  produce.start()
  #消費者程序
  customer = Process(target=parse_data,args=(q,))
  customer.start()