從原始碼看hystrix的工作原理
Hystrix是Netflix開源的一個限流熔斷的專案、主要有以下功能:
- 隔離(執行緒池隔離和訊號量隔離):限制呼叫分散式服務的資源使用,某一個呼叫的服務出現問題不會影響其他服務呼叫。
- 優雅的降級機制:超時降級、資源不足時(執行緒或訊號量)降級,降級後可以配合降級介面返回託底資料。
- 融斷:當失敗率達到閥值自動觸發降級(如因網路故障/超時造成的失敗率高),熔斷器觸發的快速失敗會進行快速恢復。
- 快取:提供了請求快取、請求合併實現。支援實時監控、報警、控制(修改配置)
下面是他的工作流程:
Hystrix主要有4種呼叫方式:
- toObservable() 方法 :未做訂閱,只是返回一個Observable 。
- observe() 方法 :呼叫 #toObservable() 方法,並向 Observable 註冊 rx.subjects.ReplaySubject 發起訂閱。
- queue() 方法 :呼叫 #toObservable() 方法的基礎上,呼叫:Observable#toBlocking() 和 BlockingObservable#toFuture() 返回 Future 物件
- execute() 方法 :呼叫 #queue() 方法的基礎上,呼叫 Future#get() 方法,同步返回 #run() 的執行結果。
主要的執行邏輯:
1.每次呼叫建立一個新的HystrixCommand,把依賴呼叫封裝在run()方法中.
2.執行execute()/queue做同步或非同步呼叫.
3.判斷熔斷器(circuit-breaker)是否開啟,如果開啟跳到步驟8,進行降級策略,如果關閉進入步驟.
4.判斷執行緒池/佇列/訊號量是否跑滿,如果跑滿進入降級步驟8,否則繼續後續步驟.
5.呼叫HystrixCommand的run方法.執行依賴邏輯
依賴邏輯呼叫超時,進入步驟8.
6.判斷邏輯是否呼叫成功。返回成功呼叫結果;調用出錯,進入步驟8.
7.計算熔斷器狀態,所有的執行狀態(成功, 失敗, 拒絕,超時)上報給熔斷器,用於統計從而判斷熔斷器狀態.
8.getFallback()降級邏輯。以下四種情況將觸發getFallback呼叫:
- run()方法丟擲非HystrixBadRequestException異常。
- run()方法呼叫超時
- 熔斷器開啟攔截呼叫
- 執行緒池/佇列/訊號量是否跑滿
- 沒有實現getFallback的Command將直接丟擲異常,fallback降級邏輯呼叫成功直接返回,降級邏輯呼叫失敗丟擲異常.
9.返回執行成功結果
下面結合原始碼看一下:
在execute中呼叫queue的get方法,同步獲取返回結果
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
在queue中是呼叫toObservable()方法
public Future<R> queue() {
final Future<R> delegate = toObservable().toBlocking().toFuture();
//省略一些無關的程式碼
};
在toObservale中先判斷是否在快取中有,如果有的話則從快取中取,沒有的話則進入applyHystrixSemantics
public Observable<R> toObservable() {
//省略....
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
commandStartTimestamp = System.currentTimeMillis();
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
//先從快取中獲取如果有的話直接返回
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
//裡面訂閱了,所以開始執行hystrixObservable
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
在applyHystrixSemantics中先判斷是否開啟熔斷器,如果開啟直接走失敗邏輯,接著嘗試獲取訊號量例項,如果獲取不到,則也失敗(hystrix的隔離模式有訊號量和執行緒池2種,這裡如果沒開啟訊號量模式,這裡的TryableSemaphore tryAcquire預設返回的都是true),接著在return 中進入executeCommandAndObserve
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
//判斷是否開啟熔斷器
if (circuitBreaker.attemptExecution()) {
//獲取訊號量例項
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
//singleSemaphoreRelease可能在doOnTerminate、doOnUnsubscribe中重複呼叫
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
//嘗試獲取訊號量
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
//拒絕邏輯
return handleSemaphoreRejectionViaFallback();
}
} else {
//直接走失敗的邏輯
return handleShortCircuitViaFallback();
}
}
在executeCommandAndObserve中進行一些超時,及失敗的邏輯處理之後,進入executeCommandWithSpecifiedIsolation
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
//省略...
//失敗處理邏輯
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
//判斷是否開啟超時設定
if (properties.executionTimeoutEnabled().get()) {
//list新增超時操作
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
在executeCommandWithSpecifiedIsolation,先判斷是否進行執行緒隔離,及一些狀態變化之後,進入getUserExecutionObservable
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
//執行緒隔離
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
//當觀察者訂閱時,才建立Observable,並且針對每個觀察者建立都是一個新的Observable。Observable是返回的
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
//狀態校驗
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
//統計標記命令
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
return Observable.error(new RuntimeException("timed out before executing run()"));
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.empty();
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don't think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
return Observable.error(ex);
}
}
});
}
}
在getUserExecutionObservable和getExecutionObservable中,主要是封裝使用者定義的run方法
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
//獲取使用者定義邏輯的observable
userObservable = getExecutionObservable();
} catch (Throwable ex) {
userObservable = Observable.error(ex);
}
return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
//包裝定義的run方法
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}