1. 程式人生 > 其它 >Filnk實時數倉(資料採集)

Filnk實時數倉(資料採集)

第1章電商實時數倉介紹

1.1普通實時計算與實時數倉比較

  普通的實時計算優先考慮時效性,所以從資料來源採集經過實時計算直接得到結果。如此做時效性更好,但是弊端是由於計算過程中的中間結果沒有沉澱下來,所以當面對大量實時需求的時候,計算的複用性較差,開發成本隨著需求增加直線上升。

  實時數倉基於一定的資料倉庫理念,對資料處理流程進行規劃、分層,目的是提高資料和計算的複用性

1.2實時電商數倉專案分層

  1)ODS層

    原始資料:日誌資料和業務資料

  2)DWD層

    依據資料物件為單位進行分流,比如訂單、頁面訪問等等

  3)DIM(Hbase+phoenix)

    維度資料

  4)DWM層

    對於部分資料物件進行進一步加工,比如獨立訪問、跳出行為,也可以和維度進行關聯,形成寬表,仍然是明細資料

  5)DWS

    根據某個主題將多個事實資料輕度聚合,形成主題寬表。資料儲存到clickhouse

  6)ADS

    把Clickhouse中的資料根據視覺化需要進行篩選聚合

第2章實時需求概述

2.1離線計算與實時計算的比較

  1)離線需求

    就是在計算開始前已知所有輸入資料,輸入資料不會產生變化,一般計算量級較大計算時間也較長。例如今天早上一點,把昨天累積的日誌,計算出所需結果。最經典的就是Hadoop的MapReduce方式;

    一般是根據前一日的資料生成報表,雖然統計指標、報表繁多,但是對時效性不敏感。從技術操作的角度,這部分屬於批處理的操作。即根據確定範圍的資料一次性計算

  2)實時需求

    輸入資料是可以以序列化的方式一個個輸入並進行處理的,也就是說在開始的時候並不需要知道所有的輸入資料。與離線計算相比,執行時間短計算量級相對較小。強調計算過程的時間要短,即所查當下給出結果。

    主要側重於對當日資料的實時監控,通常業務邏輯相對離線需求簡單一下,統計指標也少一些,但是更注重資料的時效性,以及使用者的互動性。從技術操作的角度,這部分屬於流處理的操作。根據資料來源源不斷地到達進行實時的運算。

2.2通常實時指標

2.2.1日常統計報表或分析圖中需要包含當日部分

  對於日常企業、網站的運營管理如果僅僅依靠離線計算,資料的時效性往往無法滿足。通過實時計算獲得當日、分鐘級、秒級甚至亞秒的資料更加便於企業對業務進行快速反應與調整。所以實時計算結果往往要與離線資料進行合併或者對比展示在

BI或者統計平臺中。

2.2.2實時資料大屏監控

  資料大屏,相對於BI工具或者資料分析平臺是更加直觀的資料視覺化方式。尤其是一些大促活動,已經成為必備的一種營銷手段。另外還有一些特殊行業,比如交通、電信的行業,那麼大屏監控幾乎是必備的監控手段。

2.2.3資料預警或提示

  經過大資料實時計算得到的一些風控預警、營銷資訊提示,能夠快速讓風控或營銷部分得到資訊,以便採取各種應對。比如,使用者在電商、金融平臺中正在進行一些非法或欺詐類操作,那麼大資料實時計算可以快速的將情況篩選出來發送風控部門進行處理,甚至自動遮蔽。或者檢測到使用者的行為對於某些商品具有較強的購買意願,那麼可以把這些“商機”推送給客服部門,讓客服進行主動的跟進。

2.2.4實時推薦系統

  實時推薦就是根據使用者的自身屬性結合當前的訪問行為,經過實時的推薦演算法計算,從而將使用者可能喜歡的商品、新聞、視訊等推送給使用者。這種系統一般是由一個使用者畫像批處理加一個使用者行為分析的流處理組合而成。

第3章計算架構

3.1離線架構

3.2實時架構

第4章日誌資料採集

  整個模擬資料的生產過程與離線數倉中模擬資料的生產過程基本一致,個別地方需要修改,這裡提供了一個模擬生成資料的jar包,可以將日誌傳送給某一個指定的埠,需要大資料程式設計師瞭解如何從指定埠接收資料並資料進行處理的流程。

4.1模擬資料需要用到的檔案列表

cd /opt/software/mock/mock_log

4.2修改application.yml檔案

  根據自己實際情況修改配置檔案application.yml

  說明:

    1)mock.data是模擬的日誌資料的日期

    2)mock.type 如果模擬實時資料,則該值必須設定為http

    3)mock.url是日誌伺服器的地址,表示把模擬出來的資料傳送到這個地址.寫這個地址的時候一定要明白你接收日誌的伺服器的地址是哪裡!

4.3生產模擬資料

java -jar gmall2020-mock-log-2020-12-18.jar

  注意:必須等到接收日誌的伺服器部署完成之後這裡才可以正常工作

