摘要:在集群中發(fā)生選舉的場景有以下三種集群啟動時節(jié)點(diǎn)重啟時節(jié)點(diǎn)重啟時本文主要針對集群啟動時發(fā)生的選舉實現(xiàn)進(jìn)行分析。
在 zookeeper 集群中發(fā)生選舉的場景有以下三種:
集群啟動時
Leader 節(jié)點(diǎn)重啟時
Follower 節(jié)點(diǎn)重啟時
本文主要針對集群啟動時發(fā)生的選舉實現(xiàn)進(jìn)行分析。
ZK 集群中節(jié)點(diǎn)在啟動時會調(diào)用QuorumPeer.start方法
public synchronized void start() { /** * 加載數(shù)據(jù)文件,獲取 lastProcessedZxid, currentEpoch,acceptedEpoch */ loadDataBase(); /** * 啟動主線程 用于處理客戶端連接請求 */ cnxnFactory.start(); /** * 開始 leader 選舉; 會相繼創(chuàng)建選舉算法的實現(xiàn),創(chuàng)建當(dāng)前節(jié)點(diǎn)與集群中其他節(jié)點(diǎn)選舉通信的網(wǎng)絡(luò)IO,并啟動相應(yīng)工作線程 */ startLeaderElection(); /** * 啟動 QuorumPeer 線程,監(jiān)聽當(dāng)前節(jié)點(diǎn)服務(wù)狀態(tài) */ super.start(); }加載數(shù)據(jù)文件
在 loadDataBase 方法中,ZK 會通過加載數(shù)據(jù)文件獲取 lastProcessedZxid , 并通過讀取 currentEpoch , acceptedEpoch 文件來獲取相對應(yīng)的值;若上述兩文件不存在,則以 lastProcessedZxid 的高 32 位作為 currentEpoch , acceptedEpoch 值并寫入對應(yīng)文件中。
初始選舉環(huán)境synchronized public void startLeaderElection() { try { // 創(chuàng)建投票 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } catch(IOException e) { } // 從集群中節(jié)點(diǎn)列表,查找當(dāng)前節(jié)點(diǎn)與其他進(jìn)行信息同步的地址 for (QuorumServer p : getView().values()) { if (p.id == myid) { myQuorumAddr = p.addr; break; } } if (myQuorumAddr == null) { throw new RuntimeException("My id " + myid + " not in the peer list"); } // electionType == 3 this.electionAlg = createElectionAlgorithm(electionType); }
protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { // 忽略其他算法的實現(xiàn) case 3: /** * 創(chuàng)建 QuorumCnxManager 實例,并啟動 QuorumCnxManager.Listener 線程用于與集群中其他節(jié)點(diǎn)進(jìn)行選舉通信; */ qcm = createCnxnManager(); QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start(); /** * 創(chuàng)建選舉算法 FastLeaderElection 實例 */ le = new FastLeaderElection(this, qcm); } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le; }
初始節(jié)點(diǎn)的相關(guān)實例之后,執(zhí)行 super.start() 方法,因 QuorumPeer 類繼承 ZooKeeperThread 故會啟動 QuorumPeer 線程
public void run() { // 代碼省略 try { /* * Main loop */ while (running) { switch (getPeerState()) { case LOOKING: LOG.info("LOOKING"); if (Boolean.getBoolean("readonlymode.enabled")) { // 只讀模式下代碼省略 } else { try { setBCVote(null); setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } } break; // 忽略其他狀態(tài)下的處理邏輯 } } } finally { } }選舉
從上述代碼可以看出 QuorumPeer 線程在運(yùn)行過程中輪詢監(jiān)聽當(dāng)前節(jié)點(diǎn)的狀態(tài)并進(jìn)行相應(yīng)的邏輯處理,集群啟動時節(jié)點(diǎn)狀態(tài)為 LOOKING (也就是選舉 Leader 過程),此時會調(diào)用 FastLeaderElection.lookForLeader 方法 (也是投票選舉算法的核心)簡化后源碼如下:
public Vote lookForLeader() throws InterruptedException { // 忽略 try { HashMaprecvset = new HashMap (); HashMap outofelection = new HashMap (); int notTimeout = finalizeWait; synchronized(this){ // logicalclock 邏輯時鐘加一 logicalclock.incrementAndGet(); /** * 更新提案信息,用于后續(xù)投票;集群啟動節(jié)點(diǎn)默認(rèn)選舉自身為 Leader */ updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } /** * 發(fā)送選舉投票提案 */ sendNotifications(); /* * Loop in which we exchange notifications until we find a leader */ while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ /* * Remove next notification from queue, times out after 2 times * the termination time */ /** * 從 recvqueue 隊列中獲取外部節(jié)點(diǎn)的選舉投票信息 */ Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven"t received enough. * Otherwise processes new notification. */ if(n == null){ /** * 檢查上一次發(fā)送的選舉投票信息是否全部發(fā)送; * 若已發(fā)送則重新在發(fā)送一遍,反之說明當(dāng)前節(jié)點(diǎn)與集群中其他節(jié)點(diǎn)未連接,則執(zhí)行 connectAll() 建立連接 */ if(manager.haveDelivered()){ sendNotifications(); } else { manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if(self.getVotingView().containsKey(n.sid)) { /** * 只處理同一集群中節(jié)點(diǎn)的投票請求 */ switch (n.state) { case LOOKING: // If notification > current, replace and send messages out if (n.electionEpoch > logicalclock.get()) { /** * 外部投票選舉周期大于當(dāng)前節(jié)點(diǎn)選舉周期 * * step1 : 更新選舉周期值 * step2 : 清空已收到的選舉投票數(shù)據(jù) * step3 : 選舉投票 PK,選舉規(guī)則參見 totalOrderPredicate 方法 * step4 : 變更選舉投票并發(fā)送 */ logicalclock.set(n.electionEpoch); recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { // 丟棄小于當(dāng)前選舉周期的投票 break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { /** * 同一選舉周期 * * step1 : 選舉投票 PK,選舉規(guī)則參見 totalOrderPredicate 方法 * step2 : 變更選舉投票并發(fā)送 */ updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } /** * 記錄外部選舉投票信息 */ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); /** * 統(tǒng)計選舉投票結(jié)果,判斷是否可以結(jié)束此輪選舉 */ if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // ...... if (n == null) { /** * 選舉結(jié)束判斷當(dāng)前節(jié)點(diǎn)狀態(tài); 若提案的 leader == myid 則 state = LEADING, 反之為 FOLLOWING */ self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); // 變更當(dāng)前投票信息 Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); leaveInstance(endVote); return endVote; } } break; case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break; case FOLLOWING: case LEADING: // ...... break; default: LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid); break; } } else { LOG.warn("Ignoring notification from non-cluster member " + n.sid); } } return null; } finally { // ...... } }
從 lookForLeader 方法的實現(xiàn)可以看出,選舉流程如下:
發(fā)送內(nèi)部投票
內(nèi)部投票發(fā)送邏輯參考后續(xù)小節(jié)
接收外部投票
接收外部投票邏輯參考后續(xù)小節(jié)
選舉投票 PK
當(dāng)接收到外部節(jié)點(diǎn)投票信息后會與內(nèi)部投票信息進(jìn)行 PK 已確定投票優(yōu)先權(quán);PK 規(guī)則參見 totalOrderPredicate 方法如下
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; } /* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
從其實現(xiàn)可以看出選舉投票 PK 規(guī)則如下:
* 比較外部投票與內(nèi)部投票的選舉周期值,選舉周期大的值優(yōu)先 * 若選舉周期值一致,則比較事務(wù) ID; 事務(wù) ID 最新的優(yōu)先 * 若選舉周期值一致且事務(wù) ID 值相同,則比較投票節(jié)點(diǎn)的 server id; server id 最大的優(yōu)先
統(tǒng)計選舉投票
當(dāng)接收到外部投票之后,都會統(tǒng)計下此輪選舉的投票情況并判斷是否可結(jié)束選舉; 參考 termPredicate 方法
protected boolean termPredicate( HashMapvotes, Vote vote) { HashSet set = new HashSet (); /** * 統(tǒng)計接收的投票中與當(dāng)前節(jié)點(diǎn)所推舉 leader 投票一致的個數(shù) */ for (Map.Entry entry : votes.entrySet()) { if (vote.equals(entry.getValue())){ set.add(entry.getKey()); } } /** * 如果超過一半的投票一致 則說明可以終止本次選舉 */ return self.getQuorumVerifier().containsQuorum(set); }
確認(rèn)節(jié)點(diǎn)角色
當(dāng)此輪選舉結(jié)束之后,通過判斷所推舉的 leader server id 是否與當(dāng)前節(jié)點(diǎn) server id 相等; 若相等則說明當(dāng)前節(jié)點(diǎn)為 leader, 反之為 follower。發(fā)送接收投票
上文中主要聊了下 ZK 選舉算法的核心部分,下面接著看下集群節(jié)點(diǎn)在選舉過程中是如何發(fā)送自己的投票和接收外部的投票及相關(guān)處理邏輯。
首先通過 FastLeaderElection.sendNotifications 方法看下發(fā)送投票邏輯:
private void sendNotifications() { for (QuorumServer server : self.getVotingView().values()) { long sid = server.id; /** * 發(fā)送投票通知信息 * * leader : 被推舉的服務(wù)器 myid * zxid : 被推舉的服務(wù)器 zxid * electionEpoch : 當(dāng)前節(jié)點(diǎn)選舉周期 * ServerState state : 當(dāng)前節(jié)點(diǎn)狀態(tài) * sid : 消息接收方 myid * peerEpoch : 被推舉的服務(wù)器 epoch */ ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch); /** * 將消息添加到隊列 sendqueue 中; * * @see Messenger.WorkerSender sendqueue 隊列會被 WorkerSender 消費(fèi) */ sendqueue.offer(notmsg); } }
從實現(xiàn)可以看出節(jié)點(diǎn)在啟動階段會將自身信息封裝為 ToSend 實例(也就是選舉自身為 leader)并添加到隊列 FastLeaderElection.sendqueue 中;那么此時我們會問到 FastLeaderElection.sendqueue 隊列中的消息被誰消費(fèi)處理呢 ? 讓我們回過頭看下節(jié)點(diǎn)在啟動初始化選舉環(huán)境時創(chuàng)建 QuorumCnxManager, FastLeaderElection 實例的過程。
PS : FastLeaderElection.sendqueue 隊列中消息被誰消費(fèi) ?QuorumCnxManager
public QuorumCnxManager(final long mySid, Mapview, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled, ConcurrentHashMap senderWorkerMap) { this.senderWorkerMap = senderWorkerMap; this.recvQueue = new ArrayBlockingQueue (RECV_CAPACITY); this.queueSendMap = new ConcurrentHashMap >(); this.lastMessageSent = new ConcurrentHashMap (); String cnxToValue = System.getProperty("zookeeper.cnxTimeout"); if(cnxToValue != null){ this.cnxTO = Integer.parseInt(cnxToValue); } this.mySid = mySid; this.socketTimeout = socketTimeout; this.view = view; this.listenOnAllIPs = listenOnAllIPs; initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize, quorumSaslAuthEnabled); listener = new Listener(); }
在 QuorumCnxManager 實例化后,會啟動一個 QuorumCnxManager.Listener 線程;同時在 QuorumCnxManager 實例中存在三個重要的集合容器變量:
senderWorkerMap : 發(fā)送器集合,Map 類型按 server id 分組;為集群中的每個節(jié)點(diǎn)分配一個 SendWorker 負(fù)責(zé)消息的發(fā)送
recvQueue : 消息接收隊列,用于存放從外部節(jié)點(diǎn)接收到的投票消息
queueSendMap : 消息發(fā)送隊列,Map 類型按 server id 分組;為集群中的每個節(jié)點(diǎn)分配一個阻塞隊列存放待發(fā)送的消息,從而保證各個節(jié)點(diǎn)之間的消息發(fā)送互不影響
下面我們再看下 QuorumCnxManager.Listener 線程啟動后,主要做了什么:
public void run() { int numRetries = 0; InetSocketAddress addr; while((!shutdown) && (numRetries < 3)){ try { ss = new ServerSocket(); ss.setReuseAddress(true); /** * 獲取當(dāng)前節(jié)點(diǎn)的選舉地址并 bind 監(jiān)聽等待外部節(jié)點(diǎn)連接 */ addr = view.get(QuorumCnxManager.this.mySid).electionAddr; ss.bind(addr); while (!shutdown) { /** * 接收外部節(jié)點(diǎn)連接并處理 */ Socket client = ss.accept(); setSockOpts(client); receiveConnection(client); numRetries = 0; } } catch (IOException e) { LOG.error("Exception while listening", e); numRetries++; ss.close(); Thread.sleep(1000); } } }
跟蹤代碼發(fā)現(xiàn) receiveConnection 方法最終會調(diào)用方法 handleConnection 如下
private void handleConnection(Socket sock, DataInputStream din) throws IOException { /** * 讀取外部節(jié)點(diǎn)的 server id * ps : 此時的 server id 是什么時候發(fā)送的呢 ? */ Long sid = din.readLong(); if (sid < this.mySid) { /** * 若外部節(jié)點(diǎn)的 server id 小于當(dāng)前節(jié)點(diǎn)的 server id,則關(guān)閉此連接,改為由當(dāng)前節(jié)點(diǎn)發(fā)起連接 * ps : 該限制說明選舉過程中,zk 只允許 server id 較大的一方去主動發(fā)起連接避免重復(fù)連接 */ SendWorker sw = senderWorkerMap.get(sid); if (sw != null) { sw.finish(); } closeSocket(sock); connectOne(sid); } else { SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); if(vsw != null) vsw.finish(); /** * 按 server id 分組,為外部節(jié)點(diǎn)分配 SendWorker, RecvWorker 和一個消息發(fā)送隊列 */ senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(SEND_CAPACITY)); /** * 啟動外部節(jié)點(diǎn)對應(yīng)的 SendWorker, RecvWorker 線程 */ sw.start(); rw.start(); return; } }
至此會發(fā)現(xiàn) QuorumCnxManager.Listener 線程處理邏輯如下:
監(jiān)聽當(dāng)前節(jié)點(diǎn)的 election address 等待接收外部節(jié)點(diǎn)連接
讀取外部節(jié)點(diǎn)的 server id 并與當(dāng)前節(jié)點(diǎn)的 server id 比較;若前者小則關(guān)閉連接,改由當(dāng)前節(jié)點(diǎn)發(fā)起連接
反之為外部節(jié)點(diǎn)分配 SendWorker,RecvWorker 線程及消息發(fā)送隊列
PS : 此處我們會有個疑問外部節(jié)點(diǎn)的 server id 是什么時候發(fā)送過來的呢 ?
下面我們在看下為每個外部節(jié)點(diǎn)開啟了 SendWorker, RecvWorker 線程后做了什么:
SendWorker
public void run() { // 省略 try { while (running && !shutdown && sock != null) { ByteBuffer b = null; try { /** * 通過 server id 獲取待發(fā)送給集群中節(jié)點(diǎn)的消息隊列 */ ArrayBlockingQueuebq = queueSendMap .get(sid); if (bq != null) { /** * 從隊列中獲取待發(fā)送的消息 */ b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS); } else { LOG.error("No queue of incoming messages for " + "server " + sid); break; } if(b != null){ lastMessageSent.put(sid, b); /** * 寫入 socket 的輸出流完成消息的發(fā)送 */ send(b); } } catch (InterruptedException e) { } } } catch (Exception e) { } } synchronized void send(ByteBuffer b) throws IOException { byte[] msgBytes = new byte[b.capacity()]; try { b.position(0); b.get(msgBytes); } catch (BufferUnderflowException be) { LOG.error("BufferUnderflowException ", be); return; } /** * 發(fā)送的報文包括:消息體正文長度和消息體正文 */ dout.writeInt(b.capacity()); dout.write(b.array()); dout.flush(); }
通過代碼實現(xiàn)我們知道 SendWorker 的職責(zé)就是從 queueSendMap 隊列中獲取待發(fā)送給遠(yuǎn)程節(jié)點(diǎn)的消息并執(zhí)行發(fā)送。
PS : 此處我們會有個疑問 QuorumCnxManager.queueSendMap 中節(jié)點(diǎn)對應(yīng)隊列中待發(fā)送的消息是誰生產(chǎn)的呢 ?
RecvWorker
public void run() { threadCnt.incrementAndGet(); try { while (running && !shutdown && sock != null) { /** * 讀取外部節(jié)點(diǎn)發(fā)送的消息 * 由 SendWorker 可知前 4 字節(jié)為消息載體有效長度 */ int length = din.readInt(); if (length <= 0 || length > PACKETMAXSIZE) { throw new IOException( "Received packet with invalid packet: " + length); } /** * 讀取消息體正文 */ byte[] msgArray = new byte[length]; din.readFully(msgArray, 0, length); ByteBuffer message = ByteBuffer.wrap(msgArray); /** * 將讀取的消息包裝為 Message 對象添加到隊列 recvQueue 中 */ addToRecvQueue(new Message(message.duplicate(), sid)); } } catch (Exception e) { LOG.warn("Connection broken for id " + sid + ", my id = " + QuorumCnxManager.this.mySid + ", error = " , e); } finally { LOG.warn("Interrupting SendWorker"); sw.finish(); if (sock != null) { closeSocket(sock); } } } public void addToRecvQueue(Message msg) { synchronized(recvQLock) { // 省略 try { recvQueue.add(msg); } catch (IllegalStateException ie) { // This should never happen LOG.error("Unable to insert element in the recvQueue " + ie); } } }
從上面可以看出 RecvWorker 線程在運(yùn)行期間會接收 server id 對應(yīng)的外部節(jié)點(diǎn)發(fā)送的消息,并將其放入 QuorumCnxManager.recvQueue 隊列中。
到目前為止我們基本完成對 QuorumCnxManager 核心功能的分析,發(fā)現(xiàn)其功能主要是負(fù)責(zé)集群中當(dāng)前節(jié)點(diǎn)與外部節(jié)點(diǎn)進(jìn)行選舉通訊的網(wǎng)絡(luò) IO 操作,譬如接收外部節(jié)點(diǎn)選舉投票和向外部節(jié)點(diǎn)發(fā)送內(nèi)部投票。
下面我們在接著回頭看下 FastLeaderElection 類實例的過程:
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){ this.stop = false; this.manager = manager; starter(self, manager); } private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; sendqueue = new LinkedBlockingQueue(); recvqueue = new LinkedBlockingQueue (); this.messenger = new Messenger(manager); }
Messenger(QuorumCnxManager manager) { /** * 啟動 WorkerSender 線程用于發(fā)送消息 */ this.ws = new WorkerSender(manager); Thread t = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); t.setDaemon(true); t.start(); /** * 啟動 WorkerReceiver 線程用于接收消息 */ this.wr = new WorkerReceiver(manager); t = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); t.setDaemon(true); t.start(); }
從 FastLeaderElection 實例化過程我們知道,其內(nèi)部分別啟動了線程 WorkerSender,WorkerReceiver ;那么接下來看下這兩個線程具體做什么吧。
public void run() { while (!stop) { try { /** * 從 sendqueue 隊列中獲取 ToSend 待發(fā)送的消息 */ ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } void process(ToSend m) { // 將 ToSend 轉(zhuǎn)換為 40字節(jié) ByteBuffer ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch); // 交由 QuorumCnxManager 執(zhí)行發(fā)送 manager.toSend(m.sid, requestBuffer); }
看了 WorkerSender 的實現(xiàn)是不是明白了什么? 還記得上文中 FastLeaderElection.sendNotifications 方法執(zhí)行發(fā)送通知的時候的疑惑嗎 ? FastLeaderElection.sendqueue 隊列產(chǎn)生的消息就是被 WorkerSender 線程所消費(fèi)處理, WorkerSender 會將消息轉(zhuǎn)發(fā)至 QuorumCnxManager 處理
public void toSend(Long sid, ByteBuffer b) { /* * If sending message to myself, then simply enqueue it (loopback). * 如果是發(fā)給自己的投票,則將其添加到接收隊列中等待處理 */ if (this.mySid == sid) { b.position(0); addToRecvQueue(new Message(b.duplicate(), sid)); /* * Otherwise send to the corresponding thread to send. */ } else { /* * Start a new connection if doesn"t have one already. */ ArrayBlockingQueuebq = new ArrayBlockingQueue (SEND_CAPACITY); ArrayBlockingQueue bqExisting = queueSendMap.putIfAbsent(sid, bq); // 將發(fā)送的消息放入對應(yīng)的隊列中,若隊列滿了則將隊列頭部元素移除 if (bqExisting != null) { addToSendQueue(bqExisting, b); } else { addToSendQueue(bq, b); } connectOne(sid); } } private void addToSendQueue(ArrayBlockingQueue queue, ByteBuffer buffer) { // 省略 try { // 將消息插入節(jié)點(diǎn)對應(yīng)的隊列中 queue.add(buffer); } catch (IllegalStateException ie) { } }
QuorumCnxManager 在收到 FastLeaderElection.WorkerSender 轉(zhuǎn)發(fā)的消息時,會判斷當(dāng)前消息是否發(fā)給自己的投票,若是則將消息添加到接收隊列中,反之會將消息添加到 queueSendMap 對應(yīng) server id 的隊列中;看到這里的時候是不是就明白了在 QuorumCnxManager.SendWorker 分析時候的疑惑呢 。 這個時候投票消息未必能夠發(fā)送出去,因為當(dāng)前節(jié)點(diǎn)與外部節(jié)點(diǎn)的通道是否已建立還未知,所以繼續(xù)執(zhí)行 connectOne
synchronized public void connectOne(long sid){ /** * 判斷當(dāng)前服務(wù)節(jié)點(diǎn)是否與 sid 外部服務(wù)節(jié)點(diǎn)建立連接;有可能對方先發(fā)起連接 * 若已連接則等待后續(xù)處理,反之發(fā)起連接 */ if (!connectedToPeer(sid)){ InetSocketAddress electionAddr; if (view.containsKey(sid)) { electionAddr = view.get(sid).electionAddr; } else { LOG.warn("Invalid server id: " + sid); return; } try { LOG.debug("Opening channel to server " + sid); Socket sock = new Socket(); setSockOpts(sock); sock.connect(view.get(sid).electionAddr, cnxTO); LOG.debug("Connected to server " + sid); initiateConnection(sock, sid); } catch (UnresolvedAddressException e) { } catch (IOException e) { } } else { LOG.debug("There is a connection already for server " + sid); } } public boolean connectedToPeer(long peerSid) { return senderWorkerMap.get(peerSid) != null; }
private boolean startConnection(Socket sock, Long sid) throws IOException { DataOutputStream dout = null; DataInputStream din = null; try { /** * 發(fā)送當(dāng)前節(jié)點(diǎn)的 server id,需告知對方我是哪臺節(jié)點(diǎn) */ dout = new DataOutputStream(sock.getOutputStream()); dout.writeLong(this.mySid); dout.flush(); din = new DataInputStream( new BufferedInputStream(sock.getInputStream())); } catch (IOException e) { LOG.warn("Ignoring exception reading or writing challenge: ", e); closeSocket(sock); return false; } // 只允許 sid 值大的服務(wù)器去主動和其他服務(wù)器連接,否則斷開連接 if (sid > this.mySid) { LOG.info("Have smaller server identifier, so dropping the " + "connection: (" + sid + ", " + this.mySid + ")"); closeSocket(sock); // Otherwise proceed with the connection } else { SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); if(vsw != null) vsw.finish(); senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(SEND_CAPACITY)); sw.start(); rw.start(); return true; } return false; }
從上述代碼可以看出節(jié)點(diǎn)在與外部節(jié)點(diǎn)連接后會先發(fā)送 myid 報文告知對方我是哪個節(jié)點(diǎn)(這也是為什么 QuorumCnxManager.Listener 線程在接收到一個連接請求時會先執(zhí)行 getLong 獲取 server id 了);同樣在連接建立的時候也遵循一個原則(只允許 server id 較大的一方發(fā)起連接)。
public void run() { Message response; while (!stop) { // Sleeps on receive try{ /** * 從 QuorumCnxManager.recvQueue 隊列中獲取接收的外部投票 */ response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); if(response == null) continue; if(!self.getVotingView().containsKey(response.sid)){ // 忽略對方是觀察者的處理 } else { // Instantiate Notification and set its attributes Notification n = new Notification(); // 將 message 轉(zhuǎn)成 notification 對象 if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ // 當(dāng)前節(jié)點(diǎn)狀態(tài)為 looking,則將外部節(jié)點(diǎn)投票添加到 recvqueue 隊列中 recvqueue.offer(n); if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get())){ // 若外部節(jié)點(diǎn)選舉周期小于當(dāng)前節(jié)點(diǎn)選舉周期則發(fā)送內(nèi)部投票 Vote v = getVote(); ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock.get(), self.getPeerState(), response.sid, v.getPeerEpoch()); sendqueue.offer(notmsg); } } else { // 忽略其他狀態(tài)時的處理 } } } catch (InterruptedException e) { } } LOG.info("WorkerReceiver is down"); }
此時我們明白 WorkerReceiver 線程在運(yùn)行期間會一直從 QuorumCnxManager.recvQueue 的隊列中拉取接收到的外部投票信息,若當(dāng)前節(jié)點(diǎn)為 LOOKING 狀態(tài),則將外部投票信息添加到 FastLeaderElection.recvqueue 隊列中,等待 FastLeaderElection.lookForLeader 選舉算法處理投票信息。
到此我們基本明白了 ZK 集群節(jié)點(diǎn)發(fā)送和接收投票的處理流程,但是這個時候您是不是又有一種懵的狀態(tài)呢 笑哭,我們會發(fā)現(xiàn)選舉過程中依賴了多個線程 WorkerSender, SendWorker, WorkerReceiver, RecvWorker ,多個阻塞隊列 sendqueue, recvqueue,queueSendMap,recvQueue 而且名字起的很類似,更讓人懵 ; 不過莫慌,我們來通過下面的圖來縷下思路小結(jié)
看了這么長時間的代碼,也夠累的;最后我們就來個小結(jié)吧 :
QuorumCnxManager 類主要職能是負(fù)責(zé)集群中節(jié)點(diǎn)與外部節(jié)點(diǎn)進(jìn)行通信及投票信息的中轉(zhuǎn)
FastLeaderElection 類是選舉投票的核心實現(xiàn)
選舉投票規(guī)則
比較外部投票與內(nèi)部投票的選舉周期值,選舉周期大的值優(yōu)先
若選舉周期值一致,則比較事務(wù) ID; 事務(wù) ID 最新的優(yōu)先
若選舉周期值一致且事務(wù) ID 值相同,則比較投票節(jié)點(diǎn)的 server id; server id 最大的優(yōu)先
集群中節(jié)點(diǎn)通信時為了避免重復(fù)建立連接,遵守一個原則:連接總是由 server id 較大的一方發(fā)起
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77695.html
摘要:只有數(shù)據(jù)同步完成之后集群才具備對外提供服務(wù)的能力。當(dāng)節(jié)點(diǎn)在選舉后角色確認(rèn)為后將會進(jìn)入狀態(tài),源碼如下在節(jié)點(diǎn)狀態(tài)變更為之后會創(chuàng)建實例,并觸發(fā)過程。 在上一篇對 zookeeper 選舉實現(xiàn)分析之后,我們知道 zookeeper 集群在選舉結(jié)束之后,leader 節(jié)點(diǎn)將進(jìn)入 LEADING 狀態(tài),follower 節(jié)點(diǎn)將進(jìn)入 FOLLOWING 狀態(tài);此時集群中節(jié)點(diǎn)將進(jìn)行數(shù)據(jù)同步操作,以保證...
摘要:摘要目前是最流行的開源分布式搜索引擎系統(tǒng),其使用作為單機(jī)存儲引擎并提供強(qiáng)大的搜索查詢能力。前言分布式一致性原理剖析系列將會對的分布式一致性原理進(jìn)行詳細(xì)的剖析,介紹其實現(xiàn)方式原理以及其存在的問題等基于版本。相當(dāng)于一次正常情況的新節(jié)點(diǎn)加入。 摘要: ES目前是最流行的開源分布式搜索引擎系統(tǒng),其使用Lucene作為單機(jī)存儲引擎并提供強(qiáng)大的搜索查詢能力。學(xué)習(xí)其搜索原理,則必須了解Lucene,...
摘要:摘要目前是最流行的開源分布式搜索引擎系統(tǒng),其使用作為單機(jī)存儲引擎并提供強(qiáng)大的搜索查詢能力。前言分布式一致性原理剖析系列將會對的分布式一致性原理進(jìn)行詳細(xì)的剖析,介紹其實現(xiàn)方式原理以及其存在的問題等基于版本。相當(dāng)于一次正常情況的新節(jié)點(diǎn)加入。 摘要: ES目前是最流行的開源分布式搜索引擎系統(tǒng),其使用Lucene作為單機(jī)存儲引擎并提供強(qiáng)大的搜索查詢能力。學(xué)習(xí)其搜索原理,則必須了解Lucene,...
閱讀 3178·2021-11-22 15:25
閱讀 3855·2021-11-17 09:33
閱讀 3370·2021-11-08 13:15
閱讀 3052·2021-09-22 10:56
閱讀 542·2021-08-31 09:45
閱讀 2755·2019-08-30 13:49
閱讀 3082·2019-08-30 12:52
閱讀 1146·2019-08-29 17:05