1. 程式人生 > 程式設計 >Hystrix請求合併與請求快取(一):請求快取

Hystrix請求合併與請求快取(一):請求快取

前言

國慶長假結束後,筆者一直在於假期綜合症纏鬥,特別是週六上班。。。

相信大家對Hystrix都很熟悉,它的原始碼大量使用RxJava,正好筆者的老本行是Android開發工程師,以前也略微接觸過,想分享下自己看完Hystix的請求合併與請求快取部分原始碼的一些收穫。

Hystrix簡介

  • Hystrix由Netflix開源,官方定義如下:

Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems,services and 3rd party libraries,stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.

  • 筆者理解:在分散式環境中,錯誤不可避免,Hystrix提供了一種隔離、降級、熔斷等機制。

1、隔離:通過隔離,避免服務之間相互影響,一個服務不可用,不會影響別的服務,避免了服務雪崩。 2、降級:分散式環境中,服務不可用的情況無法避免,降級機制可以給出更加友好的互動(預設值、異常返回)。 3、熔斷:熔斷機制可以避免在服務不可用時,服務呼叫方還在呼叫不可用的服務,導致資源消耗、耗時增加。 4、提供視覺化的監控,Hystrix Dashboard。 4、當然,還有筆者今天要講的請求合併與請求快取

  • 請求合併與請求快取,對應於官方給出的**What does it do?**的第3項:

Parallel execution. Concurrency aware request caching. Automated batching through request collapsing.

  • 以下都是通過官方給的測試用例作為入口,查詢原始碼並進行分析。

1、請求快取:CommandUsingRequestCache 2、請求合併:CommandCollapserGetValueForKey

請求快取

  • 請求快取的例子在CommandUsingRequestCache,繼承自HystrixCommand,和一般的Command一致。
  • 那麼,使用快取和不使用快取程式碼層面有何不同呢?

1、初始化HystrixRequestContext 2、重寫getCacheKey

HystrixRequestContext

  • HystrixRequestContext.initializeContext程式碼在HystrixRequestContext中,從類名可以看出這是個請求上下文,儲存一些請求的資訊。

  • 從原始碼可以看出,new出一個HystrixRequestContext,塞入ThreadLocal變數中。

	
    private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<HystrixRequestContext>();

/**
     * Call this at the beginning of each request (from parent thread)
     * to initialize the underlying context so that {@link HystrixRequestVariableDefault} can be used on any children threads and be accessible from
     * the parent thread.
     * <p>
     * <b>NOTE: If this method is called then <code>shutdown()</code> must also be called or a memory leak will occur.</b>
     * <p>
     * See class header JavaDoc for example Servlet Filter implementation that initializes and shuts down the context.
     */
    public static HystrixRequestContext initializeContext() {
        HystrixRequestContext state = new HystrixRequestContext();
        requestVariables.set(state);
        return state;
    }
複製程式碼
  • 那麼,HystrixRequestContext儲存上下文的資料結構是怎樣的呢?
// 每個HystrixRequestContext例項,都會有一個ConcurrentMap
ConcurrentHashMap<HystrixRequestVariableDefault<?>,HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<HystrixRequestVariableDefault<?>,HystrixRequestVariableDefault.LazyInitializer<?>>();

/**
     刪除ConcurrentMap中儲存的所有鍵值對,如果初始化了HystrixRequestContext物件,沒有呼叫shutdown方法,確實會導致記憶體洩漏,因為state還在。
     */
    public void shutdown() {
        if (state != null) {
            for (HystrixRequestVariableDefault<?> v : state.keySet()) {
                // for each RequestVariable we call 'remove' which performs the shutdown logic
                try {
                    HystrixRequestVariableDefault.remove(this,v);
                } catch (Throwable t) {
                    HystrixRequestVariableDefault.logger.error("Error in shutdown,will continue with shutdown of other variables",t);
                }
            }
            // null out so it can be garbage collected even if the containing object is still
            // being held in ThreadLocals on threads that weren't cleaned up
            state = null;
        }
    }
複製程式碼
  • 這個ConcurrentHashMap裡存的HystrixRequestVariableDefault及靜態內部類HystrixRequestVariableDefault.LazyInitializer又是什麼呢?

HystrixRequestVariableDefault

  • HystrixRequestVariableDefault其實就是儲存了泛型Tvalue,並且封裝了initialValuegetset方法。
  • LazyInitializer顧名思義就是為了懶漢式初始化value,而設計的內部類。
