1. 程式人生 > >Gossip協議在Cassandra中的實現 #2

Gossip協議在Cassandra中的實現 #2

關系 vol 管理 jira hint || deb base ner

原始鏈接: https://github.com/aCoder2013/blog/issues/2 , 轉載請註明出處

Gossip協議是什麽?

? 簡單來說就是一種去中心化、點對點的數據廣播協議,你可以把它理解為病毒的傳播。A傳染給B,B繼續傳染給C,如此下去。

? 協議本身只有一些簡單的限制,狀態更新的時間隨著參與主機數的增長以對數的速率增長,即使是一些節點掛掉或者消息丟失也沒關系。很多的分布式系統都用gossip 協議來解決自己遇到的一些難題。比如說服務發現框架consul就用了gossip協議( Serf)來做管理主機的關系以及集群之間的消息廣播,Cassandra也用到了這個協議,用來實現一些節點發現、健康檢查等。

通信流程

概述

首先系統需要配置幾個種子節點,比如說A、B, 每個參與的節點都會維護所有節點的狀態,node->(Key,Value,Version),版本號較大的說明其數據較新,節點P只能直接更新它自己的狀態,節點P只能間接的通過gossip協議來更新本機維護的其他節點的數據。

大致的過程如下,

? ① SYN:節點A向隨機選擇一些節點,這裏可以只選擇發送摘要,即不發送valus,避免消息過大

? ② ACK:節點B接收到消息後,會將其與本地的合並,這裏合並采用的是對比版本,版本較大的說明數據較新. 比如節點A向節點B發送數據C(key,value,2),而節點B本機存儲的是C(key,value1,3),那麽因為B的版本比較新,合並之後的數據就是B本機存儲的數據,然後會發回A節點。

? ③ ACK2:節點A接收到ACK消息,將其應用到本機的數據中

A發GossipDigestSyn  => B執行GossipDigestSynVerbHandler 
B發GossipDigestAck  => A執行GossipDigestAckVerbHandler 
A發GossipDigestAck2 => B執行GossipDigestAck2VerbHandler

這三個類都實現了IVerbHandler接口,註冊到MessagingService的處理器中:

MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());

這樣當消息模塊接收到消息後就會調用對應的Handler處理,如下面的代碼所示:

IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
        if (verbHandler == null)
        {
            //未知的消息不處理
            logger.trace("Unknown verb {}", verb);
            return;
        }

        try
        {
            verbHandler.doVerb(message, id);
        }
        catch (IOException ioe)
        {
            handleFailure(ioe);
            throw new RuntimeException(ioe);
        }
        catch (TombstoneOverwhelmingException | IndexNotAvailableException e)
        {
            handleFailure(e);
            logger.error(e.getMessage());
        }
        catch (Throwable t)
        {
            handleFailure(t);
            throw t;
        }

源碼解析

初始化

具體的初始化都是在org.apache.cassandra.service.StorageService#public synchronized void initServer() throws ConfigurationException()去做的,裏面會調用prepareToJoin() 嘗試加入gossip集群。

private void prepareToJoin() throws ConfigurationException
    {
        //volatile修飾保證可見性,已經加入了集群就直接跳過
        if (!joined)
        {
            /*....省略...*/
            if (!MessagingService.instance().isListening())
                //開始監聽消息
                MessagingService.instance().listen();

            //給本節點起個名字
            UUID localHostId = SystemKeyspace.getLocalHostId();

            /*
            *  一次shadow round會獲取所有到與之通訊節點擁有的所有節點的信息
            */
            if (replacing)
            {
                localHostId = prepareForReplacement();
                appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));

                if (!DatabaseDescriptor.isAutoBootstrap())
                {
                    // Will not do replace procedure, persist the tokens we‘re taking over locally
                    // so that they don‘t get clobbered with auto generated ones in joinTokenRing
                    SystemKeyspace.updateTokens(bootstrapTokens);
                }
                else if (isReplacingSameAddress())
                {
                    //only go into hibernate state if replacing the same address (CASSANDRA-8523)
                    logger.warn("Writes will not be forwarded to this node during replacement because it has the same address as " +
                                "the node to be replaced ({}). If the previous node has been down for longer than max_hint_window_in_ms, " +
                                "repair must be run after the replacement process in order to make this node consistent.",
                                DatabaseDescriptor.getReplaceAddress());
                    appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
                }
            }
            else
            {
                checkForEndpointCollision(localHostId);
            }

            // have to start the gossip service before we can see any info on other nodes.  this is necessary
            // for bootstrap to get the load info it needs.
            // (we won‘t be part of the storage ring though until we add a counterId to our state, below.)
            // Seed the host ID-to-endpoint map with our own ID.
            getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddress());
            appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion());
            appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(localHostId));
            appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getBroadcastRpcAddress()));
            appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());

            // load the persisted ring state. This used to be done earlier in the init process,
            // but now we always perform a shadow round when preparing to join and we have to
            // clear endpoint states after doing that.
            loadRingState();

            logger.info("Starting up server gossip");
            //啟動gossip,比如定時任務等
            Gossiper.instance.register(this);
            Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering.
            gossipActive = true;
            // gossip snitch infos (local DC and rack)
            gossipSnitchInfo();
            // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
            Schema.instance.updateVersionAndAnnounce(); // Ensure we know our own actual Schema UUID in preparation for updates
            LoadBroadcaster.instance.startBroadcasting();
            HintsService.instance.startDispatch();
            BatchlogManager.instance.start();
        }
    }

public synchronized Map<InetAddress, EndpointState> doShadowRound()
    {
        buildSeedsList();
        // it may be that the local address is the only entry in the seed
        // list in which case, attempting a shadow round is pointless
        if (seeds.isEmpty())
            return endpointShadowStateMap;

        seedsInShadowRound.clear();
        endpointShadowStateMap.clear();
        // 構造一個空的Syn消息,表明這是一次shadow round
        List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
        GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
                DatabaseDescriptor.getPartitionerName(),
                gDigests);
        MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
                digestSynMessage,
                GossipDigestSyn.serializer);

        inShadowRound = true;
        int slept = 0;
        try
        {
            while (true)
            {   
                /*
                *  第一次以及後面每五秒都會嘗試向所有的種子節點發送一次shdow round syn消息,嘗試
                *  獲取所有的節點的信息。如果達到了最大的延遲(默認為30S)或者已經達到了目的就會退出
                */
                if (slept % 5000 == 0)
                { 
                    logger.trace("Sending shadow round GOSSIP DIGEST SYN to seeds {}", seeds);

                    for (InetAddress seed : seeds)
                        MessagingService.instance().sendOneWay(message, seed);
                }

                Thread.sleep(1000);
                if (!inShadowRound)
                    break;

                slept += 1000;
                if (slept > StorageService.RING_DELAY)
                {
                    // if we don‘t consider ourself to be a seed, fail out
                    if (!DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()))
                        throw new RuntimeException("Unable to gossip with any seeds");

                    logger.warn("Unable to gossip with any seeds but continuing since node is in its own seed list");
                    inShadowRound = false;
                    break;
                }
            }
        }
        catch (InterruptedException wtf)
        {
            throw new RuntimeException(wtf);
        }

        return ImmutableMap.copyOf(endpointShadowStateMap);
    }

Gossiper#start()中啟動一個定時任務GossipTask,默認為每秒一次,發送SYN消息:

/*
* 線程池最好都指定名字,這樣方便查問題,另外最好指定好隊列大小,最好不要用Executors中
* 默認的×××隊列,關閉的時候註意處理好中斷,很多人都是catch Exception後打個異常就算了,
* 這樣不是很好的處理方式,我個人通常是當catch到InterruptedException後,根據業務場景決定是 
* 需要通過interrupt方法重置中斷位,當處理完這輪任務之後,決定是否退出
*/
private static final DebuggableScheduledThreadPoolExecutor executor = new DebuggableScheduledThreadPoolExecutor("GossipTasks");

public void start(int generationNbr, Map<ApplicationState, VersionedValue> preloadLocalStates)
    {
        buildSeedsList();
        /* initialize the heartbeat state for this localEndpoint */
        maybeInitializeLocalState(generationNbr);
        EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
        localState.addApplicationStates(preloadLocalStates);

        //notify snitches that Gossiper is about to start
        DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
        if (logger.isTraceEnabled())
            logger.trace("gossip started with generation {}", localState.getHeartBeatState().getGeneration());

        scheduledGossipTask = executor.scheduleWithFixedDelay(new GossipTask(),
                                                              Gossiper.intervalInMillis,
                                                              Gossiper.intervalInMillis,
                                                              TimeUnit.MILLISECONDS);
    }

那麽GossipTask內部的實現是怎樣的呢?

  private class GossipTask implements Runnable
    {
        public void run()
        {
            try
            {
                //等待MessagingService開始監聽
                MessagingService.instance().waitUntilListening();
            //加鎖
                taskLock.lock();

        //更新心跳計數器,這個是用來做失敗檢測的,這裏會有個定時任務輪詢這個Map,檢測最 近一次的
             //心跳時間,如果距離當前時間差距不合理,那麽我們就可以認為這個節點掛掉了,可以放到另外
             //隊列中,隨後隔一段時間再去看看是否恢復。
             endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState()
                                              .updateHeartBeat();
                if (logger.isTraceEnabled())
                    logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion());
                final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
                //隨機選擇一些節點,構造摘要列表
                Gossiper.instance.makeRandomGossipDigest(gDigests);

                if (gDigests.size() > 0)
                {
                    //構造消息,可以看到這裏的類型是GOSSIP_DIGEST_SYN
                    GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
                                                                           DatabaseDescriptor.getPartitionerName(),
                                                                           gDigests);
                    MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
                                                                                          digestSynMessage,
                                                                                          GossipDigestSyn.serializer);
                    /*將消息發送給一個活著的節點,隨機選擇的,代碼如下
                    *  int index = (size == 1) ? 0 : random.nextInt(size);
                    *  InetAddress to = liveEndpoints.get(index);
                    *  如果選擇到的是種子節點,那麽就會返回true.
                    */ 
                    boolean gossipedToSeed = doGossipToLiveMember(message);
                    //隨機決定是否向掛掉的節點發送gossip消息
                    maybeGossipToUnreachableMember(message);
                    /*
                    * 可參見這個issue:https://issues.apache.org/jira/browse/CASSANDRA-150
                    */
                    if (!gossipedToSeed || liveEndpoints.size() < seeds.size())
                        maybeGossipToSeed(message);
                         doStatusCheck();
                }
            }
            catch (Exception e)
            {
                JVMStabilityInspector.inspectThrowable(e);
                logger.error("Gossip error", e);
            }
            finally
            {
                taskLock.unlock();
            }
        }
    }

GossipDigestSynVerbHandler

