1. 程式人生 > >[記錄]Python高並發編程

[記錄]Python高並發編程

greenlet random 不同 包括 pool 不能 內核 高效率 對象

==========
==多進程==
==========
要讓Python程序實現多進程(multiprocessing),我們先了解操作系統的相關知識。

Unix/Linux操作系統提供了一個fork()系統調用,它非常特殊。普通的函數調用,調用一次,返回一次,但是fork()調用一次,返回兩次,因為操作系統自動把當前進程(稱為父進程)復制了一份(稱為子進程),然後,分別在父進程和子進程內返回。

子進程永遠返回0,而父進程返回子進程的ID。這樣做的理由是,一個父進程可以fork出很多子進程,所以,父進程要記下每個子進程的ID,而子進程只需要調用getppid()就可以拿到父進程的ID。

Python的os模塊封裝了常見的系統調用,其中就包括fork,可以在Python程序中輕松創建子進程:

# multiprocessing.py
import os

print ‘Process (%s) start...‘ % os.getpid()
pid = os.fork()
if pid==0:
    print ‘I am child process (%s) and my parent is %s.‘ % (os.getpid(), os.getppid())
else:
    print ‘I (%s) just created a child process (%s).‘ % (os.getpid(), pid)
運行結果如下:

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
由於Windows沒有fork調用,上面的代碼在Windows上無法運行。由於Mac系統是基於BSD(Unix的一種)內核,所以,在Mac下運行是沒有問題的,推薦大家用Mac學Python!

有了fork調用,一個進程在接到新任務時就可以復制出一個子進程來處理新任務,常見的Apache服務器就是由父進程監聽端口,每當有新的http請求時,就fork出子進程來處理新的http請求。

multiprocessing
如果你打算編寫多進程的服務程序,Unix/Linux無疑是正確的選擇。由於Windows沒有fork調用,難道在Windows上無法用Python編寫多進程的程序?

由於Python是跨平臺的,自然也應該提供一個跨平臺的多進程支持。multiprocessing模塊就是跨平臺版本的多進程模塊。

multiprocessing模塊提供了一個Process類來代表一個進程對象,下面的例子演示了啟動一個子進程並等待其結束:

from multiprocessing import Process
import os

# 子進程要執行的代碼
def run_proc(name):
    print ‘Run child process %s (%s)...‘ % (name, os.getpid())

if __name__==‘__main__‘:
    print ‘Parent process %s.‘ % os.getpid()
    p = Process(target=run_proc, args=(‘test‘,))
    print ‘Process will start.‘
    p.start()
    p.join()
    print ‘Process end.‘
執行結果如下:

Parent process 928.
Process will start.
Run child process test (929)...
Process end.
創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,用start()方法啟動,這樣創建進程比fork()還要簡單。

join()方法可以等待子進程結束後再繼續往下運行,通常用於進程間的同步。

Pool
如果要啟動大量的子進程,可以用進程池的方式批量創建子進程:

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print ‘Run task %s (%s)...‘ % (name, os.getpid())
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print ‘Task %s runs %0.2f seconds.‘ % (name, (end - start))

if __name__==‘__main__‘:
    print ‘Parent process %s.‘ % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print ‘Waiting for all subprocesses done...‘
    p.close()
    p.join()
    print ‘All subprocesses done.‘
執行結果如下:

Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.
代碼解讀:

對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之後就不能繼續添加新的Process了。

請註意輸出的結果,task 0,1,2,3是立刻執行的,而task 4要等待前面某個task完成後才執行,這是因為Pool的默認大小在我的電腦上是4,因此,最多同時執行4個進程。這是Pool有意設計的限制,並不是操作系統的限制。如果改成:

p = Pool(5)
就可以同時跑5個進程。

由於Pool的默認大小是CPU的核數,如果你不幸擁有8核CPU,你要提交至少9個子進程才能看到上面的等待效果。

進程間通信
Process之間肯定是需要通信的,操作系統提供了很多機制來實現進程間的通信。Python的multiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等多種方式來交換數據。

我們以Queue為例,在父進程中創建兩個子進程,一個往Queue裏寫數據,一個從Queue裏讀數據:

from multiprocessing import Process, Queue
import os, time, random

