1. 程式人生 > 實用技巧 >python進階八——併發程式設計之多執行緒

python進階八——併發程式設計之多執行緒

1:執行緒介紹

在傳統作業系統中,每個程序有一個地址空間,而且預設就有一個控制執行緒

  執行緒顧名思義,就是一條流水線工作的過程,一條流水線必須屬於一個車間,一個車間的工作過程是一個程序

車間負責把資源整合到一起,是一個資源單位,而一個車間內至少有一個流水線

流水線的工作需要電源,電源就相當於cpu

  所以,程序只是用來把資源集中到一起(程序只是一個資源單位,或者說資源集合),而執行緒才是cpu上的執行單位。

  多執行緒(即多個控制執行緒)的概念是,在一個程序中存在多個控制執行緒,多個控制執行緒共享該程序的地址空間,相當於一個車間內有多條流水線,都共用一個車間的資源。

1.1: 執行緒的建立開銷小

建立程序的開銷要遠大於執行緒?

如果我們的軟體是一個工廠,該工廠有多條流水線,流水線工作需要電源,電源只有一個即cpu(單核cpu)

一個車間就是一個程序,一個車間至少一條流水線(一個程序至少一個執行緒)

建立一個程序,就是建立一個車間(申請空間,在該空間內建至少一條流水線)

而建執行緒,就只是在一個車間內造一條流水線,無需申請空間,所以建立開銷小

程序之間是競爭關係,執行緒之間是協作關係?

車間直接是競爭/搶電源的關係,競爭(不同的程序直接是競爭關係,是不同的程式設計師寫的程式執行的,迅雷搶佔其他程序的網速,360把其他程序當做病毒乾死)
一個車間的不同流水線式協同工作的關係(同一個程序的執行緒之間是合作關係,是同一個程式寫的程式內開啟動,迅雷內的執行緒是合作關係,不會自己幹自己)

1.2:執行緒與程序的區別

  1. Threads share the address space of the process that created it; processes have their own address space.
  2. Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
  3. Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
  4. New threads are easily created; new processes require duplication of the parent process.
  5. Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
  6. Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.

1.3:為何要用多執行緒

多執行緒指的是,在一個程序中開啟多個執行緒,簡單的講:如果多個任務共用一塊地址空間,那麼必須在一個程序內開啟多個執行緒。詳細的講分為4點:

  1. 多執行緒共享一個程序的地址空間

2. 執行緒比程序更輕量級,執行緒比程序更容易建立可撤銷,在許多作業系統中,建立一個執行緒比建立一個程序要快10-100倍,在有大量執行緒需要動態和快速修改時,這一特性很有用

3. 若多個執行緒都是cpu密集型的,那麼並不能獲得性能上的增強,但是如果存在大量的計算和大量的I/O處理,擁有多個執行緒允許這些活動彼此重疊執行,從而會加快程式執行的速度。

4. 在多cpu系統中,為了最大限度的利用多核,可以開啟多個執行緒,比開程序開銷要小的多。(這一條並不適用於python)

2.多執行緒程式設計

2.1開啟執行緒的兩種方式

方式一
#方式二
from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    t = Sayhi('huihuang')
    t.start()
    print('主執行緒')
方式二

2.2 在一個程序下開啟多個執行緒與在一個程序下開啟多個子程序的區別

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #在主程序下開啟執行緒
    t=Thread(target=work)
    t.start()
    print('主執行緒/主程序')
    '''
    列印結果:
    hello
    主執行緒/主程序
    '''

    #在主程序下開啟子程序
    t=Process(target=work)
    t.start()
    print('主執行緒/主程序')
    '''
    列印結果:
    主執行緒/主程序
    hello
    '''
誰的開啟速度快
開啟速度對比
from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主程序下開啟多個執行緒,每個執行緒都跟主程序的pid一樣
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主執行緒/主程序pid',os.getpid())

    #part2:開多個程序,每個程序都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主執行緒/主程序pid',os.getpid())
瞅一瞅pid
對比pid
from  threading import Thread
from multiprocessing import Process
import os
def work():
    global n
    n=0