第5章建立父工程

  在整個實時數倉專案中, 會有比較多的模組需要管理, 我們統一建立一個父工程來管理不同的模組

  1)新建父工程命名FlinkParent

  2)新建SpringBoot子工程

第6章搭建單機版資料採集伺服器

6.1瞭解springboot

  Spring Boot 是由 Pivotal 團隊提供的全新框架,其設計目的是用來簡化新 Spring 應用的初始搭建以及開發過程。該框架使用了特定的方式來進行配置,從而使開發人員不再需要定義樣板化的配置。

  使用Spring boot 好處:

    1)內嵌 Tomcat, 不再需要外部的 Tomcat

    2)不再需要那些千篇一律,繁瑣的 xml 檔案。

    3)更方便的和各個第三方工具( mysql,redis,elasticsearch,dubbo 等等整合),而只要維護一個配置檔案即可。

6.1.1springbootSSM的關係

  springboot 整合了springmvc, spring 等核心功能。也就是說本質上實現功能的還是原有的spring ,springmvc 的包,但是 springboot單獨包裝了一層,這樣使用者就不必直接對 springmvc, spring 等,在 xml 中配置

6.1.2springboot如何配置

  springboot實際上就是把以前需要使用者手工配置的部分,全部作為預設項。除非使用者需要額外更改不然不用配置。這就是所謂的:約定大於配置

  如果需要特別配置的時候,去修改application.properties

6.2idea建立日誌採集伺服器

  步驟1:project中建立springboot子模組(前面已經建立了,記得換爸爸)

    1)父工程的pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.yuange.flink</groupId>
    <artifactId>FlinkParent</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>

    <modules>
        <module>flink-logger</module>
    </modules>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

    2)flink-logger 子工程

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.yuange.flink</groupId>
        <artifactId>FlinkParent</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>flink-logger</artifactId>

    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

  步驟2:建立controller,在controller中定義方法, 用來處理客戶端的http請求,如果不做額外配置, controller需要與主程式GmallLoggerApplication同包, 或它所在包的子包下

package com.yuange.flink.flinklogger.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @作者:袁哥
 * @時間:2021/7/27 19:54
 */
@RestController
public class LoggerController {

    @GetMapping("/applog")
    public String doLog(@RequestParam("param")String log){
        System.out.println(log);
        return "success";
    }
}

  步驟3:修改日誌伺服器埠為8081,在配置檔案resources/application.properties 內新增如下程式碼:

server.port=8081

  步驟4:啟動日誌伺服器,傳送模擬資料測試

    1)啟動SpringBoot專案

    2)linux啟動模擬資料

      (1)修改application.yml :mock.url: http://window地址:8081/applog

        注意:window地址必須是在linux虛擬機器可以訪問到ip地址.

      (2)啟動程式,生成日誌

cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

    3)觀察idea是否收到資料,如果沒有收到資料,請確認地址是否正確

  步驟5:把日誌落盤

    在本實時專案中,落盤的日誌後面並沒有使用,主要考慮在企業應用中,採集到資料不僅僅應用到實時專案,也可以其他的一些需求也會可能會用到,比如離線需求。另外也可以起到資料備份的作用。落盤工具使用logback,類似於log4j

    1)新增配置檔案resources/logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <!--日誌的根目錄, 根據需要更改成日誌要儲存的目錄-->
    <property name="LOG_HOME" value="D:\Root\workSpace\IntelliJ IDEA 2019.2.4\workSpace\FlinkParent\output"/>
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_HOME}/app.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <!-- 將某一個包下日誌單獨列印日誌  需要更換我們的 Controller 類 -->
    <logger name="com.yuange.flink.flinklogger.controller.LoggerController"
            level="INFO" additivity="true">
        <appender-ref ref="rollingFile"/>
        <appender-ref ref="console"/>
    </logger>

    <root level="error" additivity="true">
        <appender-ref ref="console"/>

    </root>
</configuration>

    2)在controller類上添加註解 @Slf4j

package com.yuange.flink.flinklogger.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @作者:袁哥
 * @時間:2021/7/27 19:54
 */
@RestController
@Slf4j
public class LoggerController {

    @GetMapping("/applog")
    public String doLog(@RequestParam("param")String log){
        saveToDisk(log);
        return "success";
    }

    private void saveToDisk(String strLog) {
        //log.info(logString) 這裡的log物件預設情況如果idea不能識別,
        //寫程式碼的時候會報錯(執行並不會報錯), 需要在idea安裝一個外掛: lombok, 看著就舒服了
        log.info(strLog);
    }
}

    3)確認是否可以正常落盤,並能在console列印日誌.

  步驟6:把日誌直接寫入到kafka中

    1)application.properties中配置Kafka相關配置

#============== kafka ===================
# 指定kafka 代理地址,可以多個
spring.kafka.bootstrap-servers=hadoop162:9092,hadoop163:9092,hadoop164:9092  

# 指定訊息key和訊息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer  
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

    2)具體寫入kafka的程式碼

