Python Threading 多執行緒程式設計
寫在篇前
threading模組是python多執行緒處理包,使用該模組可以很方便的實現多執行緒處理任務,本篇文章的基礎是需要掌握程序、執行緒基本概念,對PV原語、鎖等傳統同步處理方法有一定的瞭解。另外,threading模組的實現是參考java多執行緒處理方式,並且只實現了其中的一個子集。必須說明的是,由於GIL的存在,多執行緒的應用主要用於IO密集型任務,不適合CPU密集型任務,如果要提高CPU的利用率,需要利用協程或則多程序程式設計。
Threading 方法屬性
threading.active_count()
返回當前活動的執行緒數threading.current_thread()
threading.get_ident()
獲取當前執行緒的(唯一)識別符號threading.enumerate()
返回一個包含當前處於活動狀態Thread 物件的listthreading.main_thread()
返回當前執行緒的主執行緒threading.settrace(func)
為每個執行緒設定一個trace函式,在呼叫run方法之前會被執行threading.setprofile(func)
同上,為每一個執行緒設定一個profile函式threading.stack_size([size])
threading.TIMEOUT_MAX
允許執行緒被堵塞的最長時間,按我的理解一般用不上
Thread物件
使用Thread一共有兩種方式,同Java多執行緒模型:
(1) 繼承Thread類,並重寫且僅僅重寫__init__
和 run
方法,特別注意的是重寫__init__
方法首先應該呼叫父類構造方法Thread.__init__
(2)將一個函式傳入到Thread類的建構函式
Thread物件的用法以及主要方法介紹,我們用以下例子來說明一下:
#! /usr/bin/python
# _*_ coding: utf-8 _*_
__author__ = 'Jeffery'
__date__ = '2018/9/15 11:26'
import threading
import time
def func1(num):
time.sleep(3)
print(str(num)+':you can if you will')
def func2(num):
time.sleep(2)
print(str(num)+':the devil is in the details')
def func3(num):
time.sleep(1)
print(str(num)+':life is short, u need what?')
def main():
start = time.time()
print('start mian Thread')
ts = (threading.Thread(group=None, target=func1, args=(1,)),
threading.Thread(group=None, target=func2, args=(2,)),
threading.Thread(group=None, target=func3, args=(3,))) # 【重要】建立三個執行緒,分別執行func1~3
for _t in ts:
_t.start() # 【重要方法】啟動執行緒
# _t.join() # 【重要方法】join方法用來告訴父執行緒,您要等我子執行緒執行完畢,你再繼續往下走,這裡註釋了,所以你應該會發現程式輸出 mian Thread ends, cost 0 secs
end = time.time()
print('mian Thread ends, cost %d secs' % (end-start))
if __name__ == '__main__':
main()
# 以下是執行結果
start mian Thread
mian Thread ends, cost 0 secs
3:life is short, u need what?
2:the devil is in the details
1:you can if you will
Lock物件
因為資源總是有限的,如果多個執行緒對同一個物件進行操作,則有可能造成資源的爭用,甚至導致死鎖。引入鎖,是一種便捷的解決方式。主要包括兩個方法,acquire()
和release()
:
acquire(blocking=True)
當阻塞引數設定為True(預設值)時呼叫,然後就會進入到locked狀態直到解鎖,返回True。
release()
釋放佔有的鎖,無返回值
#! /usr/bin/python
# _*_ coding: utf-8 _*_
__author__ = 'Jeffery'
__date__ = '2018/9/15 13:56'
import threading
import time
# 臨界資源
num = 0
lock = threading.Lock()
def num_add(t_i):
global num
time.sleep(3)
if lock.acquire(): # 這句話相當於 if lock.acquire()
num += 1
lock.release()
print('thread %d set num %d' % (t_i, num))
def num_sub(t_i):
global num
time.sleep(3)
if lock.acquire(): # 這句話相當於 if lock.acquire()
num -= 1
lock.release()
print('thread %d set num %d' % (t_i, num))
def main():
ts = []
for i in range(5):
t = threading.Thread(target=num_add, args=(i+1,))
t.start()
ts.append(t)
for i in range(5):
t = threading.Thread(target=num_sub, args=(5+i+1,))
t.start()
ts.append(t)
for t in ts:
t.join()
print('end')
if __name__ == '__main__':
main()
# 以下是結果
thread 4 set num 1
thread 3 set num 2
thread 5 set num 3
thread 1 set num 4
thread 2 set num 5
thread 9 set num 4
thread 10 set num 3
thread 7 set num 2
thread 6 set num 1
thread 8 set num 0
end
RLock物件
RLock稱之為遞迴鎖,從某種角度來講屬於一種比Lock更安全的鎖,和Lock的區別在於:在同一執行緒內,對RLock
進行多次acquire()
操作(或則說RLock允許遞迴加鎖),程式不會阻塞;而如果是Lock那麼將會發生堵塞。RLock所擁有的方法同上,下面例子在上例基礎上稍微改動:
#! /usr/bin/python
# _*_ coding: utf-8 _*_
__author__ = 'Jeffery'
__date__ = '2018/9/15 13:56'
import threading
import time
# 臨界資源
num = 0
lock = threading.RLock() # 【重點】這裡必須是RLock,不能是Lock
def num_add(t_i):
with lock: # 這是屬於上下文管理器的一個使用用法
global num
time.sleep(3)
if lock.acquire(): # 這句話相當於 if lock.acquire()
num += 1
lock.release()
print('thread %d set num %d' % (t_i, num))
def num_sub(t_i):
global num
time.sleep(3)
if lock.acquire(): # 這句話相當於 if lock.acquire()
num -= 1
lock.release()
print('thread %d set num %d' % (t_i, num))
def main():
ts = []
for i in range(5):
t = threading.Thread(target=num_add, args=(i+1,))
t.start()
ts.append(t)
for i in range(5):
t = threading.Thread(target=num_sub, args=(5+i+1,))
t.start()
ts.append(t)
for t in ts:
t.join()
print('end')
if __name__ == '__main__':
main()
Semaphore物件
訊號量是電腦科學最古老的一種同步原語,由荷蘭電腦科學家Edsger W. Dijkstra提出,通常也稱之為PV原語。在物件內部會維護一個計數器,這個計數器的初值由Semaphore初始化時給出,這個值我們一般稱之為 臨界資源數量。當臨界資源數量大於0時,執行緒不會被阻塞;當臨界資源等於0時再有執行緒請求,那將會被阻塞,直到某個執行緒釋放臨界資源。主要方法同RLock、Lock,下面用PV原語最(也許吧)經典的例子,哲學家問題來說明一下:
哲學家問題:話說有五個哲學家,他們的生活方式是交替地進行思考和進餐。他們共用一張圓桌,分別坐在五張椅子上,在圓桌上有五個碗和五支筷子,平時一個哲學家進行思考,飢餓時便試圖取用其左、右最靠近他的筷子,只有在他拿到兩支筷子時才能進餐;進餐完畢,放下筷子又繼續思考。請設計一種方法,使哲學家不要餓死。
#! /usr/bin/python
# _*_ coding: utf-8 _*_
__author__ = 'Jeffery'
__date__ = '2018/9/15 15:35'
import threading
import time
def philosopher(i, chopstick_sema, sema):
"""
描述哲學家活動
:type i: int 表示第幾個哲學家
:return:no return
"""
while True:
think(i + 1)
sema.acquire(blocking=True) # 請求吃飯
chopstick_sema[i].acquire() # 拿起左邊筷子
chopstick_sema[((i + 1) % 5)].acquire() # 拿起右邊筷子
eat(i + 1)
chopstick_sema[((i + 1) % 5)].release() # 放下右邊筷子
chopstick_sema[i].release() # 放下左邊筷子
sema.release() # 吃飯完畢
think(i + 1)
def eat(phil_NO):
print('philosopher %d is eating' % phil_NO)
time.sleep(1)
print('philosopher %d finish eating' % phil_NO)
def think(phil_NO):
print('philosopher %d is thinking' % phil_NO)
time.sleep(2)
print('philosopher %d finish thinking' % phil_NO)
def main():
# 為了避免死鎖,同時只允許四個哲學家吃飯
sema = threading.Semaphore(value=4)
chopstick_sema = [threading.Semaphore(value=1) for i in range(5)]
ts = []
for i in range(5):
t = threading.Thread(target=philosopher, args=(i, chopstick_sema, sema))
t.start()
ts.append(t)
for t in ts:
t.join()
if __name__ == '__main__':
main()
Condition物件
Condition物件總是與某種鎖相關聯,鎖物件可以通過建構函式傳入,或者它會預設建立一個鎖,並且鎖是Condition物件的一部分,不必單獨跟蹤它。
acquire()
嘗試獲取鎖
release()
釋放已獲得的鎖
wait(timeout=-1)
主動進入等待阻塞狀態,直到被其他執行緒喚醒或則超時
wait_for(predicate, timeout=None)
進入等待狀態,直到條件被置為True,timeout引數意思如上,predicate引數表示一個callable物件。
notify()
喚醒其中一個執行緒
notify_all()
喚醒所有程序
舉個例子:
假設有一個緩衝區大小為5,要實現互斥存取資料,程式碼如下:
#! /usr/bin/python
# _*_ coding: utf-8 _*_
__author__ = 'Jeffery'
__date__ = '2018/9/15 16:26'
import threading
import time
con = threading.Condition(threading.RLock())
num = 0
class Producer(threading.Thread):
"""
生產者類
"""
def __init__(self):
"""
建構函式
"""
threading.Thread.__init__(self)
def run(self):
"""
重寫 run方法
:return:
"""
global num
with con:
while True:
num += 1
print('生產一個產品,現在有產品%d個' % num)
time.sleep(2)
if num >= 5:
print('緩衝區已滿')
con.notify() # 喚醒等待池
con.wait() # 主動進入等待池,掛起
class Consumers(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
global num
with con:
while True:
num -= 1
print("取出一個產品,還剩%d個" % num)
time.sleep(3)
if num <= 0:
print('緩衝區已空')
con.notify()
con.wait()
if __name__ == '__main__':
p = Producer()
c = Consumers()
p.start()
c.start()
p.join()
c.join()
Event物件
Event,翻譯作事件,個人覺得該詞是計算機程式設計中的一個專有名詞。在這裡用來執行緒之間的通訊,當某一個event發生之後,其他執行緒做出一定的反應;或者說某一個執行緒等待另一個執行緒某個事件的發生。每一個Event物件內部維護一個flag,表徵事件是否已經發生。
is_set()
當且僅當Event物件內部維護的flag為True時,返回True,否則返回False
set()
將flag置為True
clear()
將flag置為False
wait(timeout=-1)
當flag是False時,阻塞;一旦為True或超時,立刻喚醒
舉個例子,好比像心意女生表白:
#! /usr/bin/python
# _*_ coding: utf-8 _*_
__author__ = 'Jeffery'
__date__ = '2018/9/15 14:58'
import threading
import time
event = threading.Event()
# 假設你想表白某位心儀女孩
# 你想,表白時送點花,更有氣氛,於是乎你打電話去花店,叫老闆送一束花過來
# 然後再表白
def confess_to_girl():
print('u r preparing to confess')
event.wait()
print('start to confess: hi , my loving girl, I....')
def flower_delivering():
print('flower is delivering, plea wait')
time.sleep(5)
print('flower comes')
event.set()
def main():
t1 = threading.Thread(target=confess_to_girl)
t2 = threading.Thread(target=flower_delivering)
t1.start()
t2.start()
t1.join()
t2.join()
print('\nstory,maybe new start, maybe ending!')
if __name__ == '__main__':
main()
Timer物件
計時器,即讓一個執行緒在一個指定的之間之後再開始執行,是屬於Thread的一個封裝子類,故用法基本相似。
def hello():
print("hello, world")
t = Timer(30.0, hello)
# t.cancel() # 在開始之前,還可以使計時器停止
t.start() # after 30 seconds, "hello, world" will be printed
Barrier物件
Barrier,顧名思義就是障礙的意思,只有人多力量大的時候,才能一起跨過一個個障礙,繼續前行。Barrier則是一個執行緒障礙,只有執行緒數量達到指定數量之後,才能喚醒所有等待程序。這個功能不是很常用,簡單介紹一下吧,主要方法有:
wait(timeout=None)
使執行緒進入等待狀態,直到達到一定數量,這時將會使barrier進入broken狀態,從而一齊釋放被堵塞的執行緒
reset()
使Barrier恢復預設‘攔截’狀態
abort()
使Barrier進入Broken狀態
parties
即上面提到的 指定數量的執行緒,int型別
n_waiting
正在等待的執行緒
broken
bool型別,指示Barrier是否處於Broken狀態