if __name__ == '__main__':
    # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('',n) #毫無疑問子程序p已經將自己的全域性的n改成了0,但改的僅僅是它自己的,檢視父程序的n仍然為100


    n=1
    t=Thread(target=work)
    t.start()
    t.join()
    print('',n) #檢視結果為0,因為同一程序內的執行緒之間共享程序內的資料
同一程序內的執行緒共享該程序的資料?
同一個程序內的執行緒共享該程序的資料

一個接收使用者輸入,一個將使用者輸入的內容格式化成大寫,一個將格式化後的結果存入檔案

from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        msg_l.append(msg)

def format_msg():
    while True:
        if msg_l:
            res=msg_l.pop()
            format_l.append(res.upper())

def save():
    while True:
        if format_l:
            with open('db.txt','a',encoding='utf-8') as f:
                res=format_l.pop()
                f.write('%s\n' %res)

if __name__ == '__main__':
    t1=Thread(target=talk)
    t2=Thread(target=format_msg)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()
View Code

2.3執行緒相關的其他方法

Thread例項物件的方法
  # isAlive(): 返回執行緒是否活動的。
  # getName(): 返回執行緒名。
  # setName(): 設定執行緒名。

threading模組提供的一些方法:
  # threading.currentThread(): 返回當前的執行緒變數。
  # threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
  # threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。
View Code
from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主程序下開啟執行緒
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主執行緒
    print(threading.enumerate()) #連同主執行緒在內有兩個執行的執行緒
    print(threading.active_count())
    print('主執行緒/主程序')

    '''
    列印結果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主執行緒/主程序
    Thread-1
    '''
View Code

主執行緒等待子執行緒結束

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主執行緒')
    print(t.is_alive())
    '''
    egon say hello
    主執行緒
    False
    '''
View Code

2.4守護執行緒

無論是程序還是執行緒,都遵循:守護xxx會等待主xxx執行完畢後被銷燬

需要強調的是:執行完畢並非終止執行

#1.對主程序來說,執行完畢指的是主程序程式碼執行完畢
#2.對主執行緒來說,執行完畢指的是主執行緒所在的程序內所有非守護執行緒統統執行完畢,主執行緒才算執行完畢
#1 主程序在其程式碼結束後就已經算執行完畢了(守護程序在此時就被回收),然後主程序會一直等非守護的子程序都執行完畢後回收子程序的資源(否則會產生殭屍程序),才會結束,

#2 主執行緒在其他非守護執行緒執行完畢後才算執行完畢(守護執行緒在此時就被回收)。因為主執行緒的結束意味著程序的結束,程序整體的資源都將被回收,而程序必須保證非守護執行緒都執行完畢後才能結束。
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必須在t.start()之前設定
    t.start()

    print('主執行緒')
    print(t.is_alive())
    '''
    主執行緒
    True
    '''
View Code
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1=Thread(target=foo)
t2=Thread(target=bar)

t1.daemon=True
t1.start()
t2.start()
print("main-------")

迷惑人的例子
View Code

2.5互斥鎖(同步鎖)

多執行緒的同步鎖與多程序的同步鎖是一個道理,就是多個執行緒搶佔同一個資料(資源)時,我們要保證資料的安全,合理的順序。

from threading import Thread
import time

x = 100
def task():
    global x
    temp = x
    time.sleep(0.1)
    temp -= 1
    x = temp



if __name__ == '__main__':
    t_l1 = []
    for i in range(100):
        t = Thread(target=task)
        t_l1.append(t)
        t.start()

    for i in t_l1:
        i.join()
    print(f'主{x}')
不加鎖搶佔同一個資源的問題
from threading import Thread
from threading import Lock
import time

x = 100
lock = Lock()

def task():
    global x
    lock.acquire()
    temp = x
    time.sleep(0.1)
    temp -= 1
    x = temp
    lock.release()


if __name__ == '__main__':
    t_l1 = []
    for i in range(100):
        t = Thread(target=task)
        t_l1.append(t)
        t.start()

    for i in t_l1:
        i.join()
    print(f'主{x}')
同步鎖保證資料安全

