1. 程式人生 > 程式設計 >Hystrix請求合併與請求快取(二):請求合併

Hystrix請求合併與請求快取(二):請求合併

請求合併

前言

今日繼續摸魚Hystrix的請求合併部分,可能不如請求快取分析的詳細,但是我感覺足夠表達實現原理了。

本文選擇了較為簡單的請求合併的用例進行切入並分析,即CommandCollapserGetValueForKey,而非ObservableCollapserGetWordForNumber,原理都是一致的,只是ObservableCollapserGetWordForNumber提供了更為豐富的介面,供業務實現。

請求合併的使用

CommandCollapserGetValueForKey例子看,只要做如下3件事,就能實現請求合併。

1、繼承HystrixCollapser<BatchReturnType,ResponseType,RequestArgumentType>。 2、重寫三個方法,分別為getRequestArgument

createCommandmapResponseToRequests。 3、寫一個BatchCommand,即請求合併後的一個HystrixCommand

接下來,可以從原始碼層面上看,如何通過這三步操作實現請求合併。

HystrixCollapser

  • 需要注意<BatchReturnType,RequestArgumentType>泛型的含義,已經在程式碼塊中添加了相應註釋。
**
 * 根據設定時間引數以及合併請求數,將多個HystrixCommand合併成一次的HystrixCommand,從而將短時間呼叫服務的次數減少。
 * <p>
 * 通常將時間視窗設為10
ms左右 * * @param <BatchReturnType> * 合併後的HystrixCommand的返回型別,例如String變成List<String>。 * @param <ResponseType> * 需要合併的HystrixCommand的返回型別。 * @param <RequestArgumentType> * 需要合併的HystrixCommand的請求引數型別。 */ public abstract class HystrixCollapser
<BatchReturnType,ResponseType,RequestArgumentType> implements HystrixExecutable<ResponseType>,HystrixObservable<ResponseType>
{ 複製程式碼

請求合併的過程

  • 請求合併的過程,從例子可以看出,合併後的BatchCommand的引數為Collection<CollapsedRequest<String,Integer>> requests,即請求合併的過程就是從單個請求的引數合併成Collection<CollapsedRequest<ResponseType,RequestArgumentType>>

  • 因此,可以從getRequestArgument的呼叫入手,就找到了HystrixCollapser.toObservable

// 提交請求,直接返回結果了。。。
Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());

/**
     * Submit a request to a batch. If the batch maxSize is hit trigger the batch immediately.
     * 和清楚了將時間視窗內的請求提交,如果到了設定的合併閾值,觸發一次合併請求
     * @param arg argument to a {@link RequestCollapser} 
     * @return Observable<ResponseType>
     * @throws IllegalStateException
     *             if submitting after shutdown
     */
    public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {
        /*
         * 啟動計時器,時間視窗閾值到了,則觸發一次合併請求
         */
        if (!timerListenerRegistered.get() && timerListenerRegistered.compareAndSet(false,true)) {
            /* schedule the collapsing task to be executed every x milliseconds (x defined inside CollapsedTask) */
            timerListenerReference.set(timer.addListener(new CollapsedTask()));
        }

        // loop until succeed (compare-and-set spin-loop)
        // 等待-通知模型
        while (true) {
	        // 拿到RequestBatch
            final RequestBatch<BatchReturnType,RequestArgumentType> b = batch.get();
            if (b == null) {
                return Observable.error(new IllegalStateException("Submitting requests after collapser is shutdown"));
            }

            final Observable<ResponseType> response;
            // 新增到RequestBatch
            if (arg != null) {
                response = b.offer(arg);
            } else {
                response = b.offer( (RequestArgumentType) NULL_SENTINEL);
            }
            // it will always get an Observable unless we hit the max batch size
            // 新增成功,返回 Observable
            if (response != null) {
                return response;
            } else {
                // this batch can't accept requests so create a new one and set it if another thread doesn't beat us
                // 新增失敗,執行 RequestBatch ,並建立新的 RequestBatch
                createNewBatchAndExecutePreviousIfNeeded(b);
            }
        }
    }
