1. 程式人生 > 資料庫 >非同步redis佇列實現 資料入庫的方法

非同步redis佇列實現 資料入庫的方法

業務需求

app客戶端向服務端介面傳送來json 資料 每天 發一次 清空快取後會再次傳送

出問題之前業務邏輯:

php 介面 首先將 json 轉為陣列 去重 在一張大表中插入不存在的資料

該使用者已經存在 和新增的id

入另一種詳情表

問題所在:

當用戶因特殊情況清除快取 導致app 傳送json串 入庫併發高 導致CPU 暴增到88% 並且居高不下

優化思路:

1、非同步佇列處理

2、redis 過濾(就是隻處理當天第一次請求)

3、redis 輔助儲存app名稱(驗證過後批量插入資料app名稱表中)

4、拼接插入的以及新增的如詳細表中

解決辦法:

1、介面修改 redis 過濾 + 如list佇列 並將結果存入redis中

首先 redis將之前的歷史資料放在redis 雜湊裡面 中文為鍵名 id 為鍵值

<?php
/**
 * Created by haiyong.
 * User: jia
 * Date: 2017/9/18
 * Time: 20:06
 */
namespace App\Http\Controllers\App;
 
 
use App\Http\Controllers\Controller;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Redis;
 
class OtherAppController extends Controller{
 
 /**
  * app應用統計介面
  * @param Request $request
  * @return string
  */
 public function appTotal(Request $request)
 {
  // //歷史資料入庫
  //$redis = Redis::connection('web_active');
  // $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName",'<>',' ')->lists('id','appName');
  // $str = '';
  // foreach ($app_name as $key => $val) {
  //  $str.= "{$val} {$key} ";
  // }
  // $redis->hmset('app_name',$app_name);
  // echo $str;exit;
  $result = $request->input('res');
  $list = json_decode($result,true);
  if (empty ($list) || !is_array($list)) {
   return json_encode(['result' => 'ERROR','msg' => 'parameter error']);
  }
  $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;
  $data['time'] = date('Y-m-d');
  $redis_key = 'log_app:'.$data['time'];
  //redis 過濾
  $redis = Redis::connection('web_active');
  //redis 鍵值過期設定
  if (empty($redis->exists($redis_key))) {
   $redis->hset($redis_key,1,'start');
   $redis->EXPIREAT($redis_key,strtotime($data['time'].'+2 day'));
  }
  //值確定
  if ($redis->hexists($redis_key,$data['uid'])) {
   return json_encode(['result' => 'SUCCESS']);
  } else {
   //推入佇列
   $redis->hset($redis_key,$data['uid'],$result);
   $redis->rpush('log_app_list',$data['time'] . ':' . $data['uid']);
   return json_encode(['result' => 'SUCCESS']);
  }
 }
}

2、php 指令碼迴圈 監控redis 佇列 執行邏輯 防止記憶體溢位

mget 獲取該使用者的app id 不存在就會返回null

通過判斷null 運用redis 新值作為自增id指標 將null 補齊 之後批量入mysql 並跟新redis 雜湊 和指標值 併入庫 詳情表

<?php
 
namespace App\Console\Commands;
 
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Storage;
 
class AppTotal extends Command
{
 /**
  * The name and signature of the console command.
  *
  * @var string
  */
 protected $signature = 'AppTotal:run';
 
 /**
  * The console command description.
  *
  * @var string
  */
 protected $description = 'Command description';
 
 /**
  * Create a new command instance.
  *
  * @return void
  */
 public function __construct()
 {
  parent::__construct();
 }
 