2.6死鎖現象與遞迴鎖

程序也有死鎖與遞迴鎖,程序的死鎖和遞迴鎖與執行緒的死鎖遞迴鎖同理。

所謂死鎖: 是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序,如下就是死鎖

from threading import Thread
from threading import Lock
import time

lock_A = Lock()
lock_B = Lock()


class MyThread(Thread):

    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        lock_A.acquire()
        print(f'{self.name}拿到A鎖')

        lock_B.acquire()
        print(f'{self.name}拿到B鎖')
        lock_B.release()

        lock_A.release()

    def f2(self):
        lock_B.acquire()
        print(f'{self.name}拿到B鎖')
        time.sleep(0.1)

        lock_A.acquire()
        print(f'{self.name}拿到A鎖')
        lock_A.release()

        lock_B.release()

if __name__ == '__main__':
    for i in range(3):
        t = MyThread()
        t.start()

    print('主....')
View Code

解決方法,遞迴鎖,在Python中為了支援在同一執行緒中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:

from threading import Thread
from threading import RLock
import time

lock_A = lock_B = RLock()


class MyThread(Thread):
    
    def run(self):
        self.f1()
        self.f2()
    
    def f1(self):
        lock_A.acquire()
        print(f'{self.name}拿到A鎖')
        
        lock_B.acquire()
        print(f'{self.name}拿到B鎖')
        lock_B.release()
        
        lock_A.release()
    
    def f2(self):
        lock_B.acquire()
        print(f'{self.name}拿到B鎖')
        time.sleep(0.1)
        
        lock_A.acquire()
        print(f'{self.name}拿到A鎖')
        lock_A.release()
        
        lock_B.release()


if __name__ == '__main__':
    for i in range(10):
        t = MyThread()
        t.start()
    
    print('主....')
View Code

2.7訊號量Semaphore

同進程的一樣

Semaphore管理一個內建的計數器,
每當呼叫acquire()時內建計數器-1;
呼叫release() 時內建計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞執行緒直到其他執行緒呼叫release()。

例項:(同時只有5個執行緒可以獲得semaphore,即可以限制最大連線數為5):

from threading import Thread
from threading import Semaphore
from threading import current_thread
import time
import random

sem = Semaphore(5)

def go_public_wc():
    sem.acquire()
    print(f'{current_thread().getName()} 上廁所ing')
    time.sleep(random.randint(1,3))
    sem.release()


if __name__ == '__main__':
    for i in range(20):
        t = Thread(target=go_public_wc)
        t.start()
View Code

2.8Python GIL(Global interpreter Lock)

首先,一些語言(java、c++、c)是支援同一個程序中的多個執行緒是可以應用多核CPU的,也就是我們會聽到的現在4核8核這種多核CPU技術的牛逼之處。那麼我們之前說過應用多程序的時候如果有共享資料是不是會出現資料不安全的問題啊,就是多個程序同時一個檔案中去搶這個資料,大家都把這個資料改了,但是還沒來得及去更新到原來的檔案中,就被其他程序也計算了,導致資料不安全的問題啊,所以我們是不是通過加鎖可以解決啊,多執行緒大家想一下是不是一樣的,併發執行就是有這個問題。但是python最早期的時候對於多執行緒也加鎖,但是python比較極端的(在當時電腦cpu確實只有1核)加了一個GIL全域性解釋鎖,是直譯器級別的,鎖的是整個執行緒,而不是執行緒裡面的某些資料操作,每次只能有一個執行緒使用cpu,也就說多執行緒用不了多核,但是他不是python語言的問題,是CPython直譯器的特性,如果用Jpython直譯器是沒有這個問題的,Cpython是預設的,因為速度快,Jpython是java開發的,在Cpython裡面就是沒辦法用多核,這是python的弊病,歷史問題,雖然眾多python團隊的大神在致力於改變這個情況,但是暫沒有解決。(這和解釋型語言(python,php)和編譯型語言有關係嗎???待定!,編譯型語言一般在編譯的過程中就幫你分配好了,解釋型要邊解釋邊執行,所以為了防止出現數據不安全的情況加上了這個鎖,這是所有解釋型語言的弊端??)

