Hystrix請求合併與請求快取(一):請求快取
前言
國慶長假結束後,筆者一直在於假期綜合症纏鬥,特別是週六上班。。。
相信大家對Hystrix都很熟悉,它的原始碼大量使用RxJava,正好筆者的老本行是Android開發工程師,以前也略微接觸過,想分享下自己看完Hystix的請求合併與請求快取部分原始碼的一些收穫。
Hystrix簡介
- Hystrix由Netflix開源,官方定義如下:
Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems,services and 3rd party libraries,stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.
- 筆者理解:在分散式環境中,錯誤不可避免,Hystrix提供了一種隔離、降級、熔斷等機制。
1、隔離:通過隔離,避免服務之間相互影響,一個服務不可用,不會影響別的服務,避免了服務雪崩。 2、降級:分散式環境中,服務不可用的情況無法避免,降級機制可以給出更加友好的互動(預設值、異常返回)。 3、熔斷:熔斷機制可以避免在服務不可用時,服務呼叫方還在呼叫不可用的服務,導致資源消耗、耗時增加。 4、提供視覺化的監控,Hystrix Dashboard。 4、當然,還有筆者今天要講的請求合併與請求快取。
- 請求合併與請求快取,對應於官方給出的**What does it do?**的第3項:
Parallel execution. Concurrency aware request caching. Automated batching through request collapsing.
- 以下都是通過官方給的測試用例作為入口,查詢原始碼並進行分析。
1、請求快取:CommandUsingRequestCache 2、請求合併:CommandCollapserGetValueForKey
請求快取
- 請求快取的例子在
CommandUsingRequestCache
,繼承自HystrixCommand
,和一般的Command
一致。 - 那麼,使用快取和不使用快取程式碼層面有何不同呢?
1、初始化
HystrixRequestContext
2、重寫getCacheKey
HystrixRequestContext
-
HystrixRequestContext.initializeContext
程式碼在HystrixRequestContext
中,從類名可以看出這是個請求上下文,儲存一些請求的資訊。 -
從原始碼可以看出,new出一個
HystrixRequestContext
,塞入ThreadLocal
變數中。
private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<HystrixRequestContext>();
/**
* Call this at the beginning of each request (from parent thread)
* to initialize the underlying context so that {@link HystrixRequestVariableDefault} can be used on any children threads and be accessible from
* the parent thread.
* <p>
* <b>NOTE: If this method is called then <code>shutdown()</code> must also be called or a memory leak will occur.</b>
* <p>
* See class header JavaDoc for example Servlet Filter implementation that initializes and shuts down the context.
*/
public static HystrixRequestContext initializeContext() {
HystrixRequestContext state = new HystrixRequestContext();
requestVariables.set(state);
return state;
}
複製程式碼
- 那麼,
HystrixRequestContext
儲存上下文的資料結構是怎樣的呢?
// 每個HystrixRequestContext例項,都會有一個ConcurrentMap
ConcurrentHashMap<HystrixRequestVariableDefault<?>,HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<HystrixRequestVariableDefault<?>,HystrixRequestVariableDefault.LazyInitializer<?>>();
/**
刪除ConcurrentMap中儲存的所有鍵值對,如果初始化了HystrixRequestContext物件,沒有呼叫shutdown方法,確實會導致記憶體洩漏,因為state還在。
*/
public void shutdown() {
if (state != null) {
for (HystrixRequestVariableDefault<?> v : state.keySet()) {
// for each RequestVariable we call 'remove' which performs the shutdown logic
try {
HystrixRequestVariableDefault.remove(this,v);
} catch (Throwable t) {
HystrixRequestVariableDefault.logger.error("Error in shutdown,will continue with shutdown of other variables",t);
}
}
// null out so it can be garbage collected even if the containing object is still
// being held in ThreadLocals on threads that weren't cleaned up
state = null;
}
}
複製程式碼
- 這個
ConcurrentHashMap
裡存的HystrixRequestVariableDefault
及靜態內部類HystrixRequestVariableDefault.LazyInitializer
又是什麼呢?
HystrixRequestVariableDefault
-
HystrixRequestVariableDefault
其實就是儲存了泛型T
的value
,並且封裝了initialValue
、get
、set
方法。 -
LazyInitializer
顧名思義就是為了懶漢式初始化value
,而設計的內部類。
// 作用一:作為內部類呼叫HystrixRequestVariableDefault.initialValue方法,通過維護initialized布林值,使HystrixRequestVariableDefault.initialValue方法只呼叫一次。
// 作用二:new一個LazyInitializer物件或LazyInitializer被垃圾回收時不會呼叫HystrixRequestVariableDefault.initialValue方法,也就是說對於業務初始化邏輯的影響被排除。
// 作用三:呼叫get方法時,可以通過CAS樂觀鎖的方式實現value的獲取,具體請參照get方法。
static final class LazyInitializer<T> {
// @GuardedBy("synchronization on get() or construction")
private T value;
/*
* Boolean to ensure only-once initialValue() execution instead of using
* a null check in case initialValue() returns null
*/
// @GuardedBy("synchronization on get() or construction")
private boolean initialized = false;
private final HystrixRequestVariableDefault<T> rv;
// 不會呼叫HystrixRequestVariableDefault.initialValue,不會更新initialized值
private LazyInitializer(HystrixRequestVariableDefault<T> rv) {
this.rv = rv;
}
// 不會呼叫HystrixRequestVariableDefault.initialValue,只能通過set方式呼叫
private LazyInitializer(HystrixRequestVariableDefault<T> rv,T value) {
this.rv = rv;
this.value = value;
this.initialized = true;
}
// 如果未初始化(沒有呼叫過set方法)過,則返回HystrixRequestVariableDefault.initialValue的值,初始化過則返回初始化的值
public synchronized T get() {
if (!initialized) {
value = rv.initialValue();
initialized = true;
}
return value;
}
}
複製程式碼
- get方法,先從
ConcurrentHashMap
中取出對應的LazyInitializer
,如果為空則使用CAS樂觀鎖的方式,new一個LazyInitializer
並存入ConcurrentHashMap
,最後返回呼叫LazyInitializer.get()
並返回
public T get() {
// 當前執行緒的HystrixRequestContext為null 或 ConcurrentHashMap<HystrixRequestVariableDefault<?>,HystrixRequestVariableDefault.LazyInitializer<?>> 為null
if (HystrixRequestContext.getContextForCurrentThread() == null) {
throw new IllegalStateException(HystrixRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
}
ConcurrentHashMap<HystrixRequestVariableDefault<?>,LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;
// short-circuit the synchronized path below if we already have the value in the ConcurrentHashMap
LazyInitializer<?> v = variableMap.get(this);
if (v != null) {
return (T) v.get();
}
/*
* 樂觀鎖方式(CAS)new一個LazyInitializer,放進ConcurrentHashMap
* 這裡值得注意的是,不呼叫LazyInitializer.get方法是不會執行HystrixRequestVariableDefault.initialValue,故當putIfAbsent失敗時,可以樂觀地放棄該例項,使該例項被GC。
* 不管哪個LazyInitializer例項的get方法被呼叫,HystrixRequestVariableDefault.initialValue也只會被呼叫一次。
*/
LazyInitializer<T> l = new LazyInitializer<T>(this);
LazyInitializer<?> existing = variableMap.putIfAbsent(this,l);
if (existing == null) {
/*
* We won the thread-race so can use 'l' that we just created.
*/
return l.get();
} else {
/*
* We lost the thread-race so let 'l' be garbage collected and instead return 'existing'
*/
return (T) existing.get();
}
}
複製程式碼
各類之間的關係
- 一個request(不侷限於一個執行緒) -> HystrixRequestContext -> ConcurrentHashMap<HystrixRequestVariableDefault,HystrixRequestVariableDefault.LazyInitializer>
- 也就是說每個request都有一個ConcurrentHashMap<HystrixRequestVariableDefault,HystrixRequestVariableDefault.LazyInitializer> map。
獲取快取
-
getCacheKey
重寫了AbstractCommand.getCacheKey
方法,AbstractCommand
為HystrixCommand
的基類。 - 根據上圖,我們可以看出
execute
方法,最終呼叫toObservable
方法,而toObservable
方法在AbstractCommand
中,因此我們可以初步斷定在AbstractCommand.toObservable
方法中,會與HystrixRequestVariableDefault
或者其實現的介面產生關聯,進行快取的讀取和寫入。
*AbstractCommand.toObservable
的關鍵程式碼如下:
final String cacheKey = getCacheKey();
/* 如果開啟了快取功能,從快取讀取 */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache,_cmd);
}
}
// 快取物件
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// 放進快取
if (requestCacheEnabled && cacheKey != null) {
// 包裝成快取Observable物件
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable,_cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey,toCache);
複製程式碼
- 接下來,我們就只要尋找
HystrixRequestCache
與HystrixRequestVariableDefault
之間的關聯了,AbstractCommand
構造器中通過HystrixRequestCache.getInstance
構造了HystrixRequestCache
物件。
// 又是CAS,putIfAbsent。。。
private static HystrixRequestCache getInstance(RequestCacheKey rcKey,HystrixConcurrencyStrategy concurrencyStrategy) {
HystrixRequestCache c = caches.get(rcKey);
if (c == null) {
HystrixRequestCache newRequestCache = new HystrixRequestCache(rcKey,concurrencyStrategy);
HystrixRequestCache existing = caches.putIfAbsent(rcKey,newRequestCache);
if (existing == null) {
// we won so use the new one
c = newRequestCache;
} else {
// we lost so use the existing
c = existing;
}
}
return c;
}
複製程式碼
- 來看
HystrixRequestCache
的值是怎麼儲存的,看HystrixRequestCache.putIfAbsent
。
HystrixCachedObservable<T> putIfAbsent(String cacheKey,HystrixCachedObservable<T> f) {
// 使用HystrixRequestCache.prefix + concurrencyStrategy + HystrixCommand.getCacheKey包裝成快取key
ValueCacheKey key = getRequestCacheKey(cacheKey);
if (key != null) {
// 尋找快取,關鍵程式碼
ConcurrentHashMap<ValueCacheKey,HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy);
if (cacheInstance == null) {
throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?");
}
HystrixCachedObservable<T> alreadySet = (HystrixCachedObservable<T>) cacheInstance.putIfAbsent(key,f);
if (alreadySet != null) {
// someone beat us so we didn't cache this
return alreadySet;
}
}
// we either set it in the cache or do not have a cache key
return null;
}
複製程式碼
-
requestVariableInstance.get(key)
為HystrixRequestVariableHolder
中的方法。
// 找到了關聯。。。這裡有HystrixRequestVariable
private static ConcurrentHashMap<RVCacheKey,HystrixRequestVariable<?>> requestVariableInstance = new ConcurrentHashMap<RVCacheKey,HystrixRequestVariable<?>>();
//
public T get(HystrixConcurrencyStrategy concurrencyStrategy) {
RVCacheKey key = new RVCacheKey(this,concurrencyStrategy);
HystrixRequestVariable<?> rvInstance = requestVariableInstance.get(key);
if (rvInstance == null) {
requestVariableInstance.putIfAbsent(key,concurrencyStrategy.getRequestVariable(lifeCycleMethods));
/*
* 記憶體洩漏檢測,
*/
if (requestVariableInstance.size() > 100) {
logger.warn("Over 100 instances of HystrixRequestVariable are being stored. This is likely the sign of a memory leak caused by using unique instances of HystrixConcurrencyStrategy instead of a single instance.");
}
}
// HystrixRequestVariable.get取出ConcurrentHashMap<ValueCacheKey,HystrixCachedObservable<?>>的map,再從ConcurrentHashMap<ValueCacheKey,HystrixCachedObservable<?>>中根據重寫的getCacheKey構造出ValueCacheKey,拿出快取值。
return (T) requestVariableInstance.get(key).get();
}
複製程式碼
獲取快取過程中各個物件的對應關係
- 一個commandKey
- 一個HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey,HystrixCachedObservable<?>>>
- 一個ConcurrentHashMap<RVCacheKey,HystrixRequestVariable> requestVariableInstance = new ConcurrentHashMap>()
請求快取總結
最後,再總結下請求快取機制,一個request對應一個HystrixRequestContext
、HystrixRequestVariable
中儲存快取值,通過重寫getCacheKey
構造對應RVCacheKey
,通過HystrixRequestCache
的HystrixRequestVariableHolder
拿到HystrixRequestVariable
的值。
總結
看了原始碼才發現,作者有如下感受:
1、各種ConcurrentHashMap 2、終於RxJava第一次看到在非Android領域運用 3、懶載入+CAS伴隨整個流程,後續也會考慮這種非鎖實現