1. 程式人生 > 實用技巧 >Django-redis快取配置

Django-redis快取配置

Celery 及非同步任務的處理

  1. 模組組成

    • 任務模組 Task

      包含非同步任務和定時任務. 其中, 非同步任務通常在業務邏輯中被觸發併發往任務佇列, 而定時任務由 Celery Beat 程序週期性地將任務發往任務佇列.

    • 訊息中介軟體 Broker

      Broker, 即為任務排程佇列, 接收任務生產者發來的訊息(即任務), 將任務存入佇列. Celery 本身不提供佇列服務, 官方推薦使用 RabbitMQ 和 Redis 等.

    • 任務執行單元 Worker

      Worker 是執行任務的處理單元, 它實時監控訊息佇列, 獲取佇列中排程的任務, 並執行它.

    • 任務結果儲存 Backend

      Backend 用於儲存任務的執行結果, 以供查詢. 同訊息中介軟體一樣, 儲存也可使用 RabbitMQ, Redis 和 MongoDB 等.

  2. 安裝

    pip install celery==4.4.5
    
  3. 建立例項

    import time
    from celery import Celery
    
    broker = 'redis://127.0.0.1:6379'
    backend = 'redis://127.0.0.1:6379/0'
    app = Celery('my_task', broker=broker, backend=backend)
    
    @app.task
    def add(x, y):
        time.sleep(5)     # 模擬耗時操作
        return x + y
    
  4. 啟動 Worker

    celery worker -A tasks -l info
    

    注意: celery預設使用多程序的方式啟動,window對多程序的方法不太友好,所以要改成 單程序 的啟動方式。 寫法是在後面 加上 --pool=solo

    • solo: 單程序

    • threading:多執行緒

    • gevent:協程 (需要安裝)

  5. 呼叫任務

    from tasks import add
    
    add.delay(2, 8)
    
  6. 常規配置

    broker_url = 'redis://127.0.0.1:6379/0'
    broker_pool_limit = 1000  # Borker 連線池, 預設是10
    
    timezone = 'Asia/Shanghai'
    accept_content = ['pickle', 'json']
    
    task_serializer = 'pickle'
    result_expires = 3600  # 任務過期時間
    
    result_backend = 'redis://127.0.0.1:6379/0'
    result_serializer = 'pickle'
    result_cache_max = 10000  # 任務結果最大快取數量
    
    worker_redirect_stdouts_level = 'INFO'
    
  7. 在django環境中執行celery

    import os
    
    from celery import Celery
    
    from worker import config
    
    # 載入django的環境
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "專案主目錄.settings")
    
    # 例項化celery
    celery_app = Celery('axf')
    
    # 載入配置檔案
    celery_app.config_from_object(config)
    
    # 自動註冊任務
    celery_app.autodiscover_tasks()
    
    

django中執行celery

  1. 安裝celery pip install celery

  2. 在根目錄下面建立一個名字叫做worker的包

  3. 在worker包下面建立配置檔案 config.py , 在配置檔案中配置 broker 的地址

    broker_url = 'redis://127.0.0.1:6379/0'
    
  4. 在 woker 目錄 下的 __init__.py檔案中 編寫 celery的功能

    import os
    
    from celery import Celery
    
    from worker import config
    
    # 載入django的環境
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "專案主目錄的名字.settings")
    
    # 例項化celery
    celery_app = Celery('axf')
    
    # 載入配置檔案
    celery_app.config_from_object(config)
    
    # 自動註冊任務
    celery_app.autodiscover_tasks()
    
  5. 把 耗時的方法 變成 任務, 把 common/func.py 中的 send_sms 變成 任務

    from libs.yuntongxun.sms import CCP
    from worker import celery_app
    
    @celery_app.task
    def send_sms(mobile):
        # 建立 隨機的 簡訊驗證碼
        smscode = randnum(4)
    
        ccp = CCP()
        # 注意: 測試的簡訊模板編號為1
        ccp.send_template_sms(mobile, [smscode, 3], 1)
    
  6. 在執行邏輯的時候,把任務加到 broker的 redis 佇列中

    def sms(request):
    
        # 獲取到手機號,發簡訊
        data = json.loads(request.body)
        mobile = data.get('mobile')
    
        # 驗證資料
        # TODO 後面需要驗證手機號是否存在
    
        # 發簡訊
        send_sms.delay(mobile)    # 用delay方法把 send_sms 耗時任務,新增到 redis 的佇列中
    
        return JsonResponse({'code': 0})
    
  7. 執行 celery 命令,開啟多個 worker 來監聽有沒有任務,如果有 就立刻執行

    celery worker -A worker -l info