但是有了這個鎖我們就不能併發了嗎?當我們的程式是偏計算的,也就是cpu佔用率很高的程式(cpu一直在計算),就不行了,但是如果你的程式是I/O型的(一般你的程式都是這個)(input、訪問網址網路延遲、開啟/關閉檔案讀寫),在什麼情況下用的到高併發呢(金融計算會用到,人工智慧(阿爾法狗),但是一般的業務場景用不到,爬網頁,多使用者網站、聊天軟體、處理檔案),I/O型的操作很少佔用CPU,那麼多執行緒還是可以併發的,因為cpu只是快速的排程執行緒,而執行緒裡面並沒有什麼計算,就像一堆的網路請求,我cpu非常快速的一個一個的將你的多執行緒排程出去,你的執行緒就去執行I/O操作了。

2.9GIL鎖與Lock的關係

GIL VS Lock

    機智的同學可能會問到這個問題,就是既然你之前說過了,Python已經有一個GIL來保證同一時間只能有一個執行緒來執行了,為什麼這裡還需要lock? 

 首先我們需要達成共識:鎖的目的是為了保護共享的資料,同一時間只能有一個執行緒來修改共享的資料

    然後,我們可以得出結論:保護不同的資料就應該加不同的鎖。

 最後,問題就很明朗了,GIL 與Lock是兩把鎖,保護的資料不一樣,前者是直譯器級別的(當然保護的就是直譯器級別的資料,比如垃圾回收的資料),後者是保護使用者自己開發的應用程式的資料,很明顯GIL不負責這件事,只能使用者自定義加鎖處理,即Lock

過程分析:所有執行緒搶的是GIL鎖,或者說所有執行緒搶的是執行許可權

  執行緒1搶到GIL鎖,拿到執行許可權,開始執行,然後加了一把Lock,還沒有執行完畢,即執行緒1還未釋放Lock,有可能執行緒2搶到GIL鎖,開始執行,執行過程中發現Lock還沒有被執行緒1釋放,於是執行緒2進入阻塞,被奪走執行許可權,有可能執行緒1拿到GIL,然後正常執行到釋放Lock。。。這就導致了序列執行的效果

  既然是序列,那我們執行

  t1.start()

  t1.join

  t2.start()

  t2.join()

  這也是序列執行啊,為何還要加Lock呢,需知join是等待t1所有的程式碼執行完,相當於鎖住了t1的所有程式碼,而Lock只是鎖住一部分操作共享資料的程式碼。
View Code

詳解:

因為Python直譯器幫你自動定期進行記憶體回收,你可以理解為python直譯器裡有一個獨立的執行緒,每過一段時間它起wake up做一次全域性輪詢看看哪些記憶體資料是可以被清空的,此時你自己的程式 裡的執行緒和 py直譯器自己的執行緒是併發執行的,假設你的執行緒刪除了一個變數,py直譯器的垃圾回收執行緒在清空這個變數的過程中的clearing時刻,可能一個其它執行緒正好又重新給這個還沒來及得清空的記憶體空間賦值了,結果就有可能新賦值的資料被刪除了,為了解決類似的問題,python直譯器簡單粗暴的加了鎖,即當一個執行緒執行時,其它人都不能動,這樣就解決了上述的問題,  這可以說是Python早期版本的遺留問題。

同進程的一樣

執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。如果程式中的其 他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event物件。 物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。在 初始情況下,Event物件中的訊號標誌被設定為假。如果有執行緒等待一個Event物件, 而這個Event物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個Event物件的訊號標誌設定為真,它將喚醒所有等待這個Event物件的執行緒。如果一個執行緒等待一個已經被設定為真的Event物件,那麼它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞執行緒;

event.set(): 設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程;

event.clear():恢復event的狀態值為False。

例如,有多個工作執行緒嘗試連結MySQL,我們想要在連結前確保MySQL服務正常才讓那些工作執行緒去連線MySQL伺服器,如果連線不成功,都會去嘗試重新連線。那麼我們就可以採用threading.Event機制來協調各個工作執行緒的連線操作

