併發程式設計(執行緒)——驗證GIL鎖,GIL與普通互斥鎖的區別,io密集型和計算密集型,死鎖現象(解決方式:遞迴鎖),Semaphore訊號量,Event事件,執行緒queue,多程序實現tcp服務端併發,執行緒池&程序池
阿新 • • 發佈:2020-08-26
一、驗證GIL鎖的存在方式
from threading import Thread from multiprocessing import Process def task(): while True: pass if __name__ == '__main__': for i in range(6): # t=Thread(target=task) # 因為有GIL鎖,同一時刻,只有一條執行緒執行,所以cpu不會滿 t=Process(target=task) # 由於是多程序,程序中的執行緒會被cpu排程執行,6個cpu全在工作,就會跑滿t.start()
二、GIL與普通互斥鎖的區別
GIL鎖是不能保證資料的安全,普通互斥鎖來保證資料安全
from threading import Thread, Lock import time mutex = Lock() money = 100 def task(): global money mutex.acquire() temp = money time.sleep(1) money = temp - 1 mutex.release() if __name__ == '__main__': ll=[] for i in range(10): t = Thread(target=task) t.start() # t.join() # 會怎麼樣?變成了序列,不能這麼做 ll.append(t) for t in ll: t.join() print(money)
三、io密集型和計算密集型
-----以下只針對於cpython直譯器
-在單核情況下:
-開多執行緒還是開多程序?不管幹什麼都是開執行緒
-在多核情況下:
-如果是計算密集型,需要開程序,能被多個cpu排程執行
-如果是io密集型,需要開執行緒,cpu遇到io會切換到其他執行緒執行
from threading import Thread from multiprocessing import Process import time # 計算密集型 def task(): count = 0 for i in range(100000000): count += i if __name__ == '__main__': ctime = time.time() ll = [] for i in range(10): t = Thread(target=task) # 開執行緒:42.68658709526062 # t = Process(target=task) # 開程序:9.04949426651001 t.start() ll.append(t) for t in ll: t.join() print(time.time()-ctime) ## io密集型 def task(): time.sleep(2) if __name__ == '__main__': ctime = time.time() ll = [] for i in range(400): t = Thread(target=task) # 開執行緒:2.0559656620025635 # t = Process(target=task) # 開程序:9.506720781326294 t.start() ll.append(t) for t in ll: t.join() print(time.time()-ctime)
四、死鎖現象(哲學家就餐問題)
是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序,如下就是死鎖
死鎖現象:
(1)張三拿到了A鎖,等B鎖,李四拿到了B鎖,等A鎖;
(2)張三在不同的線/程序中搶a鎖
from threading import Thread, Lock import time mutexA = Lock() mutexB = Lock() def eat_apple(name): mutexA.acquire() print('%s 獲取到了a鎖' % name) mutexB.acquire() print('%s 獲取到了b鎖' % name) print('開始吃蘋果,並且吃完了') mutexB.release() print('%s 釋放了b鎖' % name) mutexA.release() print('%s 釋放了a鎖' % name) def eat_egg(name): mutexB.acquire() print('%s 獲取到了b鎖' % name) time.sleep(2) mutexA.acquire() print('%s 獲取到了a鎖' % name) print('開始吃雞蛋,並且吃完了') mutexA.release() print('%s 釋放了a鎖' % name) mutexB.release() print('%s 釋放了b鎖' % name) if __name__ == '__main__': ll = ['egon', 'alex', '鐵蛋'] for name in ll: t1 = Thread(target=eat_apple, args=(name,)) t2 = Thread(target=eat_egg, args=(name,)) t1.start() t2.start()
解決方式:
遞迴鎖
遞迴鎖(可重入),同一個人可以多次acquire,每acquire一次,內部計數器加1,每relaese一次,內部計數器減一
只有計數器不為0,其他人都不獲得這把鎖
from threading import Thread, Lock,RLock import time # 同一把鎖 # mutexA = Lock() # mutexB = mutexA # 使用可重入鎖解決(同一把鎖) # mutexA = RLock() # mutexB = mutexA mutexA = mutexB =RLock() def eat_apple(name): mutexA.acquire() print('%s 獲取到了a鎖' % name) mutexB.acquire() print('%s 獲取到了b鎖' % name) print('開始吃蘋果,並且吃完了') mutexB.release() print('%s 釋放了b鎖' % name) mutexA.release() print('%s 釋放了a鎖' % name) def eat_egg(name): mutexB.acquire() print('%s 獲取到了b鎖' % name) time.sleep(2) mutexA.acquire() print('%s 獲取到了a鎖' % name) print('開始吃雞蛋,並且吃完了') mutexA.release() print('%s 釋放了a鎖' % name) mutexB.release() print('%s 釋放了b鎖' % name) if __name__ == '__main__': ll = ['egon', 'alex', '鐵蛋'] for name in ll: t1 = Thread(target=eat_apple, args=(name,)) t2 = Thread(target=eat_egg, args=(name,)) t1.start() t2.start()
五、Semaphore訊號量
Semaphore:訊號量可以理解為多把鎖,同時允許多個執行緒來更改資料
from threading import Thread,Semaphore import time import random sm=Semaphore(3) # 數字表示可以同時有多少個執行緒操作 def task(name): sm.acquire() print('%s 正在蹲坑'%name) time.sleep(random.randint(1,5)) sm.release() if __name__ == '__main__': for i in range(20): t=Thread(target=task,args=('屌絲男%s號'%i,)) t.start()
六、Event事件
一些執行緒需要等到其他執行緒執行完成之後才能執行,類似於發射訊號
比如一個執行緒等待另一個執行緒執行結束再繼續執行
from threading import Thread, Event import time event = Event() def girl(name): print('%s 現在不單身,正在談戀愛'%name) time.sleep(10) print('%s 分手了,給屌絲男發了訊號'%name) event.set() def boy(name): print('%s 在等著女孩分手'%name) event.wait() # 只要沒來訊號,就卡在者 print('女孩分手了,機會來了,衝啊') if __name__ == '__main__': lyf = Thread(target=girl, args=('劉亦菲',)) lyf.start() for i in range(10): b = Thread(target=boy, args=('屌絲男%s號' % i,)) b.start()
案例:
起兩個執行緒,第一個執行緒讀檔案的前半部分,讀完發一個訊號,另一個程序讀後半部分,並列印
from threading import Thread, Event import time import os event = Event() # 獲取檔案總大小 size = os.path.getsize('a.txt') def read_first(): with open('a.txt', 'r', encoding='utf-8') as f: n = size // 2 # 取檔案一半,整除 data = f.read(n) print(data) print('我一半讀完了,發了個訊號') event.set() def read_last(): event.wait() # 等著發訊號 with open('a.txt', 'r', encoding='utf-8') as f: n = size // 2 # 取檔案一半,整除 # 游標從檔案開頭開始,移動了n個位元組,移動到檔案一半 f.seek(n, 0) data = f.read() print(data) if __name__ == '__main__': t1=Thread(target=read_first) t1.start() t2=Thread(target=read_last) t2.start()
七、執行緒queue
1、程序queue和執行緒不是一個
程序queue:
from multiprocessing import Queue
執行緒queue:
from queue import Queue,LifoQueue,PriorityQueue
2、執行緒間通訊,因為共享變數會出現資料不安全問題,用執行緒queue通訊,不需要加鎖,內部自帶
3、queue是執行緒安全的
4、三種執行緒Queue
-Queue:佇列,先進先出
-PriorityQueue:優先順序佇列,誰小誰先出
-LifoQueue:棧,後進後出
#1 Quenue使用 q=Queue(5) #放值 q.put("lqz") q.put("egon") q.put("鐵蛋") q.put("鋼彈") q.put("金蛋") # q.put("銀蛋") # q.put_nowait("銀蛋") # 取值 print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) # 卡住 # print(q.get()) # q.get_nowait() # 是否滿,是否空 print(q.full()) print(q.empty()) #2 LifoQueue使用 q=LifoQueue(5) q.put("lqz") q.put("egon") q.put("鐵蛋") q.put("鋼彈") q.put("金蛋") # q.put("ddd蛋") print(q.get()) #3 PriorityQueue使用 #PriorityQueue:數字越小,級別越高 q=PriorityQueue(3) q.put((-10,'金蛋')) q.put((100,'銀蛋')) q.put((101,'鐵蛋')) # q.put((1010,'鐵dd蛋')) # 不能再放了 print(q.get()) print(q.get()) print(q.get())
八、通過多程序,實現TCP服務端支援多個客戶端連線(併發)
#服務端 from multiprocessing import Process import socket def task(conn): while True: try: data = conn.recv(1024) if len(data) == 0: break print(data) conn.send(data.upper()) except Exception as e: print(e) break conn.close() if __name__ == '__main__': server = socket.socket() server.bind(('127.0.0.1', 8081)) server.listen(5) # 多執行緒,或者多程序 while True: # 連線迴圈 conn, addr = server.accept() # 多使用者的服務端 t=Process(target=task,args=(conn,)) t.start() ### 單使用者的服務端 # while True: # try: # data = conn.recv(1024) # if len(data) == 0: break # print(data) # conn.send(data.upper()) # except Exception as e: # print(e) # break # conn.close()
#客戶端 import socket import time cli=socket.socket() cli.connect(('127.0.0.1',8081)) while True: cli.send(b'hello world') time.sleep(0.1) data=cli.recv(1024) print(data)
九、執行緒池&程序池
1、為什麼會出現池?不管是開程序還是開執行緒,不能無限制開,通過池,假設池子裡就有10個,不管再怎麼開,永遠是這10個
2、使用(需要記住)
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(2)
pool.submit(get_pages, url).add_done_callback(call_back)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from threading import Thread import time import random pool = ThreadPoolExecutor(5) # 數字是池的大小 # pool = ProcessPoolExecutor(5) # 數字是池的大小 def task(name): print('%s任務開始' % name) time.sleep(random.randint(1, 4)) print('任務結束') return '%s 返回了'%name def call_back(f): # print(type(f)) print(f.result()) if __name__ == '__main__': # ll=[] # for i in range(10): # 起了100個執行緒 # # t=Thread(target=task) # # t.start() # res = pool.submit(task, '屌絲男%s號' % i) # 不需要再寫在args中了 # # res是Future物件 # # from concurrent.futures._base import Future # # print(type(res)) # # print(res.result()) # 像join,只要執行result,就會等著結果回來,就變成串行了 # ll.append(res) # # for res in ll: # print(res.result()) # 終極使用 for i in range(10): # 起了100個執行緒 # 向執行緒池中提交一個任務,等任務執行完成,自動回到到call_back函式執行 pool.submit(task,'屌絲男%s號' % i).add_done_callback(call_back)
執行緒池案例:
爬網站
from concurrent.futures import ThreadPoolExecutor import requests # 爬蟲會學到的模組 pool = ThreadPoolExecutor(2) def get_pages(url): # https://www.baidu.com res = requests.get(url) # 向這個地址傳送請求 name = url.rsplit('/')[-1] + '.html' print(name) # www.baidu.com.html # res.content拿到頁面的二進位制 return {'name': name, 'text': res.content} def call_back(f): dic = f.result() with open(dic['name'], 'wb') as f: f.write(dic['text']) if __name__ == '__main__': ll = ['https://www.baidu.com', 'https://www.mzitu.com', 'https://www.cnblogs.com'] for url in ll: pool.submit(get_pages, url).add_done_callback(call_back)
十、
---39----