1. 程式人生 > >canal 基於Mysql資料庫增量日誌解析

canal 基於Mysql資料庫增量日誌解析

canal 基於Mysql資料庫增量日誌解析

 1.前言

 最近太多事情 工作的事情,以及終身大事等等 耽誤更新,由於最近做專案需要同步監聽 未來電視 mysql的變更瞭解到公司會用canal做增量監聽,就嘗試使用了一下 這裡做個demo 簡單的記錄一下。

 2.canal簡介

 canal:主要用途是基於 MySQL 資料庫增量日誌解析,提供增量資料訂閱和消費的中介軟體
 當前的 canal 支援源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

 3.MySQL 注備複製原理

  3.1 mysql主備複製工作原理

  1.MySQL master 將資料變更寫入二進位制日誌( binary log, 其中記錄叫做二進位制日誌事件binary log events,可以通過 show binlog events 進行檢視)
  2.MySQL slave 將 master 的 binary log events 拷貝到它的中繼日誌(relay log)
  3.MySQL slave 重放 relay log 中事件,將資料變更反映它自己的資料

  3.2 canal 工作原理

  1.canal 模擬 MySQL slave 的互動協議,偽裝自己為 MySQL slave ,向 MySQL master 傳送dump 協議
  2.MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
  3.canal 解析 binary log 物件(原始為 byte 流)

 4.準備

 對於自建MySQL ,需要先開啟 Binlog寫入功能,並且配置binlog-format 為Row模式 在my.cnf中配置

 授權 canal 連結 MySQL 賬號具有作為 MySQL slave 的許可權, 如果已有賬戶可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

 5.canal 下載安裝配置

  5.1 canal下載

  canal 下載地址 (下載速度可能很慢)

  下載 canal.deployer-xxx.tar.gz 如 canal.deployer-1.1.4.tar.gz

  解壓後 可以看到如下結構

  5.2 canal 初始配置

  配置修改:

vim conf/example/instance.properties

  如下:

#################################################
## mysql serverId
canal.instance.mysql.slaveId = 2020

# position info 修改自己的資料庫(canal要監聽的資料庫 地址 )
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password 修改成自己 資料庫資訊的賬號 (單獨開一個 準備階段建立的賬號)
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8

# table regex  表的監聽規則 
# canal.instance.filter.regex = blogs\.blog_info  
canal.instance.filter.regex = .\*\\\\..\*
# table black regex
canal.instance.filter.black.regex = 

  啟動canal

sh bin/startup.sh

  檢視server日誌
  看到 the canal server is running now 表示啟動成功

vi logs/canal/canal.log


2020-01-08 15:25:33.361 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ##    start the canal server.
2020-01-08 15:25:33.468 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111]
2020-01-08 15:25:34.061 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

  檢視instance的日誌

vi logs/example/example.log

2020-01-08 15:25:33.864 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-blogs 
2020-01-08 15:25:33.998 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
2020-01-08 15:25:33.999 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status

  5.3 擴充套件 destination 配置

vi conf/canal.properties

  在canal.destinations 處可以配置當前server上部署的instance 列表 預設為 example ,我這裡改成了 blogs最好對應資料庫名稱。一個instance 對應一個 資料庫

 6.建立Java 客戶端 監聽canal 消費資料

  6.1 建立maven專案

  6.2 新增canal client POM 依賴

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.0</version>
</dependency>

  6.3 建立 canal 的客戶端監聽

  CanalMessageListener.java

  該類實現InitializingBean 主要是在初始化的時候 執行 init 方法,在init()方法中 建立 CanalConnector物件,連線需要監聽的canal,主要提供 canal的 host ,port ,destination ,以及username 和 password

  parse 方法 主要用於將監聽的物件 通過反射等轉換成對應的實體類

/**
* @author johnny
**/
@Component
@Slf4j
@ConditionalOnProperty(name = "application.canal.accessor", havingValue = "canal")
public class CanalMessageListener implements InitializingBean, ParseCanal {


private CanalConnector connector;

@Autowired
private CanalConfig canalConfig;

@Autowired
private IParseDispatcher configParseDispatcher;

private void init() {
    //建立canal 監聽 傳入host port destination等引數
    connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getHost(), canalConfig.getPort()),
            canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
    connector.connect();
    //  .*\..*
    connector.subscribe(".*\\..*");
    connector.rollback();

