1. 程式人生 > 實用技巧 >併發程式設計(執行緒)——驗證GIL鎖,GIL與普通互斥鎖的區別,io密集型和計算密集型,死鎖現象(解決方式:遞迴鎖),Semaphore訊號量,Event事件,執行緒queue,多程序實現tcp服務端併發,執行緒池&程序池

併發程式設計(執行緒)——驗證GIL鎖,GIL與普通互斥鎖的區別,io密集型和計算密集型,死鎖現象(解決方式:遞迴鎖),Semaphore訊號量,Event事件,執行緒queue,多程序實現tcp服務端併發,執行緒池&程序池

一、驗證GIL鎖的存在方式

from threading import Thread
from multiprocessing import Process


def task():
    while True:
        pass

if __name__ == '__main__':
    for i in range(6):
        # t=Thread(target=task)  # 因為有GIL鎖,同一時刻,只有一條執行緒執行,所以cpu不會滿
        t=Process(target=task)   # 由於是多程序,程序中的執行緒會被cpu排程執行,6個cpu全在工作,就會跑滿
t.start()

二、GIL與普通互斥鎖的區別

GIL鎖是不能保證資料的安全,普通互斥鎖來保證資料安全
from threading import Thread, Lock
import time

mutex = Lock()
money = 100


def task():
    global money
    mutex.acquire()
    temp = money
    time.sleep(1)
    money = temp - 1
    mutex.release()


if __name__ == '__main__':
    ll
=[] for i in range(10): t = Thread(target=task) t.start() # t.join() # 會怎麼樣?變成了序列,不能這麼做 ll.append(t) for t in ll: t.join() print(money)

三、io密集型和計算密集型

-----以下只針對於cpython直譯器
-在單核情況下:
-開多執行緒還是開多程序?不管幹什麼都是開執行緒
-在多核情況下:
-如果是計算密集型,需要開程序,能被多個cpu排程執行
-如果是io密集型,需要開執行緒,cpu遇到io會切換到其他執行緒執行
from threading import Thread
from multiprocessing import Process
import time


# 計算密集型
def task():
    count = 0
    for i in range(100000000):
        count += i


if __name__ == '__main__':
    ctime = time.time()
    ll = []
    for i in range(10):
        t = Thread(target=task)  # 開執行緒:42.68658709526062
        # t = Process(target=task)   # 開程序:9.04949426651001
        t.start()
        ll.append(t)

    for t in ll:
        t.join()
    print(time.time()-ctime)


## io密集型
def task():
    time.sleep(2)


if __name__ == '__main__':
    ctime = time.time()
    ll = []
    for i in range(400):
        t = Thread(target=task)  # 開執行緒:2.0559656620025635
        # t = Process(target=task)   # 開程序:9.506720781326294
        t.start()
        ll.append(t)

    for t in ll:
        t.join()
    print(time.time()-ctime)

四、死鎖現象(哲學家就餐問題)

是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序,如下就是死鎖
死鎖現象:
(1)張三拿到了A鎖,等B鎖,李四拿到了B鎖,等A鎖;
(2)張三在不同的線/程序中搶a鎖
from threading import Thread, Lock
import time

mutexA = Lock()
mutexB = Lock()


def eat_apple(name):
    mutexA.acquire()
    print('%s 獲取到了a鎖' % name)
    mutexB.acquire()
    print('%s 獲取到了b鎖' % name)
    print('開始吃蘋果,並且吃完了')
    mutexB.release()
    print('%s 釋放了b鎖' % name)
    mutexA.release()
    print('%s 釋放了a鎖' % name)


def eat_egg(name):
    mutexB.acquire()
    print('%s 獲取到了b鎖' % name)
    time.sleep(2)
    mutexA.acquire()
    print('%s 獲取到了a鎖' % name)
    print('開始吃雞蛋,並且吃完了')
    mutexA.release()
    print('%s 釋放了a鎖' % name)
    mutexB.release()
    print('%s 釋放了b鎖' % name)


if __name__ == '__main__':
    ll = ['egon', 'alex', '鐵蛋']
    for name in ll:
        t1 = Thread(target=eat_apple, args=(name,))
        t2 = Thread(target=eat_egg, args=(name,))
        t1.start()
        t2.start()

解決方式:

遞迴鎖

