Gossip协议概述
Cassandra集群中的节点没有主次之分,它们通过一种叫做Gossip的协议进行通信。通过Gossip协议,它们可以知道集群中有哪些节点,以及这些节点的状态如何?每一条Gossip消息上都有一个版本号,节点可以对接收到的消息进行版本比对,从而得知哪些消息是我需要更新的,哪些消息是我有而别人没有的,然后互相倾诉吐槽,确保二者得到的信息相同,这很像现实生活中的八卦(摆龙门阵)一样,一传十,十传百,最后尽人皆知。
在Cassandra启动时,会启动Gossip服务,Gossip服务启动后会启动一个任务GossipTask,这个任务会周期性地与其他节点进行通信。GossipTask是位于org.apache.cassandra.gms.Gossip类下的一个内部类,其run方法如下:
public void run(){ MessagingService.instance().waitUntilListening(); /* Update the local heartbeat counter. */ endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat(); final List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); Gossiper.instance.makeRandomGossipDigest(gDigests); if (gDigests.size() > 0){ GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),DatabaseDescriptor.getPartitionerName(),gDigests); MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,digestSynMessage,GossipDigestSyn.serializer); /* Gossip to some random live member */ boolean gossipedToSeed = doGossipToLiveMember(message); /* Gossip to some unreachable member with some probability to check if he is back up */ doGossipToUnreachableMember(message); if (!gossipedToSeed || liveEndpoints.size() < seeds.size()) doGossipToSeed(message); doStatusCheck(); } }主要做了几下几件事:
1、GossipTask在Gossip启动后并不会立即运行,阻塞在listenGate这个条件变量上,当Gossip服务调用listen时才开始运行;
2、首先更新本节点的心跳版本号,然后构造需要发送给其他节点的消息gDigests;
4、检查节点状态。
有关种子节点和节点状态后面再分析,本节只关注Gossip协议本身。
GossipTask用于向其他节点发送Gossip信息,Cassandra还提供了SocketThread这样一个线程来负责接收消息,接收消息的代码在org.apache.cassandra.net.IncomingTcpConnection类中。不管是发送还是接收Gossip消息,都是调用org.apache.cassandra.net.MessagingService的sendOneWay方法实现的。
一次Gossip通信分为三个阶段,如图所示:
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN,new GossipDigestSynVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK,new GossipDigestAckVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2,new GossipDigestAck2VerbHandler());可见这三种消息分别对应三个消息类型GOSSIP_DIGEST_SYN、GOSSIP_DIGEST_ACK、GOSSIP_DIGEST_ACK2.
下面详细分析每个阶段。
GossipDigestSynMessage
GossipTask的run方法中发送了GOSSIP_DIGEST_SYN类型的消息(GossipDigestSynMessage),这种消息交给其对应的处理器GossipDigestAckVerbHandler处理,具体处理过程在doVerb()方法中,核心代码如下:
public void doVerb(MessageIn<GossipDigestSyn> message,int id){ //...check List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests(); doSort(gDigestList); List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>(); Map<InetAddress,EndpointState> deltaEpStateMap = new HashMap<InetAddress,EndpointState>(); Gossiper.instance.examineGossiper(gDigestList,deltaGossipDigestList,deltaEpStateMap); logger.trace("sending {} digests and {} deltas",deltaGossipDigestList.size(),deltaEpStateMap.size()); MessageOut<GossipDigestAck> gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK,new GossipDigestAck(deltaGossipDigestList,deltaEpStateMap),GossipDigestAck.serializer); Gossiper.instance.checkSeedContact(from); MessagingService.instance().sendOneWay(gDigestAckMessage,from); }1、对接受到的消息排序:先按generation排序,如果generation相同,按maxVersion与本地版本差排序;
2、examineGossiper对比接收到的信息与本节点的差异,本节点需要进一步获取的消息由deltaGossipDigestList保存,本节点需要告诉from节点的信息由deltaEpStateMap保存;
3、利用deltaGossipDigestList和deltaEpStateMap封装成GossipDigestAckMessage消息,发送给from节点。
GossipDigestAckMessage
同GossipDigestSynMessage消息一样,GOSSIP_DIGEST_ACK类型的消息由处理器GossipDigestAckVerbHandler处理,也是doVerb()方法,实际上这三个处理器有共同的实现接口IVerbHandler。核心代码如下:
public void doVerb(MessageIn<GossipDigestAck> message,int id){ List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList(); Map<InetAddress,EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap(); if (epStateMap.size() > 0) { Gossiper.instance.notifyFailureDetector(epStateMap); Gossiper.instance.applyStateLocally(epStateMap); } Map<InetAddress,EndpointState>(); for (GossipDigest gDigest : gDigestList) { InetAddress addr = gDigest.getEndpoint(); EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr,gDigest.getMaxVersion()); if (localEpStatePtr != null) deltaEpStateMap.put(addr,localEpStatePtr); } MessageOut<GossipDigestAck2> gDigestAck2Message = new MessageOut<GossipDigestAck2>(MessagingService.Verb.GOSSIP_DIGEST_ACK2,new GossipDigestAck2(deltaEpStateMap),GossipDigestAck2.serializer); MessagingService.instance().sendOneWay(gDigestAck2Message,from); }1、epStateMap 是from节点想要告诉它的消息,调用applyStateLocally方法进行更新;
3、将2中的消息封装成GOSSIP_DIGEST_ACK2消息发送给from结点。
GossipDigestAck2Message
由GossipDigestAck2VerbHandler处理器处理,这里就不贴代码,接收到from接口发过来的消息,进行本地更新即可。
下节学习cassandra的机架感应
参考资料: