1. 程式人生 > >執行緒(三):Lock(互斥鎖)、RLock( 遞迴鎖)、Semaphore(訊號量)、Event(事件)、Condition(條件)、Timer(定時器)、queue(佇列)

執行緒(三):Lock(互斥鎖)、RLock( 遞迴鎖)、Semaphore(訊號量)、Event(事件)、Condition(條件)、Timer(定時器)、queue(佇列)

目錄

一、鎖

1)同步鎖

2)死鎖與遞迴鎖

二、訊號量

三、事件

四、條件

五、定時器

六、執行緒佇列


一、鎖

1)同步鎖

#同步鎖的引用
from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果肯定為0,由原來的併發執行變成序列,犧牲了執行效率保證了資料安全
#互斥鎖與join的區別

#不加鎖:併發執行,速度快,資料不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''


#不加鎖:未加鎖部分併發執行,加鎖部分序列執行,速度慢,資料安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的程式碼併發執行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加鎖的程式碼序列執行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

#有的同學可能有疑問:既然加鎖會讓執行變成序列,那麼我在start之後立即使用join,就不用加鎖了啊,也是序列的效果啊
#沒錯:在start之後立刻使用jion,肯定會將100個任務的執行變成序列,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是
#start後立即join:任務內的所有程式碼都是序列執行的,而加鎖,只是加鎖的部分即修改共享資料的部分是序列的
#單從保證資料安全方面,二者都可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多麼的恐怖
'''


2)死鎖與遞迴鎖

程序也有死鎖與遞迴鎖,在程序那裡忘記說了,放到這裡一切說了額

所謂死鎖: 是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序,如下就是死鎖

#死鎖
from threading import Lock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

解決方法,遞迴鎖,在Python中為了支援在同一執行緒中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:

#遞迴鎖RLock
from threading import RLock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()

典型問題:科學家吃麵

#死鎖問題
import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麵條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麵'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麵條' % name)
    print('%s 吃麵' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['aaa','bbb','ccc']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()

#遞迴鎖解決死鎖問題
import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麵條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麵'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麵條' % name)
    print('%s 吃麵' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['aaa','bbb','ccc']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()

二、訊號量

同進程的一樣

Semaphore管理一個內建的計數器,
每當呼叫acquire()時內建計數器-1;
呼叫release() 時內建計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞執行緒直到其他執行緒呼叫release()。

例項:(同時只有5個執行緒可以獲得semaphore,即可以限制最大連線數為5):

#例項
from threading import Thread,Semaphore
import threading
import time
# def func():
#     if sm.acquire():
#         print (threading.currentThread().getName() + ' get semaphore')
#         time.sleep(2)
#         sm.release()
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()

池與訊號量

與程序池是完全不同的概念,程序池Pool(4),最大隻能產生4個程序,而且從頭到尾都只是這四個程序,不會產生新的,而訊號量是產生一堆執行緒/程序

三、事件

同進程的一樣

執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。如果程式中的其 他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event物件。 物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。在 初始情況下,Event物件中的訊號標誌被設定為假。如果有執行緒等待一個Event物件, 而這個Event物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個Event物件的訊號標誌設定為真,它將喚醒所有等待這個Event物件的執行緒。如果一個執行緒等待一個已經被設定為真的Event物件,那麼它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()==False將阻塞執行緒;
event.set(): 設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程;
event.clear():恢復event的狀態值為False。

            

例如,有多個工作執行緒嘗試連結MySQL,我們想要在連結前確保MySQL服務正常才讓那些工作執行緒去連線MySQL伺服器,如果連線不成功,都會去嘗試重新連線。那麼我們就可以採用threading.Event機制來協調各個工作執行緒的連線操作

#例項
import threading
import time,random
from threading import Thread,Event

def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('連結超時')
        print('<%s>第%s次嘗試連結' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>連結成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()

四、條件

使得執行緒等待,只有滿足某條件時,才釋放n個執行緒

Python提供的Condition物件提供了對複雜執行緒同步問題的支援。Condition被稱為條件變數,除了提供與Lock類似的acquire和release方法外,還提供了wait和notify方法。執行緒首先acquire一個條件變數,然後判斷一些條件。如果條件不滿足則wait;如果條件滿足,進行一些處理改變條件後,通過notify方法通知其他執行緒,其他處於wait狀態的執行緒接到通知後會重新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。

程式碼說明:

import threading

def run(n):
    con.acquire()
    con.wait()
    print("run the thread: %s" % n)
    con.release()

if __name__ == '__main__':

    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

    while True:
        inp = input('>>>')
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()
        print('****')

五、定時器

定時器,指定n秒後執行某個操作

from threading import Timer
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

六、執行緒佇列

queue佇列 :使用import queue,用法與程序Queue一樣

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先進先出

#先進先出
import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''

class queue.LifoQueue(maxsize=0) #last in fisrt out

#後進先出
import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(後進先出):
third
second
first
'''

class queue.PriorityQueue(maxsize=0) #儲存資料時可設定優先順序的佇列

#優先順序佇列
import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先順序(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先順序越高,優先順序高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''