1. 程式人生 > >python多程序併發中,解決資料共享問題Value+Array

python多程序併發中,解決資料共享問題Value+Array

參考文章:http://www.jb51.net/article/57666.htm之前多執行緒執行的時候,全部用的全域性變數,程式碼如下:
#!/usr/bin/env python
#encoding: utf-8

import requestSender as AB
import random
import threading, time
import os
TOTAL = 0
SUCC = 0
FAIL = 0
EXCEPT = 0
MAXTIME = 0
MINTIME = 100
GT3 = 0
LT3 = 0
TOTAL_TIME=0
lock=threading.Lock()
class RequestThread(threading.Thread):
    def __init__(self, thread_name):
        threading.Thread.__init__(self)
        self.test_count = 0
        self.name=thread_name

    def run(self):
        global lock
        if lock.acquire():
            print self.name
            self.test_performace()
            lock.release()

    def test_performace(self):
        global TOTAL
        global SUCC
        global FAIL
        global EXCEPT
        global GT3
        global LT3
        global TOTAL_TIME
        try:
            st = time.time()
            #AB.ingress_test1_0()
            if AB.ingress_test1_0()== "OK":
                TOTAL += 1
                SUCC += 1
            else:
                TOTAL += 1
                FAIL += 1
            time_span = time.time() - st
            TOTAL_TIME+=time_span
            self.maxtime(time_span)
            self.mintime(time_span)
            if time_span > 3:
                GT3 += 1
            else:
                LT3 += 1
        except Exception as e:
            print('Error:',e)
            TOTAL += 1
            EXCEPT += 1

    def maxtime(self, ts):
        global MAXTIME
        if ts > MAXTIME:
            MAXTIME = ts

    def mintime(self, ts):
        global MINTIME
        if ts < MINTIME:
            MINTIME = ts

def main(thread_count):
    print('===========task start===========')
    start_time = time.ctime()
    thread_count = thread_count
    threads = []
    for i in range(thread_count):
        t = RequestThread("thread" + str(i))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    print('===========task end===========')
    print(start_time)
    print("total:%d,succ:%d,fail:%d,except:%d" % (TOTAL, SUCC, FAIL, EXCEPT))
    print('response total_time:', TOTAL_TIME)
    print('response average_time:', TOTAL_TIME /TOTAL)
    print('response maxtime:', MAXTIME)
    print('response mintime', MINTIME)
    print('great than 3 seconds:%d,percent:%0.2f' % (GT3, float(GT3) / TOTAL))
    print('less than 3 seconds:%d,percent:%0.2f' % (LT3, float(LT3) / TOTAL))
    time.sleep(60 * random.randint(1, 5))
    AB.ingress_test1_1()
    time.sleep(60 * random.randint(1, 5))
for x in xrange(10):
    main(10)
    if not os.path.isdir("zxtest"):
        os.mkdir("zxtest")
    AB.myLog('zxtest/test1_0.log',"test1_0:","total:%d,succ:%d,fail:%d,except:%d" % (TOTAL, SUCC, FAIL, EXCEPT),'response total_time:', TOTAL_TIME,
         'response maxtime:', MAXTIME,'response mintime', MINTIME,'average time',TOTAL_TIME/TOTAL,'great than 3 seconds:%d,percent:%0.2f' % (GT3, float(GT3) / TOTAL),
         'less than 3 seconds:%d,percent:%0.2f' % (LT3, float(LT3) / TOTAL))
為了更好利用多核資源,改成多程序+攜程,但是python的多程序預設無法共享全域性變數,所以問題來了。解決方案就是利用Value+Array,在多程序間共享全域性變數,程式碼貼出來如下;
# !/user/bin/env python
# encoding:utf-8
# code by Iris
from multiprocessing import Process, Value, Array
import requestSender as AB
from gevent import monkey; monkey.patch_all()
import gevent
import random
import time
import os
def test_performace(func,n,a):
    '''
    TOTAL,SUCC,FAIL,EXCEPT,MAXTIME,MINTIME,GT3,LT3,TOTAL_TIME=a
    :param func:
    :return:
    '''
    n.value = n.value + 1
    try:
        st = time.time()
        if func() == "OK":
            a[0]+= 1
            a[1] += 1
        else:
            a[0] += 1
            a[2] += 1
        time_span = time.time() - st
        a[8] += time_span
        if time_span>a[4]:
            a[4]=time_span
        else:
            pass
        if time_span<a[5]:
            a[5]=time_span
        else:
            pass
        if time_span > 3:
            a[6] += 1
        else:
            a[7] += 1
    except Exception as e:
        print('Error:', e)
        a[0] += 1
        a[3] += 1
def process_start(Coroutine_num,func,n,a):
    tasks = []
    for i in xrange(Coroutine_num):
        tasks.append(gevent.spawn(test_performace,func,n,a))
    gevent.joinall(tasks)  # 使用協程來執行
def task_start(progress_num,Coroutine_num,func,num, arr):
    for i in xrange(progress_num):
        p = Process(target=process_start, args=(Coroutine_num,func,num, arr))
        print('===========task start===========')
        start_time = time.ctime()
        print(start_time)
        p.start()
        p.join()
        print('===========task end===========')
if __name__=="__main__":
    num = Value('f', 0)
    arr = Array('f', [0]*9)
    arr[5]=100
    for x in xrange(1):
        task_start(2, 3, AB.ingress_test1_0,num,arr)
        TOTAL,SUCC,FAIL,EXCEPT,MAXTIME,MINTIME,GT3,LT3,TOTAL_TIME=arr
        print TOTAL,SUCC,FAIL,EXCEPT,MAXTIME,MINTIME,GT3,LT3,TOTAL_TIME
        print("total:%d,succ:%d,fail:%d,except:%d" % (TOTAL, SUCC, FAIL, EXCEPT))
        print('response total_time:', TOTAL_TIME)
        print('response average_time:', TOTAL_TIME / TOTAL)
        print('response maxtime:', MAXTIME)
        print('response mintime', MINTIME)
        print('great than 3 seconds:%d,percent:%0.2f' % (GT3, float(GT3) / TOTAL))
        print('less than 3 seconds:%d,percent:%0.2f' % (LT3, float(LT3) / TOTAL))
        if not os.path.isdir("zxtest"):
            os.mkdir("zxtest")
        AB.myLog('zxtest/test1_0.log', "test1_0:", "total:%d,succ:%d,fail:%d,except:%d" % (TOTAL, SUCC, FAIL, EXCEPT),
                 'response total_time:', TOTAL_TIME,
                 'response maxtime:', MAXTIME, 'response mintime', MINTIME, 'average time', TOTAL_TIME / TOTAL,
                 'great than 3 seconds:%d,percent:%0.2f' % (GT3, float(GT3) / TOTAL),
                 'less than 3 seconds:%d,percent:%0.2f' % (LT3, float(LT3) / TOTAL))