package com.yuange.flink.flinklogger.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @作者:袁哥
 * @時間:2021/7/27 19:54
 */
@RestController
@Slf4j
public class LoggerController {

    @GetMapping("/applog")
    public String doLog(@RequestParam("param")String log){
        //資料落盤
        saveToDisk(log);
        //寫入kafka
        sendToKafka(log);
        return "success";
    }

    @Autowired
    private KafkaTemplate<String,String> kafka;

    private void sendToKafka(String strLog) {
        kafka.send("ods_log",strLog);
    }

    private void saveToDisk(String strLog) {
        //log.info(logString) 這裡的log物件預設情況如果idea不能識別,
        //寫程式碼的時候會報錯(執行並不會報錯), 需要在idea安裝一個外掛: lombok, 看著就舒服了
        log.info(strLog);
    }
}

  步驟7:測試是否可以寫入到kafka

    1)啟動Zookeeper

zk start

    2)啟動Kafka

kafka.sh start

    3)啟動終端消費者

/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop162:9092 --topic osd_log

    4)啟動SpringBoot日誌伺服器

    5)傳送模擬資料

cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

    6)確認kafka是否收到資料

6.3部署到linux測試資料採集

  在idea中如果日誌伺服器可以測試通過,現在打包,然後部署到linux測試.

  步驟1:logback.xml中的落盤目錄修改為linux的目錄

#建立applog目錄
mkdir /opt/software/mock/mock_log/applog
<property name="LOG_HOME" value="/opt/software/mock/mock_log/applog"/>

  步驟2:打包flink-logger, 併發送到linux

  步驟3:啟動gmll-logger伺服器

cd /opt/module/applog
java -jar flink-logger-1.0-SNAPSHOT.jar

  步驟4:啟動模擬資料

    1)修改/opt/software/mock/mock_log/application.yml 檔案

#http模式下,傳送的地址
mock.url: "http://hadoop162:8081/applog"

    2)啟動程式,生產資料

java -jar gmall2020-mock-log-2020-12-18.jar

    3)檢視Kafka是否有資料

/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop162:9092 --topic ods_log

第7章搭建叢集版資料採集伺服器

7.1部署Nginx

7.1.1Nginx簡介

  Nginx (讀作“engine x”), 是一個高效能的 HTTP 和反向代理伺服器 , 特點是佔有記憶體少,併發能力強,事實上 nginx 的併發能力確實在同類型的網頁伺服器中表現較好,中國大陸使用 nginx 網站使用者有:百度、京東、新浪、網易、騰訊、淘寶等。

7.1.2NginxTomcat的關係

  除了tomcat 以外, apache,nginx,jboss,jetty 等都是 http 伺服器。nginx 和 apache 只支援靜態頁面和 CGI 協議的動態語言,比如 perl 、 php 等, 但是nginx不支援 java 。Java 程式只能通過與 tomcat 配合完成。 nginx 與 tomcat 配合,為 tomcat 叢集提供反向代理服務、負載均衡等服務

7.1.3Nginx功能

  1)反向代理

  2)負載均衡

    (1)輪詢(預設) 每個請求按時間順序逐一分配到不同的後端伺服器,如果後端某臺伺服器宕機,則自動剔除故障機器,使使用者訪問不受影響

    (2)weight 指定輪詢權重,weight值越大,分配到的機率就越高,主要用於後端每臺伺服器效能不均衡的情況。

    (3)備機模式 平時不工作, 只有其他down 機的時候才會開始工作

    (4)公平模式(第三方) 更智慧的一個負載均衡演算法,此演算法可以根據頁面大小和載入時間長短智慧地進行負載均衡,也就是根據後端伺服器的響應時間來分配請求,響應時間短的優先分配。如果想要使用此排程演算法,需要Nginx的upstream_fair模組。

  3)動靜分離

7.1.4安裝部署

  1)使用 yum 安裝依賴包

sudo yum -y install openssl openssl-devel pcre pcre-devel zlib zlib-devel gcc gcc-c++

  2)下載 Nginx,如果已經有下載好的安裝包, 此步驟可以省略

wget http://nginx.org/download/nginx-1.12.2.tar.gz

  3)解壓到當前目錄

tar -zxvf nginx-1.12.2.tar.gz

  4)編譯和安裝,進入解壓後的根目錄

./configure   --prefix=/opt/module/nginx
make && make install

  5)啟動 Nginx

    (1)進入安裝目錄:

cd /opt/module/nginx

    (2)啟動 nginx:

sbin/nginx

    (3)關閉 nginx:

sbin/nginx -s stop

    (4)重新載入:

sbin/nginx -s reload

  注意:

    1)Nginx 預設使用的是 80 埠, 由於非root使用者不能使用 1024 以內的埠, 但是在生產環境下不建議使用root使用者啟動nginx, 主要從安全方面考慮

    2)如果使用普通使用者啟動 Nginx, 需要先執行下面的命令來突破上面的限制:

