1. 程式人生 > 程式設計 >Golang 實現Thrift客戶端連線池方式

Golang 實現Thrift客戶端連線池方式

1 前言

閱讀文章之前,請先了解一下thrift相關知識。thrift官方並沒有提供客戶端連線池的實現方案,而我們在實際使用時,thrift客戶端必須複用,來保證較為可觀的吞吐量,並避免在高QPS呼叫情況下,不斷的建立、釋放客戶端所帶來的機器埠耗盡問題。

本文會詳細講解如何實現一個簡單可靠的thrift客戶端連線池,並通過對照實驗來說明thrift客戶端連線池所帶來的好處。

由於篇幅的原因,本文只粘出關鍵程式碼,原始碼請檢視Thrift Client Pool Demo

1.1 執行環境

Golang版本: go1.14.3 darwin/amd64

Thrift Golang庫版本: 0.13.0

Thrift IDL編輯器版本: 0.13.0

1.2 .thrift檔案

namespace java com.czl.api.thrift.model
namespace cpp com.czl.api
namespace php com.czl.api
namespace py com.czl.api
namespace js com.czl.apixianz
namespace go com.czl.api
struct ApiRequest {
 1: required i16 id;
}
struct ApiResponse{
 1:required string name;
}
// service1
service ApiService1{
 ApiResponse query(1:ApiRequest request)
}
// service2
service ApiService2{
 ApiResponse query(1:ApiRequest request)
}

注:請通過安裝Thrift IDL編譯器,並生成客戶端、服務端程式碼。

1.3 對照實驗說明

通過指令碼開啟100個協程併發呼叫rpc服務10分鐘,統計這段時間內,未使用thrift客戶端連線池與使用客戶端連線池服務的平均吞吐量、Thrift API呼叫平均延遲、機器埠消耗等資料進行效能對比。

實驗一: 未使用thrift客戶端連線池

實驗二: 使用thrift客戶端連線池

2 Thrift客戶端連線池實現

2.1 連線池的功能

首先,我們要明確一下連線池的職責,這裡我簡單的總結一下,連線池主要功能是維護連線的建立、釋放,通過快取連線來複用連線,減少建立連線所帶來的開銷,提高系統的吞吐量,一般連線池還會有連線斷開的重連機制、超時機制等。這裡我們可以先定義出大部分連線池都會有的功能,只是定義,可以先不管每個功能的具體實現。每一個空閒Thrift客戶端其實底層都維護著一條空閒TCP連線,空閒Thrift客戶端與空閒連線在這裡其實是同一個概念。

......
// Thrift客戶端建立方法,留給業務去實現
type ThriftDial func(addr string,connTimeout time.Duration) (*IdleClient,error)
// 關閉Thrift客戶端,留給業務實現
type ThriftClientClose func(c *IdleClient) error
// Thrift客戶端連線池
type ThriftPool struct {
 // Thrift客戶端建立邏輯,業務自己實現
 Dial ThriftDial
 // Thrift客戶端關閉邏輯,業務自己實現
 Close ThriftClientClose
 // 空閒客戶端,用雙端佇列儲存
 idle list.List
 // 同步鎖,確保count、status、idle等公共資料併發操作安全
 lock *sync.Mutex
 // 記錄當前已經建立的Thrift客戶端,確保MaxConn配置
 count int32
 // Thrift客戶端連線池狀態,目前就open和stop兩種
 status uint32
 // Thrift客戶端連線池相關配置
 config *ThriftPoolConfig
}
// 連線池配置
type ThriftPoolConfig struct {
 // Thrfit Server端地址
 Addr string
 // 最大連線數
 MaxConn int32
 // 建立連線超時時間
 ConnTimeout time.Duration
 // 空閒客戶端超時時間,超時主動釋放連線,關閉客戶端
 IdleTimeout time.Duration
 // 獲取Thrift客戶端超時時間
 Timeout time.Duration
 // 獲取Thrift客戶端失敗重試間隔
 interval time.Duration
}
// Thrift客戶端
type IdleClient struct {
 // Thrift傳輸層,封裝了底層連線建立、維護、關閉、資料讀寫等細節
 Transport thrift.TTransport
 // 真正的Thrift客戶端,業務建立傳入
 RawClient interface{}
}
// 封裝了Thrift客戶端
type idleConn struct {
 // 空閒Thrift客戶端
 c *IdleClient
 // 最近一次放入空閒佇列的時間
 t time.Time
}
// 獲取Thrift空閒客戶端
func (p *ThriftPool) Get() (*IdleClient,error) {
 // 1. 從空閒池中獲取空閒客戶端,獲取到更新資料,返回,否則執行第2步
 // 2. 建立新到Thrift客戶端,更新資料,返回Thrift客戶端
 ......
}
// 歸還Thrift客戶端
func (p *ThriftPool) Put(client *IdleCLient) error {
 // 1. 如果客戶端已經斷開,更新資料,返回,否則執行第2步
 // 2. 將Thrift客戶端丟進空閒連線池,更新資料,返回
 ......
}
// 超時管理,定期釋放空閒太久的連線
func (p *ThriftPool) CheckTimeout() {
 // 掃描空閒連線池,將空閒太久的連線主動釋放掉,並更新資料
 ......
}
// 異常連線重連
func (p *ThriftPool) Reconnect(client *IdleClient) (newClient *IdleClient,err error) {
 // 1. 關閉舊客戶端
 // 2. 建立新的客戶端,並返回
 ......
}
// 其他方法
......