# 寫數據進程執行的代碼:
def write(q):
    for value in [‘A‘, ‘B‘, ‘C‘]:
        print ‘Put %s to queue...‘ % value
        q.put(value)
        time.sleep(random.random())

# 讀數據進程執行的代碼:
def read(q):
    while True:
        value = q.get(True)
        print ‘Get %s from queue.‘ % value

if __name__==‘__main__‘:
    # 父進程創建Queue,並傳給各個子進程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動子進程pw,寫入:
    pw.start()
    # 啟動子進程pr,讀取:
    pr.start()
    # 等待pw結束:
    pw.join()
    # pr進程裏是死循環,無法等待其結束,只能強行終止:
    pr.terminate()
運行結果如下:

Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
在Unix/Linux下,multiprocessing模塊封裝了fork()調用,使我們不需要關註fork()的細節。由於Windows沒有fork調用,因此,multiprocessing需要“模擬”出fork的效果,父進程所有Python對象都必須通過pickle序列化再傳到子進程去。
所以,如果multiprocessing在Windows下調用失敗了,要先考慮是不是pickle失敗了。

小結
在Unix/Linux下,可以使用fork()調用實現多進程。

要實現跨平臺的多進程,可以使用multiprocessing模塊。

進程間通信是通過Queue、Pipes等實現的。

==========
==多線程==
==========
多任務可以由多進程完成,也可以由一個進程內的多線程完成。

我們前面提到了進程是由若幹線程組成的,一個進程至少有一個線程。

由於線程是操作系統直接支持的執行單元,因此,高級語言通常都內置多線程的支持,Python也不例外,並且,Python的線程是真正的Posix Thread,而不是模擬出來的線程。

Python的標準庫提供了兩個模塊:thread和threading,thread是低級模塊,threading是高級模塊,對thread進行了封裝。絕大多數情況下,我們只需要使用threading這個高級模塊。

啟動一個線程就是把一個函數傳入並創建Thread實例,然後調用start()開始執行:

import time, threading

# 新線程執行的代碼:
def loop():
    print ‘thread %s is running...‘ % threading.current_thread().name
    n = 0
    while n < 5:
        n = n + 1
        print ‘thread %s >>> %s‘ % (threading.current_thread().name, n)
        time.sleep(1)
    print ‘thread %s ended.‘ % threading.current_thread().name

print ‘thread %s is running...‘ % threading.current_thread().name
t = threading.Thread(target=loop, name=‘LoopThread‘)
t.start()
t.join()
print ‘thread %s ended.‘ % threading.current_thread().name
執行結果如下:

thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.
由於任何進程默認就會啟動一個線程,我們把該線程稱為主線程,主線程又可以啟動新的線程,Python的threading模塊有個current_thread()函數,它永遠返回當前線程的實例。
主線程實例的名字叫MainThread,子線程的名字在創建時指定,我們用LoopThread命名子線程。名字僅僅在打印時用來顯示,完全沒有其他意義,
如果不起名字Python就自動給線程命名為Thread-1,Thread-2……

Lock
多線程和多進程最大的不同在於,多進程中,同一個變量,各自有一份拷貝存在於每個進程中,互不影響,而多線程中,所有變量都由所有線程共享。
所以,任何一個變量都可以被任何一個線程修改,因此,線程之間共享數據最大的危險在於多個線程同時改一個變量,把內容給改亂了。

來看看多個線程同時操作一個變量怎麽把內容給改亂了:

import time, threading

# 假定這是你的銀行存款:
balance = 0

def change_it(n):
    # 先存後取,結果應該為0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(100000):
        change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print balance
我們定義了一個共享變量balance,初始值為0,並且啟動兩個線程,先存後取,理論上結果應該為0,但是,由於線程的調度是由操作系統決定的,
當t1、t2交替執行時,只要循環次數足夠多,balance的結果就不一定是0了。

原因是因為高級語言的一條語句在CPU執行時是若幹條語句,即使一個簡單的計算:

balance = balance + n
也分兩步:

計算balance + n,存入臨時變量中;
將臨時變量的值賦給balance。
也就是可以看成:

x = balance + n
balance = x
由於x是局部變量,兩個線程各自都有自己的x,當代碼正常執行時:

初始值 balance = 0

t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t1: balance = x1     # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1     # balance = 0

t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2     # balance = 8
t2: x2 = balance - 8 # x2 = 8 - 8 = 0
t2: balance = x2     # balance = 0