sudo setcap cap_net_bind_service=+eip /opt/module/nginx/sbin/nginx

  6)檢視是否啟動成功:http://hadoop162

ps -ef | grep nginx

7.1.5配置負載均衡

  模擬資料以後應該發給nginx, 然後nginx再轉發給我們的日誌伺服器,日誌伺服器我們會分別配置在hadoop162,hadoop163,hadoop164三臺裝置上

  1)開啟nginx配置檔案

cd /opt/module/nginx/conf
vim nginx.conf

  2)修改如下配置

http {
       # 啟動省略
    upstream logcluster{
        server hadoop162:8081 weight=1;
        server hadoop163:8081 weight=1;
        server hadoop164:8081 weight=1;
    }
    server {
        listen       80;
        server_name  localhost;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;

        location / {
            #root   html;
            #index  index.html index.htm;
            # 代理的伺服器叢集  命名隨意, 但是不能出現下劃線
            proxy_pass http://logcluster;
            proxy_connect_timeout 10;
        }
        
        # 其他省略    
}

7.2分發日誌伺服器

  日誌伺服器每個節點配置一個. (nginx只需要配置到hadoop162單臺裝置就行了)

xsync /opt/module/applog/

7.3日誌伺服器群起指令碼

  1)新建指令碼

vim /home/atguigu/bin/log-lg.sh

  2)編寫指令碼

#!/bin/bash
log_app=flink-logger-1.0-SNAPSHOT.jar
nginx_home=/opt/module/nginx
log_home=/opt/module/applog

case $1 in
"start")
# 在hadoop162啟動nginx
if [[ -z "`ps -ef | awk '/nginx/ && !/awk/{print $n}'`" ]]; then
    echo "在hadoop162啟動nginx"
    $nginx_home/sbin/nginx
fi
# 分別在162-164啟動日誌伺服器$
for host in hadoop162 hadoop163 hadoop164 ; do
    echo "在 $host 上啟動日誌伺服器"
    ssh $host "nohup java -jar $log_home/$log_app 1>$log_home/logger.log 2>&1 &"
done
   ;;
"stop")
echo "在hadoop162停止nginx"
$nginx_home/sbin/nginx -s stop
for host in hadoop162 hadoop163 hadoop164 ; do
    echo "在 $host 上停止日誌伺服器"
    ssh $host "jps | awk '/$log_app/ {print \$1}' | xargs kill -9"
done
   ;;

*)
echo "你啟動的姿勢不對"
echo " log.sh start 啟動日誌採集"
echo " log.sh stop  停止日誌採集"
   ;;
esac

  3)賦予可執行許可權

chmod +x /home/atguigu/bin/log-lg.sh

  4)分發

xsync /home/atguigu/bin/log-lg.sh

7.4測試負載均衡

  傳送模擬資料,注意把埠改為nginx的埠:80

  1)啟動zookeeper

  2)啟動kafka

  3)啟動日誌伺服器

log-lg.sh start

  4)修改配置檔案,將訪問埠改為80

vim /opt/software/mock/mock_log/application.yml
#http模式下,傳送的地址
mock.url: "http://hadoop162:80/applog"

  5)啟動kafka消費者

/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop163:9092 --topic ods_log

  6)啟動程式,生產資料

cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar

  7)發現Kafka消費到了資料

第8章業務資料採集

  可以實時採集mysql資料的工具有:canal 和 maxwell,debzium

  兩個工具是競品, 各有優缺點

8.1使用canal實時採集mysql資料

8.1.1什麼是canal

  阿里巴巴B2B公司,因為業務的特性,賣家主要集中在國內,買家主要集中在國外,所以衍生出了杭州和美國異地機房的需求,從2010年開始,阿里系公司開始逐步的嘗試基於資料庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務。

  Canal是用java開發的基於資料庫增量日誌解析,提供增量資料訂閱&消費的中介軟體。目前,canal主要支援了MySQL的binlog解析,解析完成後才利用canal client 用來處理獲得的相關資料。(資料庫同步需要阿里的otter中介軟體,基於canal)。

8.1.2canal使用場景

  原始場景:otter中介軟體的一部分,阿里otter中介軟體的一部分,otter是阿里用於進行異地資料庫之間的同步框架,canal是其中一部分。

  常見場景1: 更新快取伺服器

  常用場景2: 製作拉鍊表

    抓取業務資料新增變化表,用於製作拉鍊表,如果表中沒有更新時間,製作拉鍊表就需要使用canal實時監控資料的變化

  常用場景3:用於實時統計

    抓取業務表的新增變化資料,用於製作實時統計,我們實時數倉就是這種應用場景!

8.1.3canal工作原理

  mysql的主從複製原理

    1)MySQL master 將資料變更寫入二進位制日誌( binary log, 其中記錄叫做二進位制日誌事件binary log events,可以通過 show binlog events 進行檢視)

    2)MySQL slave 將 master 的 binary log events 拷貝到它的中繼日誌(relay log)

    3)MySQL slave 重放 relay log 中事件,將資料變更反映它自己的資料

  canal工作原理

    1)canal 模擬 MySQL slave 的互動協議,偽裝自己為 MySQL slave ,向 MySQL master 傳送dump 協議

    2)MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )

    3)canal 解析 binary log 物件(原始為 byte 流)