public void doVerb(MessageIn<GossipDigestSyn> message, int id)
    {
        InetAddress from = message.from;
        if (logger.isTraceEnabled())
            logger.trace("Received a GossipDigestSynMessage from {}", from);
        if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound())
        {
            if (logger.isTraceEnabled())
                logger.trace("Ignoring GossipDigestSynMessage because gossip is disabled");
            return;
        }

        GossipDigestSyn gDigestMessage = message.payload;
        /* 不是同一個集群的就不處理 */
        if (!gDigestMessage.clusterId.equals(DatabaseDescriptor.getClusterName()))
        {
            logger.warn("ClusterName mismatch from {} {}!={}", from, gDigestMessage.clusterId, DatabaseDescriptor.getClusterName());
            return;
        }

        if (gDigestMessage.partioner != null && !gDigestMessage.partioner.equals(DatabaseDescriptor.getPartitionerName()))
        {
            logger.warn("Partitioner mismatch from {} {}!={}", from, gDigestMessage.partioner, DatabaseDescriptor.getPartitionerName());
            return;
        }

        List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();

        /*發送者和接受者都處於shadow round階段,那麽就發送一個空的ack回去*/
        if (!Gossiper.instance.isEnabled() && Gossiper.instance.isInShadowRound())
        {
            // a genuine syn (as opposed to one from a node currently
            // doing a shadow round) will always contain > 0 digests
            if (gDigestList.size() > 0)
            {
                logger.debug("Ignoring non-empty GossipDigestSynMessage because currently in gossip shadow round");
                return;
            }

            logger.debug("Received a shadow round syn from {}. Gossip is disabled but " +
                         "currently also in shadow round, responding with a minimal ack", from);
                // new ArrayList<>默認16的size,也會占用額外的內存,
            // 可以考慮改成0或者使用Collections.EMPTY_LIST
            MessagingService.instance()
                            .sendOneWay(new MessageOut<>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
                                                         new GossipDigestAck(new ArrayList<>(), new HashMap<>()),
                                                         GossipDigestAck.serializer),
                                        from);
            return;
        }

        if (logger.isTraceEnabled())
        {
            StringBuilder sb = new StringBuilder();
            for (GossipDigest gDigest : gDigestList)
            {
                sb.append(gDigest);
                sb.append(" ");
            }
            logger.trace("Gossip syn digests are : {}", sb);
        }

        /*
        * 下面的工作其實就類似於git中的merge,如上文所說,版本大的說明他所持有的節點信息較新
        * 這裏就是做一個diff,如果你的version比我本地的大,那麽我就發一個請求,讓你把這個節點的
        * 信息發給我,如果我的version比你的大,那麽說明我的信息更新一點,就會告訴你,你的該更
                * 新了然後就會發一個GossipDigestAck消息回去。
        */
        doSort(gDigestList);

        List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
        Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
        Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
        logger.trace("sending {} digests and {} deltas", deltaGossipDigestList.size(), deltaEpStateMap.size());
        MessageOut<GossipDigestAck> gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
                                                                                        new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap),
                                                                                        GossipDigestAck.serializer);
        if (logger.isTraceEnabled())
            logger.trace("Sending a GossipDigestAckMessage to {}", from);
        MessagingService.instance().sendOneWay(gDigestAckMessage, from);
    }

核心的實現:

void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndpointState> deltaEpStateMap)
    {
        if (gDigestList.size() == 0)
        {

           /* 
            * 如果是空的,表明這是一次shadow round,那麽我們要把自己所有已知的節點信息發過去。
            */
            logger.debug("Shadow request received, adding all states");
            for (Map.Entry<InetAddress, EndpointState> entry : endpointStateMap.entrySet())
            {
                gDigestList.add(new GossipDigest(entry.getKey(), 0, 0));
            }
        }
        for ( GossipDigest gDigest : gDigestList )
        {
            int remoteGeneration = gDigest.getGeneration();
            int maxRemoteVersion = gDigest.getMaxVersion();
            /* Get state associated with the end point in digest */
            EndpointState epStatePtr = endpointStateMap.get(gDigest.getEndpoint());
            /*
                Here we need to fire a GossipDigestAckMessage. If we have some data associated with this endpoint locally
                then we follow the "if" path of the logic. If we have absolutely nothing for this endpoint we need to
                request all the data for this endpoint.
            */
            if (epStatePtr != null)
            {
                int localGeneration = epStatePtr.getHeartBeatState().getGeneration();
                /* get the max version of all keys in the state associated with this endpoint */
                int maxLocalVersion = getMaxEndpointStateVersion(epStatePtr);
                if (remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion)
                    continue;

                if (remoteGeneration > localGeneration)
                {
                    /* we request everything from the gossiper */
                    requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
                }
                else if (remoteGeneration < localGeneration)
                {
                    /* send all data with generation = localgeneration and version > 0 */
                    sendAll(gDigest, deltaEpStateMap, 0);
                }
                else if (remoteGeneration == localGeneration)
                {
                    /*
                        If the max remote version is greater then we request the remote endpoint send us all the data
                        for this endpoint with version greater than the max version number we have locally for this
                        endpoint.
                        If the max remote version is lesser, then we send all the data we have locally for this endpoint
                        with version greater than the max remote version.
                    */
                    if (maxRemoteVersion > maxLocalVersion)
                    {
                        deltaGossipDigestList.add(new GossipDigest(gDigest.getEndpoint(), remoteGeneration, maxLocalVersion));
                    }
                    else if (maxRemoteVersion < maxLocalVersion)
                    {
                        /* send all data with generation = localgeneration and version > maxRemoteVersion */
                        sendAll(gDigest, deltaEpStateMap, maxRemoteVersion);
                    }
                }
            }
            else
            {
                /* We are here since we have no data for this endpoint locally so request everything. */
                requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
            }
        }
    }

