1. 程式人生 > >35、concurrent.futures模塊與協程

35、concurrent.futures模塊與協程

否則 ssp org 之間 內存 pat sta page hide

concurrent.futures —Launching parallel tasks concurrent.futures模塊同時提供了進程池和線程池,它是將來的使用趨勢,同樣我們之前學習的進程池Pool和threadpool模塊也可以使用。

對進程池疑惑的可以參閱:32進程池與回調函數http://www.cnblogs.com/liluning/p/7445457.html

對threadpool模塊疑惑的可以看我閑暇時寫的一段代碼:(因為本人也不了解這個模塊,代碼裏寫的也是自己想當然的,如有問題請自行查閱資料)

技術分享
#pip3 install threadpool  #需下載
import
threadpool import requests import re import os #爬取網頁 def get_page(url) : pattern = re.compile(r<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<, re.S) response = requests.get(url) if response.status_code == 200 : #status_code請求的狀態碼200為正常
return (response.text,pattern,url) #信息處理 def parse_page(info) : page_content,pattern,url = info print(<%s> parse [%s]% (os.getpid(), url)) res = re.findall(pattern,page_content) dic_l = [] for item in res: dic = { index:item[0],
title:item[1], actor:item[2].strip()[3:], time:item[3][5:] } dic_l.append(dic) print(dic) with open(movie_info.txt,a,encoding=utf-8) as f : for i in range(len(dic_l)) : parse_res = (index:%s title:%s actor:%s time:%s\n %(dic_l[i][index],dic_l[i][title],dic_l[i][actor],dic_l[i][time])) f.write(parse_res) if __name__ == __main__: urls = [ http://maoyan.com/board/7, http://maoyan.com/board/6, http://maoyan.com/board/1, http://maoyan.com/board/2, http://maoyan.com/board/4, ] t = threadpool.ThreadPool(4) #創建線程池 for url in urls : res = threadpool.makeRequests(get_page,urls,parse_page(get_page(url))) #參數:執行函數,參數,回調函數 [t.putRequest(req) for req in res] t.wait()
基於threadpool貓眼爬蟲

一、concurrent.futures模塊

1、官方文檔

https://docs.python.org/dev/library/concurrent.futures.html#module-concurrent.futures

2、ProcessPoolExecutor(進程池)與ThreadPoolExecutor(線程池)

(進程池類與線程池類的方法使用等各方面基本相同)

1)導入

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

2)創建

p = ProcessPoolExecutor(num)  #創建進程池
t = ThreadPoolExecutor(num)  #創建線程池

3)參數

num:要創建的進程數或線程數,如果省略,進程數將默認使用cpu_count()的值,線程數將默認使用cpu_count()*5的值

4)主要方法

submit(fn, *args, **kwargs):在一個池工作進程中執行執行fn(args kwargs)執行,並返回一個表示可調用的執行的Future對象
map(func, *iterables, timeout=None, chunksize=1):
shutdown(wait=True):執行結束釋放資源

3、應用

1)進程池

from concurrent.futures import ProcessPoolExecutor
import os,time
def task(n):
    print(%s is running %os.getpid())
    time.sleep(2)
    return n**2

if __name__ == __main__:
    p=ProcessPoolExecutor()
    l=[]
    start=time.time()
    for i in range(10):
        obj=p.submit(task,i)
        l.append(obj)
    p.shutdown()
    print(=*30)
    print([obj for obj in l])
    print([obj.result() for obj in l])
    print(time.time()-start)

2)線程池

from concurrent.futures import ThreadPoolExecutor
import threading
import os,time
def task(n):
    print(%s:%s is running %(threading.currentThread().getName(),os.getpid()))
    time.sleep(2)
    return n**2

if __name__ == __main__:
    p=ThreadPoolExecutor()
    l=[]
    start=time.time()
    for i in range(10):
        obj=p.submit(task,i)
        l.append(obj)
    p.shutdown()
    print(=*30)
    print([obj.result() for obj in l])
    print(time.time()-start)

3)同步執行

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random
def task(n):
    print(%s is running %os.getpid())
    time.sleep(2)
    return n**2

