詳解python之協程gevent模組
Gevent官網文件地址: [ http://www.gevent.org/contents.html
](http://www.gevent.org/contents.html)
程序、執行緒、協程區分
我們通常所說的協程Coroutine其實是corporate routine的縮寫,直接翻譯為協同的例程,一般我們都簡稱為協程。
在linux系統中,執行緒就是輕量級的程序,而我們通常也把協程稱為輕量級的執行緒即微執行緒。
程序和協程
下面對比一下程序和協程的相同點和不同點:
相同點:
- 相同點存在於,當我們掛起一個執行流的時,我們要儲存的東西:
- 棧, 其實在你切換前你的區域性變數,以及要函式的呼叫都需要儲存,否則都無法恢復
暫存器狀態,這個其實用於當你的執行流恢復後要做什麼
而暫存器和棧的結合就可以理解為上下文,上下文切換的理解:
CPU看上去像是在併發的執行多個程序,這是通過處理器在程序之間切換來實現的,作業系統實現這種交錯執行的機制稱為上下文切換
作業系統保持跟蹤程序執行所需的所有狀態資訊。這種狀態,就是上下文。
在任何一個時刻,作業系統都只能執行一個程序程式碼,當作業系統決定把控制權從當前程序轉移到某個新程序時,就會進行上下文切換,即儲存當前程序的上下文,恢復新程序的上下文,然後將控制權傳遞到新程序,新程序就會從它上次停止的地方開始。
不同點:
- 執行流的排程者不同,程序是核心排程,而協程是在使用者態排程,也就是說程序的上下文是在核心態儲存恢復的,而協程是在使用者態儲存恢復的,很顯然使用者態的代價更低
- 程序會被強佔,而協程不會,也就是說協程如果不主動讓出CPU,那麼其他的協程,就沒有執行的機會。
- 對記憶體的佔用不同,實際上協程可以只需要4K的棧就足夠了,而程序佔用的記憶體要大的多
- 從作業系統的角度講,多協程的程式是單程序,單協程
執行緒和協程
既然我們上面也說了,協程也被稱為微執行緒,下面對比一下協程和執行緒:
- 執行緒之間需要上下文切換成本相對協程來說是比較高的,尤其在開啟執行緒較多時,但協程的切換成本非常低。
- 同樣的執行緒的切換更多的是靠作業系統來控制,而協程的執行由我們自己控制。
協程只是在單一的執行緒裡不同的協程之間切換,其實和執行緒很像,執行緒是在一個程序下,不同的執行緒之間做切換,這也可能是協程稱為微執行緒的原因吧。
Gevent模組
Gevent是一種基於協程的Python網路庫,它用到Greenlet提供的,封裝了libevent事件迴圈的高層同步API。它讓開發者在不改變程式設計習慣的同時,用同步的方式寫非同步I/O的程式碼。
簡單示例:
import gevent
def test1():
print 12
gevent.sleep(0)
print 34
def test2():
print 56
gevent.sleep(0)
print 78
gevent.joinall([
gevent.spawn(test1),
gevent.spawn(test2),
])
結果:
12
56
34
78
猴子補丁 Monkey patching
這個補丁是Gevent模組最需要注意的問題,有了它,才會讓Gevent模組發揮它的作用。我們往往使用Gevent是為了實現網路通訊的高併發,但是,Gevent直接修改標準庫裡面大部分的阻塞式系統呼叫,包括socket、ssl、threading和
select等模組,而變為協作式執行。但是我們無法保證你在複雜的生產環境中有哪些地方使用這些標準庫會由於打了補丁而出現奇怪的問題。
一種方法是使用gevent下的socket模組,我們可以通過”from gevent import
socket”來匯入。不過更常用的方法是使用猴子布丁(Monkey
patching)。使用猴子補丁褒貶不一,但是官網上還是建議使用”patch_all()”,而且在程式的第一行就執行。
from gevent import monkey; monkey.patch_socket()
import gevent
import socket
urls = ['www.baidu.com', 'www.gevent.org', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=5)
print [job.value for job in jobs]
上述程式碼的第一行就是對socket標準庫打上猴子補丁,此後socket標準庫中的類和方法都會被替換成非阻塞式的,所有其他的程式碼都不用修改,這樣協程的效率就真正體現出來了。Python中其它標準庫也存在阻塞的情況,gevent提供了”monkey.patch_all()”方法將所有標準庫都替換。
獲取協程狀態
- started屬性/ready()方法:判斷協程是否已啟動。
- successful()方法:判斷協程是否成功執行且沒有丟擲異常。
- value屬性:獲取協程執行完之後的返回值。
另外,greenlet協程執行過程中發生的異常是不會被丟擲到協程外的,因此需要用協程物件的”exception”屬性來獲取協程中的異常。
下面的例子很好的演示了各種方法和屬性的使用。
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron
import gevent
def win():
return 'You win!'
def fail():
raise Exception('You failed!')
winner = gevent.spawn(win)
loser = gevent.spawn(fail)
print(winner.started) # True
print(loser.started) # True
# 在Greenlet中發生的異常,不會被拋到Greenlet外面。
# 控制檯會打出Stacktrace,但程式不會停止
try:
gevent.joinall([winner, loser])
except Exception as e:
# 這段永遠不會被執行
print('This will never be reached')
print(winner.ready()) # True
print(loser.started) # True
print(winner.value) # 'You win!'
print(loser.value) # None
print('successful ',winner.successful()) # True
print('successful ',loser.successful()) # False
# 這裡可以通過raise loser.exception 或 loser.get()
# 來將協程中的異常丟擲
print(loser.exception)
協程執行超時控制
之前我們講過在”gevent.joinall()”方法中可以傳入timeout引數來設定超時,我們也可以在全域性範圍內設定超時時間:
import gevent
from gevent import Timeout
timeout = Timeout(2) # 2 seconds
timeout.start()
def wait():
gevent.sleep(10)
try:
gevent.spawn(wait).join()
except Timeout:
print('Could not complete')
上例中,我們將超時設為2秒,此後所有協程的執行,如果超過兩秒就會丟擲”Timeout”異常。我們也可以將超時設定在with語句內,這樣該設定只在with語句塊中有效:
with Timeout(1):
gevent.sleep(10)
此外,我們可以指定超時所丟擲的異常,來替換預設的”Timeout”異常。比如下例中超時就會丟擲我們自定義的”TooLong”異常。
class TooLong(Exception):
pass
with Timeout(1, TooLong):
gevent.sleep(10)
協程間通訊
事件(Event)物件
greenlet協程間的非同步通訊可以使用事件(Event)物件。該物件的”wait()”方法可以阻塞當前協程,而”set()”方法可以喚醒之前阻塞的協程。在下面的例子中,5個waiter協程都會等待事件evt,當setter協程在3秒後設置evt事件,所有的waiter協程即被喚醒。
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron
import gevent
from gevent.event import Event
evt = Event()
def setter():
print 'Wait for me'
gevent.sleep(3) # 3秒後喚醒所有在evt上等待的協程
print "Ok, I'm done"
evt.set() # 喚醒
def waiter():
print "I'll wait for you"
evt.wait() # 等待
print 'Finish waiting'
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])
AsyncResult事件
除了Event事件外,gevent還提供了AsyncResult事件,它可以在喚醒時傳遞訊息。讓我們將上例中的setter和waiter作如下改動:
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron
from gevent.event import AsyncResult
aevt = AsyncResult()
def setter():
print 'Wait for me'
gevent.sleep(3) # 3秒後喚醒所有在evt上等待的協程
print "Ok, I'm done"
aevt.set('Hello!') # 喚醒,並傳遞訊息
def waiter():
print("I'll wait for you")
message = aevt.get() # 等待,並在喚醒時獲取訊息
print 'Got wake up message: %s' % message
佇列 Queue
佇列Queue的概念相信大家都知道,我們可以用它的put和get方法來存取佇列中的元素。gevent的佇列物件可以讓greenlet協程之間安全的訪問。執行下面的程式,你會看到3個消費者會分別消費佇列中的產品,且消費過的產品不會被另一個消費者再取到:
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron<br>
import gevent
from gevent.queue import Queue
products = Queue()
def consumer(name):
#while not products.empty():
while True:
try:
print('%s got product %s' % (name, products.get_nowait()))
gevent.sleep(0)
except gevent.queue.Empty:
break
print('Quit')
def producer():
for i in range(1, 10):
products.put(i)
gevent.joinall([
gevent.spawn(producer),
gevent.spawn(consumer, 'steve'),
gevent.spawn(consumer, 'john'),
gevent.spawn(consumer, 'nancy'),
])
注意:協程佇列跟執行緒佇列是一樣的,put和get方法都是阻塞式的,它們都有非阻塞的版本:put_nowait和get_nowait。如果呼叫get方法時佇列為空,則是不會丟擲”gevent.queue.Empty”異常。我們只能使用get_nowait()的方式讓氣丟擲異常。
訊號量
訊號量可以用來限制協程併發的個數。它有兩個方法,acquire和release。顧名思義,acquire就是獲取訊號量,而release就是釋放。當所有訊號量都已被獲取,那剩餘的協程就只能等待任一協程釋放訊號量後才能得以執行:
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron
import gevent
from gevent.coros import BoundedSemaphore
sem = BoundedSemaphore(2)
def worker(n):
sem.acquire()
print('Worker %i acquired semaphore' % n)
gevent.sleep(0)
sem.release()
print('Worker %i released semaphore' % n)
gevent.joinall([gevent.spawn(worker, i) for i in xrange(0, 6)])
上面的例子中,我們初始化了”BoundedSemaphore”訊號量,並將其個數定為2。所以同一個時間,只能有兩個worker協程被排程。程式執行後的結果如下:
Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore
Worker 2 acquired semaphore
Worker 3 acquired semaphore
Worker 2 released semaphore
Worker 3 released semaphore
Worker 4 acquired semaphore
Worker 4 released semaphore
Worker 5 acquired semaphore
Worker 5 released semaphore
如果訊號量個數為1,那就等同於同步鎖。
協程本地變數
同線程類似,協程也有本地變數,也就是隻在當前協程內可被訪問的變數:
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron
import gevent
from gevent.local import local
data = local()
def f1():
data.x = 1
print data.x
def f2():
try:
print data.x
except AttributeError:
print 'x is not visible'
gevent.joinall([
gevent.spawn(f1),
gevent.spawn(f2)
])
通過將變數存放在local物件中,即可將其的作用域限制在當前協程內,當其他協程要訪問該變數時,就會丟擲異常。不同協程間可以有重名的本地變數,而且互相不影響。因為協程本地變數的實現,就是將其存放在以的”greenlet.getcurrent()”的返回為鍵值的私有的名稱空間內。
多併發socket模型
伺服器端:
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron
import socket
import gevent
from gevent import socket, monkey
monkey.patch_all()
def server(port):
s = socket.socket()
s.bind(('0.0.0.0', port))
s.listen(500)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli)
def handle_request(conn):
try:
while True:
data = conn.recv(1024)
print("recv:", data)
conn.send(data)
if not data:
conn.shutdown(socket.SHUT_WR)
except Exception as ex:
print(ex)
finally:
conn.close()
if __name__ == '__main__':
server(8001)
當客戶端連線上伺服器端時,伺服器端通過開闢一個協程與該客戶端完成互動任務,同時由於使用了Gevent協程的方式,在每個客戶端與伺服器互動時,並不會影響到伺服器端的工作。
客戶端:
#!/usr/bin/env python
# _*_ coding utf-8 _*_
#Author: aaron
import socket
HOST = 'localhost' # The remote host
PORT = 8001 # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while True:
msg = bytes(input(">>:"), encoding="utf8")
s.sendall(msg)
data = s.recv(1024)
# print(data)
print('Received', repr(data)) # repr 格式化輸出
s.close()
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援指令碼之家。