1. 程式人生 > >一萬字詳解 Redis Cluster Gossip 協議

一萬字詳解 Redis Cluster Gossip 協議

# Redis Cluster Gossip 協議 大家好,我是歷小冰,今天來講一下 Reids Cluster 的 Gossip 協議和叢集操作,文章的思維導圖如下所示。 ![](https://img2020.cnblogs.com/blog/1816118/202012/1816118-20201203211537560-1717349330.jpg) ### 叢集模式和 Gossip 簡介 **對於資料儲存領域,當資料量或者請求流量大到一定程度後,就必然會引入分散式**。比如 Redis,雖然其單機效能十分優秀,但是因為下列原因時,也不得不引入叢集。 - 單機無法保證高可用,需要引入多例項來提供高可用性 - 單機能夠提供高達 8W 左右的QPS,再高的QPS則需要引入多例項 - 單機能夠支援的資料量有限,處理更多的資料需要引入多例項; - 單機所處理的網路流量已經超過伺服器的網絡卡的上限值,需要引入多例項來分流。 有叢集,叢集往往需要維護一定的元資料,比如例項的ip地址,快取分片的 slots 資訊等,所以需要一套分散式機制來維護元資料的一致性。這類機制一般有兩個模式:分散式和集中式 分散式機制將元資料儲存在部分或者所有節點上,不同節點之間進行不斷的通訊來維護元資料的變更和一致性。Redis Cluster,Consul 等都是該模式。 ![](https://img2020.cnblogs.com/blog/1816118/202012/1816118-20201203211637679-585748500.png) 而集中式是將叢集元資料集中儲存在外部節點或者中介軟體上,比如 zookeeper。舊版本的 kafka 和 storm 等都是使用該模式。 ![](https://img2020.cnblogs.com/blog/1816118/202012/1816118-20201203211649409-1852650618.png) 兩種模式各有優劣,具體如下表所示: | 模式 | 優點 | 缺點 | | ------ | ------------------------------------------------------------ | ------------------------------------------------------------ | | 集中式 | 資料更新及時,時效好,元資料的更新和讀取,時效性非常好,一旦元資料出現了變更,立即就更新到集中式的外部節點中,其他節點讀取的時候立即就可以感知到; | 較大資料更新壓力,更新壓力全部集中在外部節點,作為單點影響整個系統 | | 分散式 | 資料更新壓力分散,元資料的更新比較分散,不是集中某一個節點,更新請求比較分散,而且有不同節點處理,有一定的延時,降低了併發壓力 | 資料更新延遲,可能導致叢集的感知有一定的滯後 | 分散式的元資料模式有多種可選的演算法進行元資料的同步,比如說 Paxos、Raft 和 Gossip。Paxos 和 Raft 等都需要全部節點或者大多數節點(超過一半)正常執行,整個叢集才能穩定執行,而 Gossip 則不需要半數以上的節點執行。 Gossip 協議,顧名思義,就像流言蜚語一樣,利用一種隨機、帶有傳染性的方式,將資訊傳播到整個網路中,並在一定時間內,使得系統內的所有節點資料一致。對你來說,掌握這個協議不僅能很好地理解這種最常用的,實現最終一致性的演算法,也能在後續工作中得心應手地實現資料的最終一致性。 ![](https://img2020.cnblogs.com/blog/1816118/202012/1816118-20201203211703276-1849961168.png) Gossip 協議又稱 epidemic 協議(epidemic protocol),是基於流行病傳播方式的節點或者程序之間資訊交換的協議,在P2P網路和分散式系統中應用廣泛,它的方法論也特別簡單: > 在一個處於有界網路的叢集裡,如果每個節點都隨機與其他節點交換特定資訊,經過足夠長的時間後,叢集各個節點對該份資訊的認知終將收斂到一致。 這裡的“特定資訊”一般就是指叢集狀態、各節點的狀態以及其他元資料等。Gossip協議是完全符合 BASE 原則,可以用在任何要求最終一致性的領域,比如分散式儲存和註冊中心。另外,它可以很方便地實現彈性叢集,允許節點隨時上下線,提供快捷的失敗檢測和動態負載均衡等。 此外,Gossip 協議的最大的好處是,即使叢集節點的數量增加,每個節點的負載也不會增加很多,幾乎是恆定的。這就允許 Redis Cluster 或者 Consul 叢集管理的節點規模能橫向擴充套件到數千個。 ### Redis Cluster 的 Gossip 通訊機制 Redis Cluster 是在 3.0 版本引入叢集功能。為了讓讓叢集中的每個例項都知道其他所有例項的狀態資訊,Redis 叢集規定各個例項之間按照 Gossip 協議來通訊傳遞資訊。 ![](https://img2020.cnblogs.com/blog/1816118/202012/1816118-20201203211718408-1915805975.png) 上圖展示了主從架構的 Redis Cluster 示意圖,其中實線表示節點間的主從複製關係,而虛線表示各個節點之間的 Gossip 通訊。 Redis Cluster 中的每個節點都**維護一份自己視角下的當前整個叢集的狀態**,主要包括: 1. *當前叢集狀態* 2. *叢集中各節點所負責的 slots資訊,及其migrate狀態* 3. *叢集中各節點的master-slave狀態* 4. 叢集中各節點的存活狀態及懷疑Fail狀態 也就是說上面的資訊,就是叢集中Node相互八卦傳播流言蜚語的內容主題,而且比較全面,既有自己的更有別人的,這麼一來大家都相互傳,最終資訊就全面而且一致了。 Redis Cluster 的節點之間會相互發送多種訊息,較為重要的如下所示: - MEET:通過「cluster meet ip port」命令,已有叢集的節點會向新的節點發送邀請,加入現有叢集,然後新節點就會開始與其他節點進行通訊; - PING:節點按照配置的時間間隔向叢集中其他節點發送 ping 訊息,訊息中帶有自己的狀態,還有自己維護的叢集元資料,和部分其他節點的元資料; - PONG: 節點用於迴應 PING 和 MEET 的訊息,結構和 PING 訊息類似,也包含自己的狀態和其他資訊,也可以用於資訊廣播和更新; - FAIL: 節點 PING 不通某節點後,會向叢集所有節點廣播該節點掛掉的訊息。其他節點收到訊息後標記已下線。 Redis 的原始碼中 cluster.h 檔案定義了全部的訊息型別,程式碼為 redis 4.0版本。 ```c // 注意,PING 、 PONG 和 MEET 實際上是同一種訊息。 // PONG 是對 PING 的回覆,它的實際格式也為 PING 訊息, // 而 MEET 則是一種特殊的 PING 訊息,用於強制訊息的接收者將訊息的傳送者新增到叢集中(如果節點尚未在節點列表中的話) #define CLUSTERMSG_TYPE_PING 0 /* Ping 訊息 */ #define CLUSTERMSG_TYPE_PONG 1 /* Pong 用於回覆Ping */ #define CLUSTERMSG_TYPE_MEET 2 /* Meet 請求將某個節點新增到叢集中 */ #define CLUSTERMSG_TYPE_FAIL 3 /* Fail 將某個節點標記為 FAIL */ #define CLUSTERMSG_TYPE_PUBLISH 4 /* 通過釋出與訂閱功能廣播訊息 */ #define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* 請求進行故障轉移操作,要求訊息的接收者通過投票來支援訊息的傳送者 */ #define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* 訊息的接收者同意向訊息的傳送者投票 */ #define CLUSTERMSG_TYPE_UPDATE 7 /* slots 已經發生變化,訊息傳送者要求訊息接收者進行相應的更新 */ #define CLUSTERMSG_TYPE_MFSTART 8 /* 為了進行手動故障轉移,暫停各個客戶端 */ #define CLUSTERMSG_TYPE_COUNT 9 /* 訊息總數 */ ``` 通過上述這些訊息,叢集中的每一個例項都能獲得其它所有例項的狀態資訊。這樣一來,即使有新節點加入、節點故障、Slot 變更等事件發生,例項間也可以通過 PING、PONG 訊息的傳遞,完成叢集狀態在每個例項上的同步。下面,我們依次來看看幾種常見的場景。 #### 定時 PING/PONG 訊息 Redis Cluster 中的節點都會定時地向其他節點發送 PING 訊息,來交換各個節點狀態資訊,檢查各個節點狀態,包括線上狀態、疑似下線狀態 PFAIL 和已下線狀態 FAIL。 Redis 叢集的定時 PING/PONG 的工作原理可以概括成兩點: - 一是,每個例項之間會按照一定的頻率,從叢集中隨機挑選一些例項,把 PING 訊息傳送給挑選出來的例項,用來檢測這些例項是否線上,並交換彼此的狀態資訊。PING 訊息中封裝了傳送訊息的例項自身的狀態資訊、部分其它例項的狀態資訊,以及 Slot 對映表。 - 二是,一個例項在接收到 PING 訊息後,會給傳送 PING 訊息的例項,傳送一個 PONG 訊息。PONG 訊息包含的內容和 PING 訊息一樣。 下圖顯示了兩個例項間進行 PING、PONG 訊息傳遞的情況,其中例項一為傳送節點,例項二是接收節點 ![](https://img2020.cnblogs.com/blog/1816118/202012/1816118-20201203211735841-445908647.png) #### 新節點上線 Redis Cluster 加入新節點時,客戶端需要執行 CLUSTER MEET 命令,如下圖所示。 ![](https://img2020.cnblogs.com/blog/1816118/202012/1816118-20201203211748069-924817932.png) 節點一在執行 CLUSTER MEET 命令時會首先為新節點建立一個 clusterNode 資料,並將其新增到自己維護的 clusterState 的 nodes 字典中。有關 clusterState 和 clusterNode 關係,我們在最後一節會有詳盡的示意圖和原始碼來講解。 然後節點一會根據據 CLUSTER MEET 命令中的 IP 地址和埠號,向新節點發送一條 MEET 訊息。新節點接收到節點一發送的MEET訊息後,新節點也會為節點一建立一個 clusterNode 結構,並將該結構新增到自己維護的 clusterState 的 nodes 字典中。 接著,新節點向節點一返回一條PONG訊息。節點一接收到節點B返回的PONG訊息後,得知新節點已經成功的接收了自己傳送的MEET訊息。 最後,節點一還會向新節點發送一條 PING 訊息。新節點接收到該條 PING 訊息後,可以知道節點A已經成功的接收到了自己返回的P ONG訊息,從而完成了新節點接入的握手操作。 MEET 操作成功之後,節點一會通過稍早時講的定時 PING 機制將新節點的資訊傳送給叢集中的其他節點,讓其他節點也與新節點進行握手,最終,經過一段時間後,新節點會被叢集中的所有節點認識。 #### 節點疑似下線和真正下線 Redis Cluster 中的節點會定期檢查已經發送 PING 訊息的接收方節點是否在規定時間 ( cluster-node-timeout ) 內返回了 PONG 訊息,如果沒有則會將其標記為疑似下線狀態,也就是 PFAIL 狀態,如下圖所示。 ![](https://img2020.cnblogs.com/blog/1816118/202012/1816118-20201203211758449-335545896.png) 然後,節點一會通過 PING 訊息,將節點二處於疑似下線狀態的資訊傳遞給其他節點,例如節點三。節點三接收到節點一的 PING 訊息得知節點二進入 PFAIL 狀態後,會在自己維護的 clusterState 的 nodes 字典中找到節點二所對應的 clusterNode 結構,並將主節點一的下線報告新增到 clusterNode 結構的 fail_reports 連結串列中。 ![](https://img2020.cnblogs.com/blog/1816118/202012/1816118-20201203211809025-1326076756.png) 隨著時間的推移,如果節點十 (舉個例子) 也因為 PONG 超時而認為節點二疑似下線了,並且發現自己維護的節點二的 clusterNode 的 fail_reports 中有**半數以上的主節點數量的未過時的將節點二標記為 PFAIL 狀態報告日誌**,那麼節點十將會把節點二將被標記為已下線 FAIL 狀態,並且節點十會**立刻**向叢集其他節點廣播主節點二已經下線的 FAIL 訊息,所有收到 FAIL 訊息的節點都會立即將節點二狀態標記為已下線。如下圖所示。 ![](https://img2020.cnblogs.com/blog/1816118/202012/1816118-20201203211818172-1510332534.png) 需要注意的是,報告疑似下線記錄是由時效性的,如果超過 cluster-node-timeout *2 的時間,這個報告就會被忽略掉,讓節點二又恢復成正常狀態。 ### Redis Cluster 通訊原始碼實現 綜上,我們瞭解了 Redis Cluster 在定時 PING/PONG、新節點上線、節點疑似下線和真正下線等環節的原理和操作流程,下面我們來真正看一下 Redis 在這些環節的原始碼實現和具體操作。 #### 涉及的資料結構體 首先,我們先來講解一下其中涉及的資料結構,也就是上文提到的 ClusterNode 等結構。 **每個節點都會維護一個 clusterState 結構**,表示當前叢集的整體狀態,它的定義如下所示。 ```c typedef struct clusterState { clusterNode *myself; /* 當前節點的clusterNode資訊 */ .... dict *nodes; /* name到clusterNode的字典 */ .... clusterNode *slots[CLUSTER_SLOTS]; /* slot 和節點的對應關係*/ .... } clusterState; ``` 它有三個比較關鍵的欄位,具體示意圖如下所示: - myself 欄位,是一個 clusterNode 結構,用來記錄自己的狀態; - nodes 字典,記錄一個 name 到 clusterNode 結構的對映,以此來記錄其他節點的狀態; - slot 陣列,記錄slot 對應的節點 clusterNode結構。 ![](https://img2020.cnblogs.com/blog/1816118/202012/1816118-20201203211830690-305806993.png) clusterNode 結構**儲存了一個節點的當前狀態**,比如**節點的建立時間、節點的名字、節點 當前的配置紀元、節點的IP地址和埠號等等**。除此之外,clusterNode結構的 link 屬性是一個clusterLink結構,該結構儲存了連線節點所需的有關資訊**,比如**套接字描述符,輸入緩衝區和輸出緩衝區。clusterNode 還有一個 fail_report 的列表,用來記錄疑似下線報告。具體定義如下所示。 ```cpp typedef struct clusterNode { mstime_t ctime; /* 建立節點的時間 */ char name[CLUSTER_NAMELEN]; /* 節點的名字 */ int flags; /* 節點標識,標記節點角色或者狀態,比如主節點從節點或者線上和下線 */ uint64_t configEpoch; /* 當前節點已知的叢集統一epoch */ unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */ int numslots; /* Number of slots handled by this node */ int numslaves; /* Number of slave nodes, if this is a master */ struct clusterNode **slaves; /* pointers to slave nodes */ struct clusterNode *slaveof; /* pointer to the master node. Note that it may be NULL even if the node is a slave if we don't have the master node in our tables. */ mstime_t ping_sent; /* 當前節點最後一次向該節點發送 PING 訊息的時間 */ mstime_t pong_received; /* 當前節點最後一次收到該節點 PONG 訊息的時間 */ mstime_t fail_time; /* FAIL 標誌位被設定的時間 */ mstime_t voted_time; /* Last time we voted for a slave of this master */ mstime_t repl_offset_time; /* Unix time we received offset for this node */ mstime_t orphaned_time; /* Starting time of orphaned master condition */ long long repl_offset; /* 當前節點的repl便宜 */ char ip[NET_IP_STR_LEN]; /* 節點的IP 地址 */ int port; /* 埠 */ int cport; /* 通訊埠,一般是埠+1000 */ clusterLink *link; /* 和該節點的 tcp 連線 */ list *fail_reports; /* 下線記錄列表 */ } clusterNode; ``` clusterNodeFailReport 是記錄節點下線報告的結構體, node 是報告節點的資訊,而 time 則代表著報告時間。 ```c typedef struct clusterNodeFailReport { struct clusterNode *node; /* 報告當前節點已經下線的節點 */ mstime_t time; /* 報告時間 */ } clusterNodeFailReport; ``` #### 訊息結構體 瞭解了 Reids 節點維護的資料結構體後,我們再來看節點進行通訊的訊息結構體。 通訊訊息最外側的結構體為 clusterMsg,它包括了很多訊息記錄資訊,包括 RCmb 標誌位,訊息總長度,訊息協議版本,訊息型別;它還包括了傳送該訊息節點的記錄資訊,比如節點名稱,節點負責的slot資訊,節點ip和埠等;最後它包含了一個 clusterMsgData 來攜帶具體型別的訊息。 ```c typedef struct { char sig[4]; /* 標誌位,"RCmb" (Redis Cluster message bus). */ uint32_t totlen; /* 訊息總長度 */ uint16_t ver; /* 訊息協議版本 */ uint16_t port; /* 埠 */ uint16_t type; /* 訊息型別 */ uint16_t count; /* */ uint64_t currentEpoch; /* 表示本節點當前記錄的整個叢集的統一的epoch,用來決策選舉投票等,與configEpoch不同的是:configEpoch表示的是master節點的唯一標誌,currentEpoch是叢集的唯一標誌。 */ uint64_t configEpoch; /* 每個master節點都有一個唯一的configEpoch做標誌,如果和其他master節點衝突,會強制自增使本節點在叢集中唯一 */ uint64_t offset; /* 主從複製偏移相關資訊,主節點和從節點含義不同 */ char sender[CLUSTER_NAMELEN]; /* 傳送節點的名稱 */ unsigned char myslots[CLUSTER_SLOTS/8]; /* 本節點負責的slots資訊,16384/8個char陣列,一共為16384bit */ char slaveof[CLUSTER_NAMELEN]; /* master資訊,假如本節點是slave節點的話,協議帶有master資訊 */ char myip[NET_IP_STR_LEN]; /* IP */ char notused1[34]; /* 保留欄位 */ uint16_t cport; /* 叢集的通訊埠 */ uint16_t flags; /* 本節點當前的狀態,比如 CLUSTER_NODE_HANDSHAKE、CLUSTER_NODE_MEET */ unsigned char state; /* Cluster state from the POV of the sender */ unsigned char mflags[3]; /* 本條訊息的型別,目前只有兩類:CLUSTERMSG_FLAG0_PAUSED、CLUSTERMSG_FLAG0_FORCEACK */ union clusterMsgData data; } clusterMsg; ``` clusterMsgData 是一個 union 結構體,它可以為 PING,MEET,PONG 或者 FAIL 等訊息體。其中當訊息為 PING、MEET 和 PONG 型別時,ping 欄位是被賦值的,而是 FAIL 型別時,fail 欄位是被賦值的。 ```c // 注意這是 union 關鍵字 union clusterMsgData { /* PING, MEET 或者 PONG 訊息時,ping 欄位被賦值 */ struct { /* Array of N clusterMsgDataGossip structures */ clusterMsgDataGossip gossip[1]; } ping; /* FAIL 訊息時,fail 被賦值 */ struct { clusterMsgDataFail about; } fail; // .... 省略 publish 和 update 訊息的欄位 }; ``` clusterMsgDataGossip 是 PING、PONG 和 MEET 訊息的結構體,它會包括髮送訊息節點維護的其他節點資訊,也就是上文中 clusterState 中 nodes 欄位包含的資訊,具體程式碼如下所示,你也會發現二者的欄位是類似的。 ```c typedef struct { /* 節點的名字,預設是隨機的,MEET訊息傳送並得到回覆後,叢集會為該節點設定正式的名稱*/ char nodename[CLUSTER_NAMELEN]; uint32_t ping_sent; /* 傳送節點最後一次給接收節點發送 PING 訊息的時間戳,收到對應 PONG 回覆後會被賦值為0 */ uint32_t pong_received; /* 傳送節點最後一次收到接收節點發送 PONG 訊息的時間戳 */ char ip[NET_IP_STR_LEN]; /* IP address last time it was seen */ uint16_t port; /* IP*/ uint16_t cport; /* 埠*/ uint16_t flags; /* 標識*/ uint32_t notused1; /* 對齊字元*/ } clusterMsgDataGossip; typedef struct { char nodename[CLUSTER_NAMELEN]; /* 下線節點的名字 */ } clusterMsgDataFail; ``` 看完了節點維護的資料結構體和傳送的訊息結構體後,我們就來看看 Redis 的具體行為原始碼了。 #### 隨機週期性傳送PING訊息 Redis 的 clusterCron 函式會被定時呼叫,每被執行10次,就會準備向隨機的一個節點發送 PING 訊息。 它會先隨機的選出 5 個節點,然後從中選擇最久沒有與之通訊的節點,呼叫 clusterSendPing 函式傳送型別為 CLUSTERMSG_TYPE_PING 的訊息 ```cpp // cluster.c 檔案 // clusterCron() 每執行 10 次(至少間隔一秒鐘),就向一個隨機節點發送 gossip 資訊 if (!(iteration % 10)) { int j; /* 隨機 5 個節點,選出其中一個 */ for (j = 0; j < 5; j++) { de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de); /* 不要 PING 連線斷開的節點,也不要 PING 最近已經 PING 過的節點 */ if (this->link == NULL || this->ping_sent != 0) continue; if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE)) continue; /* 對比 pong_received 欄位,選出更長時間未收到其 PONG 訊息的節點(表示好久沒有接受到該節點的PONG訊息了) */ if (min_pong_node == NULL || min_pong > this->pong_received) { min_pong_node = this; min_pong = this->pong_received; } } /* 向最久沒有收到 PONG 回覆的節點發送 PING 命令 */ if (min_pong_node) { serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name); clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING); } } ``` clusterSendPing 函式的具體行為我們後續再瞭解,因為該函式在其他環節也會經常用到 #### 節點加入叢集 當節點執行 CLUSTER MEET 命令後,會在自身給新節點維護一個 clusterNode 結構體,該結構體的 link 也就是TCP連線欄位是 null,表示是新節點尚未建立連線。 clusterCron 函式中也會處理這些未建立連線的新節點,呼叫 createClusterLink 創立連線,然後呼叫 clusterSendPing 函式來發送 MEET 訊息 ```cpp /* cluster.c clusterCron 函式部分,為未建立連線的節點建立連線 */ if (node->link == NULL) { int fd; mstime_t old_ping_sent; clusterLink *link; /* 和該節點建立連線 */ fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->cport, NET_FIRST_BIND_ADDR); /* .... fd 為-1時的異常處理 */ /* 建立 link */ link = createClusterLink(node); link->fd = fd; node->link = link; aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link); /* 向新連線的節點發送 PING 命令,防止節點被識進入下線 */ /* 如果節點被標記為 MEET ,那麼傳送 MEET 命令,否則傳送 PING 命令 */ old_ping_sent = node->ping_sent; clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING); /* .... */ /* 如果當前節點(傳送者)沒能收到 MEET 資訊的回覆,那麼它將不再向目標節點發送命令。*/ /* 如果接收到回覆的話,那麼節點將不再處於 HANDSHAKE 狀態,並繼續向目標節點發送普通 PING 命令*/ node->flags &= ~CLUSTER_NODE_MEET; } ``` #### 防止節點假超時及狀態過期 防止節點假超時和標記疑似下線標記也是在 clusterCron 函式中,具體如下所示。它會檢查當前所有的 nodes 節點列表,如果發現某個節點與自己的最後一個 PONG 通訊時間超過了預定的閾值的一半時,為了防止節點是假超時,會主動釋放掉與之的 link 連線,然後會主動向它傳送一個 PING 訊息。 ```cpp /* cluster.c clusterCron 函式部分,遍歷節點來檢查 fail 的節點*/ while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); now = mstime(); /* Use an updated time at every iteration. */ mstime_t delay; /* 如果等到 PONG 到達的時間超過了 node timeout 一半的連線 */ /* 因為儘管節點依然正常,但連線可能已經出問題了 */ if (node->link && /* is connected */ now - node->link->ctime > server.cluster_node_timeout && /* 還未重連 */ node->ping_sent && /* 已經發過ping訊息 */ node->pong_received < node->ping_sent && /* 還在等待pong訊息 */ /* 等待pong訊息超過了 timeout/2 */ now - node->ping_sent > server.cluster_node_timeout/2) { /* 釋放連線,下次 clusterCron() 會自動重連 */ freeClusterLink(node->link); } /* 如果目前沒有在 PING 節點*/ /* 並且已經有 node timeout 一半的時間沒有從節點那裡收到 PONG 回覆 */ /* 那麼向節點發送一個 PING ,確保節點的資訊不會太舊,有可能一直沒有隨機中 */ if (node->link && node->ping_sent == 0 && (now - node->pong_received) > server.cluster_node_timeout/2) { clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); continue; } /* .... 處理failover和標記遺失下線 */ } ``` #### 處理failover和標記疑似下線 如果防止節點假超時處理後,節點依舊未收到目標節點的 PONG 訊息,並且時間已經超過了 cluster_node_timeout,那麼就將該節點標記為疑似下線狀態。 ```cpp /* 如果這是一個主節點,並且有一個從伺服器請求進行手動故障轉移,那麼向從伺服器傳送 PING*/ if (server.cluster->mf_end && nodeIsMaster(myself) && server.cluster->mf_slave == node && node->link) { clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); continue; } /* 後續程式碼只在節點發送了 PING 命令的情況下執行*/ if (node->ping_sent == 0) continue; /* 計算等待 PONG 回覆的時長 */ delay = now - node->ping_sent; /* 等待 PONG 回覆的時長超過了限制值,將目標節點標記為 PFAIL (疑似下線)*/ if (delay > server.cluster_node_timeout) { /* 超時了,標記為疑似下線 */ if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) { redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing", node->name); // 開啟疑似下線標記 node->flags |= REDIS_NODE_PFAIL; update_state = 1; } } ``` #### 實際傳送Gossip訊息 以下是前方多次呼叫過的clusterSendPing()方法的原始碼,程式碼中有詳細的註釋,大家可以自行閱讀。主要的操作就是將節點自身維護的 clusterState 轉換為對應的訊息結構體,。 ```cpp /* 向指定節點發送一條 MEET 、 PING 或者 PONG 訊息 */ void clusterSendPing(clusterLink *link, int type) { unsigned char *buf; clusterMsg *hdr; int gossipcount = 0; /* Number of gossip sections added so far. */ int wanted; /* Number of gossip sections we want to append if possible. */ int totlen; /* Total packet length. */ // freshnodes 是用於傳送 gossip 資訊的計數器 // 每次傳送一條資訊時,程式將 freshnodes 的值減一 // 當 freshnodes 的數值小於等於 0 時,程式停止傳送 gossip 資訊 // freshnodes 的數量是節點目前的 nodes 表中的節點數量減去 2 // 這裡的 2 指兩個節點,一個是 myself 節點(也即是傳送資訊的這個節點) // 另一個是接受 gossip 資訊的節點 int freshnodes = dictSize(server.cluster->nodes)-2; /* 計算要攜帶多少節點的資訊,最少3個,最多 1/10 叢集總節點數量*/ wanted = floor(dictSize(server.cluster->nodes)/10); if (wanted < 3) wanted = 3; if (wanted > freshnodes) wanted = freshnodes; /* .... 省略 totlen 的計算等*/ /* 如果傳送的資訊是 PING ,那麼更新最後一次傳送 PING 命令的時間戳 */ if (link->node && type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime(); /* 將當前節點的資訊(比如名字、地址、埠號、負責處理的槽)記錄到訊息裡面 */ clusterBuildMessageHdr(hdr,type); /* Populate the gossip fields */ int maxiterations = wanted*3; /* 每個節點有 freshnodes 次傳送 gossip 資訊的機會 每次向目標節點發送 2 個被選中節點的 gossip 資訊(gossipcount 計數) */ while(freshnodes > 0 && gossipcount < wanted && maxiterations--) { /* 從 nodes 字典中隨機選出一個節點(被選中節點) */ dictEntry *de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de); /* 以下節點不能作為被選中節點: * Myself:節點本身。 * PFAIL狀態的節點 * 處於 HANDSHAKE 狀態的節點。 * 帶有 NOADDR 標識的節點 * 因為不處理任何 Slot 而被斷開連線的節點 */ if (this == myself) continue; if (this->flags & CLUSTER_NODE_PFAIL) continue; if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) || (this->link == NULL && this->numslots == 0)) { freshnodes--; /* Tecnically not correct, but saves CPU. */ continue; } // 檢查被選中節點是否已經在 hdr->data.ping.gossip 數組裡面 // 如果是的話說明這個節點之前已經被選中了 // 不要再選中它(否則就會出現重複) if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue; /* 這個被選中節點有效,計數器減一 */ clusterSetGossipEntry(hdr,gossipcount,this); freshnodes--; gossipcount++; } /* .... 如果有 PFAIL 節點,最後新增 */ /* 計算資訊長度 */ totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += (sizeof(clusterMsgDataGossip)*gossipcount); /* 將被選中節點的數量(gossip 資訊中包含了多少個節點的資訊)記錄在 count 屬性裡面*/ hdr->count = htons(gossipcount); /* 將資訊的長度記錄到資訊裡面 */ hdr->totlen = htonl(totlen); /* 傳送網路請求 */ clusterSendMessage(link,buf,totlen); zfree(buf); } void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) { clusterMsgDataGossip *gossip; /* 指向 gossip 資訊結構 */ gossip = &(hdr->data.ping.gossip[i]); /* 將被選中節點的名字記錄到 gossip 資訊 */ memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN); /* 將被選中節點的 PING 命令傳送時間戳記錄到 gossip 資訊 */ gossip->ping_sent = htonl(n->ping_sent/1000); /* 將被選中節點的 PONG 命令回覆的時間戳記錄到 gossip 資訊 */ gossip->pong_received = htonl(n->pong_received/1000); /* 將被選中節點的 IP 記錄到 gossip 資訊 */ memcpy(gossip->ip,n->ip,sizeof(n->ip)); /* 將被選中節點的埠號記錄到 gossip 資訊 */ gossip->port = htons(n->port); gossip->cport = htons(n->cport); /* 將被選中節點的標識值記錄到 gossip 資訊 */ gossip->flags = htons(n->flags); gossip->notused1 = 0; } ``` 下面是 clusterBuildMessageHdr 函式,它主要負責填充訊息結構體中的基礎資訊和當前節點的狀態資訊。 ```c /* 構建訊息的 header */ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { int totlen = 0; uint64_t offset; clusterNode *master; /* 如果當前節點是salve,則master為其主節點,如果當前節點是master節點,則master就是當前節點 */ master = (nodeIsSlave(myself) && myself->slaveof) ? myself->slaveof : myself; memset(hdr,0,sizeof(*hdr)); /* 初始化協議版本、標識、及型別, */ hdr->ver = htons(CLUSTER_PROTO_VER); hdr->sig[0] = 'R'; hdr->sig[1] = 'C'; hdr->sig[2] = 'm'; hdr->sig[3] = 'b'; hdr->type = htons(type); /* 訊息頭設定當前節點id */ memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN); /* 訊息頭設定當前節點ip */ memset(hdr->myip,0,NET_IP_STR_LEN); if (server.cluster_announce_ip) { strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN); hdr->myip[NET_IP_STR_LEN-1] = '\0'; } /* 基礎埠及叢集內節點通訊埠 */ int announced_port = server.cluster_announce_port ? server.cluster_announce_port : server.port; int announced_cport = server.cluster_announce_bus_port ? server.cluster_announce_bus_port : (server.port + CLUSTER_PORT_INCR); /* 設定當前節點的槽資訊 */ memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots)); memset(hdr->slaveof,0,CLUSTER_NAMELEN); if (myself->slaveof != NULL) memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN); hdr->port = htons(announced_port); hdr->cport = htons(announced_cport); hdr->flags = htons(myself->flags); hdr->state = server.cluster->state; /* 設定 currentEpoch and configEpochs. */ hdr->currentEpoch = htonu64(server.cluster->currentEpoch); hdr->configEpoch = htonu64(master->configEpoch); /* 設定複製偏移量 */ if (nodeIsSlave(myself)) offset = replicationGetSlaveOffset(); else offset = server.master_repl_offset; hdr->offset = htonu64(offset); /* Set the message flags. */ if (nodeIsMaster(myself) && server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED; /* 計算並設定訊息的總長度 */ if (type == CLUSTERMSG_TYPE_FAIL) { totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += sizeof(clusterMsgDataFail); } else if (type == CLUSTERMSG_TYPE_UPDATE) { totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += sizeof(clusterMsgDataUpdate); } hdr->totlen = htonl(totlen); } ``` ### 後記 本來只想寫一下 Redis Cluster 的 Gossip 協議,沒想到文章越寫,內容越多,最後原始碼分析也是有點虎頭蛇尾,大家就湊合看一下,也希望大家繼續關注我後續的問題。 [個人部落格,歡迎來玩](http://remcarpediem.net/article/933