2.10條件Condition(瞭解)

使得執行緒等待,只有滿足某條件時,才釋放n個執行緒

import time
from threading import Thread,RLock,Condition,current_thread

def func1(c):
    c.acquire(False) #固定格式
    # print(1111)

    c.wait()  #等待通知,
    time.sleep(3)  #通知完成後大家是序列執行的,這也看出了鎖的機制了
    print('%s執行了'%(current_thread().getName()))

    c.release()

if __name__ == '__main__':
    c = Condition()
    for i in range(5):
        t = Thread(target=func1,args=(c,))
        t.start()

    while True:
        num = int(input('請輸入你要通知的執行緒個數:'))
        c.acquire() #固定格式
        c.notify(num)  #通知num個執行緒別等待了,去執行吧
        c.release()

#結果分析: 
# 請輸入你要通知的執行緒個數:3
# 請輸入你要通知的執行緒個數:Thread-1執行了 #有時候你會發現的你結果列印在了你要輸入內容的地方,這是列印的問題,沒關係,不影響
# Thread-3執行了
# Thread-2執行了
View Code

2.11定時器(瞭解)

定時器,指定n秒後執行某個操作,這個做定時任務的時候可能會用到

from threading import Thread,Event
import threading
import time,random
def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('連結超時')
        print('<%s>第%s次嘗試連結' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>連結成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()
View Code

2.12執行緒佇列

執行緒之間的通訊我們列表行不行呢,當然行,那麼佇列和列表有什麼區別呢?

  queue佇列 :使用import queue,用法與程序Queue一樣

  queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

  classqueue.Queue(maxsize=0) #先進先出
import queue #不需要通過threading模組裡面匯入,直接import queue就可以了,這是python自帶的
#用法基本和我們程序multiprocess中的queue是一樣的
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
# q.put_nowait() #沒有資料就報錯,可以通過try來搞
print(q.get())
print(q.get())
print(q.get())
# q.get_nowait() #沒有資料就報錯,可以通過try來搞
'''
結果(先進先出):
first
second
third
'''
先進先出示例

classqueue.LifoQueue(maxsize=0) #last in fisrt out

import queue

q=queue.LifoQueue() #佇列,類似於棧,棧我們提過嗎,是不是先進後出的順序啊
q.put('first')
q.put('second')
q.put('third')
# q.put_nowait()

print(q.get())
print(q.get())
print(q.get())
# q.get_nowait()
'''
結果(後進先出):
third
second
first
'''
先進後出示例

classqueue.PriorityQueue(maxsize=0) #儲存資料時可設定優先順序的佇列

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先順序(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高
q.put((-10,'a'))
q.put((-5,'a'))  #負數也可以
# q.put((20,'ws'))  #如果兩個值的優先順序一樣,那麼按照後面的值的acsii碼順序來排序,如果字串第一個數元素相同,比較第二個元素的acsii碼順序
# q.put((20,'wd'))
# q.put((20,{'a':11})) #TypeError: unorderable types: dict() < dict() 不能是字典
# q.put((20,('w',1)))  #優先順序相同的兩個資料,他們後面的值必須是相同的資料型別才能比較,可以是元祖,也是通過元素的ascii碼順序來排序

q.put((20,'b'))
q.put((20,'a'))
q.put((0,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先順序越高,優先順序高的優先出隊):
'''
優先順序佇列示例

這三種佇列都是執行緒安全的,不會出現多個執行緒搶佔同一個資源或資料的情況

2.13Python標準模組--concurrent.futures

早期的時候我們沒有執行緒池,現在python提供了一個新的標準或者說內建的模組,這個模組裡面提供了新的執行緒池和程序池,之前我們說的程序池是在multiprocessing裡面的,現在這個在這個新的模組裡面,他倆用法上是一樣的。

為什麼要將程序池和執行緒池放到一起呢,是為了統一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式一樣,而且只要通過這個concurrent.futures匯入就可以直接用他們兩個了

concurrent.futures模組提供了高度封裝的非同步呼叫介面
ThreadPoolExecutor:執行緒池,提供非同步呼叫
ProcessPoolExecutor: 程序池,提供非同步呼叫
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
非同步提交任務

#map(func, *iterables, timeout=None, chunksize=1) 
取代for迴圈submit的操作

#shutdown(wait=True) 
相當於程序池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源後才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait引數為何值,整個程式都會等到所有任務執行完畢
submit和map必須在shutdown之前

#result(timeout=None)
取得結果

#add_done_callback(fn)
回撥函式
View Code
import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    print('%s列印的:'%(threading.get_ident()),n)
    return n*n
tpool = ThreadPoolExecutor(max_workers=5) #預設一般起執行緒的資料不超過CPU個數*5
# tpool = ProcessPoolExecutor(max_workers=5) #程序池的使用只需要將上面的ThreadPoolExecutor改為ProcessPoolExecutor就行了,其他都不用改
#非同步執行
t_lst = []
for i in range(5):
    t = tpool.submit(func,i) #提交執行函式,返回一個結果物件,i作為任務函式的引數 def submit(self, fn, *args, **kwargs):  可以傳任意形式的引數
    t_lst.append(t)  #
    # print(t.result())
    #這個返回的結果物件t,不能直接去拿結果,不然又變成串行了,可以理解為拿到一個號碼,等所有執行緒的結果都出來之後,我們再去通過結果物件t獲取結果
tpool.shutdown() #起到原來的close阻止新任務進來 + join的作用,等待所有的執行緒執行完畢
print('主執行緒')
for ti in t_lst:
    print('>>>>',ti.result())

# 我們還可以不用shutdown(),用下面這種方式
# while 1:
#     for n,ti in enumerate(t_lst):
#         print('>>>>', ti.result(),n)
#     time.sleep(2) #每個兩秒去去一次結果,哪個有結果了,就可以取出哪一個,想表達的意思就是說不用等到所有的結果都出來再去取,可以輪詢著去取結果,因為你的任務需要執行的時間很長,那麼你需要等很久才能拿到結果,通過這樣的方式可以將快速出來的結果先拿出來。如果有的結果物件裡面還沒有執行結果,那麼你什麼也取不到,這一點要注意,不是空的,是什麼也取不到,那怎麼判斷我已經取出了哪一個的結果,可以通過列舉enumerate來搞,記錄你是哪一個位置的結果物件的結果已經被取過了,取過的就不再取了

#結果分析: 列印的結果是沒有順序的,因為到了func函式中的sleep的時候執行緒會切換,誰先列印就沒準兒了,但是最後的我們通過結果物件取結果的時候拿到的是有序的,因為我們主執行緒進行for迴圈的時候,我們是按順序將結果物件新增到列表中的。
# 37220列印的: 0
# 32292列印的: 4
# 33444列印的: 1
# 30068列印的: 2
# 29884列印的: 3
# 主執行緒
# >>>> 0
# >>>> 1
# >>>> 4
# >>>> 9
# >>>> 16
ThreaPoolExecutor簡單使用

ProcessPoolExecutor的使用:

只需要將這一行程式碼改為下面這一行就可以了,其他的程式碼都不用變
tpool = ThreadPoolExecutor(max_workers=5) #預設一般起執行緒的資料不超過CPU個數*5
# tpool = ProcessPoolExecutor(max_workers=5)

你就會發現為什麼將執行緒池和程序池都放到這一個模組裡面了,用法一樣
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import threading
import os,time,random
def task(n):
    print('%s is runing' %threading.get_ident())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    s = executor.map(task,range(1,5)) #map取代了for+submit
    print([i for i in s])
map的使用
import time
import os
import threading
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

def func(n):
    time.sleep(2)
    return n*n

def call_back(m):
    print('結果為:%s'%(m.result()))

tpool = ThreadPoolExecutor(max_workers=5)
t_lst = []
for i in range(5):
    t = tpool.submit(func,i).add_done_callback(call_back)
回撥函式簡單使用
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<程序%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<程序%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future物件obj,需要用obj.result()拿到結果
回撥函式的簡單應用(練習)