1. 程式人生 > 實用技巧 >RabbitMQ:The channelMax limit is reached. Try later.

RabbitMQ:The channelMax limit is reached. Try later.

這個問題是我當初寫專案時遇到的,因為用RabbitMQ做削峰處理,高併發情況下,channel數到達了限制,所以不能繼續建立,相信大家也遇到過。

​ 正常來說,這個錯誤還是比較少見的,只不過專案需要保證訊息的可靠性,所以採取了傳送確認和消費手動確認機制,導致併發效能下降,從而出現這個問題。、

​ 這裡先上結論,方便著急的小夥伴們改bug。

結論:RabbitMQ java客戶端在建立連線時,會向服務端傳送一個請求,這個請求會獲取到服務端的channelMax值,java客戶端會自己進行一個處理,兩者都不為0時,會選擇一個小的值,如果你沒有在rabbitmq.conf檔案中修改channel_Max的值,那麼java客戶端會採用預設的2047或更小,這就會導致你明明在客戶端連線上配置了channelMax(比如你配置了4095),但依舊會報錯,而且web管理頁面最大值依舊是2047

第一次修改配置不生效

出現這種情況經常伴隨著訊息丟失,而且訊息丟失情況非常嚴重,達到了百分之二十的丟失率,這個丟失率也會因為併發量、每次消費數量等等配置的不同而變化。

由於專案是基於SpringBoot2.2的,yml暫時無法配置RequestChannelMax的值,這裡只能採用直接通過set的方式放入值。

@Configuration
@Slf4j
public class RabbitMQConfig {


    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        CachingConnectionFactory cachingConnectionFactory 
= (CachingConnectionFactory) connectionFactory; //這裡我明明設定了4095,但是專案執行之後,壓測之後,還是會報異常,而且報異常的時候,RabbitMQ //web管理頁面上的channel數依舊是2047,不得已只能分析原始碼了 cachingConnectionFactory.getRabbitConnectionFactory().setRequestedChannelMax(4095); final RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); rabbitTemplate.setMessageConverter(jackson2MessageConverter()); rabbitTemplate.setMandatory(
true); rabbitTemplate.setConfirmCallback((correlationData, b, s) -> { if(!b){ log.error("confirmCallBack 傳送失敗的資料:{}",correlationData); log.error("confirmCallBack 確認情況:{}",b); log.error("confirmCallBack 傳送失敗的原因:{}",s); } }); rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> { log.error("returnCallBack 訊息:{}",message); log.error("returnCallBack 迴應碼:{}",i); log.error("returnCallBack 迴應資訊:{}",s); log.error("returnCallBack 交換機:{}",s1); log.error("returnCallBack 路由鍵:{}",s2); }); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter jackson2MessageConverter() { return new Jackson2JsonMessageConverter(); } }

分析原始碼

首先是模擬出報錯的場景,然後進入報異常的類。

發現是this.delegate.createChannel();方法返回的是一個空channel物件,進入這個方法看一下。

發現有一個ChannelManage物件,顧名思義,就是一個channel管理器,由它負責建立channel,那麼看一下這個物件都有什麼值呢?

public Channel createChannel() throws IOException {
        this.ensureIsOpen();
        ChannelManager cm = this._channelManager;
        if (cm == null) {
            return null;
        } else {
            Channel channel = cm.createChannel(this);
            this.metricsCollector.newChannel(channel);
            return channel;
        }
    }


只截取了部分程式碼,首先可以看到有一個int型別的channelMax,這個值就是channel的最大值,還有一個構造器,很明顯,這個值是通過構造器傳進來的,通過容器初始化時打斷點進行跟蹤,發現此時的channelMax依舊是2047,這也進一步證明了,值的覆蓋或者處理髮生在這個類呼叫之前。

