1. 程式人生 > 實用技巧 >zookeeper原理之Leader選舉原始碼分析

zookeeper原理之Leader選舉原始碼分析

Zookeeper 的一致性 Zookeeper 的來源 對於 zookeeper 的一致性問題,有很多朋友有疑問,我這邊再幫大家從來源層面梳理一遍一致性的問題。上篇,我們講到了 zookeeper 的來源,是來自於 google chubby。為了解決在分散式環境下,如何從多個 server 中選舉出 master server。那麼這多個 server 就需要涉及到一致性問題,這個一致性體現的是多個server 就 master 這個投票在分散式環境下達成一致性。簡單來說就是最終聽誰的。但是在網路環境中由於網路的不可靠性,會存在訊息丟失和或者被篡改等問題。所以如何在這樣一個環境中快速並且正確的在多個server 中對某一個數據達成一致性並且保證不論發生任何異常,都不會破壞整個系統一致性呢?所以在 Lamport 大神設計了一套 Paxos 的演算法,多個 server 基於這個演算法就可以達成一致。而 google chubby 就是基於 paxos 演算法的實現,用來實現分散式鎖服務。並且提供了 master 選舉的服務。 Paxos 在 Chubby 中的應用
很多朋友會有疑問,Chubby 和 paxos 演算法有什麼關係?Chubby 本來應該設計成一個包含 Paxos 演算法的協議庫,是的應用程式可以基於這個庫方便的使用 Paxos 演算法,但是它並沒有這麼做,而是把 Chubby 設計成了一個需要訪問中心化節點的分散式鎖服務。既然是一個服務,那麼它肯定需要是一個高可靠的服務。所以 Chubby 被構建為一個叢集,叢集中存在一箇中心節點(MASTER),採用 Paxos 協議,通過投票的方式來選舉一個獲得過半票數的伺服器作為 Master,在 chubby 叢集中,每個伺服器都會維護一份資料的副本,在實際的執行過程中, 只有 master 伺服器能執行事務操作,其他伺服器都是使用paxos協議從master節點同步最新的資料。而 zookeeper 是 chubby 的開源實現,所以實現原理和 chubby 基本是一致的。 Zookeeper 的一致性是什麼情況?
Zookeeper 的一致性,體現的是什麼一致呢? 根據前面講的 zab 協議的同步流程,在 zookeeper 叢集內部的資料副本同步,是基於過半提交的策略,意味著它是最終一致性。並不滿足強一致的要求。其實正確來說,zookeeper 是一個順序一致性模型。由於 zookeeper 設計出來是提供分散式鎖服務,那麼意味著它本身需要實現順序一致性( http://zookeeper.apache.org/doc/r3.5.5/zookeeperProgrammers.html#ch_zkGuarantees )順序一致性是在分散式環境中實現分散式鎖的基本要求,比如當一個多個程式來爭搶鎖,如果 clientA 獲得鎖以後,後續所有來爭搶鎖的程式看到的鎖的狀態都應該是被 clientA 鎖定了,而不是其他狀態。 什麼是順序一致性呢?
在講順序一致性之前,咱們思考一個問題,假如說 zookeeper 是一個最終一致性模型,那麼他會發生什麼情況ClientA/B/C 假設只序列執行, clientA 更新 zookeeper 上的一個值 x。ClientB 和 clientC 分別讀取叢集的不同副本,返回的 x 的值是不一樣的。clientC 的讀取操作是發生在 clientB 之後,但是卻讀到了過期的值。很明顯,這是一種弱一致模型。如果用它來實現鎖機制是有問題的。 順序一致性提供了更強的一致性保證,我們來觀察下面這個圖,從時間軸來看,B0 發生在 A0 之前,讀取的值是 0,B2 發生在 A0 之後,讀取到的x 的值為 1.而讀操作 B1/C0/C1 和寫操作 A0 在時間軸上有重疊,因此他們可能讀到舊的值為 0,也可能讀到新的值 1. 但是在強順序一致性模型中,如果 B1 得到的 x 的值為 1,那麼 C1 看到的值也一定是 1.

需要注意的是:由於網路的延遲以及系統本身執行請求的不確定性,會導致請求發起的早的客戶端不一定會在服務端執行得早。最終以服務端執行的結果為準。簡單來說:順序一致性是針對單個操作,單個數據物件。屬於 CAP 中 C這個範疇。一個數據被更新後,能夠立馬被後續的讀操作讀到。但是 zookeeper 的順序一致性實現是縮水版的,在下面這個網頁中,可以看到官網對於一致性這塊做了解釋http://zookeeper.apache.org/doc/r3.5.5/zookeeperProgrammers.html#ch_zkGuaranteeszookeeper 不保證在每個例項中,兩個不同的客戶端具有相同的zookeeper 資料檢視,由於網路延遲等因素,一個客戶端可能會在另外一個客戶端收到更改通知之前執行更新,考慮到 2 個客戶端 A 和 B 的場景,如果 A 把 znode /a 的值從 0 設定為1,然後告訴客戶端 B 讀取 /a, 則客戶端 B 可能會讀取到舊的值 0,具體取決於他連線到那個伺服器,如果客戶端 A 和 B 要讀取必須要讀取到相同的值,那麼 client B 在讀取操作之前執行 sync 方法。 除此之外,zookeeper 基於 zxid 以及阻塞佇列的方式來實現請求的順序一致性。如果一個 client 連線到一個最新的 follower 上,那麼它 read 讀取到了最新的資料,然後 client 由於網路原因重新連線到 zookeeper 節點,而這個時候連線到一個還沒有完成資料同步的 follower 節點,那麼這一次讀到的資料不就是舊的資料嗎?實際上 zookeeper 處理了這種情況,client 會記錄自己已經讀取到的最大的 zxid,如果 client 重連到 server 發現 client 的 zxid 比自己大。連線會失敗。 Single System Image 的理解 zookeeper 官網還說它保證了“Single System Image”,其解釋為“A clientwill see the same view of the service regardless of the server that itconnects to.”。實際上看來這個解釋還是有一點誤導性的。其實由上面 zxid的原理可以看出,它表達的意思是“client 只要連線過一次 zookeeper,就不會有歷史的倒退”。https://github.com/apache/zookeeper/pull/931 leader 選舉的原理 接下來再我們基於原始碼來分析 leader 選舉的整個實現過程。 leader 選舉存在與兩個階段中,一個是伺服器啟動時的 leader 選舉。 另一個是執行過程中 leader 節點宕機導致的 leader 選舉 ;在開始分析選舉的原理之前,先了解幾個重要的引數伺服器 ID(myid) 比如有三臺伺服器,編號分別是 1,2,3。 編號越大在選擇演算法中的權重越大。 zxid 事務 id值越大說明資料越新,在選舉演算法中的權重也越大。 邏輯時鐘(epoch – logicalclock)或者叫投票的次數,同一輪投票過程中的邏輯時鐘值是相同的。每投完一次票這個資料就會增加,然後與接收到的其它伺服器返回的投票資訊中的數值相比,根據不同的值做出不同的判斷。 選舉狀態
  • LOOKING,競選狀態。
  • FOLLOWING,隨從狀態,同步 leader 狀態,參與投票。
  • OBSERVING,觀察狀態,同步 leader 狀態,不參與投票。LEADING,領導者狀態。
  • LEADING,領導者狀態。
伺服器啟動時的 leader 選舉 每個節點啟動的時候狀態都是 LOOKING,處於觀望狀態,接下來就開始進行選主流程。 若進行 Leader 選舉,則至少需要兩臺機器,這裡選取 3 臺機器組成的伺服器叢集為例。在叢集初始化階段,當有一臺伺服器 Server1 啟動時,其單獨無法進行和完成 Leader 選舉,當第二臺伺服器 Server2 啟動時,此時兩臺機器可以相互通訊,每臺機器都試圖找到 Leader,於是進入 Leader選舉過程。選舉過程如下: (1) 每個 Server 發出一個投票。由於是初始情況,Server1 和 Server2 都會將自己作為 Leader 伺服器來進行投票,每次投票會包含所推舉的伺服器的 myid 和 ZXID、epoch,使用(myid, ZXID,epoch)來表示,此時 Server1 的投票為(1, 0),Server2 的投票為(2, 0),然後各自將這個投票發給叢集中其他機器。 (2) 接受來自各個伺服器的投票。叢集的每個伺服器收到投票後,首先判斷該投票的有效性,如檢查是否是本輪投票(epoch)、是否來自LOOKING 狀態的伺服器。 (3) 處理投票。針對每一個投票,伺服器都需要將別人的投票和自己的投票進行 PK,PK 規則如下
  1. 優先比較 epoch
  2. 其次檢查 ZXID。ZXID 比較大的伺服器優先作為 Leader
  3. 如果 ZXID 相同,那麼就比較 myid。myid 較大的伺服器作為Leader 伺服器。
對於 Server1 而言,它的投票是(1, 0),接收 Server2 的投票為(2, 0),首先會比較兩者的 ZXID,均為 0,再比較 myid,此時 Server2 的myid 最大,於是更新自己的投票為(2, 0),然後重新投票,對於 Server2 而言,其無須更新自己的投票,只是再次向叢集中所有機器發出上一次投票資訊即可。 (4) 統計投票。每次投票後,伺服器都會統計投票資訊,判斷是否已經有過半機器接受到相同的投票資訊,對於 Server1、Server2 而言,都統計出叢集中已經有兩臺機器接受了(2, 0)的投票資訊,此時便認為已經選出了 Leader。 (5) 改變伺服器狀態。一旦確定了 Leader,每個伺服器就會更新自己的狀態,如果是 Follower,那麼就變更為 FOLLOWING,如果是 Leader,就變更為 LEADING。 執行過程中的 leader 選舉 當叢集中的 leader 伺服器出現宕機或者不可用的情況時,那麼整個叢集將無法對外提供服務,而是進入新一輪的 Leader 選舉,伺服器執行期間的 Leader 選舉和啟動時期的 Leader 選舉基本過程是一致的。 (1) 變更狀態。Leader 掛後,餘下的非 Observer 伺服器都會將自己的伺服器狀態變更為 LOOKING,然後開始進入 Leader 選舉過程。 (2) 每個 Server 會發出一個投票。在執行期間,每個伺服器上的 ZXID 可能不同,此時假定 Server1 的 ZXID 為 123,Server3 的 ZXID 為 122;在第一輪投票中,Server1 和 Server3 都會投自己,產生投票(1, 123),(3, 122),然後各自將投票傳送給叢集中所有機器。接收來自各個伺服器的投票。與啟動時過程相同。 (3) 處理投票。與啟動時過程相同,此時,Server1 將會成為 Leader。 (4) 統計投票。與啟動時過程相同。 (5) 改變伺服器的狀態。與啟動時過程相同。 leader 選舉的原始碼分析 原始碼分析,最關鍵的是要找到一個入口,對於 zk 的 leader 選舉,並不是由客戶端來觸發,而是在啟動的時候會觸發一次選舉。因此我們可以直接 去看啟動指令碼 zkServer.sh 中的執行命令,ZOOMAIN 就是 QuorumPeerMain。那麼我們基於這個入口來看: QuorumPeerMain.main 方法 main 方法中,呼叫了 initializeAndRun 進行初始化並且執行
protected void initializeAndRun(String[] args) throws ConfigException, IOException{
	//這段程式碼比較簡單,設定配置引數,如果 args 不為空,可以基於外部的配置路徑來進行解析
	QuorumPeerConfig config = new QuorumPeerConfig();
	if (args.length == 1) {
	 config.parse(args[0]);
	}
	// 這裡啟動了一個執行緒,來定時對日誌進行清理,從命名來看也很容易理解
	DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), 
	config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
	//如果是叢集模式,會呼叫 runFromConfig.servers 實際就是我們在 zoo.cfg 裡面配置的叢集節點
	if (a config.servers.size() > 0) {
	  runFromConfig(config);
	} else {//否則直接執行單機模式
	  LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode");
	// there is only server in the quorum -- run as standalone
	   ZooKeeperServerMain.main(args);
     } 
}

runFromConfig 從名字可以看出來,是基於配置檔案來進行啟動。 所以整個方法都是對引數進行解析和設定 ,因為這些引數暫時還沒用到,所以沒必要去看。直接看核心的程式碼quorumPeer.start(), 啟動一個執行緒,那麼從這句程式碼可以看出來QuorumPeer 實際是繼承了執行緒。那麼它裡面一定有一個 run 方法:
public void runFromConfig(QuorumPeerConfig config) throws IOException {
			 try {
				 ManagedUtil.registerLog4jMBeans();
			 } catch (JMException e) {
				 LOG.warn("Unable to register log4j JMX control", e);
			 }
			 	LOG.info("Starting quorum peer");
			 try {
			 ServerCnxnFactory cnxnFactory =  ServerCnxnFactory.createFactory();
			 
			 cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
			 quorumPeer = getQuorumPeer();
			 
			 quorumPeer.setQuorumPeers(config.getServers());
			 quorumPeer.setTxnFactory(new FileTxnSnapLog( new File(config.getDataLogDir()), new File(config.getDataDir())));
			 
			 quorumPeer.setElectionType(config.getElectionAlg());
			 quorumPeer.setMyid(config.getServerId());
			 quorumPeer.setTickTime(config.getTickTime());
			 
			 quorumPeer.setInitLimit(config.getInitLimit());
			 
			 quorumPeer.setSyncLimit(config.getSyncLimit());
			 //投票決定方式,預設超過半數就通過 
			 
			 quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
			 quorumPeer.setCnxnFactory(cnxnFactory);
			 
			 quorumPeer.setQuorumVerifier(config.getQuorumVerifi er());
			 
			 quorumPeer.setClientPortAddress(config.getClientPor tAddress());
			 
			 quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
			 
			 quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
			 quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
			 
			 quorumPeer.setLearnerType(config.getPeerType());
			 
			 quorumPeer.setSyncEnabled(config.getSyncEnabled());
			 // sets quorum sasl authentication 
			 configurationsquorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
			 if(quorumPeer.isQuorumSaslAuthEnabled()){
			 
				 quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
			 
				 quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
			 
				 quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
			 
				 quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
			 
				 quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
			 }
			 
	    quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
	    quorumPeer.initialize();
		 //啟動主執行緒 
	    quorumPeer.start();
	    quorumPeer.join();
	 } catch (InterruptedException e) {
		 // warn, but generally this is ok
		 LOG.warn("Quorum Peer interrupted", e);
	 } 
}
QuorumPeer.start QuorumPeer.start 方法,重寫了 Thread 的 start。也就是線上程啟動之前,會做以下操作 1. 通過 loadDataBase 恢復快照資料 2. cnxnFactory.start() 啟動 zkServer,相當於使用者可以通過 2181 這個埠進行通訊了,這塊後續在講。我們還是以 leader 選舉為主線
@Override
public synchronized void start() {
   loadDataBase(); 
   cnxnFactory.start(); 
   startLeaderElection();
   super.start();
}
startLeaderElection 看到這個方法,有沒有兩眼放光的感覺?沒錯,前面鋪墊了這麼長,終於進入 leader 選舉的方法了
synchronized public void startLeaderElection() {
		try {
			// 構建一個票據,用於投票
			currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
		} catch (IOException e) {
			RuntimeException re = new RuntimeException(e.getMessage());
			re.setStackTrace(e.getStackTrace());
			throw re;
		}
		// 這個 getView 返回的就是在配置檔案中配置的server.myid=ip:port:port。view 在哪裡解析的呢?
		for (QuorumServer p : getView().values()) {
			if (p.id == myid) {// 獲得當前 zkserver myid 對應的 ip 地址
				myQuorumAddr = p.addr;
				break;
			}
		}
		if (myQuorumAddr == null) {
			throw new RuntimeException("My id " + myid + " not in the peer list");
		} // 根據 electionType 匹配對應的選舉演算法,electionType 預設值為 3.可以在配置檔案中動態配置
		if (electionType == 0) {
			try {
				udpSocket = new DatagramSocket(myQuorumAddr.getPort());
				responder = new ResponderThread();
				responder.start();
			} catch (SocketException e) {
				throw new RuntimeException(e);
			}
		}
		this.electionAlg = createElectionAlgorithm(electionType);
	}
