1. 程式人生 > 其它 >Sentinel-Go 原始碼系列(三)滑動時間視窗演算法的工程實現

Sentinel-Go 原始碼系列(三)滑動時間視窗演算法的工程實現

要說現在工程師最重要的能力,我覺得工程能力要排第一。

就算現在大廠面試經常要手撕演算法,也是更偏向考查程式碼工程實現的能力,之前在群裡看到這樣的圖片,就覺得很離譜。

演算法與工程實現

在 Sentinel-Go 中,一個很核心的演算法是流控(限流)演算法。

流控可能每個人都聽過,但真要手寫一個,還是有些困難。為什麼流控演算法難寫?以我的感覺是演算法和工程實現上存在一定差異,雖然演算法好理解,但卻沒法照著實現。

舉個例子,令牌桶演算法很好理解,只需給定一個桶,以恆定的速率往桶內放令牌,滿了則丟棄,執行任務前先去桶裡拿令牌,只有拿到令牌才可以執行,否則拒絕。

如果實現令牌桶,按道理應該用一個單獨執行緒(或程序)往桶裡放令牌,業務執行緒去桶裡取,但真要這麼實現,怎麼保證這個單獨執行緒能穩定執行,萬一掛了豈不是很危險?

所以工程實現上和演算法原本肯定存在一定的差異,這也是為什麼需要深入原始碼的一個原因。

滑動時間視窗的演進

通常來說,流控的度量是按每秒的請求數,也就是 QPS

QPS:query per second,指每秒查詢數,當然他的意義已經泛化了,不再特指查詢,可以泛指所有請求。如果非要區分,TPS 指每秒事務數,即寫入數,或 RPS,每秒請求數,本文不分這麼細,統計叫QPS。

當然也有按併發數來度量,併發數的流控就非常簡單

併發數流控

併發是一個瞬時概念,它跟時間沒有關係。和程序中的執行緒數、協程數一樣,每次取的時候只能拿到一個瞬間的快照,但可能很快就變化了。

併發數怎麼定義?可以近似認為進入業務程式碼開始就算一個併發,執行完這個併發就消失。

這樣說來,實現就非常簡單了,只需要定義一個全域性變數,責任鏈開始時對這個變數原子增1,並獲取當前併發數的一個快照,判斷併發數是否超限,如果超限則直接阻斷,執行完了別忘了原子減1即可,由於太過簡單,就不需要放程式碼了。

固定時間視窗

參考併發數流控,當需要度量 QPS 時,是否也可以利用這樣的思想呢?

由於 QPS 有時間的度量,第一直覺是和併發數一樣弄個變數,再起個單獨執行緒每隔 1s 重置這個變數。

但單獨執行緒始終不放心,需要稍微改一下。

如果系統有一個起始時間,每次請求時,獲取當前時間,兩者之差,就能算出當前處於哪個時間視窗,這個時間視窗單獨計數即可。

如果稍微思考下,你會發現問題不簡單,如下圖,10t 到20t 只有60個請求,20t到30t之間只有80個請求,但有可能16t到26t之間有110個請求,這就很有可能把系統打垮。

滑動時間視窗

為了解決上面的問題,工程師想出了一個好辦法:別固定時間視窗,以當前時間往前推算視窗

但問題又來了,這該怎麼實現呢?

滑動時間視窗工程實現

在工程實現上,可以將時間劃分為細小的取樣視窗,快取一段時間的取樣視窗,這樣每當請求來的時候,只需要往前拿一段時間的取樣視窗,然後求和就能拿到總的請求數。

Sentinel-Go 滑動時間視窗的實現

前方程式碼高能預警~

Sentinel-Go 是基於 LeapArray 實現的滑動視窗,其資料結構如下

type LeapArray struct {
	bucketLengthInMs uint32 // bucket大小
	sampleCount      uint32 // bucket數量
	intervalInMs     uint32 // 視窗總大小
	array            *AtomicBucketWrapArray // bucket陣列
	updateLock mutex // 更新鎖
}

type AtomicBucketWrapArray struct {
	base unsafe.Pointer // 陣列的起始地址
	length int // 長度,不能改變
	data   []*BucketWrap // 真正bucket的資料
}

type BucketWrap struct {
	BucketStart uint64 // bucket起始時間
	Value atomic.Value // bucket資料結構,例如 MetricBucket
}

type MetricBucket struct {
	counter        [base.MetricEventTotal]int64 // 計數陣列,可放不同型別
	minRt          int64 // 最小RT
	maxConcurrency int32 // 最大併發數
}

再看下是如何寫入指標的,例如當流程正常通過時

// ①
sn.AddCount(base.MetricEventPass, int64(count))

// ②
func (bla *BucketLeapArray) AddCount(event base.MetricEvent, count int64) {
	bla.addCountWithTime(util.CurrentTimeMillis(), event, count)
}

// ③
func (bla *BucketLeapArray) addCountWithTime(now uint64, event base.MetricEvent, count int64) {
	b := bla.currentBucketWithTime(now)
	if b == nil {
		return
	}
	b.Add(event, count)
}