這裡有兩個關鍵的資料結構,ThriftPool和IdleClient,ThriftPool負責實現整個連線池的功能,IdleClient封裝了真正的Thrift客戶端。

先看一下ThriftPool的定義:

// Thrift客戶端建立方法,留給業務去實現
type ThriftDial func(addr string,error)
// 關閉Thrift客戶端,留給業務實現
type ThriftClientClose func(c *IdleClient) error
// Thrift客戶端連線池
type ThriftPool struct {
 // Thrift客戶端建立邏輯,業務自己實現
 Dial ThriftDial
 // Thrift客戶端關閉邏輯,業務自己實現
 Close ThriftClientClose
 // 空閒客戶端,用雙端佇列儲存
 idle list.List
 // 同步鎖,確保count、status、idle等公共資料併發操作安全
 lock *sync.Mutex
 // 記錄當前已經建立的Thrift客戶端,確保MaxConn配置
 count int32
 // Thrift客戶端連線池狀態,目前就open和stop兩種
 status uint32
 // Thrift客戶端連線池相關配置
 config *ThriftPoolConfig
}
// 連線池配置
type ThriftPoolConfig struct {
 // Thrfit Server端地址
 Addr string
 // 最大連線數
 MaxConn int32
 // 建立連線超時時間
 ConnTimeout time.Duration
 // 空閒客戶端超時時間,超時主動釋放連線,關閉客戶端
 IdleTimeout time.Duration
 // 獲取Thrift客戶端超時時間
 Timeout time.Duration
 // 獲取Thrift客戶端失敗重試間隔
 interval time.Duration
}

Thrift客戶端建立與關閉,涉及到業務細節,這裡抽離成Dial方法和Close方法。

連線池需要維護空閒客戶端,這裡用雙端佇列來儲存。

一般的連線池,都應該支援最大連線數配置,MaxConn可以配置連線池最大連線數,同時我們用count來記錄連線池當前已經建立的連線。

為了實現連線池的超時管理,當然也得有相關超時配置。

連線池的狀態、當前連線數等這些屬性,是多協程併發操作的,這裡用同步鎖lock來確保併發操作安全。

在看一下IdleClient實現:

// Thrift客戶端
type IdleClient struct {
 // Thrift傳輸層,封裝了底層連線建立、維護、關閉、資料讀寫等細節
 Transport thrift.TTransport
 // 真正的Thrift客戶端,業務建立傳入
 RawClient interface{}
}
// 封裝了Thrift客戶端
type idleConn struct {
 // 空閒Thrift客戶端
 c *IdleClient
 // 最近一次放入空閒佇列的時間
 t time.Time
}

RawClient是真正的Thrift客戶端,與實際邏輯相關。

Transport Thrift傳輸層,Thrift傳輸層,封裝了底層連線建立、維護、關閉、資料讀寫等細節。

idleConn封裝了IdleClient,用來實現空閒連線超時管理,idleConn記錄一個時間,這個時間是Thrift客戶端最近一次被放入空閒佇列的時間。

2.2 獲取連線