quorumPeer. createElectionAlgorithm 根據對應的標識建立選舉演算法
protected Election createElectionAlgorithm(int electionAlgorithm) {
		Election le = null;
		// TODO: use a factory rather than a switch
		switch (electionAlgorithm) {
		case 0:
			le = new LeaderElection(this);
			break;
		case 1:
			le = new AuthFastLeaderElection(this);
			break;
		case 2:
			le = new AuthFastLeaderElection(this, true);
			break;
		case 3:
			qcm = createCnxnManager();
			QuorumCnxManager.Listener listener = qcm.listener;
			if (listener != null) {
				listener.start(); // 啟動監聽器,這個監聽具體做什麼的暫時不管,後面遇到需要了解的地方再回過頭來看
				le = new FastLeaderElection(this, qcm);// 初始化 FastLeaderElection
			} else {
				LOG.error("Null listener when initializing cnx manager");
			}
			break;
		default:
			assert false;
		}
		return le;
	}

FastLeaderElection 初始化FastLeaderElection,QuorumCnxManager 是一個很核心的物件,用來實現領導選舉中的網路連線管理功能,這個後面會用到
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
     this.stop = false;
     this.manager = manager;
     starter(self, manager);
}

FastLeaderElection. starter starter 方法裡面,設定了一些成員屬性,並且構建了兩個阻塞佇列,分別是 sendQueue 和 recvqueue。並且例項化了一個 Messager
private void starter(QuorumPeer self, QuorumCnxManager manager) {
		this.self = self;
		proposedLeader = -1;
		proposedZxid = -1;
		sendqueue = new LinkedBlockingQueue<ToSend>();
		recvqueue = new LinkedBlockingQueue<Notification>();
		this.messenger = new Messenger(manager);
	}

Messenger 在 Messenger 裡面構建了兩個執行緒,一個是 WorkerSender,一個是WorkerReceiver。 這兩個執行緒是分別用來發送和接收訊息的執行緒。具體做什麼,暫時先不分析。
Messenger(QuorumCnxManager manager) {
		 this.ws = new WorkerSender(manager);
		 Thread t = new Thread(this.ws,
		 "WorkerSender[myid=" + self.getId() + "]");
		 t.setDaemon(true);
		 t.start();
		 this.wr = new WorkerReceiver(manager);
		 t = new Thread(this.wr,
		 "WorkerReceiver[myid=" + self.getId() + "]");
		 t.setDaemon(true);
		 t.start();
	}

階段性總結 ok,分析到這裡,先做一個簡單的總結,通過一個流程圖把前面部分的功能串聯起來。

至此,先做如上總結~