1. 程式人生 > >並發編程 - 協程 - 總結

並發編程 - 協程 - 總結

cbc __main__ res 本質 start 之前 UC rec 共享數據

協程:
單線程下實現並發 並發 = 切換 + 保存狀態
1.遇到IO切, 提高效率
2.遇到計算切, 並沒有提高效率
1.協程本質:
協程的本質就是在單線程下,由用戶自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。
為了實現它,我們需要找尋一種可以同時滿足以下條件的解決方案:
1. 可以控制多個任務之間的切換,切換之前將任務的狀態保存下來,以便重新運行時,可以基於暫停的位置繼續執行。
2. 作為1的補充:可以檢測io操作,在遇到io操作的情況下才發生切換
2.強調:
1. python的線程屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行)
2. 單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
3.優點:
1. 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級
2. 單線程內就可以實現並發的效果,最大限度地利用cpu
4.缺點:
1. 協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程
2. 協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程
5.總結:
1.必須在只有一個單線程裏實現並發
2.修改共享數據不需加鎖
3.用戶程序裏自己保存多個控制流的上下文棧
4.附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制))

單線程下實現並發:
1.yield # yield 遇到io 不能監測到 切換 # 沒有io 去切換,會降低執行速度
2.greenlet 模塊 # greenlet 遇到io 不能監測到 切換 # 沒有io 去切換,會降低執行速度
pip3 install greenlet
greenlet 比yield 好 但是還是不好 遇到io不會切
g1 = greenlet(eat)
g1.switch(‘alice‘)
g1.switch()
3.gevent 模塊
pip3 install gevent
gevent:封裝了greenlet模塊,但是他能檢測到io 自動切
加了補丁:from gevent import monkey;monkey.patch_all()
遇到time.sleep(2) 才會切,否則只有遇到gevent.sleep(2) 才會切,所以必須加補丁
g1 = gevent.spawn(eat,‘alice‘)
g1.join() # 等待g1結束
gevent.joinall([g1,g2])
g1.value # 拿到func1的返回值

補丁說明:
from gevent import monkey;monkey.patch_all()
1.必須放到被打補丁者的前面,如time,socket模塊之前
2.要用gevent,需要將from gevent import monkey;monkey.patch_all()放到文件的開頭

# 通過gevent實現單線程下的socket並發(from gevent import monkey;monkey.patch_all()一定要放到導入socket模塊之前,
否則gevent無法識別socket的阻塞)
技術分享圖片
 1 import time
 2 def consumer(res):
 3     ‘‘‘任務1:接收數據,處理數據‘‘‘
 4     pass
 5 
 6 def producer():
 7     ‘‘‘任務2:生產數據‘‘‘
 8     res=[]
 9     for i in range(10000000):
10         res.append(i)
11     return res
12 
13 start=time.time()
14 #串行執行
15 res=producer()
16 consumer(res) #寫成consumer(producer())會降低執行效率
17 # consumer(producer()) 18 stop=time.time() 19 print(stop-start) #1.5536692142486572
計算型的切換降低運行效率 技術分享圖片
 1 import time
 2 def producter():  # 並發的去執行
 3     g = consumer()
 4     next(g)
 5     for i in range(10):
 6         g.send(i)
 7         time.sleep(2)
 8 
 9 def consumer():
10     while True:
11
res = yield 12 print(res) 13 start = time.time() 14 producter() 15 stop = time.time() 16 print(stop-start) 17 18 import time 19 def producter(): # 串行 計算型的 切換 會降低運行效率 20 res = [] 21 for i in range(10000000): 22 res.append(i) 23 return res 24 25 def consumer(res): 26 pass 27 28 start = time.time() 29 res = producter() # 串行 執行 寫成 consumer(producter()) 會降低執行效率 30 consumer(res) 31 stop = time.time() 32 print(stop-start)
yield,不能實現遇到io切換 技術分享圖片
 1 from greenlet import greenlet
 2 import time
 3 
 4 def eat(name):
 5     print(%s eat 1%name)
 6 
 7     g2.switch(alice)
 8     time.sleep(4)  # 遇到io 不會立即s切
 9     print(%s eat 2%name)