複製程式碼
  • offer方法
 public Observable<ResponseType>  offer(RequestArgumentType arg) {
 2:     // 執行已經開始,新增失敗
 3:     /* short-cut - if the batch is started we reject the offer */
 4:     if (batchStarted.get()) {
 5:         return null;
 6:     }
 7: 
 8:     /*
 9:      * The 'read' just means non-exclusive even though we are writing.
10:      */
11:     if (batchLock.readLock().tryLock()) {
12:         try {
13:             // 執行已經開始,新增失敗
14:             /* double-check now that we have the lock - if the batch is started we reject the offer */
15:             if (batchStarted.get()) {
16:                 return null;
17:             }
18: 
19:             // 超過佇列最大長度,新增失敗
20:             if (argumentMap.size() >= maxBatchSize) {
21:                 return null;
22:             } else {
23:                 // 建立 CollapsedRequestSubject ,並新增到佇列
24:                 CollapsedRequestSubject<ResponseType,RequestArgumentType> collapsedRequest = new CollapsedRequestSubject<ResponseType,RequestArgumentType>(arg,this);
25:                 final CollapsedRequestSubject<ResponseType,RequestArgumentType> existing = (CollapsedRequestSubject<ResponseType,RequestArgumentType>) argumentMap.putIfAbsent(arg,collapsedRequest);
26:                 /**
27:                  * If the argument already exists in the batch,then there are 2 options:
28:                  * A) If request caching is ON (the default): only keep 1 argument in the batch and let all responses
29:                  * be hooked up to that argument
30:                  * B) If request caching is OFF: return an error to all duplicate argument requests
31:                  *
32:                  * This maintains the invariant that each batch has no duplicate arguments.  This prevents the impossible
33:                  * logic (in a user-provided mapResponseToRequests for HystrixCollapser and the internals of HystrixObservableCollapser)
34:                  * of trying to figure out which argument of a set of duplicates should get attached to a response.
35:                  *
36:                  * See https://github.com/Netflix/Hystrix/pull/1176 for further discussion.
37:                  */
38:                 if (existing != null) {
39:                     boolean requestCachingEnabled = properties.requestCacheEnabled().get();
40:                     if (requestCachingEnabled) {
41:                         return existing.toObservable();
42:                     } else {
43:                         return Observable.error(new IllegalArgumentException("Duplicate argument in collapser batch : [" + arg + "]  This is not supported.  Please turn request-caching on for HystrixCollapser:" + commandCollapser.getCollapserKey().name() + " or prevent duplicates from making it into the batch!"));
44:                     }
45:                 } else {
46:                     return collapsedRequest.toObservable();
47:                 }
48: 
49:             }
50:         } finally {
51:             batchLock.readLock().unlock();
52:         }
53:     } else {
54:         return null;
55:     }
56: }
複製程式碼
  • 第 38 至 47 行 :返回Observable。當argumentMap已經存在arg對應的Observable時,必須開啟快取 ( HystrixCollapserProperties.requestCachingEnabled = true ) 功能。原因是,如果在相同的arg,並且未開啟快取,同時第 43 行實現的是collapsedRequest.toObservable(),那麼相同的arg將有多個Observable執行命令,此時HystrixCollapserBridge.mapResponseToRequests 方法無法將執行(Response)賦值到arg對應的命令請求( CollapsedRequestSubject ) ,見 github.com/Netflix/Hys…

  • 回過頭看HystrixCollapser#toObservable()方法的程式碼,這裡也有對快取功能,是不是重複了呢?argumentMap 針對的是RequestBatch級的快取,HystrixCollapser : RequestCollapser : RequestBatch 是 1 : 1 : N 的關係,通過 HystrixCollapser#toObservable() 對快取的處理邏輯,保證 RequestBatch 切換後,依然有快取。

  • CollapsedTask負責觸發時間視窗內合併請求的處理,其實關鍵方法就是createNewBatchAndExecutePreviousIfNeeded,並且也呼叫了executeBatchIfNotAlreadyStarted

/**
     * Executed on each Timer interval execute the current batch if it has requests in it.
     */
    private class CollapsedTask implements TimerListener {
		        ...
                @Override
                public Void call() throws Exception {
                    try {
                        // we fetch current so that when multiple threads race
                        // we can do compareAndSet with the expected/new to ensure only one happens
                        // 拿到合併請求
                        RequestBatch<BatchReturnType,RequestArgumentType> currentBatch = batch.get();
                        // 1) it can be null if it got shutdown
                        // 2) we don't execute this batch if it has no requests and let it wait until next tick to be executed
                        // 處理合並請求
                        if (currentBatch != null && currentBatch.getSize() > 0) {
                            // do execution within context of wrapped Callable
                            createNewBatchAndExecutePreviousIfNeeded(currentBatch);
                        }
                    } catch (Throwable t) {
                        logger.error("Error occurred trying to execute the batch.",t);
                        t.printStackTrace();
                        // ignore error so we don't kill the Timer mainLoop and prevent further items from being scheduled
                    }
                    return null;
                }

            });
        }
複製程式碼
  • executeBatchIfNotAlreadyStarted中對請求進行了合併及執行!!!

1、呼叫HystrixCollapserBridge.shardRequests 方法,將多個命令請求分片成 N 個【多個命令請求】。預設實現下,不進行分片。 2、迴圈 N 個【多個命令請求】。 3、呼叫 HystrixCollapserBridge.createObservableCommand 方法,將多個命令請求合併,建立一個 HystrixCommand 。點選 連結 檢視程式碼。

		...
  // shard batches
                Collection<Collection<CollapsedRequest<ResponseType,RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
                // for each shard execute its requests 
                for (final Collection<CollapsedRequest<ResponseType,RequestArgumentType>> shardRequests : shards) {
                        try {
                        // create a new command to handle this batch of requests
                        Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);
           commandCollapser.mapResponseToRequests(o,shardRequests).doOnError(new Action1<Throwable>() {
           ...
複製程式碼
  • 最後執行就很簡單了,呼叫重寫的mapResponseToRequests的方法將一個HystrixCommand的執行結果,映射回對應的命令請求們。

總結

  • 簡單來講,就是講一定時間視窗內的請求,放入“佇列”,有一個定時觸發的任務,將“佇列”裡的請求(“佇列”裡的請求也有可能被分片,成為“請求佇列集合”,即Collection<Collection<>>),通過自己實現的BatchCommand執行,將最後結果再對映為合併前HystrixCommand的結果返回。

PS:又見到了好多concurrentHashMap,CAS,Atomic變數。。。

參考文獻