// 作用一:作為內部類呼叫HystrixRequestVariableDefault.initialValue方法,通過維護initialized布林值,使HystrixRequestVariableDefault.initialValue方法只呼叫一次。
// 作用二:new一個LazyInitializer物件或LazyInitializer被垃圾回收時不會呼叫HystrixRequestVariableDefault.initialValue方法,也就是說對於業務初始化邏輯的影響被排除。
// 作用三:呼叫get方法時,可以通過CAS樂觀鎖的方式實現value的獲取,具體請參照get方法。
static final class LazyInitializer<T> {
        // @GuardedBy("synchronization on get() or construction")
        private T value;

        /*
         * Boolean to ensure only-once initialValue() execution instead of using
         * a null check in case initialValue() returns null
         */
        // @GuardedBy("synchronization on get() or construction")
        private boolean initialized = false;

        private final HystrixRequestVariableDefault<T> rv;

		// 不會呼叫HystrixRequestVariableDefault.initialValue,不會更新initialized值
        private LazyInitializer(HystrixRequestVariableDefault<T> rv) {
            this.rv = rv;
        }

		// 不會呼叫HystrixRequestVariableDefault.initialValue,只能通過set方式呼叫
        private LazyInitializer(HystrixRequestVariableDefault<T> rv,T value) {
            this.rv = rv;
            this.value = value;
            this.initialized = true;
        }
		// 如果未初始化(沒有呼叫過set方法)過,則返回HystrixRequestVariableDefault.initialValue的值,初始化過則返回初始化的值
        public synchronized T get() {
            if (!initialized) {
                value = rv.initialValue();
                initialized = true;
            }
            return value;
        }
    }
複製程式碼
  • get方法,先從ConcurrentHashMap中取出對應的LazyInitializer,如果為空則使用CAS樂觀鎖的方式,new一個LazyInitializer並存入ConcurrentHashMap,最後返回呼叫LazyInitializer.get()並返回
public T get() {
		// 當前執行緒的HystrixRequestContext為null 或 ConcurrentHashMap<HystrixRequestVariableDefault<?>,HystrixRequestVariableDefault.LazyInitializer<?>> 為null
        if (HystrixRequestContext.getContextForCurrentThread() == null) {
            throw new IllegalStateException(HystrixRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
        }
        ConcurrentHashMap<HystrixRequestVariableDefault<?>,LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;

        // short-circuit the synchronized path below if we already have the value in the ConcurrentHashMap
        LazyInitializer<?> v = variableMap.get(this);
        if (v != null) {
            return (T) v.get();
        }

        /*
         * 樂觀鎖方式(CAS)new一個LazyInitializer,放進ConcurrentHashMap 
         * 這裡值得注意的是,不呼叫LazyInitializer.get方法是不會執行HystrixRequestVariableDefault.initialValue,故當putIfAbsent失敗時,可以樂觀地放棄該例項,使該例項被GC。
         * 不管哪個LazyInitializer例項的get方法被呼叫,HystrixRequestVariableDefault.initialValue也只會被呼叫一次。
         */
        LazyInitializer<T> l = new LazyInitializer<T>(this);
        LazyInitializer<?> existing = variableMap.putIfAbsent(this,l);
        if (existing == null) {
            /*
             * We won the thread-race so can use 'l' that we just created.
             */
            return l.get();
        } else {
            /*
             * We lost the thread-race so let 'l' be garbage collected and instead return 'existing'
             */
            return (T) existing.get();
        }
    }
複製程式碼

各類之間的關係

  • 一個request(不侷限於一個執行緒) -> HystrixRequestContext -> ConcurrentHashMap<HystrixRequestVariableDefault,HystrixRequestVariableDefault.LazyInitializer>
  • 也就是說每個request都有一個ConcurrentHashMap<HystrixRequestVariableDefault,HystrixRequestVariableDefault.LazyInitializer> map。

獲取快取

  • getCacheKey重寫了AbstractCommand.getCacheKey方法,AbstractCommandHystrixCommand的基類。
    enter image description here
  • 根據上圖,我們可以看出execute方法,最終呼叫toObservable方法,而toObservable方法在AbstractCommand中,因此我們可以初步斷定在AbstractCommand.toObservable方法中,會與HystrixRequestVariableDefault或者其實現的介面產生關聯,進行快取的讀取和寫入。

*AbstractCommand.toObservable的關鍵程式碼如下:

 final String cacheKey = getCacheKey();

                /* 如果開啟了快取功能,從快取讀取 */
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache,_cmd);
                    }
                }

				// 快取物件
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // 放進快取
                if (requestCacheEnabled && cacheKey != null) {
                    // 包裝成快取Observable物件
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable,_cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey,toCache);
複製程式碼
  • 接下來,我們就只要尋找HystrixRequestCacheHystrixRequestVariableDefault之間的關聯了,AbstractCommand構造器中通過HystrixRequestCache.getInstance構造了HystrixRequestCache物件。