......
var nowFunc = time.Now
......
// 獲取Thrift空閒客戶端
func (p *ThriftPool) Get() (*IdleClient,error) {
 return p.get(nowFunc().Add(p.config.Timeout))
}
// 獲取連線的邏輯實現
// expire設定了一個超時時間點,當沒有可用連線時,程式會休眠一小段時間後重試
// 如果一直獲取不到連線,一旦到達超時時間點,則報ErrOverMax錯誤
func (p *ThriftPool) get(expire time.Time) (*IdleClient,error) {
 if atomic.LoadUint32(&p.status) == poolStop {
 return nil,ErrPoolClosed
 }
 // 判斷是否超額
 p.lock.Lock()
 if p.idle.Len() == 0 && atomic.LoadInt32(&p.count) >= p.config.MaxConn {
 p.lock.Unlock()
 // 不採用遞迴的方式來實現重試機制,防止棧溢位,這裡改用迴圈方式來實現重試
 for {
 // 休眠一段時間再重試
 time.Sleep(p.config.interval)
 // 超時退出
 if nowFunc().After(expire) {
 return nil,ErrOverMax
 }
 p.lock.Lock()
 if p.idle.Len() == 0 && atomic.LoadInt32(&p.count) >= p.config.MaxConn {
 p.lock.Unlock()
 } else { // 有可用連結,退出for迴圈
 break
 }
 }
 }
 if p.idle.Len() == 0 {
 // 先加1,防止首次建立連線時,TCP握手太久,導致p.count未能及時+1,而新的請求已經到來
 // 從而導致短暫性實際連線數大於p.count(大部分連結由於無法進入空閒連結佇列,而被關閉,處於TIME_WATI狀態)
 atomic.AddInt32(&p.count,1)
 p.lock.Unlock()
 client,err := p.Dial(p.config.Addr,p.config.ConnTimeout)
 if err != nil {
 atomic.AddInt32(&p.count,-1)
 return nil,err
 }
 // 檢查連線是否有效
 if !client.Check() {
 atomic.AddInt32(&p.count,ErrSocketDisconnect
 }
 return client,nil
 }
 // 從隊頭中獲取空閒連線
 ele := p.idle.Front()
 idlec := ele.Value.(*idleConn)
 p.idle.Remove(ele)
 p.lock.Unlock()
 // 連線從空閒佇列獲取,可能已經關閉了,這裡再重新檢查一遍
 if !idlec.c.Check() {
 atomic.AddInt32(&p.count,ErrSocketDisconnect
 }
 return idlec.c,nil
}

p.Get()的邏輯比較清晰:如果空閒佇列沒有連線,且當前連線已經到達p.config.MaxConn,就休眠等待重試;當滿足獲取連線條件時p.idle.Len() != 0 || atomic.LoadInt32(&p.count) < p.config.MaxConn,有空閒連線,則返回空閒連線,減少建立連線的開銷,沒有的話,再重新建立一條新的連線。

這裡有兩個關鍵的地方需要注意:

等待重試的邏輯,不要用遞迴的方式來實現,防止執行棧溢位。

// 遞迴的方法實現等待重試邏輯
func (p *ThriftPool) get(expire time.Time) (*IdleClient,error) {
 // 超時退出
 if nowFunc().After(expire) {
 return nil,ErrOverMax
 }
 if atomic.LoadUint32(&p.status) == poolStop {
 return nil,ErrPoolClosed
 }
 // 判斷是否超額
 p.lock.Lock()
 if p.idle.Len() == 0 && atomic.LoadInt32(&p.count) >= p.config.MaxConn {
 p.lock.Unlock()
 // 休眠遞迴重試
 time.Sleep(p.config.interval)
 p.get(expire)
 }
 .......
}

注意p.lock.Lock()的和p.lock.UnLock()呼叫時機,確保公共資料併發操作安全。

2.3 釋放連線

// 歸還Thrift客戶端
func (p *ThriftPool) Put(client *IdleClient) error {
 if client == nil {
 return nil
 }
 if atomic.LoadUint32(&p.status) == poolStop {
 err := p.Close(client)
 client = nil
 return err
 }
 if atomic.LoadInt32(&p.count) > p.config.MaxConn || !client.Check() {
 atomic.AddInt32(&p.count,-1)
 err := p.Close(client)
 client = nil
 return err
 }
 p.lock.Lock()
 p.idle.PushFront(&idleConn{
 c: client,t: nowFunc(),})
 p.lock.Unlock()
 return nil
}

p.Put()邏輯也比較簡單,如果連線已經失效,p.count需要-1,並進行連線關閉操作。否則丟到空閒佇列裡,這裡還是丟到隊頭,沒錯,還是丟到隊頭,p.Get()和p.Put()都是從隊頭操作,有點像堆操作,為啥這麼處理,等下面說到空閒連線超時管理就清楚了,這裡先記住丟回空閒佇列的時候,會更新空閒連線的時間。

2.4 超時管理

獲取連線超時管理p.Get()方法已經講過了,建立連線超時管理由p.Dial()去實現,下面說的是空閒連線的超時管理,空閒佇列的連線,如果一直沒有使用,超過一定時間,需要主動關閉掉,服務端的資源有限,不需要用的連線就主動關掉,而且連線放太久,服務端也會主動關掉。

// 超時管理,定期釋放空閒太久的連線
func (p *ThriftPool) CheckTimeout() {
 p.lock.Lock()
 for p.idle.Len() != 0 {
 ele := p.idle.Back()
 if ele == nil {
 break
 }
 v := ele.Value.(*idleConn)
 if v.t.Add(p.config.IdleTimeout).After(nowFunc()) {
 break
 }
 //timeout && clear
 p.idle.Remove(ele)
 p.lock.Unlock()
 p.Close(v.c) //close client connection
 atomic.AddInt32(&p.count,-1)
 p.lock.Lock()
 }
 p.lock.Unlock()
 return
}

清理超時空閒連線的時候,是從隊尾開始清理掉超時或者無效的連線,直到找到第一個可用的連線或者佇列為空。p.Get()和p.Put()都從隊頭操作佇列,保證了活躍的連線都在隊頭,如果一開始建立的連線太多,後面業務操作變少,不需要那麼多連線的時候,那多餘的連線就會沉到隊尾,被超時管理所清理掉。另外,這樣設計也可以優化操作的時間複雜度<O(n)。

2.5 重連機制

事實上,thrift的transport層並沒有提供一個檢查連線是否有效的方法,一開始實現連線池的時候,檢測方法是呼叫thrift.TTransport.IsOpen()來判斷

// 檢測連線是否有效
func (c *IdleClient) Check() bool {
 if c.Transport == nil || c.RawClient == nil {
 return false
 }
 return c.Transport.IsOpen()
}

可在測試階段發現當底層當TCP連線被異常斷開的時候(服務端重啟、服務端宕機等),c.Transport.IsOpen()並不能如期的返回false,如果我們檢視thrift的原始碼,可以發現,其實c.Transport.IsOpen()只和我們是否呼叫了c.Transport.Open()方法有關。為了能實現斷開重連機制,我們只能在使用階段發現異常連線時,重連連線。

這裡我在ThriftPool上封裝了一層代理ThriftPoolAgent,來實現斷開重連邏輯,具體請參考程式碼實現。

package pool
import (
 "fmt"
 "github.com/apache/thrift/lib/go/thrift"
 "log"
 "net"
)
type ThriftPoolAgent struct {
 pool *ThriftPool
}
func NewThriftPoolAgent() *ThriftPoolAgent {
 return &ThriftPoolAgent{}
}
func (a *ThriftPoolAgent) Init(pool *ThriftPool) {
 a.pool = pool
}
// 真正的業務邏輯放到do方法做,ThriftPoolAgent只要保證獲取到可用的Thrift客戶端,然後傳給do方法就行了
func (a *ThriftPoolAgent) Do(do func(rawClient interface{}) error) error {
 var (
 client *IdleClient
 err error
 )
 defer func() {
 if client != nil {
 if err == nil {
 if rErr := a.releaseClient(client); rErr != nil {
 log.Println(fmt.Sprintf("releaseClient error: %v",rErr))
 }
 } else if _,ok := err.(net.Error); ok {
 a.closeClient(client)
 } else if _,ok = err.(thrift.TTransportException); ok {
 a.closeClient(client)
 } else {
 if rErr := a.releaseClient(client); rErr != nil {
 log.Println(fmt.Sprintf("releaseClient error: %v",rErr))
 }
 }
 }
 }()
 // 從連線池裡獲取連結
 client,err = a.getClient()
 if err != nil {
 return err
 }
 if err = do(client.RawClient); err != nil {
 if _,ok := err.(net.Error); ok {
 log.Println(fmt.Sprintf("err: retry tcp,%T,%s",err,err.Error()))
 // 網路錯誤,重建連線
 client,err = a.reconnect(client)
 if err != nil {
 return err
 }
 return do(client.RawClient)
 }
 if _,ok := err.(thrift.TTransportException); ok {
 log.Println(fmt.Sprintf("err: retry tcp,err.Error()))
 // thrift傳輸層錯誤,也重建連線
 client,err = a.reconnect(client)
 if err != nil {
 return err
 }
 return do(client.RawClient)
 }
 return err
 }
 return nil
}
// 獲取連線
func (a *ThriftPoolAgent) getClient() (*IdleClient,error) {
 return a.pool.Get()
}
// 釋放連線
func (a *ThriftPoolAgent) releaseClient(client *IdleClient) error {
 return a.pool.Put(client)
}
// 關閉有問題的連線,並重新建立一個新的連線
func (a *ThriftPoolAgent) reconnect(client *IdleClient) (newClient *IdleClient,err error) {
 return a.pool.Reconnect(client)
}
// 關閉連線
func (a *ThriftPoolAgent) closeClient(client *IdleClient) {
 a.pool.CloseConn(client)
}
// 釋放連線池
func (a *ThriftPoolAgent) Release() {
 a.pool.Release()
}
func (a *ThriftPoolAgent) GetIdleCount() uint32 {
 return a.pool.GetIdleCount()
}
func (a *ThriftPoolAgent) GetConnCount() int32 {
 return a.pool.GetConnCount()
}

3 對照實驗

啟用100個協程,不斷呼叫Thrift服務端API 10分鐘,對比服務平均吞吐量、Thrift API呼叫平均延遲、機器埠消耗。

平均吞吐量(r/s) = 總成功數 / 600

API呼叫平均延遲(ms/r) = 總成功數 / API成功請求總耗時(微秒) / 1000

機器埠消耗計算:netstat -nt | grep 9444 -c

3.1 實驗一:未使用連線池

機器埠消耗

Golang 實現Thrift客戶端連線池方式

平均吞吐量、平均延遲

Golang 實現Thrift客戶端連線池方式

從結果看,API的平均延遲在77ms左右,但是服務的平均吞吐量才到360,比理論值1000 / 77 * 1000 = 1299少了很多,而且有96409次錯誤,報錯的主要原因是:connect can't assign request address,100個協程併發呼叫就已經消耗了1.6w個埠,如果併發數更高的場景,埠消耗的情況會更加嚴重,實際上,這1.6w條TCP連線,幾乎都是TIME_WAIT狀態,Thrfit客戶端用完就close掉,根據TCP三次握手可知主動斷開連線的一方最終將會處於TIME_WAIT狀態,並等待2MSL時間。

Golang 實現Thrift客戶端連線池方式

3.2 實驗二:使用連線池

機器埠消耗

Golang 實現Thrift客戶端連線池方式

平均吞吐量、平均延遲

Golang 實現Thrift客戶端連線池方式

可以看出,用了連線池後,平均吞吐量可達到1.8w,API呼叫平均延遲才0.5ms,你可能會問,理論吞吐量不是可以達到1000 / 0.5 * 100 = 20w?理論歸理論,如果按照1.8w吞吐量算,一次處理過程總時間消耗是1000 / (18000 / 100) = 5.6ms,所以這裡影響吞吐量的因素已經不是API呼叫的耗時了,1.8w的吞吐量其實已經挺不錯了。

另外,消耗的埠數也才194/2 = 97(除餘2是因為server端也在本地跑),而且都是ESTABLISH狀態,連線一直保持著,不斷的在被複用。連線被複用,少了建立TCP連線的三次握手環節,這裡也可以解釋為啥API呼叫的平均延遲可以從77ms降到0.5ms,不過0.5ms確實有點低,線上環境Server一般不會和Client在同一臺機器,而且業務邏輯也會比這裡複雜,API呼叫的平均延遲會相對高一點。

4 總結

呼叫Thrift API必須使用Thrift客戶端連線池,否則在高併發的情況下,會有大量的TCP連線處於TIME_WAIT狀態,機器埠被大量消耗,可能會導致部分請求失敗甚至服務不可用。每次請求都重新建立TCP連線,進行TCP三次握手環節,API呼叫的延遲會比較高,服務的吞吐量也不會很高。

使用Thrift客戶端連線池,可以提高系統的吞吐量,同時可以避免機器埠被耗盡的危險,提高服務的可靠性。

以上為個人經驗,希望能給大家一個參考,也希望大家多多支援我們。如有錯誤或未考慮完全的地方,望不吝賜教。