1. 程式人生 > >基於swoole、redis集合做一個訊息訂閱

基於swoole、redis集合做一個訊息訂閱

利用swoole開啟常駐程序,需要幾個按自己的情況來定,swoole程序數最好是和伺服器cpu核數相等

上篇和這裡我用的都是woker程序沒有用task_worker

redis 有序集合score可以按時間戳來吧需要傳送的資料儲存起來

利用swoole啟動的常駐程序不斷的去探測,可以設定一段時間去按照score時間排序去把這個時間段的有序集合的資料取出來消費

下面上程式碼:

swoole啟動程式碼

function run()
{
    try {
        $swoole = new \swoole_server(127.0.0.1, 9999);
        $swoole->set([
'daemonize' => 1, //是否開啟守護程序 'worker_num' => 8, //實際需要去設定 'log_file' => __APP_LOGS_PATH__ . '/swoole.log' ]); $swoole->on('WorkerStart', 'onWorkerStart'); $swoole->on('Receive', 'onReceive'); $swoole->start(); } catch
(\Exception $e) {
logs(['err_code' => $e->getCode(), 'err_msg' => $e->getMessage()], 'error'); } }
具體的分配程序去redis有序集合取資料然後消費
function onWorkerStart(swoole_server $swoole, $worker_id)
{
    for ($i = 1; $i <= 3000; $i++) {
        $redis = connectRedis();
if ($worker_id == 0) {
            $quedata 
= []; $quedata['tag'] = 'test'; if ($redis->zCard('cron:test')) { $data = $quedata['data'] = $redis->zRevRangeByScore('cron:test', time(), time() - 500); if (empty($data)) { sleep(300); } else { $quedata = json_encode($quedata); call_user_func_array('postMessage', [&$quedata, &$redis]); foreach ($data as $v) { $redis->zRem('cron:test', $v); } } } else { sleep(300); } } elseif ($worker_id == 1) { $quedata = []; $quedata['tag'] = 'order'; if ($redis->zCard('cron:order')) { $data = $quedata['data'] = $redis->zRevRangeByScore('cron:order', time(), time() - 500); if (empty($data)) { sleep(300); } else { $quedata = json_encode($quedata); call_user_func_array('postMessage', [&$quedata, &$redis]); foreach ($data as $v) { $redis->zRem('cron:order', $v); } } } else { sleep(300); } } } $redis->close(); unset($redis); method_exists($swoole, 'stop') ? $swoole->stop() : @exit; }
當然我這裡有序集合只設置了一個,然後去固定的有序集合去消費。你也可以用不同的業務模組建立不同的redis有序集合,然後配合分配的程序去消費