遞迴鎖(可重入),同一個人可以多次acquire,每acquire一次,內部計數器加1,每relaese一次,內部計數器減一
只有計數器不為0,其他人都不獲得這把鎖
from threading import Thread, Lock,RLock
import time

# 同一把鎖
# mutexA = Lock()
# mutexB = mutexA

# 使用可重入鎖解決(同一把鎖)
# mutexA = RLock()
# mutexB = mutexA
mutexA = mutexB =RLock()

def eat_apple(name):
    mutexA.acquire()
    print('%s 獲取到了a鎖' % name)
    mutexB.acquire()
    print('%s 獲取到了b鎖' % name)
    print('開始吃蘋果,並且吃完了')
    mutexB.release()
    print('%s 釋放了b鎖' % name)
    mutexA.release()
    print('%s 釋放了a鎖' % name)


def eat_egg(name):
    mutexB.acquire()
    print('%s 獲取到了b鎖' % name)
    time.sleep(2)
    mutexA.acquire()
    print('%s 獲取到了a鎖' % name)
    print('開始吃雞蛋,並且吃完了')
    mutexA.release()
    print('%s 釋放了a鎖' % name)
    mutexB.release()
    print('%s 釋放了b鎖' % name)


if __name__ == '__main__':
    ll = ['egon', 'alex', '鐵蛋']
    for name in ll:
        t1 = Thread(target=eat_apple, args=(name,))
        t2 = Thread(target=eat_egg, args=(name,))
        t1.start()
        t2.start()

五、Semaphore訊號量

Semaphore:訊號量可以理解為多把鎖,同時允許多個執行緒來更改資料
from  threading import Thread,Semaphore
import time
import random
sm=Semaphore(3) # 數字表示可以同時有多少個執行緒操作


def task(name):
    sm.acquire()
    print('%s 正在蹲坑'%name)
    time.sleep(random.randint(1,5))
    sm.release()



if __name__ == '__main__':
    for i in range(20):
        t=Thread(target=task,args=('屌絲男%s號'%i,))
        t.start()

六、Event事件

一些執行緒需要等到其他執行緒執行完成之後才能執行,類似於發射訊號
比如一個執行緒等待另一個執行緒執行結束再繼續執行
from threading import Thread, Event
import time

event = Event()


def girl(name):
    print('%s 現在不單身,正在談戀愛'%name)
    time.sleep(10)
    print('%s 分手了,給屌絲男發了訊號'%name)
    event.set()


def boy(name):
    print('%s 在等著女孩分手'%name)
    event.wait()  # 只要沒來訊號,就卡在者
    print('女孩分手了,機會來了,衝啊')


if __name__ == '__main__':
    lyf = Thread(target=girl, args=('劉亦菲',))
    lyf.start()

    for i in range(10):
        b = Thread(target=boy, args=('屌絲男%s號' % i,))
        b.start()

案例:

起兩個執行緒,第一個執行緒讀檔案的前半部分,讀完發一個訊號,另一個程序讀後半部分,並列印

from threading import Thread, Event
import time
import os

event = Event()
# 獲取檔案總大小
size = os.path.getsize('a.txt')


def read_first():
    with open('a.txt', 'r', encoding='utf-8') as f:
        n = size // 2  # 取檔案一半,整除
        data = f.read(n)
        print(data)
        print('我一半讀完了,發了個訊號')
        event.set()


def read_last():
    event.wait()  # 等著發訊號
    with open('a.txt', 'r', encoding='utf-8') as f:
        n = size // 2  # 取檔案一半,整除
        # 游標從檔案開頭開始,移動了n個位元組,移動到檔案一半
        f.seek(n, 0)
        data = f.read()
        print(data)


if __name__ == '__main__':
    t1=Thread(target=read_first)
    t1.start()
    t2=Thread(target=read_last)
    t2.start()

七、執行緒queue

1、程序queue和執行緒不是一個
  程序queue:
  from multiprocessing import Queue

  執行緒queue:
  from queue import Queue,LifoQueue,PriorityQueue

2、執行緒間通訊,因為共享變數會出現資料不安全問題,用執行緒queue通訊,不需要加鎖,內部自帶
3、queue是執行緒安全的
4、三種執行緒Queue
-Queue:佇列,先進先出
-PriorityQueue:優先順序佇列,誰小誰先出
-LifoQueue:棧,後進後出
#1 Quenue使用

