1. 程式人生 > >《從0到1學習Flink》—— Flink 讀取 Kafka 資料寫入到 RabbitMQ

《從0到1學習Flink》—— Flink 讀取 Kafka 資料寫入到 RabbitMQ

開發十年,就只剩下這套架構體系了! >>>   

前言

之前有文章 《從0到1學習Flink》—— Flink 寫入資料到 Kafka 寫過 Flink 將處理後的資料後發到 Kafka 訊息佇列中去,當然我們常用的訊息佇列可不止這一種,還有 RocketMQ、RabbitMQ 等,剛好 Flink 也支援將資料寫入到 RabbitMQ,所以今天我們就來寫篇文章講講如何將 Flink 處理後的資料寫入到 RabbitMQ。

前提準備

安裝 RabbitMQ

這裡我直接用 docker 命令安裝吧,先把 docker 在 mac 上啟動起來。

在命令列中執行下面的命令:

1
docker run -d  -p 15672:15672  -p  5672:5672  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management

對這個命令不懂的童鞋可以看看我以前的文章:http://www.54tianzhisheng.cn/2018/01/26/SpringBoot-RabbitMQ/

登入使用者名稱和密碼分別是:admin / admin ,登入進去是這個樣子就代表安裝成功了:

依賴

pom.xml 中新增 Flink connector rabbitmq 的依賴如下:

1
2
3
4
5
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-rabbitmq_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

生產者

這裡我們依舊自己寫一個工具類一直的往 RabbitMQ 中的某個 queue 中發資料,然後由 Flink 去消費這些資料。

注意按照我的步驟來一步步操作,否則可能會出現一些錯誤!

RabbitMQProducerUtil.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQProducerUtil {
    public final static String QUEUE_NAME = "zhisheng";

    public static void main(String[] args) throws Exception {
        //建立連線工廠
        ConnectionFactory factory = new ConnectionFactory();

        //設定RabbitMQ相關資訊
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);

        //建立一個新的連線
        Connection connection = factory.newConnection();

        //建立一個通道
        Channel channel = connection.createChannel();

        // 宣告一個佇列
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //傳送訊息到佇列中
        String message = "Hello zhisheng";

        //我們這裡演示傳送一千條資料
        for (int i = 0; i < 1000; i++) {
            channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes("UTF-8"));
            System.out.println("Producer Send +'" + message + i);
        }

        //關閉通道和連線
        channel.close();
        connection.close();
    }
}

Flink 主程式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import com.zhisheng.common.utils.ExecutionEnvUtil;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

/**
 * 從 rabbitmq 讀取資料
 */
public class Main {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;

        //這些配置建議可以放在配置檔案中,然後通過 parameterTool 來獲取對應的引數值
        final RMQConnectionConfig connectionConfig = new RMQConnectionConfig
                .Builder().setHost("localhost").setVirtualHost("/")
                .setPort(5672).setUserName("admin").setPassword("admin")
                .build();

        DataStreamSource<String> zhisheng = env.addSource(new RMQSource<>(connectionConfig,
                "zhisheng",
                true,
                new SimpleStringSchema()))
                .setParallelism(1);
        zhisheng.print();

        //如果想保證 exactly-once 或 at-least-once 需要把 checkpoint 開啟
//        env.enableCheckpointing(10000);
        env.execute("flink learning connectors rabbitmq");
    }
}

執行 RabbitMQProducerUtil 類,再執行 Main 類!

注意⚠️:

1、RMQConnectionConfig 中設定的使用者名稱和密碼要設定成 admin/admin,如果你換成是 guest/guest,其實是在 RabbitMQ 裡面是沒有這個使用者名稱和密碼的,所以就會報這個錯誤:

1
nested exception is com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.

不出意外的話應該你執行 RabbitMQProducerUtil 類後,立馬兩個執行的結果都會出來,速度還是很快的。

2、如果你在 RabbitMQProducerUtil 工具類中把註釋的那行程式碼開啟的話:

1
2
// 宣告一個佇列
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

就會出現這種錯誤:

1
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'zhisheng' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)

這是因為你開啟那個註釋的話,一旦你運行了該類就會建立一個叫做

的 Queue,當你再執行 Main 類中的時候,它又會建立這樣一個叫 ```zhisheng``` 的 Queue,然後因為已經有同名的 Queue 了,所以就有了衝突,解決方法就是把那行程式碼註釋就好了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
3、該 connector(聯結器)中提供了 RMQSource 類去消費 RabbitMQ queue 中的訊息和確認 checkpoints 上的訊息,它提供了三種不一樣的保證:

+ Exactly-once(只消費一次): 前提條件有,1 是要開啟 checkpoint,因為只有在 checkpoint 完成後,才會返回確認訊息給 RabbitMQ(這時,訊息才會在 RabbitMQ 佇列中刪除);2 是要使用 Correlation ID,在將訊息發往 RabbitMQ 時,必須在訊息屬性中設定 Correlation ID。資料來源根據 Correlation ID 把從 checkpoint 恢復的資料進行去重;3 是資料來源不能並行,這種限制主要是由於 RabbitMQ 將訊息從單個佇列分派給多個消費者。
+ At-least-once(至少消費一次): 開啟了 checkpoint,但未使用相 Correlation ID 或 資料來源是並行的時候,那麼就只能保證資料至少消費一次了
+ No guarantees(無法保證): Flink 接收到資料就返回確認訊息給 RabbitMQ

### Sink 資料到 RabbitMQ

RabbitMQ 除了可以作為資料來源,也可以當作下游,Flink 消費資料做了一些處理之後也能把資料發往 RabbitMQ,下面演示下 Flink 消費 Kafka 資料後寫入到 RabbitMQ。

```java
public class Main1 {
    public static void main(String[] args) throws Exception {
        final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
        StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
        DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env);

        final RMQConnectionConfig connectionConfig = new RMQConnectionConfig
                .Builder().setHost("localhost").setVirtualHost("/")
                .setPort(5672).setUserName("admin").setPassword("admin")
                .build();

        //注意,換一個新的 queue,否則也會報錯
        data.addSink(new RMQSink<>(connectionConfig, "zhisheng001", new MetricSchema()));
        env.execute("flink learning connectors rabbitmq");
    }
}

 

是不是很簡單?但是需要注意的是,要換一個之前不存在的 queue,否則是會報錯的。

不出意外的話,你可以看到 RabbitMQ 的監控頁面會出現新的一個 queue 出來,如下圖:

總結

本文先把 RabbitMQ 作為資料來源,寫了個 Flink 消費 RabbitMQ 佇列裡面的資料進行打印出來,然後又寫了個 Flink 消費 Kafka 資料後寫入到