zookeeper原理之Leader選舉原始碼分析
阿新 • • 發佈:2020-08-18
Zookeeper 的一致性
Zookeeper 的來源
對於 zookeeper 的一致性問題,有很多朋友有疑問,我這邊再幫大家從來源層面梳理一遍一致性的問題。上篇,我們講到了 zookeeper 的來源,是來自於 google chubby。為了解決在分散式環境下,如何從多個 server 中選舉出 master server。那麼這多個 server 就需要涉及到一致性問題,這個一致性體現的是多個server 就 master 這個投票在分散式環境下達成一致性。簡單來說就是最終聽誰的。但是在網路環境中由於網路的不可靠性,會存在訊息丟失和或者被篡改等問題。所以如何在這樣一個環境中快速並且正確的在多個server 中對某一個數據達成一致性並且保證不論發生任何異常,都不會破壞整個系統一致性呢?所以在 Lamport 大神設計了一套 Paxos 的演算法,多個 server 基於這個演算法就可以達成一致。而 google chubby 就是基於 paxos 演算法的實現,用來實現分散式鎖服務。並且提供了 master 選舉的服務。
Paxos 在 Chubby 中的應用
很多朋友會有疑問,Chubby 和 paxos 演算法有什麼關係?Chubby 本來應該設計成一個包含 Paxos 演算法的協議庫,是的應用程式可以基於這個庫方便的使用 Paxos 演算法,但是它並沒有這麼做,而是把 Chubby 設計成了一個需要訪問中心化節點的分散式鎖服務。既然是一個服務,那麼它肯定需要是一個高可靠的服務。所以 Chubby 被構建為一個叢集,叢集中存在一箇中心節點(MASTER),採用 Paxos 協議,通過投票的方式來選舉一個獲得過半票數的伺服器作為 Master,在 chubby 叢集中,每個伺服器都會維護一份資料的副本,在實際的執行過程中, 只有 master 伺服器能執行事務操作,其他伺服器都是使用paxos協議從master節點同步最新的資料。而 zookeeper 是 chubby 的開源實現,所以實現原理和 chubby 基本是一致的。
Zookeeper 的一致性是什麼情況?
Zookeeper 的一致性,體現的是什麼一致呢?
根據前面講的 zab 協議的同步流程,在 zookeeper 叢集內部的資料副本同步,是基於過半提交的策略,意味著它是最終一致性。並不滿足強一致的要求。其實正確來說,zookeeper 是一個順序一致性模型。由於 zookeeper 設計出來是提供分散式鎖服務,那麼意味著它本身需要實現順序一致性( http://zookeeper.apache.org/doc/r3.5.5/zookeeperProgrammers.html#ch_zkGuarantees )順序一致性是在分散式環境中實現分散式鎖的基本要求,比如當一個多個程式來爭搶鎖,如果 clientA 獲得鎖以後,後續所有來爭搶鎖的程式看到的鎖的狀態都應該是被 clientA 鎖定了,而不是其他狀態。
什麼是順序一致性呢?
在講順序一致性之前,咱們思考一個問題,假如說 zookeeper 是一個最終一致性模型,那麼他會發生什麼情況ClientA/B/C 假設只序列執行, clientA 更新 zookeeper 上的一個值 x。ClientB 和 clientC 分別讀取叢集的不同副本,返回的 x 的值是不一樣的。clientC 的讀取操作是發生在 clientB 之後,但是卻讀到了過期的值。很明顯,這是一種弱一致模型。如果用它來實現鎖機制是有問題的。
順序一致性提供了更強的一致性保證,我們來觀察下面這個圖,從時間軸來看,B0 發生在 A0 之前,讀取的值是 0,B2 發生在 A0 之後,讀取到的x 的值為 1.而讀操作 B1/C0/C1 和寫操作 A0 在時間軸上有重疊,因此他們可能讀到舊的值為 0,也可能讀到新的值 1. 但是在強順序一致性模型中,如果 B1 得到的 x 的值為 1,那麼 C1 看到的值也一定是 1.
需要注意的是:由於網路的延遲以及系統本身執行請求的不確定性,會導致請求發起的早的客戶端不一定會在服務端執行得早。最終以服務端執行的結果為準。簡單來說:順序一致性是針對單個操作,單個數據物件。屬於 CAP 中 C這個範疇。一個數據被更新後,能夠立馬被後續的讀操作讀到。但是 zookeeper 的順序一致性實現是縮水版的,在下面這個網頁中,可以看到官網對於一致性這塊做了解釋http://zookeeper.apache.org/doc/r3.5.5/zookeeperProgrammers.html#ch_zkGuaranteeszookeeper 不保證在每個例項中,兩個不同的客戶端具有相同的zookeeper 資料檢視,由於網路延遲等因素,一個客戶端可能會在另外一個客戶端收到更改通知之前執行更新,考慮到 2 個客戶端 A 和 B 的場景,如果 A 把 znode /a 的值從 0 設定為1,然後告訴客戶端 B 讀取 /a, 則客戶端 B 可能會讀取到舊的值 0,具體取決於他連線到那個伺服器,如果客戶端 A 和 B 要讀取必須要讀取到相同的值,那麼 client B 在讀取操作之前執行 sync 方法。
除此之外,zookeeper 基於 zxid 以及阻塞佇列的方式來實現請求的順序一致性。如果一個 client 連線到一個最新的 follower 上,那麼它 read 讀取到了最新的資料,然後 client 由於網路原因重新連線到 zookeeper 節點,而這個時候連線到一個還沒有完成資料同步的 follower 節點,那麼這一次讀到的資料不就是舊的資料嗎?實際上 zookeeper 處理了這種情況,client 會記錄自己已經讀取到的最大的 zxid,如果 client 重連到 server 發現 client 的 zxid 比自己大。連線會失敗。
Single System Image 的理解
zookeeper 官網還說它保證了“Single System Image”,其解釋為“A clientwill see the same view of the service regardless of the server that itconnects to.”。實際上看來這個解釋還是有一點誤導性的。其實由上面 zxid的原理可以看出,它表達的意思是“client 只要連線過一次 zookeeper,就不會有歷史的倒退”。https://github.com/apache/zookeeper/pull/931
leader 選舉的原理
接下來再我們基於原始碼來分析 leader 選舉的整個實現過程。
leader 選舉存在與兩個階段中,一個是伺服器啟動時的 leader 選舉。 另一個是執行過程中 leader 節點宕機導致的 leader 選舉 ;在開始分析選舉的原理之前,先了解幾個重要的引數伺服器 ID(myid)
比如有三臺伺服器,編號分別是 1,2,3。
編號越大在選擇演算法中的權重越大。
zxid 事務 id值越大說明資料越新,在選舉演算法中的權重也越大。
邏輯時鐘(epoch – logicalclock)或者叫投票的次數,同一輪投票過程中的邏輯時鐘值是相同的。每投完一次票這個資料就會增加,然後與接收到的其它伺服器返回的投票資訊中的數值相比,根據不同的值做出不同的判斷。
選舉狀態
- LOOKING,競選狀態。
- FOLLOWING,隨從狀態,同步 leader 狀態,參與投票。
- OBSERVING,觀察狀態,同步 leader 狀態,不參與投票。LEADING,領導者狀態。
- LEADING,領導者狀態。
- 優先比較 epoch
- 其次檢查 ZXID。ZXID 比較大的伺服器優先作為 Leader
- 如果 ZXID 相同,那麼就比較 myid。myid 較大的伺服器作為Leader 伺服器。
protected void initializeAndRun(String[] args) throws ConfigException, IOException{ //這段程式碼比較簡單,設定配置引數,如果 args 不為空,可以基於外部的配置路徑來進行解析 QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // 這裡啟動了一個執行緒,來定時對日誌進行清理,從命名來看也很容易理解 DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); //如果是叢集模式,會呼叫 runFromConfig.servers 實際就是我們在 zoo.cfg 裡面配置的叢集節點 if (a config.servers.size() > 0) { runFromConfig(config); } else {//否則直接執行單機模式 LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); } }runFromConfig 從名字可以看出來,是基於配置檔案來進行啟動。 所以整個方法都是對引數進行解析和設定 ,因為這些引數暫時還沒用到,所以沒必要去看。直接看核心的程式碼quorumPeer.start(), 啟動一個執行緒,那麼從這句程式碼可以看出來QuorumPeer 實際是繼承了執行緒。那麼它裡面一定有一個 run 方法:
public void runFromConfig(QuorumPeerConfig config) throws IOException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); quorumPeer = getQuorumPeer(); quorumPeer.setQuorumPeers(config.getServers()); quorumPeer.setTxnFactory(new FileTxnSnapLog( new File(config.getDataLogDir()), new File(config.getDataDir()))); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); //投票決定方式,預設超過半數就通過 quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setQuorumVerifier(config.getQuorumVerifi er()); quorumPeer.setClientPortAddress(config.getClientPor tAddress()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); // sets quorum sasl authentication configurationsquorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if(quorumPeer.isQuorumSaslAuthEnabled()){ quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); quorumPeer.initialize(); //啟動主執行緒 quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } }QuorumPeer.start QuorumPeer.start 方法,重寫了 Thread 的 start。也就是線上程啟動之前,會做以下操作 1. 通過 loadDataBase 恢復快照資料 2. cnxnFactory.start() 啟動 zkServer,相當於使用者可以通過 2181 這個埠進行通訊了,這塊後續在講。我們還是以 leader 選舉為主線
@Override public synchronized void start() { loadDataBase(); cnxnFactory.start(); startLeaderElection(); super.start(); }startLeaderElection 看到這個方法,有沒有兩眼放光的感覺?沒錯,前面鋪墊了這麼長,終於進入 leader 選舉的方法了
synchronized public void startLeaderElection() { try { // 構建一個票據,用於投票 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } catch (IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } // 這個 getView 返回的就是在配置檔案中配置的server.myid=ip:port:port。view 在哪裡解析的呢? for (QuorumServer p : getView().values()) { if (p.id == myid) {// 獲得當前 zkserver myid 對應的 ip 地址 myQuorumAddr = p.addr; break; } } if (myQuorumAddr == null) { throw new RuntimeException("My id " + myid + " not in the peer list"); } // 根據 electionType 匹配對應的選舉演算法,electionType 預設值為 3.可以在配置檔案中動態配置 if (electionType == 0) { try { udpSocket = new DatagramSocket(myQuorumAddr.getPort()); responder = new ResponderThread(); responder.start(); } catch (SocketException e) { throw new RuntimeException(e); } } this.electionAlg = createElectionAlgorithm(electionType); }quorumPeer. createElectionAlgorithm 根據對應的標識建立選舉演算法
protected Election createElectionAlgorithm(int electionAlgorithm) { Election le = null; // TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: qcm = createCnxnManager(); QuorumCnxManager.Listener listener = qcm.listener; if (listener != null) { listener.start(); // 啟動監聽器,這個監聽具體做什麼的暫時不管,後面遇到需要了解的地方再回過頭來看 le = new FastLeaderElection(this, qcm);// 初始化 FastLeaderElection } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }FastLeaderElection 初始化FastLeaderElection,QuorumCnxManager 是一個很核心的物件,用來實現領導選舉中的網路連線管理功能,這個後面會用到
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){ this.stop = false; this.manager = manager; starter(self, manager); }FastLeaderElection. starter starter 方法裡面,設定了一些成員屬性,並且構建了兩個阻塞佇列,分別是 sendQueue 和 recvqueue。並且例項化了一個 Messager
private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; sendqueue = new LinkedBlockingQueue<ToSend>(); recvqueue = new LinkedBlockingQueue<Notification>(); this.messenger = new Messenger(manager); }Messenger 在 Messenger 裡面構建了兩個執行緒,一個是 WorkerSender,一個是WorkerReceiver。 這兩個執行緒是分別用來發送和接收訊息的執行緒。具體做什麼,暫時先不分析。
Messenger(QuorumCnxManager manager) { this.ws = new WorkerSender(manager); Thread t = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); t.setDaemon(true); t.start(); this.wr = new WorkerReceiver(manager); t = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); t.setDaemon(true); t.start(); }階段性總結 ok,分析到這裡,先做一個簡單的總結,通過一個流程圖把前面部分的功能串聯起來。
至此,先做如上總結~