8.1.4mysqlbinlog

  1)什麼是binlog

    MySQL的二進位制日誌可以說是MySQL最重要的日誌了,它記錄了所有的DDL和DML(除了資料查詢語句)語句,以事件形式記錄,還包含語句所執行的消耗的時間,MySQL的二進位制日誌是事務安全型的。一般來說開啟二進位制日誌大概會有1%的效能損耗。

    二進位制有兩個最重要的使用場景:

      其一:MySQL Replication在Master端開啟binlog,Mster把它的二進位制日誌傳遞給slaves來達到master-slave資料一致的目的。

      其二:自然就是資料恢復了,通過使用mysqlbinlog工具來使恢復資料。

    二進位制日誌包括/兩類檔案:

      A:二進位制日誌索引檔案(檔名字尾為.index)用於記錄所有的二進位制的檔案

      B:二進位制日誌檔案(檔名字尾為.00000*)記錄資料庫所有的DDL和DML(除了資料查詢語句)語句事件。

  2)開啟binlog

    預設情況下,mysql是沒有開啟binlog的,需要手動開啟,開啟步驟:

    (1)找到mysql的配置檔案:my.cnf. 大部分的mysql版本預設在:/etc/my.cnf.如果沒有找到,則可以通過下面的命令查詢:

sudo find / -name my.cnf

    (2)修改my.cnf.my.cnf檔案中增加如下內容:

sudo vim /etc/my.cnf
server-id= 1
#日誌字首
log-bin=mysql-bin
#同步格式
binlog_format=row
#同步的庫
binlog-do-db=flinkdb

  3)配置說明

    (1)server-id:mysql主從複製的時候,主從之間每個例項必須有獨一無二的id

    (2)log-bin:這個表示binlog日誌的字首是mysql-bin,以後生成的日誌檔案就是 mysql-bin.123456 的檔案後面的數字按順序生成。 每次mysql重啟或者到達單個檔案大小的閾值時,新生一個檔案,按順序編號。

    (3)Binlog_format:mysql binlog的格式,有三種值,分別是statement, row, mixed,三者區別如下

    statement:

      語句級,binlog會記錄每次一執行寫操作的語句。相對row模式節省空間,但是可能產生不一致性,比如update tt set create_date=now(),如果用binlog日誌進行恢復,由於執行時間不同可能產生的資料就不同。

      優點: 節省空間

      缺點: 有可能造成資料不一致。

    row:

      行級, binlog會記錄每次操作後每行記錄的變化。

      優點:保持資料的絕對一致性。因為不管sql是什麼,引用了什麼函式,他只記錄執行後的效果。

      缺點:佔用較大空間

    mixed:

      statement的升級版,一定程度上解決了,因為一些情況而造成的statement,模式不一致問題,在某些情況下譬如:當函式中包含 UUID() 時,包含 AUTO_INCREMENT 欄位的表被更新時;執行 INSERT DELAYED 語句時;用 UDF 時;會按照ROW的方式進行處理

      優點:節省空間,同時兼顧了一定的一致性。

      缺點:還有些極個別情況依舊會造成不一致,另外statement和mixed對於需要對binlog的監控的情況都不方便。

      由於canal不是資料庫,是不能執行sql語句的,所以,只能設定為row格式

    (3)binlog-do-db:設定把哪個database的變化寫入到binlog, 如果不配置,則所有database的變化都會寫入到binlog,如果要設定多個數據庫需要,需要寫多次這個引數的配置

binlog-do-db = a
binlog-do-db = b

  4)檢測配置是否成功

    (1)重啟mysql伺服器

sudo systemctl restart mysqld

    (2)啟動msyql客戶端,執行sql語句:

mysql -uroot -paaaaaa
show variables like'%log_bin%';

    (3)也可以去對應的目錄下檢視是否生成log_bin檔案

cd /var/lib/mysql
sudo ls 

8.1.5mysql準備業務資料

CREATE DATABASE `flinkdb` CHARACTER SET utf8 COLLATE utf8_general_ci;
USE flinkdb;

8.1.6下載和解壓安裝canal

  1)在mysql建立canal使用者,canal需要監控mysql資料,在企業中一般拿不到root使用者,需新建立只讀取許可權的使用者

Mysql> set global validate_password_policy=0;

mysql> set global validate_password_length=4;

mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY'canal';

mysql> FLUSH PRIVILEGES;

  2)下載canal

wget -P /opt/software/canal https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

  3)解壓安裝canal

mkdir /opt/module/canal
tar -zxvf /opt/software/canal/canal.deployer-1.1.4.tar.gz -C /opt/module/canal 