if __name__ == __main__:
    p=ProcessPoolExecutor()
    start=time.time()
    for i in range(10):
        res=p.submit(task,i).result()
        print(res)
    print(=*30)
    print(time.time()-start)

4、回調函數

不懂回調函數的看本章節首部有鏈接

from concurrent.futures import ThreadPoolExecutor
import requests, os, time
from threading import currentThread
def get_page(url):
    print(%s:<%s> is getting [%s] %(currentThread().getName(),os.getpid(),url))
    response=requests.get(url)
    time.sleep(2)
    return {url:url,text:response.text}
def parse_page(res):
    res=res.result()  #註意值
    print(%s:<%s> parse [%s] %(currentThread().getName(),os.getpid(),res[url]))
    with open(db.txt,a) as f:
        parse_res=url:%s size:%s\n %(res[url],len(res[text]))
        f.write(parse_res)
if __name__ == __main__:
    p=ThreadPoolExecutor()
    urls = [
        https://www.baidu.com,
        http://www.openstack.org,
        https://www.python.org,
        http://www.sina.com.cn/
    ]

    for url in urls:
        p.submit(get_page, url).add_done_callback(parse_page)
        #add_done_callback()回調函數
    p.shutdown()
    print(,os.getpid())

5、map方法

map有疑惑可以閱覽:19、內置函數和匿名函數http://www.cnblogs.com/liluning/p/7280832.html

from concurrent.futures import ProcessPoolExecutor
import os,time
def task(n):
    print(%s is running %os.getpid())
    time.sleep(2)
    return n**2

if __name__ == __main__:
    p=ProcessPoolExecutor()
    obj=p.map(task,range(10))
    p.shutdown()
    print(=*30)
    print(list(obj))

二、協程概念

1、定義

是單線程下的並發,又稱微線程,纖程。英文名Coroutine。一句話說明什麽是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。

2、註意

1)python的線程屬於內核級別的,即由操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運行)

2)單線程內開啟協程,一旦遇到io,就會從應用程序級別(而非操作系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)

3、優點

1) 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級

2) 單線程內就可以實現並發的效果,最大限度地利用cpu

4、缺點

1) 協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程

2) 協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程

5、總結

1)必須在只有一個單線程裏實現並發

2)修改共享數據不需加鎖

3)用戶程序裏自己保存多個控制流的上下文棧

附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制))


三、greenlet模塊

如果我們在單個線程內有20個任務,要想實現在多個任務之間切換,使用yield生成器的方式過於麻煩(需要先得到初始化一次的生成器,然後再調用send。。。非常麻煩),而使用greenlet模塊可以非常簡單地實現這20個任務直接的切換

生成器:18、叠代器和生成器http://www.cnblogs.com/liluning/p/7274862.html

1、安裝

pip3 install greenlet

2、使用

from greenlet import greenlet

def eat(name):
    print(%s eat 1 %name)
    g2.switch(egon)
    print(%s eat 2 %name)
    g2.switch()
def play(name):
    print(%s play 1 %name)
    g1.switch()
    print(%s play 2 %name)

g1=greenlet(eat)
g2=greenlet(play)

g1.switch(egon)#可以在第一次switch時傳入參數,以後都不需要

3、單純的切換(在沒有io的情況下或者沒有重復開辟內存空間的操作),反而會降低程序的執行速度

技術分享
#順序執行
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i

def f2():
    res=1
    for i in range(100000000):
        res*=i

start=time.time()
f1()
f2()
stop=time.time()
print(run time is %s %(stop-start)) #10.985628366470337

#切換
from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i
        g2.switch()

def f2():
    res=1
    for i in range(100000000):
        res*=i
        g1.switch()

start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print(run time is %s %(stop-start)) # 52.763017892837524
View Code

單線程裏的這20個任務的代碼通常會既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提高效率,這就用到了Gevent模塊。


四、Gevent模塊

1、安裝

pip3 install gevent

Gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。

2、用法

g1=gevent.spawn(func,1,,2,3,x=4,y=5)創建一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的

g2=gevent.spawn(func2)

g1.join() #等待g1結束

