35、concurrent.futures模塊與協程
concurrent.futures —Launching parallel tasks concurrent.futures模塊同時提供了進程池和線程池,它是將來的使用趨勢,同樣我們之前學習的進程池Pool和threadpool模塊也可以使用。
對進程池疑惑的可以參閱:32進程池與回調函數http://www.cnblogs.com/liluning/p/7445457.html
對threadpool模塊疑惑的可以看我閑暇時寫的一段代碼:(因為本人也不了解這個模塊,代碼裏寫的也是自己想當然的,如有問題請自行查閱資料)
#pip3 install threadpool #需下載 import基於threadpool貓眼爬蟲threadpool import requests import re import os #爬取網頁 def get_page(url) : pattern = re.compile(r‘<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<‘, re.S) response = requests.get(url) if response.status_code == 200 : #status_code請求的狀態碼200為正常return (response.text,pattern,url) #信息處理 def parse_page(info) : page_content,pattern,url = info print(‘<%s> parse [%s]‘% (os.getpid(), url)) res = re.findall(pattern,page_content) dic_l = [] for item in res: dic = { ‘index‘:item[0],‘title‘:item[1], ‘actor‘:item[2].strip()[3:], ‘time‘:item[3][5:] } dic_l.append(dic) print(dic) with open(‘movie_info.txt‘,‘a‘,encoding=‘utf-8‘) as f : for i in range(len(dic_l)) : parse_res = (‘index:%s title:%s actor:%s time:%s\n‘ %(dic_l[i][‘index‘],dic_l[i][‘title‘],dic_l[i][‘actor‘],dic_l[i][‘time‘])) f.write(parse_res) if __name__ == ‘__main__‘: urls = [ ‘http://maoyan.com/board/7‘, ‘http://maoyan.com/board/6‘, ‘http://maoyan.com/board/1‘, ‘http://maoyan.com/board/2‘, ‘http://maoyan.com/board/4‘, ] t = threadpool.ThreadPool(4) #創建線程池 for url in urls : res = threadpool.makeRequests(get_page,urls,parse_page(get_page(url))) #參數:執行函數,參數,回調函數 [t.putRequest(req) for req in res] t.wait()
一、concurrent.futures模塊
1、官方文檔
https://docs.python.org/dev/library/concurrent.futures.html#module-concurrent.futures
2、ProcessPoolExecutor(進程池)與ThreadPoolExecutor(線程池)
(進程池類與線程池類的方法使用等各方面基本相同)
1)導入
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
2)創建
p = ProcessPoolExecutor(num) #創建進程池 t = ThreadPoolExecutor(num) #創建線程池
3)參數
num:要創建的進程數或線程數,如果省略,進程數將默認使用cpu_count()的值,線程數將默認使用cpu_count()*5的值
4)主要方法
submit(fn, *args, **kwargs):在一個池工作進程中執行執行fn(args kwargs)執行,並返回一個表示可調用的執行的Future對象 map(func, *iterables, timeout=None, chunksize=1): shutdown(wait=True):執行結束釋放資源
3、應用
1)進程池
from concurrent.futures import ProcessPoolExecutor import os,time def task(n): print(‘%s is running‘ %os.getpid()) time.sleep(2) return n**2 if __name__ == ‘__main__‘: p=ProcessPoolExecutor() l=[] start=time.time() for i in range(10): obj=p.submit(task,i) l.append(obj) p.shutdown() print(‘=‘*30) print([obj for obj in l]) print([obj.result() for obj in l]) print(time.time()-start)
2)線程池
from concurrent.futures import ThreadPoolExecutor import threading import os,time def task(n): print(‘%s:%s is running‘ %(threading.currentThread().getName(),os.getpid())) time.sleep(2) return n**2 if __name__ == ‘__main__‘: p=ThreadPoolExecutor() l=[] start=time.time() for i in range(10): obj=p.submit(task,i) l.append(obj) p.shutdown() print(‘=‘*30) print([obj.result() for obj in l]) print(time.time()-start)
3)同步執行
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time,random def task(n): print(‘%s is running‘ %os.getpid()) time.sleep(2) return n**2 if __name__ == ‘__main__‘: p=ProcessPoolExecutor() start=time.time() for i in range(10): res=p.submit(task,i).result() print(res) print(‘=‘*30) print(time.time()-start)
4、回調函數
不懂回調函數的看本章節首部有鏈接
from concurrent.futures import ThreadPoolExecutor import requests, os, time from threading import currentThread def get_page(url): print(‘%s:<%s> is getting [%s]‘ %(currentThread().getName(),os.getpid(),url)) response=requests.get(url) time.sleep(2) return {‘url‘:url,‘text‘:response.text} def parse_page(res): res=res.result() #註意值 print(‘%s:<%s> parse [%s]‘ %(currentThread().getName(),os.getpid(),res[‘url‘])) with open(‘db.txt‘,‘a‘) as f: parse_res=‘url:%s size:%s\n‘ %(res[‘url‘],len(res[‘text‘])) f.write(parse_res) if __name__ == ‘__main__‘: p=ThreadPoolExecutor() urls = [ ‘https://www.baidu.com‘, ‘http://www.openstack.org‘, ‘https://www.python.org‘, ‘http://www.sina.com.cn/‘ ] for url in urls: p.submit(get_page, url).add_done_callback(parse_page) #add_done_callback()回調函數 p.shutdown() print(‘主‘,os.getpid())
5、map方法
map有疑惑可以閱覽:19、內置函數和匿名函數http://www.cnblogs.com/liluning/p/7280832.html
from concurrent.futures import ProcessPoolExecutor import os,time def task(n): print(‘%s is running‘ %os.getpid()) time.sleep(2) return n**2 if __name__ == ‘__main__‘: p=ProcessPoolExecutor() obj=p.map(task,range(10)) p.shutdown() print(‘=‘*30) print(list(obj))
二、協程概念
1、定義
是單線程下的並發,又稱微線程,纖程。英文名Coroutine。一句話說明什麽是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。
2、註意
1)python的線程屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行)
2)單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
3、優點
1) 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級
2) 單線程內就可以實現並發的效果,最大限度地利用cpu
4、缺點
1) 協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程
2) 協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程
5、總結
1)必須在只有一個單線程裏實現並發
2)修改共享數據不需加鎖
3)用戶程序裏自己保存多個控制流的上下文棧
附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制))
三、greenlet模塊
如果我們在單個線程內有20個任務,要想實現在多個任務之間切換,使用yield生成器的方式過於麻煩(需要先得到初始化一次的生成器,然後再調用send。。。非常麻煩),而使用greenlet模塊可以非常簡單地實現這20個任務直接的切換
生成器:18、叠代器和生成器http://www.cnblogs.com/liluning/p/7274862.html
1、安裝
pip3 install greenlet
2、使用
from greenlet import greenlet def eat(name): print(‘%s eat 1‘ %name) g2.switch(‘egon‘) print(‘%s eat 2‘ %name) g2.switch() def play(name): print(‘%s play 1‘ %name) g1.switch() print(‘%s play 2‘ %name) g1=greenlet(eat) g2=greenlet(play) g1.switch(‘egon‘)#可以在第一次switch時傳入參數,以後都不需要
3、單純的切換(在沒有io的情況下或者沒有重復開辟內存空間的操作),反而會降低程序的執行速度
#順序執行 import time def f1(): res=1 for i in range(100000000): res+=i def f2(): res=1 for i in range(100000000): res*=i start=time.time() f1() f2() stop=time.time() print(‘run time is %s‘ %(stop-start)) #10.985628366470337 #切換 from greenlet import greenlet import time def f1(): res=1 for i in range(100000000): res+=i g2.switch() def f2(): res=1 for i in range(100000000): res*=i g1.switch() start=time.time() g1=greenlet(f1) g2=greenlet(f2) g1.switch() stop=time.time() print(‘run time is %s‘ %(stop-start)) # 52.763017892837524View Code
單線程裏的這20個任務的代碼通常會既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提高效率,這就用到了Gevent模塊。
四、Gevent模塊
1、安裝
pip3 install gevent
Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
2、用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)創建一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的 g2=gevent.spawn(func2) g1.join() #等待g1結束 g2.join() #等待g2結束 #或者上述兩步合作一步:gevent.joinall([g1,g2]) g1.value#拿到func1的返回值
3、遇到IO阻塞時會自動切換任務
import gevent def eat(name): print(‘%s eat 1‘ %name) gevent.sleep(2) print(‘%s eat 2‘ %name) def play(name): print(‘%s play 1‘ %name) gevent.sleep(1) print(‘%s play 2‘ %name) g1=gevent.spawn(eat,‘egon‘) g2=gevent.spawn(play,name=‘egon‘) g1.join() g2.join() #或者gevent.joinall([g1,g2]) print(‘主‘)View Code
上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞,而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊之前或者我們幹脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到文件的開頭
from gevent import monkey;monkey.patch_all() import gevent import time def eat(): print(‘eat food 1‘) time.sleep(2) print(‘eat food 2‘) def play(): print(‘play 1‘) time.sleep(1) print(‘play 2‘) g1=gevent.spawn(eat) g2=gevent.spawn(play_phone) gevent.joinall([g1,g2]) print(‘主‘)View Code
4、Gevent的同步與異步
from gevent import spawn,joinall,monkey;monkey.patch_all() import time def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print(‘Task %s done‘ % pid) def synchronous(): for i in range(10): task(i) def asynchronous(): g_l=[spawn(task,i) for i in range(10)] joinall(g_l) if __name__ == ‘__main__‘: print(‘Synchronous:‘) synchronous() print(‘Asynchronous:‘) asynchronous() #上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完後才會繼續向下走。View Code
5、Gevent實現爬蟲
from gevent import monkey;monkey.patch_all() import gevent import requests import time def get_page(url): print(‘GET: %s‘ %url) response=requests.get(url) if response.status_code == 200: print(‘%d bytes received from %s‘ %(len(response.text),url)) start_time=time.time() g1=gevent.spawn(get_page, ‘https://www.python.org/‘) g2=gevent.spawn(get_page, ‘https://www.yahoo.com/‘) g3=gevent.spawn(get_page, ‘https://github.com/‘) gevent.joinall([g1,g2,g3]) stop_time=time.time() print(‘run time is %s‘ %(stop_time-start_time))
6、gevent實現單線程下的socket並發
通過gevent實現單線程下的socket並發(from gevent import monkey;monkey.patch_all()一定要放到導入socket模塊之前,否則gevent無法識別socket的阻塞)
from gevent import monkey;monkey.patch_all() from socket import * import gevent #如果不想用money.patch_all()打補丁,可以用gevent自帶的socket # from gevent import socket # s=socket.socket() def server(server_ip,port): s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind((server_ip,port)) s.listen(5) while True: conn,addr=s.accept() gevent.spawn(talk,conn,addr) def talk(conn,addr): try: while True: res=conn.recv(1024) print(‘client %s:%s msg: %s‘ %(addr[0],addr[1],res)) conn.send(res.upper()) except Exception as e: print(e) finally: conn.close() if __name__ == ‘__main__‘: server(‘127.0.0.1‘,8080)服務端
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect((‘127.0.0.1‘,8080)) while True: msg=input(‘>>: ‘).strip() if not msg:continue client.send(msg.encode(‘utf-8‘)) msg=client.recv(1024) print(msg.decode(‘utf-8‘))客戶端
7、多協程發送多個客戶端
from gevent import monkey;monkey.patch_all() import gevent from socket import * def talk(conn,addr): while True: data=conn.recv(1024) print(‘%s:%s %s‘ %(addr[0],addr[1],data)) conn.send(data.upper()) conn.close() def server(ip,port): s = socket(AF_INET, SOCK_STREAM) s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) s.bind((ip,port)) s.listen(5) while True: conn,addr=s.accept() gevent.spawn(talk,conn,addr) s.close() if __name__ == ‘__main__‘: server(‘127.0.0.1‘, 8088)服務端
from multiprocessing import Process from socket import * def client(server_ip,server_port): client=socket(AF_INET,SOCK_STREAM) client.connect((server_ip,server_port)) while True: client.send(‘hello‘.encode(‘utf-8‘)) msg=client.recv(1024) print(msg.decode(‘utf-8‘)) if __name__ == ‘__main__‘: for i in range(500): p=Process(target=client,args=(‘127.0.0.1‘,8088)) p.start()客戶端
35、concurrent.futures模塊與協程