8.1.7配置canal

  canal有兩種配置:server級別和instance級別

  1)server級別的配置是對整個canal進行配置,是一些全域性性的配置.一個sever中可以配置多個instance

  2)instance級別的配置,是最小的訂閱mysql的佇列,比如example例項

  3)canal server配置

vim /opt/module/canal/conf/canal.properties
#重點關注以下配置:
canal.ip = hadoop162  
# canal伺服器繫結ip地址
canal.port = 11111  
 # canal埠號, 將來客戶端通過這個埠號可以讀到資料
canal.zkServers = hadoop162:2181,hadoop163:2181,hadoop164:2181
# zk地址, 用來管理canal的高可用
# tcp, kafka, RocketMQ
# tcp:客戶端通過tcp方式從Canal服務端拉取增量資料
# kafka:Canal服務端將增量資料同步到kafka中,客戶端從kafka消費資料,此時客戶端感知不到Canal的存在,只需要跟kafka互動。
# RocketMQ:同kafka,增量資料同步到RocketMQ中。
canal.serverMode = kafka
canal.destinations = yuange  
# 配置例項, 如果有多個例項, 用逗號隔開. 我們建立一個yaunge例項
canal.mq.servers = hadoop162:9092,hadoop163:9092,hadoop164:9092

  4)canal instance配置

    (1)把目錄名example改為yuange(其實就是和剛才的配置儲存一致,用來表示yuange例項)

mv /opt/module/canal/conf/example /opt/module/canal/conf/yuange

    (2)開啟例項配置檔案:

vim /opt/module/canal/conf/yuange/instance.properties

    (3)在其中配置要監控的mysql和監控到的資料傳送到kafka

# canal例項(slave)的id, 不能和mysql的id重複.  可以自動生成, 無需手工配置
# canal.instance.mysql.slaveId=0
# 要監控的mysql地址
canal.instance.master.address = hadoop162:3306
# 連線mysql的使用者名稱
canal.instance.dbUsername=canal
# 連線mysql的密碼
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# 該例項監控的 庫.表  預設所有庫下所有表     
canal.instance.filter.regex=flinkdb\\..*    # 監控gmall資料庫下所有包
# kafka topic配置
canal.mq.topic=ods_db
# 註釋掉此配置, 此配置是隻傳送到一個固定分割槽中
# canal.mq.partition=0
# 雜湊模式的分割槽數, 要和kafka的topic的分割槽數保持一致
canal.mq.partitionsNum=2
# 如何計算每條資料進入的分割槽
canal.mq.partitionHash= .*\\..*:$pk$    # 指定所有的表用主鍵hash得到分割槽索引

8.1.8canaHA配置和啟動canal

  canal只是支援HA, 不支援高負載,沒有負載均衡的概念.

  1)分發canal到hadoop163hadoop164,注意:修改canal.ip = hadoop162, hadoop163hadoop164

xsync /opt/module/canal/ 

  2)hadoop162,hadoop163,hadoop164分別啟動canal,注意:需要先啟動zookeeper和kafka

/opt/module/canal/bin/startup.sh

  3)製作canal統一啟停指令碼

vim /home/atguigu/bin/canal.sh
#!/bin/bash
canal_home=/opt/module/canal
case $1 in
start)
        for host in hadoop162 hadoop163 hadoop164 ; do
            echo "========== $host 啟動canal ========="
        ssh $host "source /etc/profile; ${canal_home}/bin/startup.sh"
        done
       ;;
stop)
            for host in hadoop162 hadoop163 hadoop164 ; do
                echo "========== $host停止 canal ========="
                ssh $host "source /etc/profile; ${canal_home}/bin/stop.sh"
            done
           ;;
*)
        echo "你啟動的姿勢不對"
        echo "  start   啟動canal叢集"
        echo "  stop    停止canal叢集"
;;
esac

8.1.9測試kafka是否收到實時資料

  1)起一個終端消費者, 消費ods_db

/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop162:9092 --topic ods_db

  2)修改配置檔案

vim /opt/software/mock/mock_db/application.properties
#配置要連線的mysql資料庫
spring.datasource.url=jdbc:mysql://hadoop162:3306/flinkdb?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=aaaaaa

#業務日期
mock.date=2021-07-28

  3)生產資料至Mysql

cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar

  4)觀察消費者是否消費到資料, 如果沒有消費到資料,則需要重新檢測canal配置

8.1.10接收到的資料格式分析

  傳送到kafka的資料格式