結果 balance = 0
但是t1和t2是交替運行的,如果操作系統以下面的順序執行t1、t2:

初始值 balance = 0

t1: x1 = balance + 5  # x1 = 0 + 5 = 5

t2: x2 = balance + 8  # x2 = 0 + 8 = 8
t2: balance = x2      # balance = 8

t1: balance = x1      # balance = 5
t1: x1 = balance - 5  # x1 = 5 - 5 = 0
t1: balance = x1      # balance = 0

t2: x2 = balance - 5  # x2 = 0 - 5 = -5
t2: balance = x2      # balance = -5

結果 balance = -5
究其原因,是因為修改balance需要多條語句,而執行這幾條語句時,線程可能中斷,從而導致多個線程把同一個對象的內容改亂了。

兩個線程同時一存一取,就可能導致余額不對,你肯定不希望你的銀行存款莫名其妙地變成了負數,所以,我們必須確保一個線程在修改balance的時候,別的線程一定不能改。

如果我們要確保balance計算正確,就要給change_it()上一把鎖,當某個線程開始執行change_it()時,我們說,該線程因為獲得了鎖,因此其他線程不能同時執行change_it(),
只能等待,直到鎖被釋放後,獲得該鎖以後才能改。由於鎖只有一個,無論多少線程,同一時刻最多只有一個線程持有該鎖,所以,不會造成修改的沖突。
創建一個鎖就是通過threading.Lock()來實現:

balance = 0
lock = threading.Lock()

def run_thread(n):
    for i in range(100000):
        # 先要獲取鎖:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要釋放鎖:
            lock.release()
當多個線程同時執行lock.acquire()時,只有一個線程能成功地獲取鎖,然後繼續執行代碼,其他線程就繼續等待直到獲得鎖為止。

獲得鎖的線程用完後一定要釋放鎖,否則那些苦苦等待鎖的線程將永遠等待下去,成為死線程。所以我們用try...finally來確保鎖一定會被釋放。

鎖的好處就是確保了某段關鍵代碼只能由一個線程從頭到尾完整地執行,壞處當然也很多,首先是阻止了多線程並發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率就大大地下降了。
其次,由於可以存在多個鎖,不同的線程持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個線程全部掛起,既不能執行,也無法結束,只能靠操作系統強制終止。

多核CPU
如果你不幸擁有一個多核CPU,你肯定在想,多核應該可以同時執行多個線程。

如果寫一個死循環的話,會出現什麽情況呢?

打開Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以監控某個進程的CPU使用率。

我們可以監控到一個死循環線程會100%占用一個CPU。

如果有兩個死循環線程,在多核CPU中,可以監控到會占用200%的CPU,也就是占用兩個CPU核心。

要想把N核CPU的核心全部跑滿,就必須啟動N個死循環線程。

試試用Python寫個死循環:

import threading, multiprocessing

def loop():
    x = 0
    while True:
        x = x ^ 1

for i in range(multiprocessing.cpu_count()):
    t = threading.Thread(target=loop)
    t.start()
啟動與CPU核心數量相同的N個線程,在4核CPU上可以監控到CPU占用率僅有160%,也就是使用不到兩核。

即使啟動100個線程,使用率也就170%左右,仍然不到兩核。

但是用C、C++或Java來改寫相同的死循環,直接可以把全部核心跑滿,4核就跑到400%,8核就跑到800%,為什麽Python不行呢?

因為Python的線程雖然是真正的線程,但解釋器執行代碼時,有一個GIL鎖:Global Interpreter Lock,任何Python線程執行前,必須先獲得GIL鎖,然後,每執行100條字節碼,
解釋器就自動釋放GIL鎖,讓別的線程有機會執行。這個GIL全局鎖實際上把所有線程的執行代碼都給上了鎖,所以,多線程在Python中只能交替執行,即使100個線程跑在100核CPU上,
也只能用到1個核。

GIL是Python解釋器設計的歷史遺留問題,通常我們用的解釋器是官方實現的CPython,要真正利用多核,除非重寫一個不帶GIL的解釋器。

所以,在Python中,可以使用多線程,但不要指望能有效利用多核。如果一定要通過多線程利用多核,那只能通過C擴展來實現,不過這樣就失去了Python簡單易用的特點。

