1. 程式人生 > >python多程序併發之multiprocessing

python多程序併發之multiprocessing

multiprocessing.Process

multiprocessing包是Python中的多程序管理包。它與 threading.Thread類似,可以利用multiprocessing.Process物件來建立一個程序。該程序可以允許放在Python程式內部編寫的函式中。該Process物件與Thread物件的用法相同,擁有is_alive()、join([timeout])、run()、start()、terminate()等方法。屬性有:authkey、daemon(要通過start()設定)、exitcode(程序在執行時為None、如果為–N,表示被訊號N結束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類,用來同步程序,其用法也與threading包中的同名類一樣。multiprocessing的很大一部份與threading使用同一套API,只不過換到了多程序的情境。

這個模組表示像執行緒一樣管理程序,這個是multiprocessing的核心,它與threading很相似,對多核CPU的利用率會比threading好的多。

看一下Process類的構造方法:

__init__(self, group=None, target=None, name=None, args=(), kwargs={})

引數說明:

  • group:程序所屬組。基本不用
  • target:表示呼叫物件。
  • args:表示呼叫物件的位置引數元組。
  • name:別名
  • kwargs:表示呼叫物件的字典。

建立程序的簡單例項:

#coding=utf-8
import multiprocessing

def
do(n) :
#獲取當前執行緒的名字 name = multiprocessing.current_process().name print name,'starting' print "worker ", n return if __name__ == '__main__' : numList = [] for i in xrange(5) : p = multiprocessing.Process(target=do, args=(i,)) numList.append(p) p.start() p.join() print
"Process end."

執行結果:

Process-1 starting
worker  0
Process end.
Process-2 starting
worker  1
Process end.
Process-3 starting
worker  2
Process end.
Process-4 starting
worker  3
Process end.
Process-5 starting
worker  4
Process end.

建立子程序時,只需要傳入一個執行函式和函式的引數,建立一個Process例項,並用其start()方法啟動,join()方法表示等待子程序結束以後再繼續往下執行,通常用於程序間的同步。

注意:
在Windows上要想使用程序模組,就必須把有關程序的程式碼寫在當前.py檔案的if __name__ == ‘__main__’ :語句的下面,才能正常使用Windows下的程序模組。Unix/Linux下則不需要。

multiprocess.Pool

當被操作物件數目不大時,可以直接利用multiprocessing中的Process動態成生多個程序,十幾個還好,但如果是上百個,上千個目標,手動的去限制程序數量卻又太過繁瑣,此時可以發揮程序池的功效。

Pool可以提供指定數量的程序供使用者呼叫,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序來它。

apply_async和apply

函式原型:

apply_async(func[, args=()[, kwds={}[, callback=None]]])

二者都是向程序池中新增新的程序,不同的時,apply每次新增新的程序時,主程序和新的程序會並行執行,但是主程序會阻塞,直到新程序的函式執行結束。 這是很低效的,所以python3.x之後不再使用

apply_async和apply功能相同,但是主程序不會阻塞。

# -*- coding:utf-8 -*-

import multiprocessing
import time

def func(msg):
    print "*msg: ", msg
    time.sleep(3)
    print "*end"

if __name__ == "__main__":
    # 維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去
    pool = multiprocessing.Pool(processes=3)
    for i in range(10):
        msg = "hello [{}]".format(i)
        # pool.apply(func, (msg,))
        pool.apply_async(func, (msg,))        # 非同步開啟程序, 非阻塞型, 能夠向池中新增程序而不等待其執行完畢就能再次執行迴圈

    print "--" * 10
    pool.close()   # 關閉pool, 則不會有新的程序新增進去
    pool.join()    # 必須在join之前close, 然後join等待pool中所有的執行緒執行完畢
    print "All process done."

執行結果:

"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py
--------------------
*msg:  hello [0]
*msg:  hello [1]
*msg:  hello [2]
*end
*msg:  hello [3]
*end
*end
*msg:  hello [4]
*msg:  hello [5]
*end
*msg:  hello [6]
*end
*end
*msg:  hello [7]
*msg:  hello [8]
*end
*msg:  hello [9]
*end*end

*end
All process done.

Process finished with exit code 0

獲得程序的執行結果

# -*- coding:utf-8 -*-

import multiprocessing
import time

def func_with_return(msg):
    print "*msg: ", msg
    time.sleep(3)
    print "*end"
    return "{} return".format(msg)

if __name__ == "__main__":
    # 維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去
    pool = multiprocessing.Pool(processes=3)
    results = []
    for i in range(10):
        msg = "hello [{}]".format(i)
        res = pool.apply_async(func_with_return, (msg,))        # 非同步開啟程序, 非阻塞型, 能夠向池中新增程序而不等待其執行完畢就能再次執行迴圈
        results.append(res)

    print "--" * 10
    pool.close()   # 關閉pool, 則不會有新的程序新增進去
    pool.join()    # 必須在join之前close, 然後join等待pool中所有的執行緒執行完畢
    print "All process done."

    print "Return results: "
    for i in results:
        print i.get()   # 獲得程序的執行結果

結果:

"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py
--------------------
*msg:  hello [0]
*msg:  hello [1]
*msg:  hello [2]
*end
*end
*msg:  hello [3]
*msg:  hello [4]
*end
*msg:  hello [5]
*end
*end
*msg:  hello [6]
*msg:  hello [7]
*end
*msg:  hello [8]
*end
*end
*msg:  hello [9]
*end
*end
All process done.
Return results: 
hello [0] return
hello [1] return
hello [2] return
hello [3] return
hello [4] return
hello [5] return
hello [6] return
hello [7] return
hello [8] return
hello [9] return

Process finished with exit code 0

map

函式原型:

map(func, iterable[, chunksize=None])

Pool類中的map方法,與內建的map函式用法行為基本一致,它會使程序阻塞直到返回結果。
注意,雖然第二個引數是一個迭代器,但在實際使用中,必須在整個佇列都就緒後,程式才會執行子程序。

# -*- coding:utf-8 -*-

import multiprocessing
import time

def func_with_return(msg):
    print "*msg: ", msg
    time.sleep(3)
    print "*end"
    return "{} return".format(msg)

if __name__ == "__main__":
    # 維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去
    pool = multiprocessing.Pool(processes=3)
    results = []
    msgs = []
    for i in range(10):
        msg = "hello [{}]".format(i)
        msgs.append(msg)

    results = pool.map(func_with_return, msgs)

    print "--" * 10
    pool.close()   # 關閉pool, 則不會有新的程序新增進去
    pool.join()    # 必須在join之前close, 然後join等待pool中所有的執行緒執行完畢
    print "All process done."

    print "Return results: "
    for i in results:
        print i   # 獲得程序的執行結果

執行結果:

"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v2.py
*msg:  hello [0]
*msg:  hello [1]
*msg:  hello [2]
*end*end

*msg:  hello [3]
*msg:  hello [4]
*end
*msg:  hello [5]
*end*end

*msg:  hello [6]
*msg:  hello [7]
*end
*msg:  hello [8]
*end
*end
*msg:  hello [9]
*end
*end
--------------------
All process done.
Return results: 
hello [0] return
hello [1] return
hello [2] return
hello [3] return
hello [4] return
hello [5] return
hello [6] return
hello [7] return
hello [8] return
hello [9] return

Process finished with exit code 0

注意執行結果中“—-”的位置,可以看到,map之後,主程序是阻塞的,等待map的結果返回

close()

關閉程序池(pool),使其不在接受新的任務。

terminate()

結束工作程序,不在處理未處理的任務。

join()

主程序阻塞等待子程序的退出,join方法必須在close或terminate之後使用。

程序間通訊

多程序最麻煩的地方就是程序間通訊,IPC比執行緒通訊要難處理的多,所以留作單獨一篇來記錄