{
    "data":[
        {
            "id":"350",
            "consignee":"蔣雄",
            "consignee_tel":"13325313235",
            "final_total_amount":"389.0",
            "order_status":"1005",
            "user_id":"62",
            "delivery_address":"第17大街第7號樓9單元324門",
            "order_comment":"描述353475",
            "out_trade_no":"822287931878949",
            "trade_body":"十月稻田 沁州黃小米 (黃小米 五穀雜糧 山西特產 真空裝 大米伴侶 粥米搭檔) 2.5kg等2件商品",
            "create_time":"2020-08-26 15:02:40",
            "operate_time":"2020-08-26 15:02:41",
            "expire_time":"2020-08-26 15:17:40",
            "tracking_no":null,
            "parent_order_id":null,
            "img_url":"http://img.gmall.com/933223.jpg",
            "province_id":"3",
            "benefit_reduce_amount":"108.0",
            "original_total_amount":"488.0",
            "feight_fee":"9.0"
        }
    ],
    "database":"gmall",
    "es":1598425361000,
    "id":73,
    "isDdl":false,
    "mysqlType":{
        "id":"bigint(20)",
        "consignee":"varchar(100)",
        "consignee_tel":"varchar(20)",
        "final_total_amount":"decimal(16,2)",
        "order_status":"varchar(20)",
        "user_id":"bigint(20)",
        "delivery_address":"varchar(1000)",
        "order_comment":"varchar(200)",
        "out_trade_no":"varchar(50)",
        "trade_body":"varchar(200)",
        "create_time":"datetime",
        "operate_time":"datetime",
        "expire_time":"datetime",
        "tracking_no":"varchar(100)",
        "parent_order_id":"bigint(20)",
        "img_url":"varchar(200)",
        "province_id":"int(20)",
        "benefit_reduce_amount":"decimal(16,2)",
        "original_total_amount":"decimal(16,2)",
        "feight_fee":"decimal(16,2)"
    },
    "old":[
        {
            "order_status":"1002"
        }
    ],
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":-5,
        "consignee":12,
        "consignee_tel":12,
        "final_total_amount":3,
        "order_status":12,
        "user_id":-5,
        "delivery_address":12,
        "order_comment":12,
        "out_trade_no":12,
        "trade_body":12,
        "create_time":93,
        "operate_time":93,
        "expire_time":93,
        "tracking_no":12,
        "parent_order_id":-5,
        "img_url":12,
        "province_id":4,
        "benefit_reduce_amount":3,
        "original_total_amount":3,
        "feight_fee":3
    },
    "table":"order_info",
    "ts":1598425365252,
    "type":"UPDATE"
}

8.1.11驗證canal高可用是否正常工作

  1)當前啟動canal的時候,只有一臺裝置會啟動 yuange例項

#啟動ZK客戶端
zkCli.sh
get /otter/canal/destinations/yuange/running

  2)停止hadoop163canal, 然後觀察

ssh hadoop163 /opt/module/canal/bin/stop.sh

8.2使用maxwell實時採集mysql資料

8.2.1什麼是maxwell

  maxwell 是由美國zendesk開源,用java編寫的Mysql實時抓取軟體 其抓取的原理也是基於binlog

8.2.2Maxwell與canal的對比

  1)Maxwell 沒有 Canal那種server+client模式,只有一個server把資料傳送到訊息佇列或redis。

  2)Maxwell 有一個亮點功能,就是Canal只能抓取最新資料,對已存在的歷史資料沒有辦法處理。而Maxwell有一個bootstrap功能,可以直接引匯出完整的歷史資料用於初始化,非常好用。

  3)Maxwell不能直接支援HA,但是它支援斷點還原,即錯誤解決後重啟繼續上次點兒讀取資料。

  4)Maxwell只支援json格式,而Canal如果用Server+client模式的話,可以自定義格式。

  5)Maxwell比Canal更加輕量級。

8.2.3使用maxwell前的準備工作

  1)在mysql中建立一個數據庫,用於儲存maxwell的元資料(可以省略,maxwell會自動建立)

CREATE DATABASE `maxwell` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';

  2)建立可以操作資料庫maxwell的使用者:maxwell

USE maxwell;
SET GLOBAL validate_password_policy=0;
SET GLOBAL validate_password_length=4;
GRANT ALL ON maxwell.*  TO  'maxwell'@'%'  IDENTIFIED BY 'aaaaaa';

  3)給使用者maxwell分配操作其他資料庫的許可權

GRANT SELECT,REPLICATION SLAVE, REPLICATION CLIENT  ON  *.*  TO 'maxwell'@'%';
FLUSH PRIVILEGES;

8.2.4安裝和配置maxwell

  1)下載maxwell

mkdir /opt/software/maxwell
wget -P /opt/software/maxwell https://github.com/zendesk/maxwell/releases/download/v1.27.1/maxwell-1.27.1.tar.gz

  2)解壓

tar -zxvf /opt/software/maxwell/maxwell-1.27.1.tar.gz -C /opt/module/

  3)配置maxwell

cd /opt/module/maxwell-1.27.1
mv config.properties.example config.properties
vim config.properties
#新增如下配置:
log_level=info
producer=kafka
kafka.bootstrap.servers=hadoop162:9092,hadoop163:9092,hadoop164:9092
kafka_topic=ods_db
# 按照主鍵的hash進行分割槽, 如果不設定是按照資料庫分割槽
producer_partition_by=primary_key
# mysql login info
host=hadoop162
user=maxwell
password=aaaaaa
# 排除掉不想監控的資料庫
filter=exclude:gmall.*
# 初始化維度表資料的時候使用
client_id=maxwell_1

