1. 程式人生 > >Python Threading 多執行緒程式設計

Python Threading 多執行緒程式設計

寫在篇前

  threading模組是python多執行緒處理包,使用該模組可以很方便的實現多執行緒處理任務,本篇文章的基礎是需要掌握程序、執行緒基本概念,對PV原語、鎖等傳統同步處理方法有一定的瞭解。另外,threading模組的實現是參考java多執行緒處理方式,並且只實現了其中的一個子集。必須說明的是,由於GIL的存在,多執行緒的應用主要用於IO密集型任務,不適合CPU密集型任務,如果要提高CPU的利用率,需要利用協程或則多程序程式設計。

Threading 方法屬性

  • threading.active_count() 返回當前活動的執行緒數
  • threading.current_thread()
    返回當前執行緒的Thread物件
  • threading.get_ident() 獲取當前執行緒的(唯一)識別符號
  • threading.enumerate() 返回一個包含當前處於活動狀態Thread 物件的list
  • threading.main_thread() 返回當前執行緒的主執行緒
  • threading.settrace(func) 為每個執行緒設定一個trace函式,在呼叫run方法之前會被執行
  • threading.setprofile(func) 同上,為每一個執行緒設定一個profile函式
  • threading.stack_size([size])
    設定每個執行緒私有棧空間大小,預設為0,若不為0不可低於32KB(32768)
  • threading.TIMEOUT_MAX 允許執行緒被堵塞的最長時間,按我的理解一般用不上

Thread物件

 使用Thread一共有兩種方式,同Java多執行緒模型:

     (1) 繼承Thread類,並重寫且僅僅重寫__init__run方法,特別注意的是重寫__init__方法首先應該呼叫父類構造方法Thread.__init__

     (2)將一個函式傳入到Thread類的建構函式

 Thread物件的用法以及主要方法介紹,我們用以下例子來說明一下:

#! /usr/bin/python
# _*_ coding: utf-8 _*_ __author__ = 'Jeffery' __date__ = '2018/9/15 11:26' import threading import time def func1(num): time.sleep(3) print(str(num)+':you can if you will') def func2(num): time.sleep(2) print(str(num)+':the devil is in the details') def func3(num): time.sleep(1) print(str(num)+':life is short, u need what?') def main(): start = time.time() print('start mian Thread') ts = (threading.Thread(group=None, target=func1, args=(1,)), threading.Thread(group=None, target=func2, args=(2,)), threading.Thread(group=None, target=func3, args=(3,))) # 【重要】建立三個執行緒,分別執行func1~3 for _t in ts: _t.start() # 【重要方法】啟動執行緒 # _t.join() # 【重要方法】join方法用來告訴父執行緒,您要等我子執行緒執行完畢,你再繼續往下走,這裡註釋了,所以你應該會發現程式輸出 mian Thread ends, cost 0 secs end = time.time() print('mian Thread ends, cost %d secs' % (end-start)) if __name__ == '__main__': main() # 以下是執行結果 start mian Thread mian Thread ends, cost 0 secs 3:life is short, u need what? 2:the devil is in the details 1:you can if you will

Lock物件

 因為資源總是有限的,如果多個執行緒對同一個物件進行操作,則有可能造成資源的爭用,甚至導致死鎖。引入鎖,是一種便捷的解決方式。主要包括兩個方法,acquire()release():

acquire(blocking=True)

  當阻塞引數設定為True(預設值)時呼叫,然後就會進入到locked狀態直到解鎖,返回True。

release()

  釋放佔有的鎖,無返回值

#! /usr/bin/python
# _*_ coding: utf-8 _*_
__author__ = 'Jeffery'
__date__ = '2018/9/15 13:56'

import threading
import time

# 臨界資源
num = 0

lock = threading.Lock()


def num_add(t_i):
    global num
    time.sleep(3)
    if lock.acquire():  # 這句話相當於 if lock.acquire()
        num += 1
    lock.release()
    print('thread %d set num %d' % (t_i, num))


def num_sub(t_i):
    global num
    time.sleep(3)
    if lock.acquire():  # 這句話相當於 if lock.acquire()
        num -= 1
    lock.release()
    print('thread %d set num %d' % (t_i, num))


def main():
    ts = []
    for i in range(5):
        t = threading.Thread(target=num_add, args=(i+1,))
        t.start()
        ts.append(t)

    for i in range(5):
        t = threading.Thread(target=num_sub, args=(5+i+1,))
        t.start()
        ts.append(t)

    for t in ts:
        t.join()

    print('end')


