Sentinel-Go 原始碼系列(三)滑動時間視窗演算法的工程實現
要說現在工程師最重要的能力,我覺得工程能力要排第一。
就算現在大廠面試經常要手撕演算法,也是更偏向考查程式碼工程實現的能力,之前在群裡看到這樣的圖片,就覺得很離譜。
演算法與工程實現
在 Sentinel-Go 中,一個很核心的演算法是流控(限流)演算法。
流控可能每個人都聽過,但真要手寫一個,還是有些困難。為什麼流控演算法難寫?以我的感覺是演算法和工程實現上存在一定差異,雖然演算法好理解,但卻沒法照著實現。
舉個例子,令牌桶演算法很好理解,只需給定一個桶,以恆定的速率往桶內放令牌,滿了則丟棄,執行任務前先去桶裡拿令牌,只有拿到令牌才可以執行,否則拒絕。
如果實現令牌桶,按道理應該用一個單獨執行緒(或程序)往桶裡放令牌,業務執行緒去桶裡取,但真要這麼實現,怎麼保證這個單獨執行緒能穩定執行,萬一掛了豈不是很危險?
所以工程實現上和演算法原本肯定存在一定的差異,這也是為什麼需要深入原始碼的一個原因。
滑動時間視窗的演進
通常來說,流控的度量是按每秒的請求數,也就是 QPS
QPS:query per second,指每秒查詢數,當然他的意義已經泛化了,不再特指查詢,可以泛指所有請求。如果非要區分,TPS 指每秒事務數,即寫入數,或 RPS,每秒請求數,本文不分這麼細,統計叫QPS。
當然也有按併發數來度量,併發數的流控就非常簡單
併發數流控
併發是一個瞬時概念,它跟時間沒有關係。和程序中的執行緒數、協程數一樣,每次取的時候只能拿到一個瞬間的快照,但可能很快就變化了。
併發數怎麼定義?可以近似認為進入業務程式碼開始就算一個併發,執行完這個併發就消失。
這樣說來,實現就非常簡單了,只需要定義一個全域性變數,責任鏈開始時對這個變數原子增1,並獲取當前併發數的一個快照,判斷併發數是否超限,如果超限則直接阻斷,執行完了別忘了原子減1即可,由於太過簡單,就不需要放程式碼了。
固定時間視窗
參考併發數流控,當需要度量 QPS 時,是否也可以利用這樣的思想呢?
由於 QPS 有時間的度量,第一直覺是和併發數一樣弄個變數,再起個單獨執行緒每隔 1s 重置這個變數。
但單獨執行緒始終不放心,需要稍微改一下。
如果系統有一個起始時間,每次請求時,獲取當前時間,兩者之差,就能算出當前處於哪個時間視窗,這個時間視窗單獨計數即可。
如果稍微思考下,你會發現問題不簡單,如下圖,10t 到20t 只有60個請求,20t到30t之間只有80個請求,但有可能16t到26t之間有110個請求,這就很有可能把系統打垮。
滑動時間視窗
為了解決上面的問題,工程師想出了一個好辦法:別固定時間視窗,以當前時間往前推算視窗
但問題又來了,這該怎麼實現呢?
滑動時間視窗工程實現
在工程實現上,可以將時間劃分為細小的取樣視窗,快取一段時間的取樣視窗,這樣每當請求來的時候,只需要往前拿一段時間的取樣視窗,然後求和就能拿到總的請求數。
Sentinel-Go 滑動時間視窗的實現
前方程式碼高能預警~
Sentinel-Go 是基於 LeapArray
實現的滑動視窗,其資料結構如下
type LeapArray struct {
bucketLengthInMs uint32 // bucket大小
sampleCount uint32 // bucket數量
intervalInMs uint32 // 視窗總大小
array *AtomicBucketWrapArray // bucket陣列
updateLock mutex // 更新鎖
}
type AtomicBucketWrapArray struct {
base unsafe.Pointer // 陣列的起始地址
length int // 長度,不能改變
data []*BucketWrap // 真正bucket的資料
}
type BucketWrap struct {
BucketStart uint64 // bucket起始時間
Value atomic.Value // bucket資料結構,例如 MetricBucket
}
type MetricBucket struct {
counter [base.MetricEventTotal]int64 // 計數陣列,可放不同型別
minRt int64 // 最小RT
maxConcurrency int32 // 最大併發數
}
再看下是如何寫入指標的,例如當流程正常通過時
// ①
sn.AddCount(base.MetricEventPass, int64(count))
// ②
func (bla *BucketLeapArray) AddCount(event base.MetricEvent, count int64) {
bla.addCountWithTime(util.CurrentTimeMillis(), event, count)
}
// ③
func (bla *BucketLeapArray) addCountWithTime(now uint64, event base.MetricEvent, count int64) {
b := bla.currentBucketWithTime(now)
if b == nil {
return
}
b.Add(event, count)
}
// ④
func (mb *MetricBucket) Add(event base.MetricEvent, count int64) {
if event >= base.MetricEventTotal || event < 0 {
logging.Error(errors.Errorf("Unknown metric event: %v", event), "")
return
}
if event == base.MetricEventRt {
mb.AddRt(count)
return
}
mb.addCount(event, count)
}
// ⑤
func (mb *MetricBucket) addCount(event base.MetricEvent, count int64) {
atomic.AddInt64(&mb.counter[event], count)
}
取到相應的 bucket,然後寫入相應 event 的 count,對 RT 會特殊處理,因為有一個最小 RT 需要處理。
重點看是如何取到相應的 bucket 的:
func (bla *BucketLeapArray) currentBucketWithTime(now uint64) *MetricBucket {
// ①根據當前時間取bucket
curBucket, err := bla.data.currentBucketOfTime(now, bla)
...
b, ok := mb.(*MetricBucket)
if !ok {
...
return nil
}
return b
}
func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
...
// ②計算index = (now / bucketLengthInMs) % LeapArray.array.length
idx := la.calculateTimeIdx(now)
// ③計算bucket開始時間 = now - (now % bucketLengthInMs)
bucketStart := calculateStartTime(now, la.bucketLengthInMs)
for {
old := la.array.get(idx)
if old == nil { // ④未使用,直接返回
newWrap := &BucketWrap{
BucketStart: bucketStart,
Value: atomic.Value{},
}
newWrap.Value.Store(bg.NewEmptyBucket())
if la.array.compareAndSet(idx, nil, newWrap) {
return newWrap, nil
} else {
runtime.Gosched()
}
} else if bucketStart == atomic.LoadUint64(&old.BucketStart) { // ⑤剛好取到是當前bucket,返回
return old, nil
} else if bucketStart > atomic.LoadUint64(&old.BucketStart) { // ⑥取到了舊的bucket,重置使用
if la.updateLock.TryLock() {
old = bg.ResetBucketTo(old, bucketStart)
la.updateLock.Unlock()
return old, nil
} else {
runtime.Gosched()
}
} else if bucketStart < atomic.LoadUint64(&old.BucketStart) { // ⑦取到了比當前還新的bucket,總共只有一個bucket時,併發情況可能會出現這種情況,其他情況不可能,直接報錯
if la.sampleCount == 1 {
return old, nil
}
return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart))
}
}
}
舉個直觀的例子,看如何拿到 bucket:
- 假設 B2 取出來是 nil,則 new 一個 bucket 通過 compareAndSet 寫入,保證執行緒安全,如果別別的執行緒先寫入,這裡會執行失敗,呼叫 runtime.Gosched(),讓出時間片,進入下一次迴圈
- 假設取出 B2 的開始時間是3400,與計算的相同,則直接使用
- 假設取出的 B2 的開始時間小於 3400,說明這個 bucket 太舊了,需要覆蓋,使用更新鎖來更新,保證執行緒安全,如果拿不到鎖,也讓出時間片,進入下一次迴圈
- 假設取出 B2 的開始時間大於3400,說明已經有其他執行緒更新了,而 bucketLengthInMs 通常遠遠大於鎖的獲取時間,所以這裡只考慮只有一個 bucket 的情況直接返回,其他情況報錯
回到 QPS 計算:
qps := stat.InboundNode().GetQPS(base.MetricEventPass)
該方法會先計算一個起始時間範圍
func (m *SlidingWindowMetric) getBucketStartRange(timeMs uint64) (start, end uint64) {
curBucketStartTime := calculateStartTime(timeMs, m.real.BucketLengthInMs())
end = curBucketStartTime
start = end - uint64(m.intervalInMs) + uint64(m.real.BucketLengthInMs())
return
}
例如當前時間為3500,則計算出
- end = 3400
- start = 3400 - 1200 + 200 = 2400
然後遍歷所有 bucket,把在這個範圍內的 bucket 都拿出來,計算 QPS,只需要相加即可。
最後
本節從滑動視窗流控演算法的工程實現演進到 Sentinel-Go 裡滑動視窗的實現,從 Sentinel-Go 的實現上看到,還得考慮記憶體的使用,併發控制等等,如果完全寫出來,還是非常不容易的。
《Sentinel-Go原始碼系列》已經寫了三篇,只介紹了兩個知識點:責任鏈模式、滑動視窗限流,後續還有物件池等,但這其實和 Sentinel-Go 關係不是很大,到時候單獨成文,就不放在本系列裡了。
本文算是一個結束,與其說是結束,不如說是一個開始。
搜尋關注微信公眾號"捉蟲大師",後端技術分享,架構設計、效能優化、原始碼閱讀、問題排查、踩坑實踐。