1. 程式人生 > 實用技巧 >Python多執行緒爬蟲詳解

Python多執行緒爬蟲詳解

一、程式程序和執行緒之間的關係

程式:一個應用就是一個程式,比如:qq,爬蟲

程序:程式執行的資源分配最小單位,

很多人學習python,不知道從何學起。
很多人學習python,掌握了基本語法過後,不知道在哪裡尋找案例上手。
很多已經做案例的人,卻不知道如何去學習更加高深的知識。
那麼針對這三類人,我給大家提供一個好的學習平臺,免費領取視訊教程,電子書籍,以及課程的原始碼!
QQ群:101677771


一個程式可以至少有一個程序

執行緒:cpu的最小排程單位,必須依賴程序而存在,一個程序至少有一個執行緒,執行緒是沒有獨立資源的,一個程序下的所有執行緒共享該程序所有資源

一個程式至少有一個程序,一個程序至少有一個執行緒

二、對多執行緒和多程序的理解

多工的目的:充分利用計算機的物理效能,來提高程式執行速度

單執行緒程式:程式執行的過程中,cpu的利用很低的

遇到各種阻塞,等待這些情況,此時cpu就處於空閒狀態
提高cpu的利用有兩種方式:

1、cpu有多個核心的,如何利好各個核心多程序程式設計,

​ 2、單個cpu,如何提高利用率,就是通過多執行緒程式設計

三、併發和並行


併發是兩個佇列交替使用一臺咖啡機,並行是兩個佇列同時使用兩臺咖啡機,如果序列,一個佇列使用一臺咖啡機

兩種解決辦法:

1、縱向擴充套件:

​ 買更好的cpu,提高硬體水平。—缺點:總有極限。

2、橫向擴充套件:

​ 增加電腦。分散式思想

並行:真正的同時執行

在python中並行通過多程序實現

併發:在統一時刻,cpu只能執行一個任務。但是cpu在各個任務之間切換,因為時間間隔很多,總體可以看來是多個程式一起執行
python的多執行緒其執行過程就是併發

序列:有一個任務執行單元,從物理上就只能一個任務、一個任務地執行

前提:cpython編譯器

​ GIL:全域性性解釋鎖。他讓多個執行緒在同時執行後,統一時刻,只能有一個執行緒拿到GIL這把鎖,拿到這把鎖的執行緒,cpu才能執行,存在目的就是為了簡化多執行緒程式設計,同時避免程序之間資料錯亂

​ 缺點:嚴重製約多執行緒執行效率
如果利用多執行緒和多程序將我們程式執行效率提到極致。將cpu的利用率達到100%—還不滿足,如何解決?

兩種解決辦法:

1、縱向擴充套件:https://user-gold-cdn.xitu.io/2020/7/17/1735bf415da0617a?w=6https://user-gold-cdn.xitu.io/2020/7/17/1735bf415da0617a?w=692&h=452&f=png&s=7952192&h=452&f=png&s=79521

​ 買更好的cpu,提高硬體水平。—缺點:總有極限

2、橫向擴充套件:

​ 增加電腦,分散式思想

四、執行緒建立方法

想要完成一個功能,有兩種選擇:

​ 完成建立執行緒功能

1、使用python已經設定好的模組。

​ threadin模組

​ (1)建立一個執行緒

​ t = threading.Thread(

​ target=執行緒的做的事,一般只需要指定方法的引用即可。

​ args = (按順序寫入引數列表)–一個元組

​ )

​ (2)啟動執行緒

​ t.start()----啟動之後該執行緒和主執行緒都是執行緒

​ 啟動之後他和主執行緒到底誰先執行只有cpu的排程佇列來決定的

2、自己造輪子

​ 顯示生活中想要製造汽車,並不是一件容易的事情

​ 相比而言,在程式碼中,先要自己取實現一個執行緒類,也不容易。但是面向物件有一種思想:繼承可以輕鬆做到造輪子