不過,也不用過於擔心,Python雖然不能利用多線程實現多核任務,但可以通過多進程實現多核任務。多個Python進程有各自獨立的GIL鎖,互不影響。

小結
多線程編程,模型復雜,容易發生沖突,必須用鎖加以隔離,同時,又要小心死鎖的發生。

Python解釋器由於設計時有GIL全局鎖,導致了多線程無法利用多核。多線程的並發在Python中就是一個美麗的夢。

===============
==ThreadLocal==
===============
在多線程環境下,每個線程都有自己的數據。一個線程使用自己的局部變量比使用全局變量好,因為局部變量只有線程自己能看見,不會影響其他線程,而全局變量的修改必須加鎖。

但是局部變量也有問題,就是在函數調用的時候,傳遞起來很麻煩:

def process_student(name):
    std = Student(name)
    # std是局部變量,但是每個函數都要用它,因此必須傳進去:
    do_task_1(std)
    do_task_2(std)

def do_task_1(std):
    do_subtask_1(std)
    do_subtask_2(std)

def do_task_2(std):
    do_subtask_2(std)
    do_subtask_2(std)
每個函數一層一層調用都這麽傳參數那還得了?用全局變量?也不行,因為每個線程處理不同的Student對象,不能共享。

如果用一個全局dict存放所有的Student對象,然後以thread自身作為key獲得線程對應的Student對象如何?

global_dict = {}

def std_thread(name):
    std = Student(name)
    # 把std放到全局變量global_dict中:
    global_dict[threading.current_thread()] = std
    do_task_1()
    do_task_2()

def do_task_1():
    # 不傳入std,而是根據當前線程查找:
    std = global_dict[threading.current_thread()]
    ...

def do_task_2():
    # 任何函數都可以查找出當前線程的std變量:
    std = global_dict[threading.current_thread()]
    ...
這種方式理論上是可行的,它最大的優點是消除了std對象在每層函數中的傳遞問題,但是,每個函數獲取std的代碼有點醜。

有沒有更簡單的方式?

ThreadLocal應運而生,不用查找dict,ThreadLocal幫你自動做這件事:

import threading

# 創建全局ThreadLocal對象:
local_school = threading.local()

def process_student():
    print ‘Hello, %s (in %s)‘ % (local_school.student, threading.current_thread().name)

def process_thread(name):
    # 綁定ThreadLocal的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=(‘Alice‘,), name=‘Thread-A‘)
t2 = threading.Thread(target= process_thread, args=(‘Bob‘,), name=‘Thread-B‘)
t1.start()
t2.start()
t1.join()
t2.join()
執行結果:

Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
全局變量local_school就是一個ThreadLocal對象,每個Thread對它都可以讀寫student屬性,但互不影響。你可以把local_school看成全局變量,
但每個屬性如local_school.student都是線程的局部變量,可以任意讀寫而互不幹擾,也不用管理鎖的問題,ThreadLocal內部會處理。

可以理解為全局變量local_school是一個dict,不但可以用local_school.student,還可以綁定其他變量,如local_school.teacher等等。

ThreadLocal最常用的地方就是為每個線程綁定一個數據庫連接,HTTP請求,用戶身份信息等,這樣一個線程的所有調用到的處理函數都可以非常方便地訪問這些資源。

=============
==分布式進程==
=============
在Thread和Process中,應當優選Process,因為Process更穩定,而且,Process可以分布到多臺機器上,而Thread最多只能分布到同一臺機器的多個CPU上。

Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網絡通信。
由於managers模塊封裝很好,不必了解網絡通信的細節,就可以很容易地編寫分布式多進程程序。

舉個例子:如果我們已經有一個通過Queue通信的多進程程序在同一臺機器上運行,現在,由於處理任務的進程任務繁重,希望把發送任務的進程和處理任務的進程分布到兩臺機器上。
怎麽用分布式進程實現?

原有的Queue可以繼續使用,但是,通過managers模塊把Queue通過網絡暴露出去,就可以讓其他機器的進程訪問Queue了。

我們先看服務進程,服務進程負責啟動Queue,把Queue註冊到網絡上,然後往Queue裏面寫入任務:

# taskmanager.py

import random, time, Queue
from multiprocessing.managers import BaseManager

# 發送任務的隊列:
task_queue = Queue.Queue()
# 接收結果的隊列:
result_queue = Queue.Queue()

# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把兩個Queue都註冊到網絡上, callable參數關聯了Queue對象:
QueueManager.register(‘get_task_queue‘, callable=lambda: task_queue)
QueueManager.register(‘get_result_queue‘, callable=lambda: result_queue)
# 綁定端口5000, 設置驗證碼‘abc‘:
manager = QueueManager(address=(‘‘, 5000), authkey=‘abc‘)
# 啟動Queue:
manager.start()
# 獲得通過網絡訪問的Queue對象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個任務進去:
for i in range(10):
    n = random.randint(0, 10000)
    print(‘Put task %d...‘ % n)
    task.put(n)
# 從result隊列讀取結果:
print(‘Try get results...‘)
for i in range(10):
    r = result.get(timeout=10)
    print(‘Result: %s‘ % r)
# 關閉:
manager.shutdown()
請註意,當我們在一臺機器上寫多進程程序時,創建的Queue可以直接拿來用,但是,在分布式多進程環境下,添加任務到Queue不可以直接對原始的task_queue進行操作,
那樣就繞過了QueueManager的封裝,必須通過manager.get_task_queue()獲得的Queue接口添加。

然後,在另一臺機器上啟動任務進程(本機上啟動也可以):

# taskworker.py

import time, sys, Queue
from multiprocessing.managers import BaseManager

# 創建類似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由於這個QueueManager只從網絡上獲取Queue,所以註冊時只提供名字:
QueueManager.register(‘get_task_queue‘)
QueueManager.register(‘get_result_queue‘)

# 連接到服務器,也就是運行taskmanager.py的機器:
server_addr = ‘127.0.0.1‘
print(‘Connect to server %s...‘ % server_addr)
# 端口和驗證碼註意保持與taskmanager.py設置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=‘abc‘)
# 從網絡連接:
m.connect()
# 獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊列取任務,並把結果寫入result隊列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print(‘run task %d * %d...‘ % (n, n))
        r = ‘%d * %d = %d‘ % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print(‘task queue is empty.‘)
# 處理結束:
print(‘worker exit.‘)
任務進程要通過網絡連接到服務進程,所以要指定服務進程的IP。

現在,可以試試分布式進程的工作效果了。先啟動taskmanager.py服務進程:

$ python taskmanager.py 
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...
taskmanager進程發送完任務後,開始等待result隊列的結果。現在啟動taskworker.py進程:

$ python taskworker.py 127.0.0.1
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.
taskworker進程結束,在taskmanager進程中會繼續打印出結果:

Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956
這個簡單的Manager/Worker模型有什麽用?其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個worker,就可以把任務分布到幾臺甚至幾十臺機器上,
比如把計算n*n的代碼換成發送郵件,就實現了郵件隊列的異步發送。

Queue對象存儲在哪?註意到taskworker.py中根本沒有創建Queue的代碼,所以,Queue對象存儲在taskmanager.py進程中。

而Queue之所以能通過網絡訪問,就是通過QueueManager實現的。由於QueueManager管理的不止一個Queue,所以,要給每個Queue的網絡調用接口起個名字,比如get_task_queue。

authkey有什麽用?這是為了保證兩臺機器正常通信,不被其他機器惡意幹擾。如果taskworker.py的authkey和taskmanager.py的authkey不一致,肯定連接不上。

小結
Python的分布式進程接口簡單,封裝良好,適合需要把繁重任務分布到多臺機器的環境下。

註意Queue的作用是用來傳遞任務和接收結果,每個任務的描述數據量要盡量小。比如發送一個處理日誌文件的任務,就不要發送幾百兆的日誌文件本身,而是發送日誌文件存放的完整路徑,
由Worker進程再去共享的磁盤上讀取文件。

==========
==異步IO==
==========
1.協程:
考慮到CPU和IO之間巨大的速度差異,一個任務在執行的過程中大部分時間都在等待IO操作,單進程單線程模型會導致別的任務無法並行執行,
因此,我們才需要多進程模型或者多線程模型來支持多任務並發執行。

