1. 程式人生 > >PHP使用SWOOLE擴充套件實現定時同步 MySQL 資料

PHP使用SWOOLE擴充套件實現定時同步 MySQL 資料

南寧公司和幾個分公司之間都使用了呼叫系統,然後現在需要做一個呼叫通話資料分析,由於分公司的呼叫伺服器是在內網,通過技術手段映射出來,分公司到南寧之間的網路不穩定,所以需要把分公司的通話資料同步到南寧。

本身最簡單的方法就是直接配置MySQL的主從同步就可以同步資料到南寧來了。但是銷售呼叫系統那邊的公司不給MySQL許可權我們。 所以這個方法只能放棄了。

於是我們乾脆的想,使用PHP來實現定時一個簡易的PHP定時同步工具,然後PHP程序常駐後臺執行,所以首先就先到了一個PHP元件:SWOOLE,經過討論,分公司的每天半天生成的資料量最大在5000條左右,所以這個方案是可行,就這樣幹。

我們使用PHP SWOOLE 做一個非同步的定時任務系統。

本身MySQL資料庫的主從同步是通過解析Master庫中的binary-log來進行同步資料到從庫的。然而我們使用PHP來同步資料的時候,那麼只能從master庫分批查詢資料,然後插入到南寧的slave庫來了。

這裡我們使用的框架是 ThinkPHP 3.2 .

首先安裝PHP擴充套件: SWOOLE,因為沒有使用到特別的功能,所以這裡我們使用pecl來快速安裝:

?

1

pecl install swoole

安裝完成後在 php.ini 裡面加入 extension="swoole.so"

 安裝完成後,我們使用 phpinfo() 來檢查是否成功了.

安裝成功了,我們就來寫業務.

服務端

1、首先啟動一個後臺的服務端,監聽埠9501

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

public function index()

{

 $serv = new \swoole_server("0.0.0.0", 9501);

 $serv->set([

  'worker_num' => 1,//一般設定為伺服器CPU數的1-4倍

  'task_worker_num' => 8,//task程序的數量

  'daemonize' => 1,//以守護程序執行

  'max_request' => 10000,//最大請求數量

  "task_ipc_mode " => 2 //使用訊息佇列通訊,並設定為爭搶模式

 ]);

 $serv->on('Receive', [$this, 'onReceive']);//接收任務,並投遞

 $serv->on('Task', [$this, 'onTask']);//可以在這個方法裡面處理任務

 $serv->on('Finish', [$this, 'onFinish']);//任務完成時候呼叫

 $serv->start();

}

2、接收和投遞任務

?

1

2

3

4

5

6

7

8

9

public function onReceive($serv, $fd, $from_id, $data)

{

 //使用json_decode 解析任務資料

 $areas = json_decode($data,true);

 foreach ($areas as $area){

  //投遞非同步任務

  $serv->task($area);

 }

}

3、任務執行,資料從master庫查詢和寫入到slave資料庫

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

public function onTask($serv, $task_id, $from_id, $task_data)

{

 $area = $task_data;//引數是地區編號

 $rows = 50; //每頁多少條

 //主庫地址,根據引數地區($area)編號切換master資料庫連線

 //從庫MySQL例項,根據引數地區($area)編號切換slave資料庫連線

 //由於程式是常駐記憶體的,所以MySQL連線可以使用長連線,然後重複利用。要使用設計模式的,可以使用物件池模式

 Code......

 

 //master 庫為分公司的資料庫,slave庫為資料同步到南寧後的從庫

 Code......

 

 //使用$sql獲取從庫中最大的自增: SELECT MAX(id) AS maxid FROM ss_cdr_cdr_info limit 1

 $slaveMaxIncrementId = ...;

 

 //使用$sql獲取主庫中最大的自增: SELECT MAX(id) AS maxid FROM ss_cdr_cdr_info limit 1

 $masterMaxIncrementId = ...;

 

 //如果相等的就不同步了

 if($slaveMaxIncrementId >= $masterMaxIncrementId){

  return false;

 }

 

 //根據條數計算頁數

 $dataNumber = ceil($masterMaxIncrementId - $slaveMaxIncrementId);

 $eachNumber = ceil($dataNumber / $rows);

 $left = 0;

 

 //根據頁數來進行分批迴圈進行寫入,要記得及時清理記憶體

 for ($i = 0; $i < $eachNumber; $i++) {

  $left = $i == 0 ? $slaveMaxIncrementId : $left + $rows;

  $right = $left + $rows;

  //生成分批查詢條件

  //$where = "id > $left AND <= $right";

  $masterData = ...;//從主庫查詢資料

  $slaveLastInsertId = ...;//插入到從庫

  unset($masterData,$slaveLastInsertId);

 }

 

 echo "New AsyncTask[id=$task_id]".PHP_EOL;

 $serv->finish("$area -> OK");

}

4、任務完成時候呼叫

?

1

2

3

4

public function onFinish($serv, $task_id, $task_data)

{

 echo "AsyncTask[$task_id] Finish: $task_data".PHP_EOL;

}

客戶端推送任務

到此基本完成,剩下來我們來寫客戶端任務推送

?

1

2

3

4

5

6

7

8

9

10

11

public function index()

{

 $client = new \swoole_client(SWOOLE_SOCK_TCP);

 if (!$client->connect('127.0.0.1', 9501, 1)) {

  throw new Exception('連結SWOOLE服務錯誤');

 }

 $areas = json_encode(['liuzhou','yulin','beihai','guilin']);

 //開始遍歷檢查

 $client->send($areas);

 echo "任務傳送成功".PHP_EOL;

}

至此基本完成了,剩下的我們來寫一個shell指令碼定時執行:/home/wwwroot/sync_db/crontab/send.sh

?

1

2

3

4

5

6

#!/bin/bash

PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/bin

export PATH

 

# 定時推送非同步的資料同步任務

/usr/bin/php /home/wwwroot/sync_db/server.php home/index/index

使用crontab定時任務,我們把指令碼加入定時任務

?

1

2

3

4

#設定每天12:30執行資料同步任務

30 12 * * * root /home/wwwroot/sync_db/crontab/send.sh

#設定每天19:00執行資料同步任務

0 19 * * * root /home/wwwroot/sync_db/crontab/send.sh

Tips: 最好推薦在裡面加入寫日誌操作,這樣好知道是任務推送、執行是否成功。

至此基本完成,程式有待優化~~~,各位看客有更好的方法歡迎提出。

參考文獻:

https://www.jb51.net/article/110702.htm