python -- 異步編程
我們在生產中,常用的處理任務模型有三種:
單線程
多線程
異步(單線程內,串行,特點是遇到阻塞(或IO之類的)就切換到其他任務)
其中一般如果都符合要求,那麽異步是最好的選擇。
單線程:遇到阻塞整個程序都等待
多線程:以空間換取時間,且有時候伴隨著數據安全問題(通常加鎖來處理)
異步:在單個線程內,且是串行執行,但是一旦遇到阻塞(IO之類的),就會切換到線程內的其他任務(把IO操作交給操作系統處理)
當我們面對如下的環境時,事件驅動模型(異步模型)通常是一個好的選擇(and):
1、程序中有許多任務
2、任務之間高度獨立(因此它們不需要互相通信,或者等待彼此)
3、在等待事件到來時,某些任務會阻塞。
常用的異步IO模型:select、poll、Epoll (windows下只支持select)
nginx就是Epoll模型實現的。單線程,多進程(為了利用多核,單線程只能跑在單個cup核心上)
前面我們說了多線程與多進程,總結其特點就是:
單線程串行執行,遇到IO就阻塞,效率低。
多線程並發執行,遇到IO就切換(用空間換取執行時間),效率上去了,但是耗費資源,操作復雜。
針對以上問題,出現了一種新的替代品,協程。
協程
協程又名微線程,纖程。協程是一種用戶態的輕量級線程(協程由用戶切換,線程由cpu時間片控制切換)協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。跟yield過程差不多
協程的定義標準:
1、必須在一個單線程裏面實現並發
2、修改共享數據不需要加鎖(因為協程是串行的)
3、用戶程序裏自己保存多個控制流的上下文棧
4、一個協程遇到IO操作(阻塞也一樣)就自動切換到其他協程
協程的好處:
1、無需線程上下文切換的開銷
2、無需原子操作鎖定及同步的開銷
"原子操作(atomic operation)是不需要synchronized",所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。
3、方便切換控制流,簡化編程模型
4、高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。
缺點:
1、無法利用多核資源:協程的本質是個單線程,它不能同時將單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
2、進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序
協程例子
1、用yield實現協程操作的例子:
1 import time 2 3 def consumer(name): 4 print("\033[32;1m ---> starting eating baozi... \033[0m") 5 while True: 6 new_baozi = yield 7 print("[%s] is eating baozi %s " %(name,new_baozi)) 8 # time.sleep(1) #在yield裏面並沒有實現阻塞切換 9 10 def producer(): 11 c = con1.__next__() 12 c = con2.__next__() 13 n = 0 14 while n < 5: 15 n += 1 16 print("\033[32;1m [producer]\033[0m is making baozi %s " % n) 17 con1.send(n) 18 con2.send(n) #用send可以想yield發送數據 19 20 if __name__ == ‘__main__‘: 21 con1 = consumer(‘c1‘) 22 con2 = consumer(‘c2‘) 23 p = producer()
執行結果
---> starting eating baozi... ---> starting eating baozi... [producer] is making baozi 1 [c1] is eating baozi 1 [c2] is eating baozi 1 [producer] is making baozi 2 [c1] is eating baozi 2 [c2] is eating baozi 2 [producer] is making baozi 3 [c1] is eating baozi 3 [c2] is eating baozi 3 [producer] is making baozi 4 [c1] is eating baozi 4 [c2] is eating baozi 4 [producer] is making baozi 5 [c1] is eating baozi 5 [c2] is eating baozi 5
2、使用gevent實現協程
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 import gevent 5 import time 6 7 def func1(): 8 print("\033[32;1m func1函數的第1部分... \033[0m") 9 gevent.sleep(2) 10 # time.sleep(2) #用time.sleep(2)是不行的 11 print("\033[32;1m func1函數的第2部分... \033[0m") 12 13 def func2(): 14 print("\033[31;1m func2函數的第1部分... \033[0m") 15 gevent.sleep(1) 16 # time.sleep(1) 17 print("\033[31;1m func2函數的第2部分... \033[0m") 18 19 def func3(): 20 print("\033[34;1m func3函數的第1部分... \033[0m") 21 gevent.sleep(3) 22 # time.sleep(3) 23 print("\033[34;1m func3函數的第2部分... \033[0m") 24 25 26 if __name__ == ‘__main__‘: 27 gevent.joinall([ 28 gevent.spawn(func1), 29 gevent.spawn(func2), 30 gevent.spawn(func3), 31 ])
執行結果
func1函數的第1部分...
func2函數的第1部分...
func3函數的第1部分...
func2函數的第2部分...
func1函數的第2部分...
func3函數的第2部分...
3、協程結合urllib模塊爬網站
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 from gevent import monkey 5 monkey.patch_all() #遇到阻塞就切換全靠它(作用:把要用到的接口全部變為非阻塞模式) 6 7 import gevent 8 from urllib.request import urlopen 9 10 def f(url): 11 print("GET: %s " %url) 12 resp = urlopen(url) 13 data = resp.read() 14 print("%s bytes received from %s " %(len(data),url)) 15 16 if __name__ == ‘__main__‘: 17 gevent.joinall([ 18 gevent.spawn(f, ‘http://www.cdu.edu.cn/‘), 19 gevent.spawn(f,‘https://www.python.org/‘), 20 gevent.spawn(f,‘https://www.jd.com/‘), 21 gevent.spawn(f,‘https://www.vip.com/‘) 22 ])
執行結果
GET: http://www.cdu.edu.cn/ GET: https://www.python.org/ GET: https://www.jd.com/ GET: https://www.vip.com/ 137584 bytes received from https://www.jd.com/ 69500 bytes received from https://www.vip.com/ 47984 bytes received from http://www.cdu.edu.cn/ 47695 bytes received from https://www.python.org/ Process finished with exit code 0
4、利用協程實現高並發服務器
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 import gevent 5 import socket 6 from gevent import monkey; monkey.patch_all() 7 8 def server(port): 9 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 10 s.bind((‘0.0.0.0‘,port)) 11 s.listen(5000) 12 13 while True: 14 # print("\033[32;1m server is waiting... \033[0m") 15 print(" server is waiting... ") 16 conn, addr = s.accept() 17 gevent.spawn(handle_request,conn) 18 19 def handle_request(conn): 20 try: 21 while True: 22 data = conn.recv(1024) 23 print("recvied:",data.decode(‘utf8‘)) 24 conn.send(data) 25 if not data: 26 conn.shutdown(socket.SHUT_WR) 27 28 except Exception as ex: 29 print(ex) 30 31 finally: 32 conn.close() 33 34 35 if __name__ == ‘__main__‘: 36 server(9999)server
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 import socket 5 import threading 6 7 def run(n): 8 ‘‘‘這裏是啟動多線程,然後每個線程死循環發包給服務器,測試協程服務器的高並發和穩定性‘‘‘ 9 while True: 10 # msg = input(">>:").strip() 11 # if len(msg) == 0:continue 12 # if msg == ‘q‘:break 13 msg = ‘hello %s‘ %n 14 sk.send(bytes(msg,‘utf8‘)) 15 data = sk.recv(1024) 16 print("Received:",data.decode(‘utf8‘)) 17 18 sk.close() 19 20 if __name__ == ‘__main__‘: 21 IP_Port = (‘127.0.0.1‘, 9999) 22 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 23 sk.connect(IP_Port) 24 25 thread_list = [] 26 for i in range(2000): 27 t = threading.Thread(target=run,args=[i,]) 28 t.start() 29 thread_list.append(t) 30 31 for thread in thread_list: 32 thread.join()client
python -- 異步編程