Hystrix請求合併與請求快取(二):請求合併
請求合併
前言
今日繼續摸魚Hystrix的請求合併部分,可能不如請求快取分析的詳細,但是我感覺足夠表達實現原理了。
本文選擇了較為簡單的請求合併的用例進行切入並分析,即CommandCollapserGetValueForKey
,而非ObservableCollapserGetWordForNumber
,原理都是一致的,只是ObservableCollapserGetWordForNumber
提供了更為豐富的介面,供業務實現。
請求合併的使用
從CommandCollapserGetValueForKey
例子看,只要做如下3件事,就能實現請求合併。
1、繼承HystrixCollapser<BatchReturnType,ResponseType,RequestArgumentType>。 2、重寫三個方法,分別為
getRequestArgument
、createCommand
、mapResponseToRequests
。 3、寫一個BatchCommand
,即請求合併後的一個HystrixCommand
。
接下來,可以從原始碼層面上看,如何通過這三步操作實現請求合併。
HystrixCollapser
- 需要注意
<BatchReturnType,RequestArgumentType>
泛型的含義,已經在程式碼塊中添加了相應註釋。
**
* 根據設定時間引數以及合併請求數,將多個HystrixCommand合併成一次的HystrixCommand,從而將短時間呼叫服務的次數減少。
* <p>
* 通常將時間視窗設為10 ms左右
*
* @param <BatchReturnType>
* 合併後的HystrixCommand的返回型別,例如String變成List<String>。
* @param <ResponseType>
* 需要合併的HystrixCommand的返回型別。
* @param <RequestArgumentType>
* 需要合併的HystrixCommand的請求引數型別。
*/
public abstract class HystrixCollapser <BatchReturnType,ResponseType,RequestArgumentType> implements HystrixExecutable<ResponseType>,HystrixObservable<ResponseType> {
複製程式碼
請求合併的過程
-
請求合併的過程,從例子可以看出,合併後的
BatchCommand
的引數為Collection<CollapsedRequest<String,Integer>> requests
,即請求合併的過程就是從單個請求的引數合併成Collection<CollapsedRequest<ResponseType,RequestArgumentType>>
。 -
因此,可以從
getRequestArgument
的呼叫入手,就找到了HystrixCollapser.toObservable
。
// 提交請求,直接返回結果了。。。
Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
/**
* Submit a request to a batch. If the batch maxSize is hit trigger the batch immediately.
* 和清楚了將時間視窗內的請求提交,如果到了設定的合併閾值,觸發一次合併請求
* @param arg argument to a {@link RequestCollapser}
* @return Observable<ResponseType>
* @throws IllegalStateException
* if submitting after shutdown
*/
public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {
/*
* 啟動計時器,時間視窗閾值到了,則觸發一次合併請求
*/
if (!timerListenerRegistered.get() && timerListenerRegistered.compareAndSet(false,true)) {
/* schedule the collapsing task to be executed every x milliseconds (x defined inside CollapsedTask) */
timerListenerReference.set(timer.addListener(new CollapsedTask()));
}
// loop until succeed (compare-and-set spin-loop)
// 等待-通知模型
while (true) {
// 拿到RequestBatch
final RequestBatch<BatchReturnType,RequestArgumentType> b = batch.get();
if (b == null) {
return Observable.error(new IllegalStateException("Submitting requests after collapser is shutdown"));
}
final Observable<ResponseType> response;
// 新增到RequestBatch
if (arg != null) {
response = b.offer(arg);
} else {
response = b.offer( (RequestArgumentType) NULL_SENTINEL);
}
// it will always get an Observable unless we hit the max batch size
// 新增成功,返回 Observable
if (response != null) {
return response;
} else {
// this batch can't accept requests so create a new one and set it if another thread doesn't beat us
// 新增失敗,執行 RequestBatch ,並建立新的 RequestBatch
createNewBatchAndExecutePreviousIfNeeded(b);
}
}
}
複製程式碼
-
offer
方法
public Observable<ResponseType> offer(RequestArgumentType arg) {
2: // 執行已經開始,新增失敗
3: /* short-cut - if the batch is started we reject the offer */
4: if (batchStarted.get()) {
5: return null;
6: }
7:
8: /*
9: * The 'read' just means non-exclusive even though we are writing.
10: */
11: if (batchLock.readLock().tryLock()) {
12: try {
13: // 執行已經開始,新增失敗
14: /* double-check now that we have the lock - if the batch is started we reject the offer */
15: if (batchStarted.get()) {
16: return null;
17: }
18:
19: // 超過佇列最大長度,新增失敗
20: if (argumentMap.size() >= maxBatchSize) {
21: return null;
22: } else {
23: // 建立 CollapsedRequestSubject ,並新增到佇列
24: CollapsedRequestSubject<ResponseType,RequestArgumentType> collapsedRequest = new CollapsedRequestSubject<ResponseType,RequestArgumentType>(arg,this);
25: final CollapsedRequestSubject<ResponseType,RequestArgumentType> existing = (CollapsedRequestSubject<ResponseType,RequestArgumentType>) argumentMap.putIfAbsent(arg,collapsedRequest);
26: /**
27: * If the argument already exists in the batch,then there are 2 options:
28: * A) If request caching is ON (the default): only keep 1 argument in the batch and let all responses
29: * be hooked up to that argument
30: * B) If request caching is OFF: return an error to all duplicate argument requests
31: *
32: * This maintains the invariant that each batch has no duplicate arguments. This prevents the impossible
33: * logic (in a user-provided mapResponseToRequests for HystrixCollapser and the internals of HystrixObservableCollapser)
34: * of trying to figure out which argument of a set of duplicates should get attached to a response.
35: *
36: * See https://github.com/Netflix/Hystrix/pull/1176 for further discussion.
37: */
38: if (existing != null) {
39: boolean requestCachingEnabled = properties.requestCacheEnabled().get();
40: if (requestCachingEnabled) {
41: return existing.toObservable();
42: } else {
43: return Observable.error(new IllegalArgumentException("Duplicate argument in collapser batch : [" + arg + "] This is not supported. Please turn request-caching on for HystrixCollapser:" + commandCollapser.getCollapserKey().name() + " or prevent duplicates from making it into the batch!"));
44: }
45: } else {
46: return collapsedRequest.toObservable();
47: }
48:
49: }
50: } finally {
51: batchLock.readLock().unlock();
52: }
53: } else {
54: return null;
55: }
56: }
複製程式碼
-
第 38 至 47 行 :返回
Observable
。當argumentMap
已經存在arg
對應的Observable
時,必須開啟快取 (HystrixCollapserProperties.requestCachingEnabled = true
) 功能。原因是,如果在相同的arg
,並且未開啟快取,同時第 43 行實現的是collapsedRequest.toObservable()
,那麼相同的arg
將有多個Observable
執行命令,此時HystrixCollapserBridge.mapResponseToRequests
方法無法將執行(Response
)賦值到arg
對應的命令請求(CollapsedRequestSubject
) ,見 github.com/Netflix/Hys… 。 -
回過頭看
HystrixCollapser#toObservable()
方法的程式碼,這裡也有對快取功能,是不是重複了呢?argumentMap
針對的是RequestBatch
級的快取,HystrixCollapser
:RequestCollapser
:RequestBatch
是 1 : 1 : N 的關係,通過HystrixCollapser#toObservable()
對快取的處理邏輯,保證RequestBatch
切換後,依然有快取。 -
CollapsedTask
負責觸發時間視窗內合併請求的處理,其實關鍵方法就是createNewBatchAndExecutePreviousIfNeeded
,並且也呼叫了executeBatchIfNotAlreadyStarted
。
/**
* Executed on each Timer interval execute the current batch if it has requests in it.
*/
private class CollapsedTask implements TimerListener {
...
@Override
public Void call() throws Exception {
try {
// we fetch current so that when multiple threads race
// we can do compareAndSet with the expected/new to ensure only one happens
// 拿到合併請求
RequestBatch<BatchReturnType,RequestArgumentType> currentBatch = batch.get();
// 1) it can be null if it got shutdown
// 2) we don't execute this batch if it has no requests and let it wait until next tick to be executed
// 處理合並請求
if (currentBatch != null && currentBatch.getSize() > 0) {
// do execution within context of wrapped Callable
createNewBatchAndExecutePreviousIfNeeded(currentBatch);
}
} catch (Throwable t) {
logger.error("Error occurred trying to execute the batch.",t);
t.printStackTrace();
// ignore error so we don't kill the Timer mainLoop and prevent further items from being scheduled
}
return null;
}
});
}
複製程式碼
-
executeBatchIfNotAlreadyStarted
中對請求進行了合併及執行!!!
1、呼叫
HystrixCollapserBridge.shardRequests
方法,將多個命令請求分片成 N 個【多個命令請求】。預設實現下,不進行分片。 2、迴圈 N 個【多個命令請求】。 3、呼叫HystrixCollapserBridge.createObservableCommand
方法,將多個命令請求合併,建立一個 HystrixCommand 。點選 連結 檢視程式碼。
...
// shard batches
Collection<Collection<CollapsedRequest<ResponseType,RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
// for each shard execute its requests
for (final Collection<CollapsedRequest<ResponseType,RequestArgumentType>> shardRequests : shards) {
try {
// create a new command to handle this batch of requests
Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);
commandCollapser.mapResponseToRequests(o,shardRequests).doOnError(new Action1<Throwable>() {
...
複製程式碼
- 最後執行就很簡單了,呼叫重寫的
mapResponseToRequests
的方法將一個HystrixCommand
的執行結果,映射回對應的命令請求們。
總結
- 簡單來講,就是講一定時間視窗內的請求,放入“佇列”,有一個定時觸發的任務,將“佇列”裡的請求(“佇列”裡的請求也有可能被分片,成為“請求佇列集合”,即Collection<Collection<>>),通過自己實現的
BatchCommand
執行,將最後結果再對映為合併前HystrixCommand
的結果返回。
PS:又見到了好多concurrentHashMap
,CAS,Atomic變數。。。
參考文獻
- github.com/Netflix/Hys…
- Hystrix 原始碼解析 —— 命令合併執行