g2.join() #等待g2結束

#或者上述兩步合作一步:gevent.joinall([g1,g2])

g1.value#拿到func1的返回值

3、遇到IO阻塞時會自動切換任務

技術分享
import gevent
def eat(name):
    print(%s eat 1 %name)
    gevent.sleep(2)
    print(%s eat 2 %name)

def play(name):
    print(%s play 1 %name)
    gevent.sleep(1)
    print(%s play 2 %name)


g1=gevent.spawn(eat,egon)
g2=gevent.spawn(play,name=egon)
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print()
View Code

上例gevent.sleep(2)模擬的是gevent可以識別的io阻塞,而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了from gevent import monkey;monkey.patch_all()必須放到被打補丁者的前面,如time,socket模塊之前或者我們幹脆記憶成:要用gevent,需要將from gevent import monkey;monkey.patch_all()放到文件的開頭

技術分享
from gevent import monkey;monkey.patch_all()

import gevent
import time
def eat():
    print(eat food 1)
    time.sleep(2)
    print(eat food 2)

def play():
    print(play 1)
    time.sleep(1)
    print(play 2)

g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print()
View Code

4、Gevent的同步與異步

技術分享
from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print(Task %s done % pid)


def synchronous():
    for i in range(10):
        task(i)

def asynchronous():
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)

if __name__ == __main__:
    print(Synchronous:)
    synchronous()

    print(Asynchronous:)
    asynchronous()
#上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行所有給定的greenlet。執行流程只會在 所有greenlet執行完後才會繼續向下走。
View Code

5、Gevent實現爬蟲

from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print(GET: %s %url)
    response=requests.get(url)
    if response.status_code == 200:
        print(%d bytes received from %s %(len(response.text),url))

start_time=time.time()
g1=gevent.spawn(get_page, https://www.python.org/)
g2=gevent.spawn(get_page, https://www.yahoo.com/)
g3=gevent.spawn(get_page, https://github.com/)
gevent.joinall([g1,g2,g3])
stop_time=time.time()
print(run time is %s %(stop_time-start_time))

6、gevent實現單線程下的socket並發

通過gevent實現單線程下的socket並發(from gevent import monkey;monkey.patch_all()一定要放到導入socket模塊之前,否則gevent無法識別socket的阻塞)

技術分享
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent

#如果不想用money.patch_all()打補丁,可以用gevent自帶的socket
# from gevent import socket
# s=socket.socket()

def server(server_ip,port):
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind((server_ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        gevent.spawn(talk,conn,addr)

def talk(conn,addr):
    try:
        while True:
            res=conn.recv(1024)
            print(client %s:%s msg: %s %(addr[0],addr[1],res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()

if __name__ == __main__:
    server(127.0.0.1,8080)
服務端 技術分享
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect((127.0.0.1,8080))


while True:
    msg=input(>>: ).strip()
    if not msg:continue

    client.send(msg.encode(utf-8))
    msg=client.recv(1024)
    print(msg.decode(utf-8))
客戶端

7、多協程發送多個客戶端

技術分享
from gevent import monkey;monkey.patch_all()
import gevent
from socket import *
def talk(conn,addr):
    while True:
        data=conn.recv(1024)
        print(%s:%s %s %(addr[0],addr[1],data))
        conn.send(data.upper())
    conn.close()

def server(ip,port):
    s = socket(AF_INET, SOCK_STREAM)
    s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    s.bind((ip,port))
    s.listen(5)
    while True:
        conn,addr=s.accept()
        gevent.spawn(talk,conn,addr)
    s.close()

if __name__ == __main__:
    server(127.0.0.1, 8088)
服務端 技術分享
from multiprocessing import Process
from socket import *
def client(server_ip,server_port):
    client=socket(AF_INET,SOCK_STREAM)
    client.connect((server_ip,server_port))
    while True:
        client.send(hello.encode(utf-8))
        msg=client.recv(1024)
        print(msg.decode(utf-8))

if __name__ == __main__:
    for i in range(500):
        p=Process(target=client,args=(127.0.0.1,8088))
        p.start()
客戶端

35、concurrent.futures模塊與協程