q=Queue(5)
#放值
q.put("lqz")
q.put("egon")
q.put("鐵蛋")
q.put("鋼彈")
q.put("金蛋")

# q.put("銀蛋")
# q.put_nowait("銀蛋")
# 取值
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# 卡住
# print(q.get())
# q.get_nowait()
# 是否滿,是否空
print(q.full())
print(q.empty())

#2 LifoQueue使用

q=LifoQueue(5)
q.put("lqz")
q.put("egon")
q.put("鐵蛋")
q.put("鋼彈")
q.put("金蛋")

# q.put("ddd蛋")
print(q.get())

#3 PriorityQueue使用
#PriorityQueue:數字越小,級別越高

q=PriorityQueue(3)
q.put((-10,'金蛋'))
q.put((100,'銀蛋'))
q.put((101,'鐵蛋'))
# q.put((1010,'鐵dd蛋'))  # 不能再放了

print(q.get())
print(q.get())
print(q.get())

八、通過多程序,實現TCP服務端支援多個客戶端連線(併發)

#服務端
from multiprocessing import Process

import socket




def task(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0: break
            print(data)
            conn.send(data.upper())
        except Exception as e:
            print(e)
            break
    conn.close()



if __name__ == '__main__':
    server = socket.socket()

    server.bind(('127.0.0.1', 8081))
    server.listen(5)

    # 多執行緒,或者多程序
    while True:  # 連線迴圈
        conn, addr = server.accept()
        # 多使用者的服務端
        t=Process(target=task,args=(conn,))
        t.start()

        ### 單使用者的服務端
        # while True:
        #     try:
        #         data = conn.recv(1024)
        #         if len(data) == 0: break
        #         print(data)
        #         conn.send(data.upper())
        #     except Exception as e:
        #         print(e)
        #         break
        # conn.close()

#客戶端
import socket
import time


cli=socket.socket()
cli.connect(('127.0.0.1',8081))

while True:
    cli.send(b'hello world')
    time.sleep(0.1)
    data=cli.recv(1024)
    print(data)

九、執行緒池&程序池

1、為什麼會出現池?不管是開程序還是開執行緒,不能無限制開,通過池,假設池子裡就有10個,不管再怎麼開,永遠是這10個

2、使用(需要記住)
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(2)
pool.submit(get_pages, url).add_done_callback(call_back)

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import Thread
import time
import random

pool = ThreadPoolExecutor(5)  # 數字是池的大小
# pool = ProcessPoolExecutor(5)  # 數字是池的大小


def task(name):
    print('%s任務開始' % name)

    time.sleep(random.randint(1, 4))
    print('任務結束')
    return '%s 返回了'%name



def call_back(f):
    # print(type(f))
    print(f.result())
if __name__ == '__main__':

    # ll=[]
    # for i in range(10):  # 起了100個執行緒
    #     # t=Thread(target=task)
    #     # t.start()
    #     res = pool.submit(task, '屌絲男%s號' % i)  # 不需要再寫在args中了
    #     # res是Future物件
    #     # from  concurrent.futures._base import Future
    #     # print(type(res))
    #     # print(res.result())  # 像join,只要執行result,就會等著結果回來,就變成串行了
    #     ll.append(res)
    #
    # for res in ll:
    #     print(res.result())

    # 終極使用
    for i in range(10):  # 起了100個執行緒
        # 向執行緒池中提交一個任務,等任務執行完成,自動回到到call_back函式執行
        pool.submit(task,'屌絲男%s號' % i).add_done_callback(call_back)

執行緒池案例:

  爬網站

from concurrent.futures import ThreadPoolExecutor

import requests  # 爬蟲會學到的模組

pool = ThreadPoolExecutor(2)


def get_pages(url):
    # https://www.baidu.com
    res = requests.get(url)  # 向這個地址傳送請求

    name = url.rsplit('/')[-1] + '.html'
    print(name)  # www.baidu.com.html
    # res.content拿到頁面的二進位制
    return {'name': name, 'text': res.content}


def call_back(f):
    dic = f.result()
    with open(dic['name'], 'wb') as f:
        f.write(dic['text'])


if __name__ == '__main__':
    ll = ['https://www.baidu.com', 'https://www.mzitu.com', 'https://www.cnblogs.com']
    for url in ll:
        pool.submit(get_pages, url).add_done_callback(call_back)

十、

---39----