if __name__ == '__main__':
    main()
# 以下是結果
thread 4 set num 1
thread 3 set num 2
thread 5 set num 3
thread 1 set num 4
thread 2 set num 5
thread 9 set num 4
thread 10 set num 3
thread 7 set num 2
thread 6 set num 1
thread 8 set num 0
end

RLock物件

  RLock稱之為遞迴鎖,從某種角度來講屬於一種比Lock更安全的鎖,和Lock的區別在於:在同一執行緒內,對RLock進行多次acquire()操作(或則說RLock允許遞迴加鎖),程式不會阻塞;而如果是Lock那麼將會發生堵塞。RLock所擁有的方法同上,下面例子在上例基礎上稍微改動:

#! /usr/bin/python
# _*_ coding: utf-8 _*_
__author__ = 'Jeffery'
__date__ = '2018/9/15 13:56'

import threading
import time

# 臨界資源
num = 0

lock = threading.RLock()  # 【重點】這裡必須是RLock,不能是Lock


def num_add(t_i):
    with lock:  # 這是屬於上下文管理器的一個使用用法
        global num
        time.sleep(3)
        if lock.acquire():  # 這句話相當於 if lock.acquire()
            num += 1
        lock.release()
        print('thread %d set num %d' % (t_i, num))


def num_sub(t_i):
    global num
    time.sleep(3)
    if lock.acquire():  # 這句話相當於 if lock.acquire()
        num -= 1
    lock.release()
    print('thread %d set num %d' % (t_i, num))


def main():
    ts = []
    for i in range(5):
        t = threading.Thread(target=num_add, args=(i+1,))
        t.start()
        ts.append(t)

    for i in range(5):
        t = threading.Thread(target=num_sub, args=(5+i+1,))
        t.start()
        ts.append(t)

    for t in ts:
        t.join()

    print('end')


if __name__ == '__main__':
    main()

Semaphore物件

 訊號量是電腦科學最古老的一種同步原語,由荷蘭電腦科學家Edsger W. Dijkstra提出,通常也稱之為PV原語。在物件內部會維護一個計數器,這個計數器的初值由Semaphore初始化時給出,這個值我們一般稱之為 臨界資源數量。當臨界資源數量大於0時,執行緒不會被阻塞;當臨界資源等於0時再有執行緒請求,那將會被阻塞,直到某個執行緒釋放臨界資源。主要方法同RLock、Lock,下面用PV原語最(也許吧)經典的例子,哲學家問題來說明一下:

  哲學家問題:話說有五個哲學家,他們的生活方式是交替地進行思考和進餐。他們共用一張圓桌,分別坐在五張椅子上,在圓桌上有五個碗和五支筷子,平時一個哲學家進行思考,飢餓時便試圖取用其左、右最靠近他的筷子,只有在他拿到兩支筷子時才能進餐;進餐完畢,放下筷子又繼續思考。請設計一種方法,使哲學家不要餓死。

這裡寫圖片描述

#! /usr/bin/python
# _*_ coding: utf-8 _*_
__author__ = 'Jeffery'
__date__ = '2018/9/15 15:35'

import threading
import time


def philosopher(i, chopstick_sema, sema):
    """
    描述哲學家活動
    :type i: int 表示第幾個哲學家
    :return:no return
    """
    while True:
        think(i + 1)

        sema.acquire(blocking=True)  # 請求吃飯

        chopstick_sema[i].acquire()  # 拿起左邊筷子
        chopstick_sema[((i + 1) % 5)].acquire()  # 拿起右邊筷子

        eat(i + 1)

        chopstick_sema[((i + 1) % 5)].release()  # 放下右邊筷子
        chopstick_sema[i].release()  # 放下左邊筷子

        sema.release()  # 吃飯完畢
        think(i + 1)


def eat(phil_NO):
    print('philosopher %d is eating' % phil_NO)
    time.sleep(1)
    print('philosopher %d finish eating' % phil_NO)


def think(phil_NO):
    print('philosopher %d is thinking' % phil_NO)
    time.sleep(2)
    print('philosopher %d finish thinking' % phil_NO)


def main():

    # 為了避免死鎖,同時只允許四個哲學家吃飯
    sema = threading.Semaphore(value=4)
    chopstick_sema = [threading.Semaphore(value=1) for i in range(5)]
    ts = []
    for i in range(5):
        t = threading.Thread(target=philosopher, args=(i, chopstick_sema, sema))
        t.start()
        ts.append(t)

    for t in ts:
        t.join()