現代操作系統對IO操作已經做了巨大的改進,最大的特點就是支持異步IO。如果充分利用操作系統提供的異步IO支持,就可以用單進程單線程模型來執行多任務,
這種全新的模型稱為事件驅動模型,Nginx就是支持異步IO的Web服務器,它在單核CPU上采用單進程模型就可以高效地支持多任務。在多核CPU上,
可以運行多個進程(數量與CPU核心數相同),充分利用多核CPU。由於系統總的進程數量十分有限,因此操作系統調度非常高效。用異步IO編程模型來實現多任務是一個主要的趨勢。

對應到Python語言,單進程的異步編程模型稱為協程,有了協程的支持,就可以基於事件驅動編寫高效的多任務程序。

Python通過yield提供了對協程的基本支持,但是不完全。而第三方的gevent為Python提供了比較完善的協程支持。
在學習異步IO模型前,我們先來了解協程。

協程,又稱微線程,纖程。英文名Coroutine。

協程的概念很早就提出來了,但直到最近幾年才在某些語言(如Lua)中得到廣泛應用。

子程序,或者稱為函數,在所有語言中都是層級調用,比如A調用B,B在執行過程中又調用了C,C執行完畢返回,B執行完畢返回,最後是A執行完畢。

所以子程序調用是通過棧實現的,一個線程就是執行一個子程序。

子程序調用總是一個入口,一次返回,調用順序是明確的。而協程的調用和子程序不同。

協程看上去也是子程序,但執行過程中,在子程序內部可中斷,然後轉而執行別的子程序,在適當的時候再返回來接著執行。

註意,在一個子程序中中斷,去執行其他子程序,不是函數調用,有點類似CPU的中斷。比如子程序A、B:

def A():
    print(‘1‘)
    print(‘2‘)
    print(‘3‘)

def B():
    print(‘x‘)
    print(‘y‘)
    print(‘z‘)
假設由協程執行,在執行A的過程中,可以隨時中斷,去執行B,B也可能在執行過程中中斷再去執行A,結果可能是:

1
2
x
y
3
z
但是在A中是沒有調用B的,所以協程的調用比函數調用理解起來要難一些。

看起來A、B的執行有點像多線程,但協程的特點在於是一個線程執行,那和多線程比,協程有何優勢?

最大的優勢就是協程極高的執行效率。因為子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯。

第二大優勢就是不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多線程高很多。

因為協程是一個線程執行,那怎麽利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。

Python對協程的支持是通過generator實現的。

在generator中,我們不但可以通過for循環來叠代,還可以不斷調用next()函數獲取由yield語句返回的下一個值。

但是Python的yield不但可以返回一個值,它還可以接收調用者發出的參數。

來看例子:

傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。

如果改用協程,生產者生產消息後,直接通過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高:

def consumer():
    r = ‘‘
    while True:
        n = yield r
        if not n:
            return
        print(‘[CONSUMER] Consuming %s...‘ % n)
        r = ‘200 OK‘

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print(‘[PRODUCER] Producing %s...‘ % n)
        r = c.send(n)
        print(‘[PRODUCER] Consumer return: %s‘ % r)
    c.close()

c = consumer()
produce(c)
執行結果:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
註意到consumer函數是一個generator,把一個consumer傳入produce後:
首先調用c.send(None)啟動生成器;
然後,一旦生產了東西,通過c.send(n)切換到consumer執行;
consumer通過yield拿到消息,處理,又通過yield把結果傳回;
produce拿到consumer處理的結果,繼續生產下一條消息;
produce決定不生產了,通過c.close()關閉consumer,整個過程結束。
整個流程無鎖,由一個線程執行,produce和consumer協作完成任務,所以稱為“協程”,而非線程的搶占式多任務。
最後套用Donald Knuth的一句話總結協程的特點:
“子程序就是協程的一種特例。”
==========================================================================================================================
2.gevent
gevent是第三方庫,通過greenlet實現協程,其基本思想是:

當一個greenlet遇到IO操作時,比如訪問網絡,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由於IO操作非常耗時,經常使程序處於等待狀態,
有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO。

由於切換是在IO操作時自動完成,所以gevent需要修改Python自帶的一些標準庫,這一過程在啟動時通過monkey patch完成:

from gevent import monkey; monkey.patch_socket()
import gevent

def f(n):
    for i in range(n):
        print gevent.getcurrent(), i

g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
g1.join()
g2.join()
g3.join()
運行結果:

<Greenlet at 0x10e49f550: f(5)> 0
<Greenlet at 0x10e49f550: f(5)> 1
<Greenlet at 0x10e49f550: f(5)> 2
<Greenlet at 0x10e49f550: f(5)> 3
<Greenlet at 0x10e49f550: f(5)> 4
<Greenlet at 0x10e49f910: f(5)> 0
<Greenlet at 0x10e49f910: f(5)> 1
<Greenlet at 0x10e49f910: f(5)> 2
<Greenlet at 0x10e49f910: f(5)> 3
<Greenlet at 0x10e49f910: f(5)> 4
<Greenlet at 0x10e49f4b0: f(5)> 0
<Greenlet at 0x10e49f4b0: f(5)> 1
<Greenlet at 0x10e49f4b0: f(5)> 2
<Greenlet at 0x10e49f4b0: f(5)> 3
<Greenlet at 0x10e49f4b0: f(5)> 4
可以看到,3個greenlet是依次運行而不是交替運行。

要讓greenlet交替運行,可以通過gevent.sleep()交出控制權:

def f(n):
    for i in range(n):
        print gevent.getcurrent(), i
        gevent.sleep(0)
執行結果:

<Greenlet at 0x10cd58550: f(5)> 0
<Greenlet at 0x10cd58910: f(5)> 0
<Greenlet at 0x10cd584b0: f(5)> 0
<Greenlet at 0x10cd58550: f(5)> 1
<Greenlet at 0x10cd584b0: f(5)> 1
<Greenlet at 0x10cd58910: f(5)> 1
<Greenlet at 0x10cd58550: f(5)> 2
<Greenlet at 0x10cd58910: f(5)> 2
<Greenlet at 0x10cd584b0: f(5)> 2
<Greenlet at 0x10cd58550: f(5)> 3
<Greenlet at 0x10cd584b0: f(5)> 3
<Greenlet at 0x10cd58910: f(5)> 3
<Greenlet at 0x10cd58550: f(5)> 4
<Greenlet at 0x10cd58910: f(5)> 4
<Greenlet at 0x10cd584b0: f(5)> 4
3個greenlet交替運行,

把循環次數改為500000,讓它們的運行時間長一點,然後在操作系統的進程管理器中看,線程數只有1個。

當然,實際代碼裏,我們不會用gevent.sleep()去切換協程,而是在執行到IO操作時,gevent自動切換,代碼如下:

from gevent import monkey; monkey.patch_all()
import gevent
import urllib2

def f(url):
    print(‘GET: %s‘ % url)
    resp = urllib2.urlopen(url)
    data = resp.read()
    print(‘%d bytes received from %s.‘ % (len(data), url))