GossipDigestAckVerbHandler

public void doVerb(MessageIn<GossipDigestAck> message, int id)
    {
        InetAddress from = message.from;
        if (logger.isTraceEnabled())
            logger.trace("Received a GossipDigestAckMessage from {}", from);
        if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound())
        {
            if (logger.isTraceEnabled())
                logger.trace("Ignoring GossipDigestAckMessage because gossip is disabled");
            return;
        }

        GossipDigestAck gDigestAckMessage = message.payload;
        List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
        Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
        logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());

        if (Gossiper.instance.isInShadowRound())
        {
            if (logger.isDebugEnabled())
                logger.debug("Received an ack from {}, which may trigger exit from shadow round", from);

            // 如果是空的,說明他也在shdow round中,木有事,反正還會重試的
            Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty(), epStateMap);
            return; 
        }

        if (epStateMap.size() > 0)
        {
            /*
            * 第一次發送SYN消息的時候會更新firstSynSendAt,如果ACK消息
            * 是在我們第一次SYN之前的,那麽說明這個ACK已經過期了,直接忽略。
            */
            if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0)
            {
                if (logger.isTraceEnabled())
                    logger.trace("Ignoring unrequested GossipDigestAck from {}", from);
                return;
            }

            /* 失敗檢測相關的,先不管 */
            Gossiper.instance.notifyFailureDetector(epStateMap);
            /*將遠程收到的信息跟本地的merge,類似上面的操作*/
            Gossiper.instance.applyStateLocally(epStateMap);
        }

        /*
        * 構造一個GossipDigestAck2Message消息,將對方需要的節點信息發給他
        */
        Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
        for (GossipDigest gDigest : gDigestList)
        {
            InetAddress addr = gDigest.getEndpoint();
            EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
            if (localEpStatePtr != null)
                deltaEpStateMap.put(addr, localEpStatePtr);
        }

        MessageOut<GossipDigestAck2> gDigestAck2Message = new MessageOut<GossipDigestAck2>(MessagingService.Verb.GOSSIP_DIGEST_ACK2,
                                                                                           new GossipDigestAck2(deltaEpStateMap),
                                                                                           GossipDigestAck2.serializer);
        if (logger.isTraceEnabled())
            logger.trace("Sending a GossipDigestAck2Message to {}", from);
        MessagingService.instance().sendOneWay(gDigestAck2Message, from);
    }

GossipDigestAck2VerbHandler


    public void doVerb(MessageIn<GossipDigestAck2> message, int id)
    {
        if (logger.isTraceEnabled())
        {
            InetAddress from = message.from;
            logger.trace("Received a GossipDigestAck2Message from {}", from);
        }
        if (!Gossiper.instance.isEnabled())
        {
            if (logger.isTraceEnabled())
                logger.trace("Ignoring GossipDigestAck2Message because gossip is disabled");
            return;
        }
        Map<InetAddress, EndpointState> remoteEpStateMap = message.payload.getEndpointStateMap();
        Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
        /*將收到的節點信息與本地的merge*/
        Gossiper.instance.applyStateLocally(remoteEpStateMap);
    }

總結

源碼上看結構是非常清晰的,每一步的邏輯相對來講還是比較容易理解的,其實也就類似tcp三次握手:

①、A隨機找個人B,隨機告訴他一些我知道的信息(這裏可以根據時間排序、根據版本打分等等,具體可以參照論文)

②、B收到以後,和自己本地對比下,比A新的發回給A,比A舊的讓通知A在下一步告訴我

③、A本地合並下,然後將B需要的信息告訴他

④、B本地合並下

⑤、完成了

參考資料

  1. https://www.cs.cornell.edu/home/rvr/papers/flowgossip.pdf
  2. https://www.consul.io
  3. https://www.serf.io/
  4. https://en.wikipedia.org/wiki/Gossip_protocol
  5. https://github.com/apache/cassandra

Gossip協議在Cassandra中的實現 #2