1. 程式人生 > >Spring Cloud原始碼分析之Eureka篇第六章:服務註冊

Spring Cloud原始碼分析之Eureka篇第六章:服務註冊

在文章《Spring Cloud原始碼分析之Eureka篇第四章:服務註冊是如何發起的 》的分析中,我們知道了作為Eureka Client的應用啟動時,在com.netflix.discovery.DiscoveryClient類的initScheduledTasks方法中,會做以下幾件事:

  1. 週期性更新服務列表;
  2. 週期性服務續約;
  3. 服務註冊邏輯;

概覽

以下圖片來自Netflix官方,圖中顯示Eureka Client會發起Register請求將自身註冊到註冊中心,這樣其他Eureka client通過Get Registry請求就能獲取到新註冊應用的相關資訊: 在這裡插入圖片描述

關於原始碼版本

本次分析的Spring Cloud版本為Edgware.RELEASE,對應的eureka-client版本為1.7.0;

原始碼分析

  1. 首先回顧com.netflix.discovery.DiscoveryClient類的initScheduledTasks方法,Eureka client在啟動的時侯都會執行此方法,如下方所示,已經略去了週期性更新服務列表相關的程式碼:
//來自EurekaClientConfigBean,預設為true
if (clientConfig.shouldRegisterWithEureka()) {
			//續租間隔
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs
(); //週期性任務處理超時後,下一次執行時將超時事件翻倍,但是不可超過expBackOffBound的設定範圍 int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs); //指定時間後啟動週期性續租的任務 scheduler.
schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); //上報自身資訊到Eureka server的操作委託給InstanceInfoReplicator例項發起, //如果有多個場景需要上報,都由InstanceInfoReplicator進行排程和安排, //並且還有限流邏輯,避免頻繁先服務端請求 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize //監聽和響應應用狀態變化,包括從停止狀態恢復或者進入停止狀態, statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } //將自身狀態上報都Eureka server(有限流邏輯避免頻繁上報) instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { //註冊狀態變化的監聽 applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //更新資訊並註冊到Eureka server instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); }
  1. 從上述程式碼可以看出,主動更新和狀態變化觸發的更新,都委託給成員變數instanceInfoReplicator執行,InstanceInfoReplicator是個輔助類,在服務註冊過程中主要負責併發控制、週期性執行等工作,有關此類的詳細介紹請參考文章《Eureka的InstanceInfoReplicator類(服務註冊輔助工具)》

  2. 本文聚焦服務註冊,因此InstanceInfoReplicator類本身的細節就不在此展開,這裡主要關注的是InstanceInfoReplicator的run方法中註冊到Eureka server的程式碼,如下圖紅框,discoveryClient.register()實現了註冊的功能: 在這裡插入圖片描述

  3. 先看程式碼discoveryClient.refreshInstanceInfo(),弄清楚即將上報到Eureka server的資訊是如何更新的,如下程式碼所示,資訊更新的操作是委託給ApplicationInfoManager例項來完成的:

void refreshInstanceInfo() {
        //更新資料
        applicationInfoManager.refreshDataCenterInfoIfRequired();
        //如果續租時間有變化就要及時更新
        applicationInfoManager.refreshLeaseInfoIfRequired();

        InstanceStatus status;
        try {
            status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
        } catch (Exception e) {
            logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
            //如果獲取狀態異常,就設定當前狀態為DOWN
            status = InstanceStatus.DOWN;
        }

        if (null != status) {
            applicationInfoManager.setInstanceStatus(status);
        }
    }
  1. 接下來看看服務註冊相關的程式碼,也就是DiscoveryClient類的register方法,如下所示,原始碼註釋中說到是註冊請求型別是Restful的,Eureka server的返回碼如果是204表示註冊成功,然而在前面的discoveryClient.register()方法內,其實並不關注這個返回值:
    /**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + appPathIdentifier + ": registering service...");
        EurekaHttpResponse<Void> httpResponse;
        try {
            //註冊操作
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }
  1. 繼續展開註冊操作的原始碼eurekaTransport.registrationClient.register(instanceInfo),多層呼叫一路展開,最終由JerseyApplicationClient類來完成註冊操作,對應原始碼在父類AbstractJerseyEurekaHttpClient中,如下所示,主要工作是利用jersey庫的Restful Api將自身的資訊POST到Eureka server:
@Override
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }

至此,Eureka client向服務註冊的原始碼就分析完畢了,過程相對簡單,DiscoveryClient、InstanceInfoReplicator、ApplicationInfoManager、JerseyApplicationClient等例項各司其職將應用自身資訊上報到Eureka server,由Eureka server儲存,再被其他例項下載;