執行緒(三):Lock(互斥鎖)、RLock( 遞迴鎖)、Semaphore(訊號量)、Event(事件)、Condition(條件)、Timer(定時器)、queue(佇列)
目錄
一、鎖
1)同步鎖
#同步鎖的引用 from threading import Thread,Lock import os,time def work(): global n lock.acquire() temp=n time.sleep(0.1) n=temp-1 lock.release() if __name__ == '__main__': lock=Lock() n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果肯定為0,由原來的併發執行變成序列,犧牲了執行效率保證了資料安全
#互斥鎖與join的區別 #不加鎖:併發執行,速度快,資料不安全 from threading import current_thread,Thread,Lock import os,time def task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 ''' #不加鎖:未加鎖部分併發執行,加鎖部分序列執行,速度慢,資料安全 from threading import current_thread,Thread,Lock import os,time def task(): #未加鎖的程式碼併發執行 time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加鎖的程式碼序列執行 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 ''' #有的同學可能有疑問:既然加鎖會讓執行變成序列,那麼我在start之後立即使用join,就不用加鎖了啊,也是序列的效果啊 #沒錯:在start之後立刻使用jion,肯定會將100個任務的執行變成序列,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是 #start後立即join:任務內的所有程式碼都是序列執行的,而加鎖,只是加鎖的部分即修改共享資料的部分是序列的 #單從保證資料安全方面,二者都可以實現,但很明顯是加鎖的效率更高. from threading import current_thread,Thread,Lock import os,time def task(): time.sleep(3) print('%s start to run' %current_thread().getName()) global n temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() start_time=time.time() for i in range(100): t=Thread(target=task) t.start() t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.6937336921692 n:0 #耗時是多麼的恐怖 '''
2)死鎖與遞迴鎖
程序也有死鎖與遞迴鎖,在程序那裡忘記說了,放到這裡一切說了額
所謂死鎖: 是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序,如下就是死鎖
#死鎖 from threading import Lock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
解決方法,遞迴鎖,在Python中為了支援在同一執行緒中多次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:
#遞迴鎖RLock
from threading import RLock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()
典型問題:科學家吃麵
#死鎖問題
import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
noodle_lock.acquire()
print('%s 搶到了麵條'%name)
fork_lock.acquire()
print('%s 搶到了叉子'%name)
print('%s 吃麵'%name)
fork_lock.release()
noodle_lock.release()
def eat2(name):
fork_lock.acquire()
print('%s 搶到了叉子' % name)
time.sleep(1)
noodle_lock.acquire()
print('%s 搶到了麵條' % name)
print('%s 吃麵' % name)
noodle_lock.release()
fork_lock.release()
for name in ['aaa','bbb','ccc']:
t1 = Thread(target=eat1,args=(name,))
t2 = Thread(target=eat2,args=(name,))
t1.start()
t2.start()
#遞迴鎖解決死鎖問題
import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
noodle_lock.acquire()
print('%s 搶到了麵條'%name)
fork_lock.acquire()
print('%s 搶到了叉子'%name)
print('%s 吃麵'%name)
fork_lock.release()
noodle_lock.release()
def eat2(name):
fork_lock.acquire()
print('%s 搶到了叉子' % name)
time.sleep(1)
noodle_lock.acquire()
print('%s 搶到了麵條' % name)
print('%s 吃麵' % name)
noodle_lock.release()
fork_lock.release()
for name in ['aaa','bbb','ccc']:
t1 = Thread(target=eat1,args=(name,))
t2 = Thread(target=eat2,args=(name,))
t1.start()
t2.start()
二、訊號量
同進程的一樣
Semaphore管理一個內建的計數器,
每當呼叫acquire()時內建計數器-1;
呼叫release() 時內建計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞執行緒直到其他執行緒呼叫release()。
例項:(同時只有5個執行緒可以獲得semaphore,即可以限制最大連線數為5):
#例項
from threading import Thread,Semaphore
import threading
import time
# def func():
# if sm.acquire():
# print (threading.currentThread().getName() + ' get semaphore')
# time.sleep(2)
# sm.release()
def func():
sm.acquire()
print('%s get sm' %threading.current_thread().getName())
time.sleep(3)
sm.release()
if __name__ == '__main__':
sm=Semaphore(5)
for i in range(23):
t=Thread(target=func)
t.start()
池與訊號量
與程序池是完全不同的概念,程序池Pool(4),最大隻能產生4個程序,而且從頭到尾都只是這四個程序,不會產生新的,而訊號量是產生一堆執行緒/程序
三、事件
同進程的一樣
執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。如果程式中的其 他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event物件。 物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。在 初始情況下,Event物件中的訊號標誌被設定為假。如果有執行緒等待一個Event物件, 而這個Event物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個Event物件的訊號標誌設定為真,它將喚醒所有等待這個Event物件的執行緒。如果一個執行緒等待一個已經被設定為真的Event物件,那麼它將忽略這個事件, 繼續執行
event.isSet():返回event的狀態值; event.wait():如果 event.isSet()==False將阻塞執行緒; event.set(): 設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程; event.clear():恢復event的狀態值為False。
例如,有多個工作執行緒嘗試連結MySQL,我們想要在連結前確保MySQL服務正常才讓那些工作執行緒去連線MySQL伺服器,如果連線不成功,都會去嘗試重新連線。那麼我們就可以採用threading.Event機制來協調各個工作執行緒的連線操作
#例項
import threading
import time,random
from threading import Thread,Event
def conn_mysql():
count=1
while not event.is_set():
if count > 3:
raise TimeoutError('連結超時')
print('<%s>第%s次嘗試連結' % (threading.current_thread().getName(), count))
event.wait(0.5)
count+=1
print('<%s>連結成功' %threading.current_thread().getName())
def check_mysql():
print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
time.sleep(random.randint(2,4))
event.set()
if __name__ == '__main__':
event=Event()
conn1=Thread(target=conn_mysql)
conn2=Thread(target=conn_mysql)
check=Thread(target=check_mysql)
conn1.start()
conn2.start()
check.start()
四、條件
使得執行緒等待,只有滿足某條件時,才釋放n個執行緒
Python提供的Condition物件提供了對複雜執行緒同步問題的支援。Condition被稱為條件變數,除了提供與Lock類似的acquire和release方法外,還提供了wait和notify方法。執行緒首先acquire一個條件變數,然後判斷一些條件。如果條件不滿足則wait;如果條件滿足,進行一些處理改變條件後,通過notify方法通知其他執行緒,其他處於wait狀態的執行緒接到通知後會重新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。
程式碼說明:
import threading
def run(n):
con.acquire()
con.wait()
print("run the thread: %s" % n)
con.release()
if __name__ == '__main__':
con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
while True:
inp = input('>>>')
if inp == 'q':
break
con.acquire()
con.notify(int(inp))
con.release()
print('****')
五、定時器
定時器,指定n秒後執行某個操作
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello)
t.start() # after 1 seconds, "hello, world" will be printed
六、執行緒佇列
queue佇列 :使用import queue,用法與程序Queue一樣
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
class queue.
Queue
(maxsize=0) #先進先出
#先進先出
import queue
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''
class queue.
LifoQueue
(maxsize=0) #last in fisrt out
#後進先出
import queue
q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
結果(後進先出):
third
second
first
'''
class queue.
PriorityQueue
(maxsize=0) #儲存資料時可設定優先順序的佇列
#優先順序佇列
import queue
q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先順序(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先順序越高,優先順序高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''