// 又是CAS,putIfAbsent。。。
 private static HystrixRequestCache getInstance(RequestCacheKey rcKey,HystrixConcurrencyStrategy concurrencyStrategy) {
        HystrixRequestCache c = caches.get(rcKey);
        if (c == null) {
            HystrixRequestCache newRequestCache = new HystrixRequestCache(rcKey,concurrencyStrategy);
            HystrixRequestCache existing = caches.putIfAbsent(rcKey,newRequestCache);
            if (existing == null) {
                // we won so use the new one
                c = newRequestCache;
            } else {
                // we lost so use the existing
                c = existing;
            }
        }
        return c;
    }
複製程式碼
  • 來看HystrixRequestCache的值是怎麼儲存的,看HystrixRequestCache.putIfAbsent
HystrixCachedObservable<T> putIfAbsent(String cacheKey,HystrixCachedObservable<T> f) {
		// 使用HystrixRequestCache.prefix + concurrencyStrategy + HystrixCommand.getCacheKey包裝成快取key
        ValueCacheKey key = getRequestCacheKey(cacheKey);
        if (key != null) {
            // 尋找快取,關鍵程式碼
            ConcurrentHashMap<ValueCacheKey,HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy);
            if (cacheInstance == null) {
                throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?");
            }
            HystrixCachedObservable<T> alreadySet = (HystrixCachedObservable<T>) cacheInstance.putIfAbsent(key,f);
            if (alreadySet != null) {
                // someone beat us so we didn't cache this
                return alreadySet;
            }
        }
        // we either set it in the cache or do not have a cache key
        return null;
    }
複製程式碼
  • requestVariableInstance.get(key)HystrixRequestVariableHolder中的方法。
 // 找到了關聯。。。這裡有HystrixRequestVariable
 private static ConcurrentHashMap<RVCacheKey,HystrixRequestVariable<?>> requestVariableInstance = new ConcurrentHashMap<RVCacheKey,HystrixRequestVariable<?>>();
 // 
 public T get(HystrixConcurrencyStrategy concurrencyStrategy) {
        
        RVCacheKey key = new RVCacheKey(this,concurrencyStrategy);
        HystrixRequestVariable<?> rvInstance = requestVariableInstance.get(key);
        if (rvInstance == null) {
            requestVariableInstance.putIfAbsent(key,concurrencyStrategy.getRequestVariable(lifeCycleMethods));
            /*
             * 記憶體洩漏檢測,
             */
            if (requestVariableInstance.size() > 100) {
                logger.warn("Over 100 instances of HystrixRequestVariable are being stored. This is likely the sign of a memory leak caused by using unique instances of HystrixConcurrencyStrategy instead of a single instance.");
            }
        }
        // HystrixRequestVariable.get取出ConcurrentHashMap<ValueCacheKey,HystrixCachedObservable<?>>的map,再從ConcurrentHashMap<ValueCacheKey,HystrixCachedObservable<?>>中根據重寫的getCacheKey構造出ValueCacheKey,拿出快取值。
        return (T) requestVariableInstance.get(key).get();
    }
複製程式碼

獲取快取過程中各個物件的對應關係

  • 一個commandKey
  • 一個HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey,HystrixCachedObservable<?>>>
  • 一個ConcurrentHashMap<RVCacheKey,HystrixRequestVariable> requestVariableInstance = new ConcurrentHashMap>()

請求快取總結

最後,再總結下請求快取機制,一個request對應一個HystrixRequestContextHystrixRequestVariable中儲存快取值,通過重寫getCacheKey構造對應RVCacheKey,通過HystrixRequestCacheHystrixRequestVariableHolder拿到HystrixRequestVariable的值。

總結

看了原始碼才發現,作者有如下感受:

1、各種ConcurrentHashMap 2、終於RxJava第一次看到在非Android領域運用 3、懶載入+CAS伴隨整個流程,後續也會考慮這種非鎖實現

參考文獻