10     g2.switch()
11 
12 def play(name):
13     print(%s play 1%name)
14     g1.switch()
15     print(%s play 2%name)
16 
17 g1 = greenlet(eat)
18 g2 = greenlet(play)
19 
20 g1.switch(alice)  # 第一次切 需要傳參數
greenlet模塊,不能實現遇到io切換 技術分享圖片
 1 #順序執行
 2 import time
 3 def f1():
 4     res=1
 5     for i in range(100000000):
 6         res+=i
 7 
 8 def f2():
 9     res=1
10     for i in range(100000000):
11         res*=i
12 
13 start=time.time()
14 f1()
15 f2()
16 stop=time.time()
17 print(run time is %s %(stop-start)) #10.985628366470337
18 
19 #切換
20 from greenlet import greenlet
21 import time
22 def f1():
23     res=1
24     for i in range(100000000):
25         res+=i
26         g2.switch()
27 
28 def f2():
29     res=1
30     for i in range(100000000):
31         res*=i
32         g1.switch()
33 
34 start=time.time()
35 g1=greenlet(f1)
36 g2=greenlet(f2)
37 g1.switch()
38 stop=time.time()
39 print(run time is %s %(stop-start)) # 52.763017892837524
計算型的使用greenlet,單純的切換會降低運行效率 技術分享圖片
 1 from gevent import monkey;monkey.patch_all()
 2 import gevent
 3 import time
 4 
 5 def eat(name):
 6     print(%s eat 1%name)
 7     time.sleep(2)
 8     # gevent.sleep(2)
 9     print(%s eat 2%name)
10 
11 def play(name):
12     print(%s play 1%name)
13     # gevent.sleep(1)
14     time.sleep(1)
15     print(%s play 2%name)
16 
17 start = time.time()
18 g1 = gevent.spawn(eat,alice)
19 g2 = gevent.spawn(play,alice)
20 # g1.join()
21 # g2.join()
22 gevent.joinall([g1,g2])
23 stop = time.time()
24 print(,stop-start)
25 """
26 alice eat 1  # 遇到 time.sleep() io 並不會切  加個補丁才會切
27 alice eat 2
28 alice play 1
29 alice play 2
30 主 3.000598430633545
31 """
32 """
33 alice eat 1   # 加了補丁後,可識別time.sleep() io 會切
34 alice play 1  # 補丁:from gevent import monkey;monkey.patch_all()
35 alice play 2
36 alice eat 2
37 主 2.000128984451294
38 """
39 """
40 alice eat 1   # 遇到 gevent.sleep() 會切
41 alice play 1
42 alice play 2
43 alice eat 2
44 主 2.0026285648345947
45 """
gevent模塊,可以實現遇到io切換 技術分享圖片
 1 # -*- coding:utf-8 -*-
 2 from gevent import spawn,monkey;monkey.patch_all()
 3 import socket
 4 
 5 #如果不想用money.patch_all()打補丁,可以用gevent自帶的socket
 6 # from gevent import socket
 7 # s=socket.socket()
 8 
 9 def talk(conn):
10     while True:
11         try:
12             res = conn.recv(1024)
13             if not res:break
14             conn.send(res.upper())
15         except Exception as e:
16             break
17     conn.close()
18 
19 def server(ip,port):
20     server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
21     server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
22     server.bind((ip,port))
23     server.listen(5)
24     while True:
25         conn, addr = server.accept()
26         spawn(talk,conn)
27 
28 if __name__ == __main__:
29     g = spawn(server,127.0.0.1,8080)
30     g.join()
gevent實現單線程下的socket並發 server 技術分享圖片
 1 # -*- coding:utf-8 -*-
 2 import socket
 3 from threading import Thread,currentThread
 4 
 5 def client(ip,port):
 6     c = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 7     c.connect((ip,port))
 8 
 9     while True:
10         c.send((%s hello %currentThread().getName()).encode(utf-8))
11         data = c.recv(1024)
12         print(data.decode(utf-8))
13 
14 if __name__ == "__main__":
15     for i in range(500):
16         t = Thread(target=client,args=(127.0.0.1,8080))
17         t.start()
gevent實現單線程下的socket並發 client




並發編程 - 協程 - 總結