​ 繼承:

  • 子類繼承父類非私有一切屬性和方法

  • 子類重寫父類的屬性和方法,子類擁有就是自己的。(子類例項化後,呼叫這些屬性和方法其實排程就是自己的)

    對於繼承程式設計思想:

    ​ 1、我們想要完成哪些功能,可以使用這個模組

    ​ 2、當這個模組的默寫功能無法滿足我們需求的時候,我們可以繼承他,就有他的特性

    ​ 3、如果對那個方法不滿足,就重寫他

    建立執行緒第二種方法:

    ​ 1、寫一個類

    ​ 2、繼承threading.Thread

    ​ 3、重寫run方法

    ​ 因為執行緒啟動之後,底層執行的run方法

    ​ 4、例項化這個類,就相當於建立了一個執行緒,物件.start()可以啟動這個執行緒

    線上程類的使用過程中,一定要讓父類的init方法觸發

    ​ 如果自定義執行緒類實現了init方法,必須在init方法中手動呼叫父類init方法執行

    ​ super().init()

    ​ threading.Thread.init(self)

五、執行緒狀態

六、使用Python寫入mongo

import pymongo
#1、建立連線
client=pymongo.MongoClient(host='localhost',port=27017)
#2、連線資料庫
db = client['tencent_data']#如果資料庫不存在直接建立
#3、db就相當於資料庫引用
#True:upsert=true---有則更新,無則插入
db['招聘資訊'].update({'PostId':item['PostId']},{'$set':item},True)
print(item,'儲存成功!')

七、多執行緒實現思路

方法一:使用建立執行緒的方法

程式碼實現:

for i in range(1, 100):
    # parse_page(i)--->用執行緒代替
    t = threading.Thread(target=parse_page, args=(i,))
    t.start()

方法二:建立執行緒類

程式碼實現:

import pymongo
import requests, threading
from queue import Queue


class Tencent(threading.Thread):
    def __init__(self, url, name, q_task):
        super().__init__()
        self.base_url = url
        self.name = name
        self.q_task = q_task
        # 1、建立連線
        self.client = pymongo.MongoClient(host='localhost', port=27017)
        # 2、連線資料庫
        self.db = self.client['tencent_data']  # 如果資料庫不存在直接建立

    def get_json(self, url, page):
        '''
        請求ajax。獲取json資料
        :param url:
        :param page:
        :return:
        '''
        headers = {
            'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36',

        }
        params = {
            'timestamp': '1595207127325',
            'countryId': '',
            'cityId': '',
            'bgIds': '',
            'productId': '',
            'categoryId': '',
            'parentCategoryId': '',
            'attrId': '',
            'keyword': '',
            'pageIndex': page,
            'pageSize': '10',
            'language': 'zh-cn',
            'area': 'cn',
        }
        response = requests.get(url, params=params, headers=headers)
        return response.json()

    def write_to_mongo(self, item):

        # 3、db就相當於資料庫引用
        # True:upsert=true---有則更新,無則插入
        self.db['招聘資訊'].update({'PostId': item['PostId']}, {'$set': item}, True)
        print(item, '儲存成功!')

    def parse_json(self, json_data):
        '''

        :param json_data:
        :return:
        '''
        for data in json_data['Data']['Posts']:
            self.write_to_mongo(data)

    def parse_page(self, page):
        base_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?'
        json_data = self.get_json(base_url, page)
        # print(json_data)
        self.parse_json(json_data)

    def run(self):
        while True:
            if self.q_task.empty():
                break
            # 1、取出頁碼
            page = self.q_task.get()
            print(f'===========第{page}頁====================@{self.name}')
            # 2、請求,解析
            self.parse_page(page)