public class ChannelManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class);
    private final Object monitor;
    private final Map<Integer, ChannelN> _channelMap;
    private final IntAllocator channelNumberAllocator;
    private final ConsumerWorkService workService;
    private final Set<CountDownLatch> shutdownSet;
    private final int _channelMax;
    private ExecutorService shutdownExecutor;
    private final ThreadFactory threadFactory;
    private int channelShutdownTimeout;
    protected final MetricsCollector metricsCollector;
    
    public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory, MetricsCollector metricsCollector) {
        this.monitor = new Object();
        this._channelMap = new HashMap();
        this.shutdownSet = new HashSet();
        this.channelShutdownTimeout = 63000;
        if (channelMax == 0) {
            channelMax = 65535;
        }

        this._channelMax = channelMax;
        this.channelNumberAllocator = new IntAllocator(1, channelMax);
        this.workService = workService;
        this.threadFactory = threadFactory;
        this.metricsCollector = metricsCollector;
    }
}

進一步跟蹤之後,發現在AMQConnection類裡的instantiateChannelManager()方法呼叫了構造器,繼續往上追蹤。

com.rabbitmq.client.impl.AMQConnection#instantiateChannelManager
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
        ChannelManager result = new ChannelManager(this._workService, channelMax, threadFactory, this.metricsCollector);
        this.configureChannelManager(result);
        return result;
    }

在AMQConnetion類的start()方法中最終發現了值改變的地方。

this.requestedChannelMax值是我在配置類中配置的4095

connTune.getChannelMax()是2047

也就是說,negotiateChannelMax()方法對這兩個值進行了處理,最終選擇了2047

            int channelMax = this.negotiateChannelMax(this.requestedChannelMax, connTune.getChannelMax());
            this._channelManager = this.instantiateChannelManager(channelMax, this.threadFactory);

最終發現這麼一段處理邏輯,如果兩個數字都不為0,那麼就取最小的,反之取最大的,看到這裡是明白做了什麼處理,但是還是有一處不明白,2047的值究竟從何處來的?

com.rabbitmq.client.impl.AMQConnection#negotiateChannelMax
    /**
     * Private API, allows for easier simulation of bogus clients.
     */
    protected int negotiateChannelMax(int requestedChannelMax, int serverMax) {
        return negotiatedMaxValue(requestedChannelMax, serverMax);
    }


    private static int negotiatedMaxValue(int clientValue, int serverValue) {
        return (clientValue == 0 || serverValue == 0) ?
            Math.max(clientValue, serverValue) :
            Math.min(clientValue, serverValue);
    }

通過對connTune的追尋,發現了這段處理,debug也證明了確實在這裡獲取的2047這個值,

其實不管從方法名rpc()還是變數名serverResponse來看,這個都是做了一個請求,那麼向誰請求其實很顯而易見了,這裡向RabbitMQ端做了一個請求,用來索取MQ端的channelMax、frameMax、heartBeat值等等

Tune connTune = null;

        try {
           ......
                try {
                    Method serverResponse = this._channel0.rpc((Method)method, this.handshakeTimeout / 2).getMethod();
                    if (serverResponse instanceof Tune) {
                        connTune = (Tune)serverResponse;
                    }
                    ......

到現在其實就很明確了,我們只在客戶端修改邊界值配置是無效的,必須同步修改MQ服務端的配置,也就是rabbitmq.conf檔案

## Set the max permissible number of channels per connection.
## 0 means "no limit".
##在配置檔案中,輸入以下引數和自己想要設定的值即可,如果用不到2047,那就不用配置
# channel_max = 128

其實問題並不大,主要還是不瞭解MQ的一個客戶端連線過程,導致耗費了大量時間。

這裡還是推薦大家,先用百度搜索,第一頁看不到正確解決方案,那就去StackOverflow網站,還不行的話,那就使用終極大法,要麼官網逐行看文件,要麼走一波原始碼,也是鍛鍊自己解決問題的思路和能力。

https://blog.csdn.net/qq_35374224/article/details/106721801

public Channel createChannel() throws IOException { this.ensureIsOpen(); ChannelManager cm = this._channelManager; if (cm == null) { return null; } else { Channel channel = cm.createChannel(this); this.metricsCollector.newChannel(channel); return channel; } }