剖析nsq訊息佇列(二) 去中心化原始碼解析
在上一篇帖子剖析nsq訊息佇列(一) 簡介及去中心化實現原理中,我介紹了nsq的兩種使用方式,一種是直接連線,還有一種是通過nslookup來實現去中心化的方式使用,並大概說了一下實現原理,沒有什麼難理解的東西,這篇帖子我把nsq
實現去中心化的原始碼和其中的業物邏輯展示給大家看一下。
nsqd和nsqlookupd的通訊實現
上一篇中在啟動nsqd
時我用了以下命令,我指定了一個引數 --lookupd-tcp-address
./nsqd -tcp-address ":8000" -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a
複製程式碼
--lookupd-tcp-address
用於指定nsqlookupd
的tcp
監聽地址。
nsqd
和 nsqlookupd
的通訊交流簡單來說就是下圖這樣
nsqd
啟動後連線nsqlookupd
,連線成功後,要傳送一個魔法標識nsq.MagicV1
,這個標識有啥魔法麼,當然不是,他只是用於標明,客戶端和服務端雙方使用的資訊通訊版本,不能的版本有不同的處理方式,為了後期做新的訊息處理版本方便吧。
nsqlookupd
的程式碼塊
func (p *tcpServer) Handle(clientConn net.Conn) {
// ...
buf := make([]byte,4)
_,err := io.ReadFull(clientConn,buf)
// ...
protocolMagic := string(buf)
// ...
var prot protocol.Protocol
switch protocolMagic {
case " V1":
prot = &LookupProtocolV1{ctx: p.ctx}
default:
// ...
return
}
err = prot.IOLoop(clientConn)
//...
}
複製程式碼
這個時候的nsqd
已經和nsqlookupd
建立好了連線,但是這時,僅僅說明他倆連線成功。
nsqlookupd
也並沒有把這個連線加到可用的nsqd
列表裡。
建立連線完成後,nsqd
會傳送IDENTIFY
命令,這個命令裡包含了nsq的基本資訊
nsqd
的程式碼
ci := make(map[string]interface{})
ci["version" ] = version.Binary
ci["tcp_port"] = n.RealTCPAddr().Port
ci["http_port"] = n.RealHTTPAddr().Port
ci["hostname"] = hostname
ci["broadcast_address"] = n.getOpts().BroadcastAddress
cmd,err := nsq.Identify(ci)
if err != nil {
lp.Close()
return
}
resp,err := lp.Command(cmd)
複製程式碼
包含了nsqd
提供的tcp
和http
埠,主機名,版本等等,傳送給nsqlookupd
,nsqlookupd
收到IDENTIFY
命令後,解析資訊然後加到nsqd
的可用列表裡
nsqlookupd
的程式碼塊
func (p *LookupProtocolV1) IDENTIFY(client *ClientV1,reader *bufio.Reader,params []string) ([]byte,error) {
var err error
if client.peerInfo != nil {
return nil,protocol.NewFatalClientErr(err,"E_INVALID","cannot IDENTIFY again")
}
var bodyLen int32
err = binary.Read(reader,binary.BigEndian,&bodyLen)
// ...
body := make([]byte,bodyLen)
_,err = io.ReadFull(reader,body)
// ...
peerInfo := PeerInfo{id: client.RemoteAddr().String()}
err = json.Unmarshal(body,&peerInfo)
// ...
client.peerInfo = &peerInfo
// 把nsqd的連線加入到可用列表裡
if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client","",""},&Producer{peerInfo: client.peerInfo}) {
p.ctx.nsqlookupd.logf(LOG_INFO,"DB: client(%s) REGISTER category:%s key:%s subkey:%s",client,"client","")
}
// ...
return response,nil
}
複製程式碼
然後每過15秒,會傳送一個PING
心跳命令給nsqlookupd
,這樣保持存活狀態,nsqlookupd
每次收到發過來的PING
命令後,也會記下這個nsqd
的最後更新時間,這樣做為一個篩選條件,如果長時間沒有更新,就認為這個節點有問題,不會把這個節點的資訊加入到可用列表。
到此為止,一個nsqd
就把自己的資訊註冊到nsqlookupd
的可用列表了,我們可以啟動多個nsqd
和多個nsqlookupd
,為nsqd
指定多個nsqlookupd
,就如同我上一篇帖子寫的那樣
--lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200
複製程式碼
nsqd
和所有的nsqlookupd
建立連線,註冊服務資訊,並保持心跳,保證可用列表的更新.
nsqlookupd 掛掉的處理方式
上面我們說了nsqd
如果出現問題,nsqlookupd
的nsqd
可用列表裡就會處理掉這個連線資訊。如nsqlookupd
掛了怎麼辦呢
nsqd
會給所有的nsqlookup
傳送資訊,當nsqd
發現nsqlookupd
出現問題時,在每次傳送命令時,會不斷的進行重新連線:
func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte,error) {
initialState := lp.state
if lp.state != stateConnected {
err := lp.Connect()
if err != nil {
return nil,err
}
lp.state = stateConnected
_,err = lp.Write(nsq.MagicV1)
if err != nil {
lp.Close()
return nil,err
}
if initialState == stateDisconnected {
lp.connectCallback(lp)
}
if lp.state != stateConnected {
return nil,fmt.Errorf("lookupPeer connectCallback() failed")
}
}
// ...
}
複製程式碼
如果連線成功,會再次呼叫connectCallback
方法,進行IDENTIFY
命令的呼叫等。
客戶端和nsqlookupd、nsqd的通訊實現
上一篇帖子裡介紹了,客戶端如何連線nsqlookupd
來進行通訊
adds := []string{"127.0.0.1:7201","127.0.0.1:8201"}
config := nsq.NewConfig()
config.MaxInFlight = 1000
config.MaxBackoffDuration = 5 * time.Second
config.DialTimeout = 10 * time.Second
topicName := "testTopic1"
c,_ := nsq.NewConsumer(topicName,"ch1",config)
testHandler := &MyTestHandler{consumer: c}
c.AddHandler(testHandler)
if err := c.ConnectToNSQLookupds(adds); err != nil {
panic(err)
}
複製程式碼
需要注意adds
裡地址的埠,是nsqlookupd
的http
埠
這裡我還使用上一篇帖子中的圖,給大家詳細分析
c.ConnectToNSQLookupds(adds)
,他的實現是訪問nsqlookupd
的http埠http://127.0.0.1:7201/lookup?topic=testTopic1
得到提供consumer
訂閱的topic
所有的producers
節點資訊, url返回的資料資訊如下。
{
"channels": [
"nsq_to_file","ch1"
],"producers": [
{
"remote_address": "127.0.0.1:58606","hostname": "li-peng-mc-macbook.local","broadcast_address": "li-peng-mc-macbook.local","tcp_port": 8000,"http_port": 8001,"version": "1.1.1-alpha"
},{
"remote_address": "127.0.0.1:58627","tcp_port": 7000,"http_port": 7001,"version": "1.1.1-alpha"
}
]
}
複製程式碼
方法queryLookupd
就是進行的上圖的操作
- 得到提供訂閱的
topic
的nsqd
列表 - 進行連線
func (r *Consumer) queryLookupd() {
retries := 0
retry:
endpoint := r.nextLookupdEndpoint()
// ...
err := apiRequestNegotiateV1("GET",endpoint,nil,&data)
if err != nil {
// ...
}
var nsqdAddrs []string
for _,producer := range data.Producers {
broadcastAddress := producer.BroadcastAddress
port := producer.TCPPort
joined := net.JoinHostPort(broadcastAddress,strconv.Itoa(port))
nsqdAddrs = append(nsqdAddrs,joined)
}
// 進行連線
for _,addr := range nsqdAddrs {
err = r.ConnectToNSQD(addr)
if err != nil && err != ErrAlreadyConnected {
r.log(LogLevelError,"(%s) error connecting to nsqd - %s",addr,err)
continue
}
}
}
複製程式碼
如何重新整理nsqd的可用列表
有新的nsqd加入,是如何處理的呢?
在呼叫ConnectToNSQLookupd
時會啟動一個協程go r.lookupdLoop()
呼叫方法lookupdLoop
的定時迴圈訪問 queryLookupd
更新 nsqd
的可用列表
// poll all known lookup servers every LookupdPollInterval
func (r *Consumer) lookupdLoop() {
// ...
var ticker *time.Ticker
select {
case <-time.After(jitter):
case <-r.exitChan:
goto exit
}
// 設定Interval 來迴圈訪問 queryLookupd
ticker = time.NewTicker(r.config.LookupdPollInterval)
for {
select {
case <-ticker.C:
r.queryLookupd()
case <-r.lookupdRecheckChan:
r.queryLookupd()
case <-r.exitChan:
goto exit
}
}
exit:
// ...
}
複製程式碼
處理 nsqd 的單點故障
當有nsqd
出現故障時怎麼辦?當前的處理方式是
-
nsqdlookupd
會把這個故障節點從可用列表中去除,客戶端從介面得到的可用列表永遠都是可用的。 - 客戶端會把這個故障節點從可用節點上移除,然後要去判斷是否使用了
nsqlookup
進行了連線,如果是則case r.lookupdRecheckChan <- 1
去重新整理可用列表queryLookupd
,如果不是,然後啟動一個協程去定時做重試連線,如果故障恢復,連線成功,會重新加入到可用列表. 客戶端實現的程式碼
func (r *Consumer) onConnClose(c *Conn) {
// ...
// remove this connections RDY count from the consumer's total
delete(r.connections,c.String())
left := len(r.connections)
// ...
r.mtx.RLock()
numLookupd := len(r.lookupdHTTPAddrs)
reconnect := indexOf(c.String(),r.nsqdTCPAddrs) >= 0
// 如果使用的是nslookup則去重新整理可用列表
if numLookupd > 0 {
// trigger a poll of the lookupd
select {
case r.lookupdRecheckChan <- 1:
default:
}
} else if reconnect {
// ...
}(c.String())
}
}
複製程式碼