8.2.5啟動maxwell

  1)啟動maxwell(先停止canal叢集)

canal.sh stop
/opt/module/maxwell-1.27.1/bin/maxwell --config config.properties --daemon

  2)確認kafka是否收到資料,起一個終端消費者:

/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop162:9092 --topic ods_db

  3)在mysql中生成資料至mysql,確認kafka是否收到資料

java -jar gmall2020-mock-db-2020-12-23.jar

  4)檢視消費情況

8.2.6Maxwell傳送到kafka的資料格式

{
    "database":"flinkdb",
    "table":"cart_info",
    "type":"update",
    "ts":1627449145,
    "xid":14229,
    "xoffset":3823,
    "data":{
        "id":141065,
        "user_id":"1539",
        "sku_id":23,
        "cart_price":40,
        "sku_num":3,
        "img_url":"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-0liuAJTluAAVP1d_tXYs725.jpg",
        "sku_name":"十月稻田 遼河長粒香 東北大米 5kg",
        "is_checked":null,
        "create_time":"2021-07-28 13:12:23",
        "operate_time":null,
        "is_ordered":1,
        "order_time":"2021-07-28 13:12:25",
        "source_type":"2401",
        "source_id":null
    },
    "old":{
        "is_ordered":0,
        "order_time":null
    }
}

8.3CanalMaxwell傳送到kafka的資料對比

  1)為了方便做對比,gmall資料庫下建立一個表:test_user_info

create table test_user_info(id int primary key, name varchar(255), tel char(11));

  2)插入資料

insert into test_user_info values(1,'lisi','13838389438');

Canal

Maxwell

{
"data":[
{
"id":"1",
"name":"lisi",
"tel":"13838389438"
}
],
"database":"gmall",
"es":1598435650000,
"id":63,
"isDdl":false,
"mysqlType":{
"id":"int",
"name":"varchar(255)",
"tel":"char(11)"
},
"old":null,
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"name":12,
"tel":1
},
"table":"test_user_info",
"ts":1598435650656,
"type":"INSERT"
}

{
"database":"gmall",
"table":"test_user_info",
"type":"insert",
"ts":1598435650,
"xid":33456,
"commit":true,
"data":{
"id":1,
"name":"lisi",
"tel":"13838389438"
}
}

  3)刪除資料

delete from test_user_info where id=1;

Canal

Maxwell

{
"data":[
{
"id":"1",
"name":"lisi",
"tel":"13838389438"
}
],
"database":"gmall",
"es":1598435893000,
"id":64,
"isDdl":false,
"mysqlType":{
"id":"int",
"name":"varchar(255)",
"tel":"char(11)"
},
"old":null,
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"name":12,
"tel":1
},
"table":"test_user_info",
"ts":1598435893190,
"type":"DELETE"
}

{
"database":"gmall",
"table":"test_user_info",
"type":"delete",
"ts":1598435893,
"xid":33994,
"commit":true,
"data":{
"id":1,
"name":"lisi",
"tel":"13838389438"
}
}

  4)更新資料

update test_user_info set name='zs' where id=1;

Canal

Maxwell

{
"data":[
{
"id":"1",
"name":"zs",
"tel":"13838389438"
}
],
"database":"gmall",
"es":1598436457000,
"id":66,
"isDdl":false,
"mysqlType":{
"id":"int",
"name":"varchar(255)",
"tel":"char(11)"
},
"old":[
{
"name":"lisi"
}
],
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"name":12,
"tel":1
},
"table":"test_user_info",
"ts":1598436457576,
"type":"UPDATE"
}

{
"database":"gmall",
"table":"test_user_info",
"type":"update",
"ts":1598436457,
"xid":35242,
"commit":true,
"data":{
"id":1,
"name":"zs",
"tel":"13838389438"
},
"old":{
"name":"lisi"
}
}

  5)總結資料特點:

    (1)日誌結構:canal產生的資料會放在陣列結構中,maxwell 以影響的資料為單位產生日誌,即每影響一條資料就會產生一條日誌。如果想知道這些日誌是否是通過某一條sql產生的可以通過xid進行判斷,相同的xid的日誌來自同一sql

    (2)數字型別:當原始資料是數字型別時,maxwell會尊重原始資料的型別不增加雙引,不變為字串。Canal一律轉換為字串。

    (3)帶原始資料欄位定義:canal資料中會帶入表結構。Maxwell更簡潔。

8.4Maxwell的初始化資料功能

  對Mysql中的已有的舊資料,如何匯入到Kafka中?Canal無能為力,Maxwell提供了一個初始化功能,可以滿足我們的需求

/opt/module/maxwell-1.27.1/bin/maxwell-bootstrap --user maxwell --password aaaaaa --host hadoop162 --database flinkdb --table user_info --client_id maxwell_1

  maxwell-bootstrap不具備將資料直接匯入kafka或者hbase的能力,通過--client_id指定將資料交給哪個maxwell程序處理,在maxwell的conf.properties中配置