gevent.joinall([
        gevent.spawn(f, ‘https://www.python.org/‘),
        gevent.spawn(f, ‘https://www.yahoo.com/‘),
        gevent.spawn(f, ‘https://github.com/‘),
])
運行結果:

GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
45661 bytes received from https://www.python.org/.
14823 bytes received from https://github.com/.
304034 bytes received from https://www.yahoo.com/.
從結果看,3個網絡操作是並發執行的,而且結束順序不同,但只有一個線程。

小結
使用gevent,可以獲得極高的並發性能,但gevent只能在Unix/Linux下運行,在Windows下不保證正常安裝和運行。
由於gevent是基於IO切換的協程,所以最神奇的是,我們編寫的Web App代碼,不需要引入gevent的包,也不需要改任何代碼,僅僅在部署的時候,用一個支持gevent的WSGI服務器,立刻就獲得了數倍的性能提升。
==========================================================================================================================
3.asyncio
asyncio是Python 3.4版本引入的標準庫,直接內置了對異步IO的支持。

asyncio的編程模型就是一個消息循環。我們從asyncio模塊中直接獲取一個EventLoop的引用,然後把需要執行的協程扔到EventLoop中執行,就實現了異步IO。

用asyncio實現Hello world代碼如下:

import asyncio

@asyncio.coroutine
def hello():
    print("Hello world!")
    # 異步調用asyncio.sleep(1):
    r = yield from asyncio.sleep(1)
    print("Hello again!")

# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執行coroutine
loop.run_until_complete(hello())
loop.close()
@asyncio.coroutine把一個generator標記為coroutine類型,然後,我們就把這個coroutine扔到EventLoop中執行。

hello()會首先打印出Hello world!,然後,yield from語法可以讓我們方便地調用另一個generator。由於asyncio.sleep()也是一個coroutine,所以線程不會等待asyncio.sleep(),
而是直接中斷並執行下一個消息循環。當asyncio.sleep()返回時,線程就可以從yield from拿到返回值(此處是None),然後接著執行下一行語句。

把asyncio.sleep(1)看成是一個耗時1秒的IO操作,在此期間,主線程並未等待,而是去執行EventLoop中其他可以執行的coroutine了,因此可以實現並發執行。

我們用Task封裝兩個coroutine試試:

import threading
import asyncio

@asyncio.coroutine
def hello():
    print(‘Hello world! (%s)‘ % threading.currentThread())
    yield from asyncio.sleep(1)
    print(‘Hello again! (%s)‘ % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
觀察執行過程:

Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(暫停約1秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
由打印的當前線程名稱可以看出,兩個coroutine是由同一個線程並發執行的。

如果把asyncio.sleep()換成真正的IO操作,則多個coroutine就可以由一個線程並發執行。

我們用asyncio的異步網絡連接來獲取sina、sohu和163的網站首頁:

import asyncio

@asyncio.coroutine
def wget(host):
    print(‘wget %s...‘ % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = ‘GET / HTTP/1.0\r\nHost: %s\r\n\r\n‘ % host
    writer.write(header.encode(‘utf-8‘))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b‘\r\n‘:
            break
        print(‘%s header > %s‘ % (host, line.decode(‘utf-8‘).rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in [‘www.sina.com.cn‘, ‘www.sohu.com‘, ‘www.163.com‘]]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
執行結果如下:

wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段時間)
(打印出sohu的header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(打印出sina的header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(打印出163的header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
...
可見3個連接由一個線程通過coroutine並發完成。

小結
asyncio提供了完善的異步IO支持;
異步操作需要在coroutine中通過yield from完成;
多個coroutine可以封裝成一組Task然後並發執行。
==========================================================================================================================
4.async/await
用asyncio提供的@asyncio.coroutine可以把一個generator標記為coroutine類型,然後在coroutine內部用yield from調用另一個coroutine實現異步操作。

為了簡化並更好地標識異步IO,從Python 3.5開始引入了新的語法async和await,可以讓coroutine的代碼更簡潔易讀。

請註意,async和await是針對coroutine的新語法,要使用新的語法,只需要做兩步簡單的替換:

把@asyncio.coroutine替換為async;
把yield from替換為await。
讓我們對比一下上一節的代碼:

@asyncio.coroutine
def hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")
用新語法重新編寫如下:

async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")
剩下的代碼保持不變。

小結
Python從3.5版本開始為asyncio提供了async和await的新語法.
==========================================================================================================================
5.aiohttp
asyncio可以實現單線程並發IO操作。如果僅用在客戶端,發揮的威力不大。如果把asyncio用在服務器端,例如Web服務器,由於HTTP連接就是IO操作,
因此可以用單線程+coroutine實現多用戶的高並發支持。

asyncio實現了TCP、UDP、SSL等協議,aiohttp則是基於asyncio實現的HTTP框架。

我們先安裝aiohttp:

pip install aiohttp
然後編寫一個HTTP服務器,分別處理以下URL:

/ - 首頁返回b‘<h1>Index</h1>‘;

/hello/{name} - 根據URL參數返回文本hello, %s!。

代碼如下:

import asyncio

from aiohttp import web

async def index(request):
    await asyncio.sleep(0.5)
    return web.Response(body=b‘<h1>Index</h1>‘)

async def hello(request):
    await asyncio.sleep(0.5)
    text = ‘<h1>hello, %s!</h1>‘ % request.match_info[‘name‘]
    return web.Response(body=text.encode(‘utf-8‘))

async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route(‘GET‘, ‘/‘, index)
    app.router.add_route(‘GET‘, ‘/hello/{name}‘, hello)
    srv = await loop.create_server(app.make_handler(), ‘127.0.0.1‘, 8000)
    print(‘Server started at http://127.0.0.1:8000...‘)
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
註意aiohttp的初始化函數init()也是一個coroutine,loop.create_server()則利用asyncio創建TCP服務。

  

參考鏈接:https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/0014319272686365ec7ceaeca33428c914edf8f70cca383000

[記錄]Python高並發編程