1. 程式人生 > 程式設計 >Eureka啟動流程-原始碼小段分析

Eureka啟動流程-原始碼小段分析

本文參考自朱榮鑫老師的書《Spring Cloud微服務架構進階》,強烈建議閱讀本書。

Eureka是Netflix開源的服務治理元件,內部網路間的微服務呼叫已不再使用IP地址,而使用微服務名稱,所以需要有Eureka這樣的的元件存在,負責維護服務的狀態。Spring Cloud整合了Eureka,使用Spring生態可以做到對其開箱即用。

除了Eureka,Spring Cloud還整合了其他Netflix元件,統稱為Spring Cloud Netflix,Spring Cloud Netflix包含了服務治理Eureka、路由Zuul、客戶端負載均衡Ribbon、熔斷器Hystrix。

除了上面這些,開發中常用的還有宣告式Http客戶端Feign,也是Netflix公司開源的。Spring Cloud在Feign的基礎上支援了Spring MVC的註解,叫OpenFeign。

言歸正傳,微服務離不開服務治理,本章探討Eureka的使用和原始碼以及叢集的實現原理。

總覽

20191123222748.png

上圖是Eureka官方的架構圖。這裡面有如下角色。

  • Application Service:這是你的業務系統服務端(微服務)
  • Eureka Client:這是Eureka客戶端,可以理解為一個jar包,嵌入在你的業務系統Application Service中,用於向Eureka服務端註冊資訊等
  • Application Client:這是你的業務系統客戶端,嵌入了Eureka Client用於向Eureka服務端獲取你要呼叫的Application Service資訊,然後Application Client發起向Application Service的呼叫
  • Eureka Server:Eureka伺服器,管理所有的微服務狀態
  • us-east-1c:Eureka最初設計的目的是用於亞馬遜雲服務AWS的分散式系統的,所以引入了AWS的Region(區域)和Zone(Availability Zone可用區)的概念,一個Region包含多個Zone。上圖中us-east-1c,us-east-1d,us-east-1e都屬於Zone,這三個Zone屬於us-east-1這個Region

Eureka Client提供了以下幾個功能:

  • 向Eureka Server註冊自己
  • 定時向Eureka Server續約
  • 客戶端下線
  • 獲取登入檔

對應的,Eureka Server提供以下功能:

  • 提供服務註冊
  • 接收服務心跳
  • 服務下線
  • 獲取登入檔中服務例項資訊
  • 服務剔除
  • 叢集同步

準備工作

為了方便跟蹤問題,可以把netflix包的DEBUG日誌開啟。

logging:
  level:
    com.netflix: DEBUG
複製程式碼

同時,我們應該知道,Spring Boot的自動化配置原理是載入了類路徑下的META-INF/spring.factories檔案,如下圖,eureka-client包中載入的自動化配置類如下:

20191124110752.png

分析步驟

通過Eureka DEBUG級別的日誌列印,我們可以看到第一條有關Eureka的日誌為:

2019-11-24 10:48:11.235  INFO 115648 --- [           main] com.netflix.discovery.DiscoveryClient    : Initializing Eureka in region us-east-1
2019-11-24 10:48:13.030  INFO 115648 --- [           main] c.n.d.provider.DiscoveryJerseyProvider   : Using JSON encoding codec LegacyJacksonJson
2019-11-24 10:48:13.030  INFO 115648 --- [           main] c.n.d.provider.DiscoveryJerseyProvider   : Using JSON decoding codec LegacyJacksonJson
2019-11-24 10:48:13.232  INFO 115648 --- [           main] c.n.d.provider.DiscoveryJerseyProvider   : Using XML encoding codec XStreamXml
複製程式碼

以上為類com.netflix.discovery.DisconveryClient列印的日誌,在此處打一個斷點,debug模式重新啟動應用。在IDEA或Eclipse中檢視呼叫棧,如下圖:

20191124114109.png

從上圖可以看出,該方法入口剛好吻合我們上面的分析,是Eureka的自動配置類EurekaClientAutoConfiguration觸發的。其原始碼如下:

// 列印日誌
logger.info("Initializing Eureka in region {}",this.clientConfig.getRegion());
// 配置檔案中的register-with-eureka和fetch-registry如果都為false則不註冊和拉去服務列表了
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
	logger.info("Client configured to neither register nor query for data.");
	this.scheduler = null;
	this.heartbeatExecutor = null;
	this.cacheRefreshExecutor = null;
	this.eurekaTransport = null;
	this.instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config),this.clientConfig.getRegion());
	DiscoveryManager.getInstance().setDiscoveryClient(this);
	DiscoveryManager.getInstance().setEurekaClientConfig(config);
	this.initTimestampMs = System.currentTimeMillis();
	logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",this.initTimestampMs,this.getApplications().size());
} else {
複製程式碼

同理,其他原始碼的分析也可以通過DEBUG日誌和斷點來分析。以下不再贅述。

Eureka Client原始碼解析

Eureka為了做到開箱即用,簡化開發人員的開發工作,將很多與Eureka Server互動的工作隱藏起來,自主完成。在應用的不同階段執行不同工作,如下圖。

圖片來自《Spring Cloud微服務架構進階》

從上面程式碼

logger.info("Initializing Eureka in region {}",this.clientConfig.getRegion());
複製程式碼

打斷點逐步跟蹤,可以發現Eureka Client的執行步驟如下:

1. 向Eureka伺服器拉取全量註冊資訊

程式碼位於DiscoveryClient#fetchRegistry方法中。

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
	Stopwatch tracer = this.FETCH_REGISTRY_TIMER.start();

	label122: {
		boolean var4;
		try {
            Applications applications = this.getApplications();
            // 如果增量式拉取被禁止,或者Applications為null,進行全量拉取
			if (!this.clientConfig.shouldDisableDelta() && Strings.isNullOrEmpty(this.clientConfig.getRegistryRefreshSingleVipAddress()) && !forceFullRegistryFetch && applications != null && applications.getRegisteredApplications().size() != 0 && applications.getVersion() != -1L) {
				this.getAndUpdateDelta(applications);
			} else {
				logger.info("Disable delta property : {}",this.clientConfig.shouldDisableDelta());
				logger.info("Single vip registry refresh property : {}",this.clientConfig.getRegistryRefreshSingleVipAddress());
				logger.info("Force full registry fetch : {}",forceFullRegistryFetch);
				logger.info("Application is null : {}",applications == null);
				logger.info("Registered Applications size is zero : {}",applications.getRegisteredApplications().size() == 0);
                logger.info("Application version is -1: {}",applications.getVersion() == -1L);
                // 全量拉取登入檔資訊
				this.getAndStoreFullRegistry();
			}

			applications.setAppsHashCode(applications.getReconcileHashCode());
			this.logTotalInstances();
			break label122;
		} catch (Throwable var8) {
			logger.error("DiscoveryClient_{} - was unable to refresh its cache! status = {}",new Object[]{this.appPathIdentifier,var8.getMessage(),var8});
			var4 = false;
		} finally {
			if (tracer != null) {
				tracer.stop();
			}

		}

		return var4;
	}

	this.onCacheRefreshed();
	this.updateInstanceRemoteStatus();
	return true;
}
複製程式碼

拉取登入檔地址的程式碼為:

// 這裡的urlPath傳入的是/apps
WebResource webResource = this.jerseyClient.resource(this.serviceUrl).path(urlPath);
複製程式碼

以上的this.serviceUrl預設為http://localhost:8761/eureka所以登入檔地址是http://localhost:8761/eureka//apps

為了避免本文篇幅太長,建議想了解這部分的原理請看朱榮鑫老師的書《Spring Cloud微服務架構進階》

這裡總結下Eureka Client啟動的整個過程:

  • 拉取登入檔資訊:Eureka啟動後,會全量拉取服務端的登入檔資訊,儲存到本地快取,以後增量拉取登入檔資訊。預設地址為:http://localhost:8761/eureka/apps
  • 服務註冊:拉取完登入檔資訊後,註冊自身服務元資料,預設地址為:http://localhost:8761/eureka/apps/{app-name}
  • 初始化定時任務:初始化了三個定時器任務,一個拉取登入檔重新整理快取,一個傳送心跳,一個當應用元資料發生變化時按需註冊。心跳預設30s一次,地址為:http://localhost:8761/eureka/apps/{app-name}/{instance-info-id},引數主要有 status (當前服務的狀態)等,狀態資訊預設使用Spring Boot Actuator獲取。
  • 服務下線:地址為:http://localhost:8761/eureka/apps/{app-name}/{instance-info-id},http方法為delete。

Eureka Server原始碼解析

Eureka Server作為一個開箱即用的服務註冊中心,需要注意的是,Eureka Server同時也是一個Eureka Client,它會向它配置檔案中的其他Eureka Server進行拉取登入檔、服務註冊和傳送心跳等操作。

還是按照上面原始碼的分析步驟,將com.netflix包調為DEBUG級別,可以在日誌中看到如下:

2019-11-25 11:14:23.816  INFO 237032 --- [           main] c.n.e.registry.AbstractInstanceRegistry  : Finished initializing remote region registries. All known remote regions: []
2019-11-25 11:14:23.817  INFO 237032 --- [           main] c.n.eureka.DefaultEurekaServerContext    : Initialized
2019-11-25 11:14:23.846  INFO 237032 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2019-11-25 11:14:23.866  INFO 237032 --- [           main] s.b.a.e.w.s.WebMvcEndpointHandlerMapping : Mapped "{[/actuator/health],methods=[GET],produces=[application/vnd.spring-boot.actuator.v2+json || application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.web.servlet.AbstractWebMvcEndpointHandlerMapping$OperationHandler.handle(javax.servlet.http.HttpServletRequest,java.util.Map<java.lang.String,java.lang.String>)
複製程式碼

檢視方法AbstractInstanceRegistry#register,該方法是提供服務註冊功能的基礎。原始碼如下:

public void register(InstanceInfo registrant,int leaseDuration,boolean isReplication) {
	try {
        read.lock();
        // 根據appName對服務例項叢集進行分類
		Map<String,Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
		REGISTER.increment(isReplication);
		if (gMap == null) {
			final ConcurrentHashMap<String,Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String,Lease<InstanceInfo>>();
			gMap = registry.putIfAbsent(registrant.getAppName(),gNewMap);
			if (gMap == null) {
				gMap = gNewMap;
			}
        }
        // 根據例項id獲取例項的租約
		Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
		// Retain the last dirty timestamp without overwriting it,if there is already a lease
		if (existingLease != null && (existingLease.getHolder() != null)) {
			Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
			Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
			logger.debug("Existing lease found (existing={},provided={}",existingLastDirtyTimestamp,registrationLastDirtyTimestamp);

			// this is a > instead of a >= because if the timestamps are equal,we still take the remote transmitted
			// InstanceInfo instead of the server local copy.
			if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
				logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
						" than the one that is being registered {}",registrationLastDirtyTimestamp);
				logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
				registrant = existingLease.getHolder();
			}
		} else {
            // The lease does not exist and hence it is a new registration
            // 如果租約不存在,這是一個新的註冊例項
			synchronized (lock) {
				if (this.expectedNumberOfRenewsPerMin > 0) {
					// Since the client wants to cancel it,reduce the threshold
					// (1
                    // for 30 seconds,2 for a minute)
                    // 自我保護機制
					this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
					this.numberOfRenewsPerMinThreshold =
							(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
				}
			}
			logger.debug("No previous lease information found; it is new registration");
        }
        // 建立新的租約
		Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant,leaseDuration);
		if (existingLease != null) {
			lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
		}
		gMap.put(registrant.getId(),lease);
		synchronized (recentRegisteredQueue) {
			recentRegisteredQueue.add(new Pair<Long,String>(
					System.currentTimeMillis(),registrant.getAppName() + "(" + registrant.getId() + ")"));
		}
		// This is where the initial state transfer of overridden status happens
		if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
			logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
							+ "overrides",registrant.getOverriddenStatus(),registrant.getId());
			if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
				logger.info("Not found overridden id {} and hence adding it",registrant.getId());
				overriddenInstanceStatusMap.put(registrant.getId(),registrant.getOverriddenStatus());
			}
		}
		InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
		if (overriddenStatusFromMap != null) {
			logger.info("Storing overridden status {} from map",overriddenStatusFromMap);
			registrant.setOverriddenStatus(overriddenStatusFromMap);
		}

		// Set the status based on the overridden status rules
		InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant,existingLease,isReplication);
		registrant.setStatusWithoutDirty(overriddenInstanceStatus);

		// If the lease is registered with UP status,set lease service up timestamp
		if (InstanceStatus.UP.equals(registrant.getStatus())) {
			lease.serviceUp();
		}
		registrant.setActionType(ActionType.ADDED);
		recentlyChangedQueue.add(new RecentlyChangedItem(lease));
		registrant.setLastUpdatedTimestamp();
		invalidateCache(registrant.getAppName(),registrant.getVIPAddress(),registrant.getSecureVipAddress());
		logger.info("Registered instance {}/{} with status {} (replication={})",registrant.getAppName(),registrant.getId(),registrant.getStatus(),isReplication);
	} finally {
		read.unlock();
	}
}
複製程式碼

為了避免本文篇幅太長,建議想了解這部分的原理請看朱榮鑫老師的書《Spring Cloud微服務架構進階》

這裡總結下Eureka Server啟動的整個過程:

  • 提供服務註冊
  • 接收服務心跳
  • 服務剔除
  • 服務下線
  • 叢集同步:Eureka Server啟動時,從它的peer節點拉取登入檔,每個Eureka Server對本地登入檔進行操作時,它將遍歷Eureka Server的peer節點,傳送同步請求。
  • 提供獲取登入檔資訊。

以上,本文結束。