 /**
  * Execute the console command.
  *
  * @return mixed
  */
 public function handle()
 {
   //歷史資料入庫 
  // $redis = Redis::connection('web_active'); 
  // $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName",'appName'); 
  // $redis->hmset('app_name',$app_name); 
  // exit; 
   while(1) {
   $redis = Redis::connection('web_active');
   //佇列名稱
   $res = $redis->lpop('log_app_list'); 
   //開關按鈕
   $lock = $redis->get('log_app_lock');
   if (!empty($res)) {
    list($date,$uid) = explode(':',$res);
    $result = $redis->hget('log_app:'.$date,$uid);
    if (!empty($result)) {
      $table_name = 'app_total'.date('Ym');
      $list = json_decode($result,true);
      $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ;
      $data['sex'] = isset($list['sex']) ? $list['sex'] : '' ;
      $data['device'] = isset($list['device']) ? $list['device'] : '' ;
      $data['appList'] = isset($list['list']) ? $list['list'] : '' ;
      //資料去重 flip比unique更節約效能
      $data['appList'] = array_flip($data['appList']);
      $data['appList'] = array_flip($data['appList']);
      $data['time'] = date('Y-m-d');
      //app應用過濾
      $app_res = $redis->hmget('app_name',$data['appList']);
      //新增加app陣列
      $new_app = [];
      //mysql 入庫陣列
      $mysql_new_app = [];
   //獲取當前redis 自增指標
   $total = $redis->get('app_name_total');
   foreach ($app_res as $key =>& $val) {
    if (is_null($val)) {
     $total += 1;
     $new_app[$data['appList'][$key]] = $total; 
     $val = $total;
     array_push($mysql_new_app,['id' => $total,'appName'=> $data['appList'][$key]]);
    }
   }
   if (count($new_app)){
    $str = "INSERT IGNORE INTO app_set_name (id,appName) values";
    foreach ($new_app as $key => $val) {
    $str.= "(".$val.",'".$key."'),";
    }
    $str = trim($str,',');
    //$mysql_res = DB::connection('phpLog')->table('app_set_name')->insert($mysql_new_app);
    $mysql_res = DB::connection('phpLog')->statement($str);
    if ($mysql_res) {
     // 設定redis 指標
     $redis->set('app_name_total',$total);
     // redis 資料入庫
     $redis->hmset('app_name',$new_app);
    }
  }
    // 詳情資料入庫
    $data['appList'] = implode(',$app_res);
      //app統計入庫
      DB::connection('phpLog')->statement("INSERT IGNORE INTO ".$table_name." (uid,sex,device,`time`,appList) 
  values('".$data['uid']."',".$data['sex'].",'".$data['device']."','".$data['time']."','".$data['appList']."')");
      //log 記錄 當檔案達到123MB的時候產生記憶體保錯 所有這個地方可是利用日誌切割 或者 不寫入 日誌
      Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt',date('Y-m-d H:i:s').' success '.$result."\n"); 
  } else {
   Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt',date('Y-m-d H:i:s').' error '.$result."\n");
  }
   }
   //執行間隔
   sleep(1);
   //結束按鈕
   if ($lock == 2) {
    exit;
   }
   //記憶體檢測
   if(memory_get_usage()>1000*1024*1024){
    exit('記憶體溢位');//大於100M記憶體退出程式,防止記憶體洩漏被系統殺死導致任務終端
   }
  }
 }
}

3、執定 定時任務監控指令碼執行情況

crontab -e

/2 * * * * /bin/bash /usr/local/nginx/html/test.sh 1>>/usr/local/nginx/html/log.log 2>&1

test.sh 內容 (檢視執行命令返回的程序id 如果沒有就執行命令開啟)

#!/bin/bash
alive=`ps -ef | grep AppTotal | grep -v grep | awk '{print $2}'`
if [ ! $alive ]
then
 /usr/local/php/bin/php /var/ms/artisan AppTotal:run > /dev/null &
fi

記得授權哦 chmod +x test.sh

筆者用的laravel 框架 將命令啟用丟入後臺

執行命令

 /usr/local/php/bin/php /var/ms/artisan AppTotal:run > /dev/null &

完事直接 ctrl -c 結束就行 命令以在後臺執行 可以用shell 中的命令檢視程序id

這樣就實現佇列非同步入庫

還有很多問題需要優化!!大致功能已經實現!!!!!!

優化完成後cpu

以上這篇非同步redis佇列實現 資料入庫的方法就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。