摘要:協議是什么簡單來說就是一種去中心化點對點的數據廣播協議你可以把它理解為病毒的傳播。傳染給,繼續(xù)傳染給如此下去。比如說服務發(fā)現框架就用了協議來做管理主機的關系以及集群之間的消息廣播,也用到了這個協議,用來實現一些節(jié)點發(fā)現健康檢查等。
Gossip協議是什么?
? 簡單來說就是一種去中心化、點對點的數據廣播協議,你可以把它理解為病毒的傳播。A傳染給B,B繼續(xù)傳染給C,如此下去。
? 協議本身只有一些簡單的限制,狀態(tài)更新的時間隨著參與主機數的增長以對數的速率增長,即使是一些節(jié)點掛掉或者消息丟失也沒關系。很多的分布式系統都用gossip 協議來解決自己遇到的一些難題。比如說服務發(fā)現框架consul就用了gossip協議( Serf)來做管理主機的關系以及集群之間的消息廣播,Cassandra也用到了這個協議,用來實現一些節(jié)點發(fā)現、健康檢查等。
通信流程 概述首先系統需要配置幾個種子節(jié)點,比如說A、B, 每個參與的節(jié)點都會維護所有節(jié)點的狀態(tài),node->(Key,Value,Version),版本號較大的說明其數據較新,節(jié)點P只能直接更新它自己的狀態(tài),節(jié)點P只能間接的通過gossip協議來更新本機維護的其他節(jié)點的數據。
大致的過程如下,
? ① SYN:節(jié)點A向隨機選擇一些節(jié)點,這里可以只選擇發(fā)送摘要,即不發(fā)送valus,避免消息過大
? ② ACK:節(jié)點B接收到消息后,會將其與本地的合并,這里合并采用的是對比版本,版本較大的說明數據較新. 比如節(jié)點A向節(jié)點B發(fā)送數據C(key,value,2),而節(jié)點B本機存儲的是C(key,value1,3),那么因為B的版本比較新,合并之后的數據就是B本機存儲的數據,然后會發(fā)回A節(jié)點。
? ③ ACK2:節(jié)點A接收到ACK消息,將其應用到本機的數據中
A發(fā)GossipDigestSyn => B執(zhí)行GossipDigestSynVerbHandler B發(fā)GossipDigestAck => A執(zhí)行GossipDigestAckVerbHandler A發(fā)GossipDigestAck2 => B執(zhí)行GossipDigestAck2VerbHandler
這三個類都實現了IVerbHandler接口,注冊到MessagingService的處理器中:
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());
這樣當消息模塊接收到消息后就會調用對應的Handler處理,如下面的代碼所示:
IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb); if (verbHandler == null) { //未知的消息不處理 logger.trace("Unknown verb {}", verb); return; } try { verbHandler.doVerb(message, id); } catch (IOException ioe) { handleFailure(ioe); throw new RuntimeException(ioe); } catch (TombstoneOverwhelmingException | IndexNotAvailableException e) { handleFailure(e); logger.error(e.getMessage()); } catch (Throwable t) { handleFailure(t); throw t; }源碼解析 初始化
具體的初始化都是在org.apache.cassandra.service.StorageService#public synchronized void initServer() throws ConfigurationException()去做的,里面會調用prepareToJoin() 嘗試加入gossip集群。
private void prepareToJoin() throws ConfigurationException { //volatile修飾保證可見性,已經加入了集群就直接跳過 if (!joined) { /*....省略...*/ if (!MessagingService.instance().isListening()) //開始監(jiān)聽消息 MessagingService.instance().listen(); //給本節(jié)點起個名字 UUID localHostId = SystemKeyspace.getLocalHostId(); /* * 一次shadow round會獲取所有到與之通訊節(jié)點擁有的所有節(jié)點的信息 */ if (replacing) { localHostId = prepareForReplacement(); appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens)); if (!DatabaseDescriptor.isAutoBootstrap()) { // Will not do replace procedure, persist the tokens we"re taking over locally // so that they don"t get clobbered with auto generated ones in joinTokenRing SystemKeyspace.updateTokens(bootstrapTokens); } else if (isReplacingSameAddress()) { //only go into hibernate state if replacing the same address (CASSANDRA-8523) logger.warn("Writes will not be forwarded to this node during replacement because it has the same address as " + "the node to be replaced ({}). If the previous node has been down for longer than max_hint_window_in_ms, " + "repair must be run after the replacement process in order to make this node consistent.", DatabaseDescriptor.getReplaceAddress()); appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true)); } } else { checkForEndpointCollision(localHostId); } // have to start the gossip service before we can see any info on other nodes. this is necessary // for bootstrap to get the load info it needs. // (we won"t be part of the storage ring though until we add a counterId to our state, below.) // Seed the host ID-to-endpoint map with our own ID. getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddress()); appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion()); appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(localHostId)); appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getBroadcastRpcAddress())); appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion()); // load the persisted ring state. This used to be done earlier in the init process, // but now we always perform a shadow round when preparing to join and we have to // clear endpoint states after doing that. loadRingState(); logger.info("Starting up server gossip"); //啟動gossip,比如定時任務等 Gossiper.instance.register(this); Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering. gossipActive = true; // gossip snitch infos (local DC and rack) gossipSnitchInfo(); // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull) Schema.instance.updateVersionAndAnnounce(); // Ensure we know our own actual Schema UUID in preparation for updates LoadBroadcaster.instance.startBroadcasting(); HintsService.instance.startDispatch(); BatchlogManager.instance.start(); } } public synchronized MapdoShadowRound() { buildSeedsList(); // it may be that the local address is the only entry in the seed // list in which case, attempting a shadow round is pointless if (seeds.isEmpty()) return endpointShadowStateMap; seedsInShadowRound.clear(); endpointShadowStateMap.clear(); // 構造一個空的Syn消息,表明這是一次shadow round List gDigests = new ArrayList (); GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(), DatabaseDescriptor.getPartitionerName(), gDigests); MessageOut message = new MessageOut (MessagingService.Verb.GOSSIP_DIGEST_SYN, digestSynMessage, GossipDigestSyn.serializer); inShadowRound = true; int slept = 0; try { while (true) { /* * 第一次以及后面每五秒都會嘗試向所有的種子節(jié)點發(fā)送一次shdow round syn消息,嘗試 * 獲取所有的節(jié)點的信息。如果達到了最大的延遲(默認為30S)或者已經達到了目的就會退出 */ if (slept % 5000 == 0) { logger.trace("Sending shadow round GOSSIP DIGEST SYN to seeds {}", seeds); for (InetAddress seed : seeds) MessagingService.instance().sendOneWay(message, seed); } Thread.sleep(1000); if (!inShadowRound) break; slept += 1000; if (slept > StorageService.RING_DELAY) { // if we don"t consider ourself to be a seed, fail out if (!DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())) throw new RuntimeException("Unable to gossip with any seeds"); logger.warn("Unable to gossip with any seeds but continuing since node is in its own seed list"); inShadowRound = false; break; } } } catch (InterruptedException wtf) { throw new RuntimeException(wtf); } return ImmutableMap.copyOf(endpointShadowStateMap); }
Gossiper#start()中啟動一個定時任務GossipTask,默認為每秒一次,發(fā)送SYN消息:
/* * 線程池最好都指定名字,這樣方便查問題,另外最好指定好隊列大小,最好不要用Executors中 * 默認的無界隊列,關閉的時候注意處理好中斷,很多人都是catch Exception后打個異常就算了, * 這樣不是很好的處理方式,我個人通常是當catch到InterruptedException后,根據業(yè)務場景決定是否* * 需要通過interrupt方法重置中斷位,當處理完這輪任務之后,決定是否退出 */ private static final DebuggableScheduledThreadPoolExecutor executor = new DebuggableScheduledThreadPoolExecutor("GossipTasks"); public void start(int generationNbr, MappreloadLocalStates) { buildSeedsList(); /* initialize the heartbeat state for this localEndpoint */ maybeInitializeLocalState(generationNbr); EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress()); localState.addApplicationStates(preloadLocalStates); //notify snitches that Gossiper is about to start DatabaseDescriptor.getEndpointSnitch().gossiperStarting(); if (logger.isTraceEnabled()) logger.trace("gossip started with generation {}", localState.getHeartBeatState().getGeneration()); scheduledGossipTask = executor.scheduleWithFixedDelay(new GossipTask(), Gossiper.intervalInMillis, Gossiper.intervalInMillis, TimeUnit.MILLISECONDS); }
那么GossipTask內部的實現是怎樣的呢?
private class GossipTask implements Runnable { public void run() { try { //等待MessagingService開始監(jiān)聽 MessagingService.instance().waitUntilListening(); //加鎖 taskLock.lock(); //更新心跳計數器,這個是用來做失敗檢測的,這里會有個定時任務輪詢這個Map,檢測最近一次的 //心跳時間,如果距離當前時間差距不合理,那么我們就可以認為這個節(jié)點掛掉了,可以放到另外 //隊列中,隨后隔一段時間再去看看是否恢復。 endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat(); if (logger.isTraceEnabled()) logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion()); final ListGossipDigestSynVerbHandlergDigests = new ArrayList (); //隨機選擇一些節(jié)點,構造摘要列表 Gossiper.instance.makeRandomGossipDigest(gDigests); if (gDigests.size() > 0) { //構造消息,可以看到這里的類型是GOSSIP_DIGEST_SYN GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(), DatabaseDescriptor.getPartitionerName(), gDigests); MessageOut message = new MessageOut (MessagingService.Verb.GOSSIP_DIGEST_SYN, digestSynMessage, GossipDigestSyn.serializer); /*將消息發(fā)送給一個活著的節(jié)點,隨機選擇的,代碼如下 * int index = (size == 1) ? 0 : random.nextInt(size); * InetAddress to = liveEndpoints.get(index); * 如果選擇到的是種子節(jié)點,那么就會返回true. */ boolean gossipedToSeed = doGossipToLiveMember(message); //隨機決定是否向掛掉的節(jié)點發(fā)送gossip消息 maybeGossipToUnreachableMember(message); /* * 可參見這個issue:https://issues.apache.org/jira/browse/CASSANDRA-150 */ if (!gossipedToSeed || liveEndpoints.size() < seeds.size()) maybeGossipToSeed(message); doStatusCheck(); } } catch (Exception e) { JVMStabilityInspector.inspectThrowable(e); logger.error("Gossip error", e); } finally { taskLock.unlock(); } } }
public void doVerb(MessageInmessage, int id) { InetAddress from = message.from; if (logger.isTraceEnabled()) logger.trace("Received a GossipDigestSynMessage from {}", from); if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound()) { if (logger.isTraceEnabled()) logger.trace("Ignoring GossipDigestSynMessage because gossip is disabled"); return; } GossipDigestSyn gDigestMessage = message.payload; /* 不是同一個集群的就不處理 */ if (!gDigestMessage.clusterId.equals(DatabaseDescriptor.getClusterName())) { logger.warn("ClusterName mismatch from {} {}!={}", from, gDigestMessage.clusterId, DatabaseDescriptor.getClusterName()); return; } if (gDigestMessage.partioner != null && !gDigestMessage.partioner.equals(DatabaseDescriptor.getPartitionerName())) { logger.warn("Partitioner mismatch from {} {}!={}", from, gDigestMessage.partioner, DatabaseDescriptor.getPartitionerName()); return; } List gDigestList = gDigestMessage.getGossipDigests(); /*發(fā)送者和接受者都處于shadow round階段,那么就發(fā)送一個空的ack回去*/ if (!Gossiper.instance.isEnabled() && Gossiper.instance.isInShadowRound()) { // a genuine syn (as opposed to one from a node currently // doing a shadow round) will always contain > 0 digests if (gDigestList.size() > 0) { logger.debug("Ignoring non-empty GossipDigestSynMessage because currently in gossip shadow round"); return; } logger.debug("Received a shadow round syn from {}. Gossip is disabled but " + "currently also in shadow round, responding with a minimal ack", from); // new ArrayList<>默認16的size,也會占用額外的內存, // 可以考慮改成0或者使用Collections.EMPTY_LIST MessagingService.instance() .sendOneWay(new MessageOut<>(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAck(new ArrayList<>(), new HashMap<>()), GossipDigestAck.serializer), from); return; } if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); for (GossipDigest gDigest : gDigestList) { sb.append(gDigest); sb.append(" "); } logger.trace("Gossip syn digests are : {}", sb); } /* * 下面的工作其實就類似于git中的merge,如上文所說,版本大的說明他所持有的節(jié)點信息較新 * 這里就是做一個diff,如果你的version比我本地的大,那么我就發(fā)一個請求,讓你把這個節(jié)點的 * 信息發(fā)給我,如果我的version比你的大,那么說明我的信息更新一點,就會告訴你,你的該更新了 * 然后就會發(fā)一個GossipDigestAck消息回去。 */ doSort(gDigestList); List deltaGossipDigestList = new ArrayList (); Map deltaEpStateMap = new HashMap (); Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap); logger.trace("sending {} digests and {} deltas", deltaGossipDigestList.size(), deltaEpStateMap.size()); MessageOut gDigestAckMessage = new MessageOut (MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap), GossipDigestAck.serializer); if (logger.isTraceEnabled()) logger.trace("Sending a GossipDigestAckMessage to {}", from); MessagingService.instance().sendOneWay(gDigestAckMessage, from); }
核心的實現:
void examineGossiper(ListGossipDigestAckVerbHandlergDigestList, List deltaGossipDigestList, Map deltaEpStateMap) { if (gDigestList.size() == 0) { /* * 如果是空的,表明這是一次shadow round,那么我們要把自己所有已知的節(jié)點信息發(fā)過去。 */ logger.debug("Shadow request received, adding all states"); for (Map.Entry entry : endpointStateMap.entrySet()) { gDigestList.add(new GossipDigest(entry.getKey(), 0, 0)); } } for ( GossipDigest gDigest : gDigestList ) { int remoteGeneration = gDigest.getGeneration(); int maxRemoteVersion = gDigest.getMaxVersion(); /* Get state associated with the end point in digest */ EndpointState epStatePtr = endpointStateMap.get(gDigest.getEndpoint()); /* Here we need to fire a GossipDigestAckMessage. If we have some data associated with this endpoint locally then we follow the "if" path of the logic. If we have absolutely nothing for this endpoint we need to request all the data for this endpoint. */ if (epStatePtr != null) { int localGeneration = epStatePtr.getHeartBeatState().getGeneration(); /* get the max version of all keys in the state associated with this endpoint */ int maxLocalVersion = getMaxEndpointStateVersion(epStatePtr); if (remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion) continue; if (remoteGeneration > localGeneration) { /* we request everything from the gossiper */ requestAll(gDigest, deltaGossipDigestList, remoteGeneration); } else if (remoteGeneration < localGeneration) { /* send all data with generation = localgeneration and version > 0 */ sendAll(gDigest, deltaEpStateMap, 0); } else if (remoteGeneration == localGeneration) { /* If the max remote version is greater then we request the remote endpoint send us all the data for this endpoint with version greater than the max version number we have locally for this endpoint. If the max remote version is lesser, then we send all the data we have locally for this endpoint with version greater than the max remote version. */ if (maxRemoteVersion > maxLocalVersion) { deltaGossipDigestList.add(new GossipDigest(gDigest.getEndpoint(), remoteGeneration, maxLocalVersion)); } else if (maxRemoteVersion < maxLocalVersion) { /* send all data with generation = localgeneration and version > maxRemoteVersion */ sendAll(gDigest, deltaEpStateMap, maxRemoteVersion); } } } else { /* We are here since we have no data for this endpoint locally so request everything. */ requestAll(gDigest, deltaGossipDigestList, remoteGeneration); } } }
public void doVerb(MessageInGossipDigestAck2VerbHandlermessage, int id) { InetAddress from = message.from; if (logger.isTraceEnabled()) logger.trace("Received a GossipDigestAckMessage from {}", from); if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound()) { if (logger.isTraceEnabled()) logger.trace("Ignoring GossipDigestAckMessage because gossip is disabled"); return; } GossipDigestAck gDigestAckMessage = message.payload; List gDigestList = gDigestAckMessage.getGossipDigestList(); Map epStateMap = gDigestAckMessage.getEndpointStateMap(); logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size()); if (Gossiper.instance.isInShadowRound()) { if (logger.isDebugEnabled()) logger.debug("Received an ack from {}, which may trigger exit from shadow round", from); // 如果是空的,說明他也在shdow round中,木有事,反正還會重試的 Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty(), epStateMap); return; } if (epStateMap.size() > 0) { /* * 第一次發(fā)送SYN消息的時候會更新firstSynSendAt,如果ACK消息 * 是在我們第一次SYN之前的,那么說明這個ACK已經過期了,直接忽略。 */ if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0) { if (logger.isTraceEnabled()) logger.trace("Ignoring unrequested GossipDigestAck from {}", from); return; } /* 失敗檢測相關的,先不管 */ Gossiper.instance.notifyFailureDetector(epStateMap); /*將遠程收到的信息跟本地的merge,類似上面的操作*/ Gossiper.instance.applyStateLocally(epStateMap); } /* * 構造一個GossipDigestAck2Message消息,將對方需要的節(jié)點信息發(fā)給他 */ Map deltaEpStateMap = new HashMap (); for (GossipDigest gDigest : gDigestList) { InetAddress addr = gDigest.getEndpoint(); EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion()); if (localEpStatePtr != null) deltaEpStateMap.put(addr, localEpStatePtr); } MessageOut gDigestAck2Message = new MessageOut (MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2(deltaEpStateMap), GossipDigestAck2.serializer); if (logger.isTraceEnabled()) logger.trace("Sending a GossipDigestAck2Message to {}", from); MessagingService.instance().sendOneWay(gDigestAck2Message, from); }
public void doVerb(MessageIn總結message, int id) { if (logger.isTraceEnabled()) { InetAddress from = message.from; logger.trace("Received a GossipDigestAck2Message from {}", from); } if (!Gossiper.instance.isEnabled()) { if (logger.isTraceEnabled()) logger.trace("Ignoring GossipDigestAck2Message because gossip is disabled"); return; } Map remoteEpStateMap = message.payload.getEndpointStateMap(); Gossiper.instance.notifyFailureDetector(remoteEpStateMap); /*將收到的節(jié)點信息與本地的merge*/ Gossiper.instance.applyStateLocally(remoteEpStateMap); }
源碼上看結構是非常清晰的,每一步的邏輯相對來講還是比較容易理解的,其實也就類似tcp三次握手:
①、A隨機找個人B,隨機告訴他一些我知道的信息(這里可以根據時間排序、根據版本打分等等,具體可以參照論文)
②、B收到以后,和自己本地對比下,比A新的發(fā)回給A,比A舊的讓通知A在下一步告訴我
③、A本地合并下,然后將B需要的信息告訴他
④、B本地合并下
⑤、完成了
參考資料https://www.cs.cornell.edu/ho...
https://www.consul.io
https://www.serf.io/
https://en.wikipedia.org/wiki...
https://github.com/apache/cas...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/67574.html
八卦數據傳播協議 Hyperledger Fabric優(yōu)化了區(qū)塊鏈網絡性能,安全性,通過跨交易執(zhí)行(認可和提交)對等點和交易排序節(jié)點劃分工作負載來實現可伸縮性。這種網絡操作的分離需要安全,可靠且可擴展的數據傳播協議以確保數據的完整性和一致性。為了滿足這些條件,Fabric實現了八卦數據傳播協議。 八卦協議 對等點利用八卦以可擴展的方式廣播分類帳和通道數據,八卦消息是連續(xù)的,并且通道上的每個對等點不...
摘要:在我們的文檔中,我們使用來表明就選舉和事務的順序達成一致。提供成員關系,故障檢測和事件廣播。這是一個允許請求的請求響應機制。這包括服務發(fā)現,還包括豐富的運行狀況檢查,鎖定,鍵值,多數據中心聯合,事件系統和。 轉載請標明出處: http://blog.csdn.net/forezp/a...本文出自方志朋的博客 什么是Consul Consul是HashiCorp公司推出的開源軟件,使...
摘要:為什么區(qū)塊鏈會選擇作為網絡基礎上面介紹的時候說過,他是無中心服務器的,中心服務器就意味著,當受到攻擊的時候,中心服務器一旦宕機,整個網絡和服務就會出現問題。區(qū)塊鏈的核心是去中心化,這和網絡的觀念不約而同,所以選擇的理由也就很充分。 區(qū)塊鏈中P2P介紹 p2p是什么 為什么區(qū)塊鏈需要P2P 比特幣、以太坊、超級賬本和EOS的P2P對比 P2P是什么 P2P作為區(qū)塊鏈網絡中去中心化...
閱讀 1173·2021-09-10 10:51
閱讀 905·2019-08-30 15:53
閱讀 2732·2019-08-30 12:50
閱讀 983·2019-08-30 11:07
閱讀 1997·2019-08-30 10:50
閱讀 3604·2019-08-29 18:47
閱讀 1317·2019-08-29 18:44
閱讀 1604·2019-08-29 17:01