    new Thread(() -> {
    
        while (true) {
            Message message = connector.getWithoutAck(canalConfig.getBatchSize());
            long batchId = message.getId();
            long size = message.getEntries().size();
        //batchId == -1 表示沒有資料變更
            if (batchId == -1 || size == 0) {
                System.out.println("empty data ");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
            //解析資料變更
                resoleveEntry(message.getEntries());
            }
        }

    }).start();

}
//解析資料變更
private void resoleveEntry(List<CanalEntry.Entry> entries) {
    CanalEntry.RowChange rowChange = null;
    for (CanalEntry.Entry row : entries) {
     //判斷是否是 事物開始 和 事物結束 
        if (row.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || row.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
            continue;
        }
        try {
            rowChange = CanalEntry.RowChange.parseFrom(row.getStoreValue());
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }

        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
        String tableName = row.getHeader().getTableName();
        CanalEntry.EventType eventType = row.getHeader().getEventType();

        for (CanalEntry.RowData rowData : rowDataList) {
            if (eventType == CanalEntry.EventType.UPDATE) {
                List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();
                Object object = parse(columns, tableName);
                log.info("收到的 object:{}", JsonUtils.marshalToString(object));
                //根據收到的物件 處理後續業務邏輯
            }
        }

    }
}

@Override
public void afterPropertiesSet() throws Exception {
    init();
}

//解析 List<CanalEntry.Column>物件到對應的 實體類
@Override
public Object parse(List<CanalEntry.Column> canalDatas, String tableName) {
//根據配置好的map 從中根據key 表名 獲取對應的對映後的 實體類class
    String className = configParseDispatcher.dispatch(tableName);
    Object entity = null;
    Class c = null;
    try {
        c = Class.forName(className);
        entity = c.newInstance();
    } catch (ClassNotFoundException e) {
        log.error("【未找到對應 {} 的 實體類 】", className);
    } catch (Exception e) {
    }

    for (CanalEntry.Column canalDataColumn : canalDatas) {
        String columnName = canalDataColumn.getName();
        Field[] fields = c.getDeclaredFields();

        for (Field field : fields) {
            Object fieldValue = null;
            field.setAccessible(true);
            String fiedName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field.getName());
            log.info("【filedName: {}】", fiedName);
            if (fiedName.equals(columnName)) {
                try {
                    if (Long.class.equals(field.getType())) {
                        fieldValue = NumberUtils.toLong(canalDataColumn.getValue());
                    }else if(Integer.class.equals(field.getType())){
                        fieldValue = NumberUtils.toInt(canalDataColumn.getValue());
                    }else if(Double.class.equals(field.getType())){
                        fieldValue = NumberUtils.toDouble(canalDataColumn.getValue());
                    }else if(Date.class.equals(field.getType())){
                        try {
                            fieldValue = DateUtils.parseDate(canalDataColumn.getValue(), new String[]{"yyyy-MM-dd HH:mm:ss"});
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                    }else{
                        fieldValue = canalDataColumn.getValue();
                    }
                    field.set(entity, fieldValue);
                    break;
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            }
        }

    }
    return entity;
}
}

  application.yml
  配置canal 地址,以及表名和實體的對映規則

server:
port: 8881



application:
  canal:
    accessor: canal
    host: 127.0.0.1
    port: 11111
    username:
    password:
    destination: blogs
    batchSize: 30

    parse:   規則,根據表名獲取對應要對映的 實體class
      rule:
        mapping:
          blog_info: com.johnny.canal.canal_test.entity.BlogInfo

  IParseDispatcher.java
  介面:用來根據表名key獲取對應的 要對映的實體,這裡寫成介面是因為可以提供多種獲取方式,比如我這裡通過yml 配置去獲取

/**
* @author johnny
* @create 2020-01-17 上午11:09
**/
public interface IParseDispatcher {

 String dispatch(String key);

}

  ConfigParseDispatcher.java
  實現上面的介面,提供一種從 application.yml 獲取初始源配置 根據 application.canal.parse.rule進行配置

/**
* @author johnny
* @create 2020-01-17 上午11:07
**/
@Data
@Configuration
@ConfigurationProperties(prefix = "application.canal.parse.rule")
public class ConfigParseDispatcher implements IParseDispatcher {

private Map<String,String> mapping=new HashMap<>();

@Override
public String dispatch(String key) {
    return mapping.get(key);
}

}

  7.演示

  啟動專案 此時控制檯列印 empty data ,無資料變更

  通過執行 在 canal監聽的mysql 上執行 更新語句

update blog_info set blog_title = 'SpringBoot配置相關for canal test '  where id = 40

  debug 程式,當執行上面的update語句後 可以看到立即收到

  通過parse方法解析為對應的 實體物件,後續做自己的業務邏輯 即可

 8.總結

 本篇主要介紹了canal是什麼,如何下載安裝和配置 ,以及提供了自己寫的一個簡單demo 。後續有機會深入瞭解一下canal的其他功能,比如 如何同步到Kafka/RocketMQ等等。。

個人部落格地址: https://www.askajohnny.com 歡迎訪問!
本文由部落格一文多發平臺 OpenWrite 釋出!