1. 程式人生 > 其它 >【mq】從零開始實現 mq-05-實現優雅停機

【mq】從零開始實現 mq-05-實現優雅停機

前景回顧

【mq】從零開始實現 mq-01-生產者、消費者啟動

【mq】從零開始實現 mq-02-如何實現生產者呼叫消費者?

【mq】從零開始實現 mq-03-引入 broker 中間人

【mq】從零開始實現 mq-04-啟動檢測與實現優化

【mq】從零開始實現 mq-05-實現優雅停機

為什麼需要優雅關閉?

我記得多年前,那個時候 rpc 框架主流用的還是 dubbo,每次都是半夜還是上線,上線上完基本都是凌晨 2-3 點。

為什麼要半夜上線呢?

因為這個時候一般業務流量最低。

還有就是上線釋出,每次都要人工等待一段幾分鐘。

因為 rpc 呼叫入口已經關閉了,但是本身可能還沒有處理完。

那麼有沒有方法可以讓服務的關閉更加優雅,而不是人工等待呢?

實現思路

人工等待幾分鐘的方式一般可以解決問題,但是大部分情況是無用功,還比較浪費時間。

比較自然的一種方式是引入鉤子函式。

當應用準備關閉時,首先判斷是否存在處理中的請求,不存在則直接關閉;存在,則等待請求完成再關閉。

實現

生產者和消費者是類似的,我們以生產者為例。

啟動實現的調整

@Override
public synchronized void run() {
    this.paramCheck();
    // 啟動服務端
    log.info("MQ 生產者開始啟動客戶端 GROUP: {} brokerAddress: {}",
            groupName, brokerAddress);
    try {
        //0. 配置資訊
        ProducerBrokerConfig config = ProducerBrokerConfig.newInstance()
                .groupName(groupName)
                .brokerAddress(brokerAddress)
                .check(check)
                .respTimeoutMills(respTimeoutMills)
                .invokeService(invokeService)
                .statusManager(statusManager);

        //1. 初始化
        this.producerBrokerService.initChannelFutureList(config);
        //2. 連線到服務端
        this.producerBrokerService.registerToBroker();

        //3. 標識為可用
        statusManager.status(true);

        //4. 新增鉤子函式
        final DefaultShutdownHook rpcShutdownHook = new DefaultShutdownHook();
        rpcShutdownHook.setStatusManager(statusManager);
        rpcShutdownHook.setInvokeService(invokeService);
        rpcShutdownHook.setWaitMillsForRemainRequest(waitMillsForRemainRequest);
        rpcShutdownHook.setDestroyable(this.producerBrokerService);
        ShutdownHooks.rpcShutdownHook(rpcShutdownHook);
        log.info("MQ 生產者啟動完成");
    } catch (Exception e) {
        log.error("MQ 生產者啟動遇到異常", e);
        throw new MqException(ProducerRespCode.RPC_INIT_FAILED);
    }
}

狀態管理類

這裡我們引入 statusManager 管理整體的狀態。

預設的如下:

public class StatusManager implements IStatusManager {

    private boolean status;

    @Override
    public boolean status() {
        return this.status;
    }

    @Override
    public IStatusManager status(boolean status) {
        this.status = status;

        return this;
    }

}

就是對一個是否可用的狀態進行維護,然後在 channel 獲取等地方便於判斷當前服務的狀態。

鉤子函式

DefaultShutdownHook 實現如下:

public class DefaultShutdownHook extends AbstractShutdownHook {

    /**
     * 呼叫管理類
     * @since 0.0.5
     */
    private IInvokeService invokeService;

    /**
     * 銷燬管理類
     * @since 0.0.5
     */
    private Destroyable destroyable;

    /**
     * 狀態管理類
     * @since 0.0.5
     */
    private IStatusManager statusManager;

    /**
     * 為剩餘的請求等待時間
     * @since 0.0.5
     */
    private long waitMillsForRemainRequest = 60 * 1000;

    //get & set

    /**
     * (1)設定 status 狀態為等待關閉
     * (2)檢視是否 {@link IInvokeService#remainsRequest()} 是否包含請求
     * (3)超時檢測-可以不新增,如果難以關閉成功,直接強制關閉即可。
     * (4)關閉所有執行緒池資源資訊
     * (5)設定狀態為成功關閉
     */
    @Override
    protected void doHook() {
        statusManager.status(false);
        // 設定狀態為等待關閉
        logger.info("[Shutdown] set status to wait for shutdown.");

        // 迴圈等待當前執行的請求執行完成
        long startMills = System.currentTimeMillis();
        while (invokeService.remainsRequest()) {
            long currentMills = System.currentTimeMillis();
            long costMills = currentMills - startMills;
            if(costMills >= waitMillsForRemainRequest) {
                logger.warn("[Shutdown] still remains request, but timeout, break.");
                break;
            }

            logger.debug("[Shutdown] still remains request, wait for a while.");
            DateUtil.sleep(10);
        }

        // 銷燬
        destroyable.destroyAll();

        // 設定狀態為關閉成功
        statusManager.status(false);
        logger.info("[Shutdown] set status to shutdown success.");
    }

}

(1)進行關閉前,首先判斷通過 invokeService.remainsRequest() 判斷是否有未處理完的訊息,有則進行等待。

(2)當然,我們還需要考慮網路訊息丟失的場景,不可能一直等待。

所以引入了超時中斷,最大等待時間也是可以自行定義的。

if(costMills >= waitMillsForRemainRequest) {
    logger.warn("[Shutdown] still remains request, but timeout, break.");
    break;
}

(3)關閉之後

將 status 設定為 false,標識當前服務不可用。

小結

隨著 rpc 技術的成熟,優雅關閉已經成為一個很基本的功能點。

一個小小的改動,可以節約生產釋出時間,早點下班陪陪家人。

希望本文對你有所幫助,如果喜歡,歡迎點贊收藏轉發一波。

我是老馬,期待與你的下次重逢。

開源地址

The message queue in java.(java 簡易版本 mq 實現) https://github.com/houbb/mq

拓展閱讀

rpc-從零開始實現 rpc https://github.com/houbb/rpc