Eureka啟動流程-原始碼小段分析
本文參考自朱榮鑫老師的書《Spring Cloud微服務架構進階》,強烈建議閱讀本書。
Eureka是Netflix開源的服務治理元件,內部網路間的微服務呼叫已不再使用IP地址,而使用微服務名稱,所以需要有Eureka這樣的的元件存在,負責維護服務的狀態。Spring Cloud整合了Eureka,使用Spring生態可以做到對其開箱即用。
除了Eureka,Spring Cloud還整合了其他Netflix元件,統稱為Spring Cloud Netflix,Spring Cloud Netflix包含了服務治理Eureka、路由Zuul、客戶端負載均衡Ribbon、熔斷器Hystrix。
除了上面這些,開發中常用的還有宣告式Http客戶端Feign,也是Netflix公司開源的。Spring Cloud在Feign的基礎上支援了Spring MVC的註解,叫OpenFeign。
言歸正傳,微服務離不開服務治理,本章探討Eureka的使用和原始碼以及叢集的實現原理。
總覽
上圖是Eureka官方的架構圖。這裡面有如下角色。
- Application Service:這是你的業務系統服務端(微服務)
- Eureka Client:這是Eureka客戶端,可以理解為一個jar包,嵌入在你的業務系統Application Service中,用於向Eureka服務端註冊資訊等
- Application Client:這是你的業務系統客戶端,嵌入了Eureka Client用於向Eureka服務端獲取你要呼叫的Application Service資訊,然後Application Client發起向Application Service的呼叫
- Eureka Server:Eureka伺服器,管理所有的微服務狀態
- us-east-1c:Eureka最初設計的目的是用於亞馬遜雲服務AWS的分散式系統的,所以引入了AWS的Region(區域)和Zone(Availability Zone可用區)的概念,一個Region包含多個Zone。上圖中us-east-1c,us-east-1d,us-east-1e都屬於Zone,這三個Zone屬於us-east-1這個Region
Eureka Client提供了以下幾個功能:
- 向Eureka Server註冊自己
- 定時向Eureka Server續約
- 客戶端下線
- 獲取登入檔
對應的,Eureka Server提供以下功能:
- 提供服務註冊
- 接收服務心跳
- 服務下線
- 獲取登入檔中服務例項資訊
- 服務剔除
- 叢集同步
準備工作
為了方便跟蹤問題,可以把netflix包的DEBUG日誌開啟。
logging:
level:
com.netflix: DEBUG
複製程式碼
同時,我們應該知道,Spring Boot的自動化配置原理是載入了類路徑下的META-INF/spring.factories檔案,如下圖,eureka-client包中載入的自動化配置類如下:
分析步驟
通過Eureka DEBUG級別的日誌列印,我們可以看到第一條有關Eureka的日誌為:
2019-11-24 10:48:11.235 INFO 115648 --- [ main] com.netflix.discovery.DiscoveryClient : Initializing Eureka in region us-east-1
2019-11-24 10:48:13.030 INFO 115648 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using JSON encoding codec LegacyJacksonJson
2019-11-24 10:48:13.030 INFO 115648 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using JSON decoding codec LegacyJacksonJson
2019-11-24 10:48:13.232 INFO 115648 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using XML encoding codec XStreamXml
複製程式碼
以上為類com.netflix.discovery.DisconveryClient列印的日誌,在此處打一個斷點,debug模式重新啟動應用。在IDEA或Eclipse中檢視呼叫棧,如下圖:
從上圖可以看出,該方法入口剛好吻合我們上面的分析,是Eureka的自動配置類EurekaClientAutoConfiguration觸發的。其原始碼如下:
// 列印日誌
logger.info("Initializing Eureka in region {}",this.clientConfig.getRegion());
// 配置檔案中的register-with-eureka和fetch-registry如果都為false則不註冊和拉去服務列表了
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
this.scheduler = null;
this.heartbeatExecutor = null;
this.cacheRefreshExecutor = null;
this.eurekaTransport = null;
this.instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config),this.clientConfig.getRegion());
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
this.initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",this.initTimestampMs,this.getApplications().size());
} else {
複製程式碼
同理,其他原始碼的分析也可以通過DEBUG日誌和斷點來分析。以下不再贅述。
Eureka Client原始碼解析
Eureka為了做到開箱即用,簡化開發人員的開發工作,將很多與Eureka Server互動的工作隱藏起來,自主完成。在應用的不同階段執行不同工作,如下圖。
從上面程式碼
logger.info("Initializing Eureka in region {}",this.clientConfig.getRegion());
複製程式碼
打斷點逐步跟蹤,可以發現Eureka Client的執行步驟如下:
1. 向Eureka伺服器拉取全量註冊資訊
程式碼位於DiscoveryClient#fetchRegistry方法中。
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = this.FETCH_REGISTRY_TIMER.start();
label122: {
boolean var4;
try {
Applications applications = this.getApplications();
// 如果增量式拉取被禁止,或者Applications為null,進行全量拉取
if (!this.clientConfig.shouldDisableDelta() && Strings.isNullOrEmpty(this.clientConfig.getRegistryRefreshSingleVipAddress()) && !forceFullRegistryFetch && applications != null && applications.getRegisteredApplications().size() != 0 && applications.getVersion() != -1L) {
this.getAndUpdateDelta(applications);
} else {
logger.info("Disable delta property : {}",this.clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}",this.clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}",forceFullRegistryFetch);
logger.info("Application is null : {}",applications == null);
logger.info("Registered Applications size is zero : {}",applications.getRegisteredApplications().size() == 0);
logger.info("Application version is -1: {}",applications.getVersion() == -1L);
// 全量拉取登入檔資訊
this.getAndStoreFullRegistry();
}
applications.setAppsHashCode(applications.getReconcileHashCode());
this.logTotalInstances();
break label122;
} catch (Throwable var8) {
logger.error("DiscoveryClient_{} - was unable to refresh its cache! status = {}",new Object[]{this.appPathIdentifier,var8.getMessage(),var8});
var4 = false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
return var4;
}
this.onCacheRefreshed();
this.updateInstanceRemoteStatus();
return true;
}
複製程式碼
拉取登入檔地址的程式碼為:
// 這裡的urlPath傳入的是/apps
WebResource webResource = this.jerseyClient.resource(this.serviceUrl).path(urlPath);
複製程式碼
以上的this.serviceUrl預設為http://localhost:8761/eureka所以登入檔地址是http://localhost:8761/eureka//apps
為了避免本文篇幅太長,建議想了解這部分的原理請看朱榮鑫老師的書《Spring Cloud微服務架構進階》
這裡總結下Eureka Client啟動的整個過程:
- 拉取登入檔資訊:Eureka啟動後,會全量拉取服務端的登入檔資訊,儲存到本地快取,以後增量拉取登入檔資訊。預設地址為:
http://localhost:8761/eureka/apps
- 服務註冊:拉取完登入檔資訊後,註冊自身服務元資料,預設地址為:
http://localhost:8761/eureka/apps/{app-name}
- 初始化定時任務:初始化了三個定時器任務,一個拉取登入檔重新整理快取,一個傳送心跳,一個當應用元資料發生變化時按需註冊。心跳預設30s一次,地址為:
http://localhost:8761/eureka/apps/{app-name}/{instance-info-id}
,引數主要有 status (當前服務的狀態)等,狀態資訊預設使用Spring Boot Actuator獲取。 - 服務下線:地址為:
http://localhost:8761/eureka/apps/{app-name}/{instance-info-id}
,http方法為delete。
Eureka Server原始碼解析
Eureka Server作為一個開箱即用的服務註冊中心,需要注意的是,Eureka Server同時也是一個Eureka Client,它會向它配置檔案中的其他Eureka Server進行拉取登入檔、服務註冊和傳送心跳等操作。
還是按照上面原始碼的分析步驟,將com.netflix包調為DEBUG級別,可以在日誌中看到如下:
2019-11-25 11:14:23.816 INFO 237032 --- [ main] c.n.e.registry.AbstractInstanceRegistry : Finished initializing remote region registries. All known remote regions: []
2019-11-25 11:14:23.817 INFO 237032 --- [ main] c.n.eureka.DefaultEurekaServerContext : Initialized
2019-11-25 11:14:23.846 INFO 237032 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator'
2019-11-25 11:14:23.866 INFO 237032 --- [ main] s.b.a.e.w.s.WebMvcEndpointHandlerMapping : Mapped "{[/actuator/health],methods=[GET],produces=[application/vnd.spring-boot.actuator.v2+json || application/json]}" onto public java.lang.Object org.springframework.boot.actuate.endpoint.web.servlet.AbstractWebMvcEndpointHandlerMapping$OperationHandler.handle(javax.servlet.http.HttpServletRequest,java.util.Map<java.lang.String,java.lang.String>)
複製程式碼
檢視方法AbstractInstanceRegistry#register,該方法是提供服務註冊功能的基礎。原始碼如下:
public void register(InstanceInfo registrant,int leaseDuration,boolean isReplication) {
try {
read.lock();
// 根據appName對服務例項叢集進行分類
Map<String,Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String,Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String,Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(),gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 根據例項id獲取例項的租約
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it,if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={},provided={}",existingLastDirtyTimestamp,registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal,we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}",registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
// 如果租約不存在,這是一個新的註冊例項
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it,reduce the threshold
// (1
// for 30 seconds,2 for a minute)
// 自我保護機制
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 建立新的租約
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant,leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(),lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long,String>(
System.currentTimeMillis(),registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides",registrant.getOverriddenStatus(),registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it",registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(),registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map",overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant,existingLease,isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status,set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(),registrant.getVIPAddress(),registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",registrant.getAppName(),registrant.getId(),registrant.getStatus(),isReplication);
} finally {
read.unlock();
}
}
複製程式碼
為了避免本文篇幅太長,建議想了解這部分的原理請看朱榮鑫老師的書《Spring Cloud微服務架構進階》
這裡總結下Eureka Server啟動的整個過程:
- 提供服務註冊
- 接收服務心跳
- 服務剔除
- 服務下線
- 叢集同步:Eureka Server啟動時,從它的peer節點拉取登入檔,每個Eureka Server對本地登入檔進行操作時,它將遍歷Eureka Server的peer節點,傳送同步請求。
- 提供獲取登入檔資訊。
以上,本文結束。