if __name__ == '__main__':
    main()

Condition物件

  Condition物件總是與某種鎖相關聯,鎖物件可以通過建構函式傳入,或者它會預設建立一個鎖,並且鎖是Condition物件的一部分,不必單獨跟蹤它。

acquire()

    嘗試獲取鎖

release()

    釋放已獲得的鎖

wait(timeout=-1)

    主動進入等待阻塞狀態,直到被其他執行緒喚醒或則超時

wait_for(predicate, timeout=None)

    進入等待狀態,直到條件被置為True,timeout引數意思如上,predicate引數表示一個callable物件。

notify()

    喚醒其中一個執行緒

notify_all()

     喚醒所有程序

舉個例子:

    假設有一個緩衝區大小為5,要實現互斥存取資料,程式碼如下:
#! /usr/bin/python
# _*_ coding: utf-8 _*_
__author__ = 'Jeffery'
__date__ = '2018/9/15 16:26'

import threading
import time

con = threading.Condition(threading.RLock())

num = 0


class Producer(threading.Thread):
    """
    生產者類
    """

    def __init__(self):
        """
        建構函式
        """
        threading.Thread.__init__(self)

    def run(self):
        """
        重寫 run方法
        :return:
        """
        global num
        with con:
            while True:
                num += 1
                print('生產一個產品,現在有產品%d個' % num)
                time.sleep(2)
                if num >= 5:
                    print('緩衝區已滿')
                    con.notify()  # 喚醒等待池
                    con.wait()  # 主動進入等待池,掛起


class Consumers(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        global num
        with con:
            while True:
                num -= 1
                print("取出一個產品,還剩%d個" % num)
                time.sleep(3)
                if num <= 0:
                    print('緩衝區已空')
                    con.notify()
                    con.wait()

if __name__ == '__main__':
    p = Producer()
    c = Consumers()
    p.start()
    c.start()
    p.join()
    c.join()


Event物件

 Event,翻譯作事件,個人覺得該詞是計算機程式設計中的一個專有名詞。在這裡用來執行緒之間的通訊,當某一個event發生之後,其他執行緒做出一定的反應;或者說某一個執行緒等待另一個執行緒某個事件的發生。每一個Event物件內部維護一個flag,表徵事件是否已經發生。

is_set()

     當且僅當Event物件內部維護的flag為True時,返回True,否則返回False

set()

    將flag置為True

clear()

    將flag置為False

wait(timeout=-1)

    當flag是False時,阻塞;一旦為True或超時,立刻喚醒

舉個例子,好比像心意女生表白:

#! /usr/bin/python
# _*_ coding: utf-8 _*_
__author__ = 'Jeffery'
__date__ = '2018/9/15 14:58'

import threading
import time

event = threading.Event()

# 假設你想表白某位心儀女孩
# 你想,表白時送點花,更有氣氛,於是乎你打電話去花店,叫老闆送一束花過來
# 然後再表白


def confess_to_girl():
    print('u r preparing to confess')
    event.wait()
    print('start to confess: hi , my loving  girl, I....')


def flower_delivering():
    print('flower is delivering, plea wait')
    time.sleep(5)
    print('flower comes')
    event.set()


def main():
    t1 = threading.Thread(target=confess_to_girl)
    t2 = threading.Thread(target=flower_delivering)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print('\nstory,maybe new start, maybe ending!')


if __name__ == '__main__':
    main()

Timer物件

 計時器,即讓一個執行緒在一個指定的之間之後再開始執行,是屬於Thread的一個封裝子類,故用法基本相似。

def hello():
    print("hello, world")

t = Timer(30.0, hello)
# t.cancel()  # 在開始之前,還可以使計時器停止
t.start()  # after 30 seconds, "hello, world" will be printed

Barrier物件

  Barrier,顧名思義就是障礙的意思,只有人多力量大的時候,才能一起跨過一個個障礙,繼續前行。Barrier則是一個執行緒障礙,只有執行緒數量達到指定數量之後,才能喚醒所有等待程序。這個功能不是很常用,簡單介紹一下吧,主要方法有:

wait(timeout=None)

    使執行緒進入等待狀態,直到達到一定數量,這時將會使barrier進入broken狀態,從而一齊釋放被堵塞的執行緒

reset()

    使Barrier恢復預設‘攔截’狀態

abort()

    使Barrier進入Broken狀態

parties

    即上面提到的 指定數量的執行緒,int型別

n_waiting

    正在等待的執行緒

broken

    bool型別,指示Barrier是否處於Broken狀態