// ④
func (mb *MetricBucket) Add(event base.MetricEvent, count int64) {
	if event >= base.MetricEventTotal || event < 0 {
		logging.Error(errors.Errorf("Unknown metric event: %v", event), "")
		return
	}
	if event == base.MetricEventRt {
		mb.AddRt(count)
		return
	}
	mb.addCount(event, count)
}

// ⑤
func (mb *MetricBucket) addCount(event base.MetricEvent, count int64) {
	atomic.AddInt64(&mb.counter[event], count)
}

取到相應的 bucket,然後寫入相應 event 的 count,對 RT 會特殊處理,因為有一個最小 RT 需要處理。

重點看是如何取到相應的 bucket 的:

func (bla *BucketLeapArray) currentBucketWithTime(now uint64) *MetricBucket {
	// ①根據當前時間取bucket
	curBucket, err := bla.data.currentBucketOfTime(now, bla)
	...
	b, ok := mb.(*MetricBucket)
	if !ok {
		...
		return nil
	}
	return b
}

func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
	...
	// ②計算index = (now / bucketLengthInMs) % LeapArray.array.length
	idx := la.calculateTimeIdx(now)
	// ③計算bucket開始時間 = now - (now % bucketLengthInMs)
	bucketStart := calculateStartTime(now, la.bucketLengthInMs)

	for { 
		old := la.array.get(idx)
		if old == nil { // ④未使用,直接返回
			newWrap := &BucketWrap{
				BucketStart: bucketStart,
				Value:       atomic.Value{},
			}
			newWrap.Value.Store(bg.NewEmptyBucket())
			if la.array.compareAndSet(idx, nil, newWrap) {
				return newWrap, nil
			} else {
				runtime.Gosched()
			}
		} else if bucketStart == atomic.LoadUint64(&old.BucketStart) { // ⑤剛好取到是當前bucket,返回
			return old, nil
		} else if bucketStart > atomic.LoadUint64(&old.BucketStart) { // ⑥取到了舊的bucket,重置使用
			if la.updateLock.TryLock() {
				old = bg.ResetBucketTo(old, bucketStart)
				la.updateLock.Unlock()
				return old, nil
			} else {
				runtime.Gosched()
			}
		} else if bucketStart < atomic.LoadUint64(&old.BucketStart) { // ⑦取到了比當前還新的bucket,總共只有一個bucket時,併發情況可能會出現這種情況,其他情況不可能,直接報錯
			if la.sampleCount == 1 {
				return old, nil
			}
			
			return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart))
		}
	}
}

舉個直觀的例子,看如何拿到 bucket:

  • 假設 B2 取出來是 nil,則 new 一個 bucket 通過 compareAndSet 寫入,保證執行緒安全,如果別別的執行緒先寫入,這裡會執行失敗,呼叫 runtime.Gosched(),讓出時間片,進入下一次迴圈
  • 假設取出 B2 的開始時間是3400,與計算的相同,則直接使用
  • 假設取出的 B2 的開始時間小於 3400,說明這個 bucket 太舊了,需要覆蓋,使用更新鎖來更新,保證執行緒安全,如果拿不到鎖,也讓出時間片,進入下一次迴圈
  • 假設取出 B2 的開始時間大於3400,說明已經有其他執行緒更新了,而 bucketLengthInMs 通常遠遠大於鎖的獲取時間,所以這裡只考慮只有一個 bucket 的情況直接返回,其他情況報錯

回到 QPS 計算:

qps := stat.InboundNode().GetQPS(base.MetricEventPass)

該方法會先計算一個起始時間範圍

func (m *SlidingWindowMetric) getBucketStartRange(timeMs uint64) (start, end uint64) {
	curBucketStartTime := calculateStartTime(timeMs, m.real.BucketLengthInMs())
	end = curBucketStartTime
	start = end - uint64(m.intervalInMs) + uint64(m.real.BucketLengthInMs())
	return
}

例如當前時間為3500,則計算出

  • end = 3400
  • start = 3400 - 1200 + 200 = 2400

然後遍歷所有 bucket,把在這個範圍內的 bucket 都拿出來,計算 QPS,只需要相加即可。

最後

本節從滑動視窗流控演算法的工程實現演進到 Sentinel-Go 裡滑動視窗的實現,從 Sentinel-Go 的實現上看到,還得考慮記憶體的使用,併發控制等等,如果完全寫出來,還是非常不容易的。

《Sentinel-Go原始碼系列》已經寫了三篇,只介紹了兩個知識點:責任鏈模式、滑動視窗限流,後續還有物件池等,但這其實和 Sentinel-Go 關係不是很大,到時候單獨成文,就不放在本系列裡了。

本文算是一個結束,與其說是結束,不如說是一個開始。


搜尋關注微信公眾號"捉蟲大師",後端技術分享,架構設計、效能優化、原始碼閱讀、問題排查、踩坑實踐。