if __name__ == '__main__':
    base_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?'
    # 1、建立任務佇列
    q_page = Queue()
    # 2、初始化任務佇列---頁碼
    for i in range(1, 200):
        q_page.put(i)
    # 3、建立執行緒的控制開關-list
    crawl_list = ['aa', 'bb', 'cc', 'dd']  # 四個執行緒
    # 4、遍歷上面list來迴圈建立執行緒
    for crawl in crawl_list:
        t = Tencent(base_url, crawl, q_page)
        t.start()

八、生產者消費者模式爬蟲

什麼是生產者消費者模式?

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費 者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之後不用等待 消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力,下面基於佇列實現生產者 消費者模型

定義: 在併發程式設計中使用生產者和消費者模式能夠解決絕大多數併發問題,該模式通過平衡生 產執行緒和消費執行緒的工作能力來提高程式的整體處理資料的速度

問題:耦合性太高,這種模式就是解耦合

實現步驟:

程式碼實現:

import pymongo
import requests
from queue import Queue
import threading


class Product(threading.Thread):
    def __init__(self, base_url, q_page, name=None):
        super().__init__()
        self.base_url = base_url
        self.q_page = q_page
        # self.name = name

    def get_json(self, url, page):
        '''
        請求ajax。獲取json資料
        :param url:
        :param page:
        :return:
        '''
        headers = {
            'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36',

        }
        params = {
            'timestamp': '1595207127325',
            'countryId': '',
            'cityId': '',
            'bgIds': '',
            'productId': '',
            'categoryId': '',
            'parentCategoryId': '',
            'attrId': '',
            'keyword': '',
            'pageIndex': page,
            'pageSize': '10',
            'language': 'zh-cn',
            'area': 'cn',
        }
        response = requests.get(url, params=params, headers=headers)
        return response.json()

    def run(self):
        while True:
            if self.q_page.empty():
                break
            page = self.q_page.get()
            print(f'生產者執行緒:=======in {page} page===============@{self.name}')
            json_data = self.get_json(self.base_url, page)
            # q_json就是公共資料池
            q_json.put(json_data)


class Consumer(threading.Thread):
    def __init__(self):
        super().__init__()
        # 1、建立連線
        self.client = pymongo.MongoClient(host='localhost', port=27017)
        # 2、連線資料庫
        self.db = self.client['tencent_data2']  # 如果資料庫不存在直接建立

    def write_to_mongo(self, item):

        # 3、db就相當於資料庫引用
        # True:upsert=true---有則更新,無則插入
        self.db['招聘資訊'].update({'PostId': item['PostId']}, {'$set': item}, True)
        print(item, '儲存成功!')

    def parse_json(self, json_data):
        '''
        :param json_data:
        :return:
        '''
        for data in json_data['Data']['Posts']:
            self.write_to_mongo(data)

    def run(self):
        while True:
            if q_json.empty() and flag:  # ?問題關鍵在於:我們並沒有監控生產者到底做完了沒。
                break
            try:
                # 1、取資料--從池子裡取
                json_data = q_json.get(block=False)
                print(f'消費者執行緒:===@{self.name}==================data:f{json_data}')
                # 2、解析儲存
                self.parse_json(json_data)
            except Exception:
                continue


if __name__ == '__main__':
    base_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?'
    # 輪詢引數
    flag = False  # p還沒下班
    # 1、建立一個池
    q_json = Queue()

    # 2、建立p和c各自執行緒取完成上述流程。
    # 2.1 p的開啟
    # 初始化任務對壘
    q_page = Queue()
    for page in range(1, 200):
        q_page.put(page)
    # 儲存生產者的每個執行緒的引用。
    crawl_p = []
    for i in range(3):
        t = Product(base_url, q_page)
        t.start()
        crawl_p.append(t)

    # 2.2建立c
    for i in range(3):
        t = Consumer()
        t.start()

    # 3|保證p都做完了,再將flag-->true
    # 阻塞在這裡--監測p都是否完成。---join()
    a = [p.join() for p in crawl_p]
    flag = True