【mq】從零開始實現 mq-05-實現優雅停機
前景回顧
【mq】從零開始實現 mq-02-如何實現生產者呼叫消費者?
【mq】從零開始實現 mq-03-引入 broker 中間人
為什麼需要優雅關閉?
我記得多年前,那個時候 rpc 框架主流用的還是 dubbo,每次都是半夜還是上線,上線上完基本都是凌晨 2-3 點。
為什麼要半夜上線呢?
因為這個時候一般業務流量最低。
還有就是上線釋出,每次都要人工等待一段幾分鐘。
因為 rpc 呼叫入口已經關閉了,但是本身可能還沒有處理完。
那麼有沒有方法可以讓服務的關閉更加優雅,而不是人工等待呢?
實現思路
人工等待幾分鐘的方式一般可以解決問題,但是大部分情況是無用功,還比較浪費時間。
比較自然的一種方式是引入鉤子函式。
當應用準備關閉時,首先判斷是否存在處理中的請求,不存在則直接關閉;存在,則等待請求完成再關閉。
實現
生產者和消費者是類似的,我們以生產者為例。
啟動實現的調整
@Override public synchronized void run() { this.paramCheck(); // 啟動服務端 log.info("MQ 生產者開始啟動客戶端 GROUP: {} brokerAddress: {}", groupName, brokerAddress); try { //0. 配置資訊 ProducerBrokerConfig config = ProducerBrokerConfig.newInstance() .groupName(groupName) .brokerAddress(brokerAddress) .check(check) .respTimeoutMills(respTimeoutMills) .invokeService(invokeService) .statusManager(statusManager); //1. 初始化 this.producerBrokerService.initChannelFutureList(config); //2. 連線到服務端 this.producerBrokerService.registerToBroker(); //3. 標識為可用 statusManager.status(true); //4. 新增鉤子函式 final DefaultShutdownHook rpcShutdownHook = new DefaultShutdownHook(); rpcShutdownHook.setStatusManager(statusManager); rpcShutdownHook.setInvokeService(invokeService); rpcShutdownHook.setWaitMillsForRemainRequest(waitMillsForRemainRequest); rpcShutdownHook.setDestroyable(this.producerBrokerService); ShutdownHooks.rpcShutdownHook(rpcShutdownHook); log.info("MQ 生產者啟動完成"); } catch (Exception e) { log.error("MQ 生產者啟動遇到異常", e); throw new MqException(ProducerRespCode.RPC_INIT_FAILED); } }
狀態管理類
這裡我們引入 statusManager 管理整體的狀態。
預設的如下:
public class StatusManager implements IStatusManager { private boolean status; @Override public boolean status() { return this.status; } @Override public IStatusManager status(boolean status) { this.status = status; return this; } }
就是對一個是否可用的狀態進行維護,然後在 channel 獲取等地方便於判斷當前服務的狀態。
鉤子函式
DefaultShutdownHook 實現如下:
public class DefaultShutdownHook extends AbstractShutdownHook {
/**
* 呼叫管理類
* @since 0.0.5
*/
private IInvokeService invokeService;
/**
* 銷燬管理類
* @since 0.0.5
*/
private Destroyable destroyable;
/**
* 狀態管理類
* @since 0.0.5
*/
private IStatusManager statusManager;
/**
* 為剩餘的請求等待時間
* @since 0.0.5
*/
private long waitMillsForRemainRequest = 60 * 1000;
//get & set
/**
* (1)設定 status 狀態為等待關閉
* (2)檢視是否 {@link IInvokeService#remainsRequest()} 是否包含請求
* (3)超時檢測-可以不新增,如果難以關閉成功,直接強制關閉即可。
* (4)關閉所有執行緒池資源資訊
* (5)設定狀態為成功關閉
*/
@Override
protected void doHook() {
statusManager.status(false);
// 設定狀態為等待關閉
logger.info("[Shutdown] set status to wait for shutdown.");
// 迴圈等待當前執行的請求執行完成
long startMills = System.currentTimeMillis();
while (invokeService.remainsRequest()) {
long currentMills = System.currentTimeMillis();
long costMills = currentMills - startMills;
if(costMills >= waitMillsForRemainRequest) {
logger.warn("[Shutdown] still remains request, but timeout, break.");
break;
}
logger.debug("[Shutdown] still remains request, wait for a while.");
DateUtil.sleep(10);
}
// 銷燬
destroyable.destroyAll();
// 設定狀態為關閉成功
statusManager.status(false);
logger.info("[Shutdown] set status to shutdown success.");
}
}
(1)進行關閉前,首先判斷通過 invokeService.remainsRequest()
判斷是否有未處理完的訊息,有則進行等待。
(2)當然,我們還需要考慮網路訊息丟失的場景,不可能一直等待。
所以引入了超時中斷,最大等待時間也是可以自行定義的。
if(costMills >= waitMillsForRemainRequest) {
logger.warn("[Shutdown] still remains request, but timeout, break.");
break;
}
(3)關閉之後
將 status 設定為 false,標識當前服務不可用。
小結
隨著 rpc 技術的成熟,優雅關閉已經成為一個很基本的功能點。
一個小小的改動,可以節約生產釋出時間,早點下班陪陪家人。
希望本文對你有所幫助,如果喜歡,歡迎點贊收藏轉發一波。
我是老馬,期待與你的下次重逢。
開源地址
The message queue in java.(java 簡易版本 mq 實現) https://github.com/houbb/mq