zookeeper源码分析(4)-选举流程和服务器启动处理

zookeeper源码分析(1)-服务端启动流程分析了服务端集群启动时会进行选举,下面主要分析下选举流程和后续的leader,follower,observer服务器的启动流程

Leader选举

首先介绍一些选举相关术语:

  • SID:服务器ID,同myid的值一样
  • ZXID:事务ID,用来标识当前服务器的事务变更状态,值越大说明当前服务器的数据越新
  • Vote:投票的对象,包含如下属性:
final private long id;  //被推举的Leader SID值
final private long zxid;  //被推举的Leader 事务 ID值
final private long electionEpoch;//逻辑时钟,用来判断多个投票是否在同一轮选举周期中,每进行新一轮的投票后,都会对该值加1
final private long peerEpoch;//被推举的Leader的epoch
final private ServerState state;//投票所属服务器的状态
  • 服务器状态ServerState
 public enum ServerState {
        LOOKING, //寻找Leader状态,处于该状态时,服务器会进入选举流程
        FOLLOWING,//跟随者状态,只处理非事务请求,事务请求会转交给leader服务器
        LEADING,//领导者状态
       OBSERVING;//观察者状态,不参与选举过程,只处理非事务请求,事务请求会转交给leader服务器
    }
  • QuorumCnxManager
    每台服务器在进行FastLeaderElection对象创建时,都会启动一个QuorumCnxManager,负责各台服务器之间的底层Leader选举过程中的网络通信,这个类中维护了一系列的队列,用于保存接收到的/待发送的消息,对于发送队列,会对每台其他服务器分别创建一个发送队列,互不干扰。核心变量为:
//消息接收队列,用于存放从其他服务器接收到的消息
public final ArrayBlockingQueue<Message> recvQueue;
//消息发送队列,按照SID分组,用于保存待发送的消息,从而保证了各台机器之间的消息发送互不影响
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
//SendWorker是消息发送器,这是按照SID分组的消息发送器集合
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap; 
//最近发送过的消息,为每个SID保留最近发送过的消息

final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;

QuorumCnxManager会为每个远程服务器创建一个SendWorker线程和RecvWorker线程

  • 消息发送过程:
    每个SendWorker不断的从对应的消息发送队列中获取一个消息来发送,并将这个消息放入lastMessageSent中,如果队列为空,则从lastMessageSent取出最后一个消息重新发送,可解决接方没有正确接收或处理消息的问题
  • 消息接收过程:
    每个RecvWorker不断的从这个TCP连接中读取消息,并将其保存到recvQueue队列中
    下面看一下服务器之间连接的创建过程:
private boolean startConnection(Socket sock, Long sid)
            throws IOException {
        DataOutputStream dout = null;
        DataInputStream din = null;
        try {
            // Use BufferedOutputStream to reduce the number of IP packets. This is
            // important for x-DC scenarios.
            BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
            dout = new DataOutputStream(buf);

            // Sending id and challenge
            // represents protocol version (in other words - message type)
            dout.writeLong(PROTOCOL_VERSION);
            dout.writeLong(self.getId());
            String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
            byte[] addr_bytes = addr.getBytes();
            dout.writeInt(addr_bytes.length);
            dout.write(addr_bytes);
            dout.flush();

            din = new DataInputStream(
                    new BufferedInputStream(sock.getInputStream()));
        } catch (IOException e) {
            LOG.warn("Ignoring exception reading or writing challenge: ", e);
            closeSocket(sock);
            return false;
        }

        // authenticate learner
        QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
        if (qps != null) {
            // TODO - investigate why reconfig makes qps null.
            authLearner.authenticate(sock, qps.hostname);
        }

        // If lost the challenge, then drop the new connection
        if (sid > self.getId()) {
            LOG.info("Have smaller server identifier, so dropping the " +
                     "connection: (" + sid + ", " + self.getId() + ")");
            closeSocket(sock);
            // Otherwise proceed with the connection
        } else {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);

            SendWorker vsw = senderWorkerMap.get(sid);
            
            if(vsw != null)
                vsw.finish();
            
            senderWorkerMap.put(sid, sw);
            queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY));
            
            sw.start();
            rw.start();
            
            return true;    
            
        }
        return false;
    }

可以发现在两两创建连接时,有个规则:只允许SID大的服务器主动和其他服务器建立连接,否则断开连接。在receiveConnection方法中,服务器会接受远程SID比自己大的连接。从而避免了两台服务器之间的重复连接。

leader选举算法实现流程如下:

《zookeeper源码分析(4)-选举流程和服务器启动处理》

选举主要函数为:FastLeaderElection.lookForLeader

public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(
                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }
        if (self.start_fle == 0) {
           self.start_fle = Time.currentElapsedTime();
        }
        try {
//用于记录当前服务器在本轮次的选举中收到的所有外部投票
            Map<Long, Vote> recvset = new HashMap<Long, Vote>();

            Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = finalizeWait;

            synchronized(this){
                logicalclock.incrementAndGet();
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
            sendNotifications();

            /*
             * Loop in which we exchange notifications until we find a leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                if(n == null){
                    if(manager.haveDelivered()){
                        sendNotifications();
                    } else {
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                    int tmpTimeOut = notTimeout*2;
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
                            tmpTimeOut : maxNotificationInterval);
                    LOG.info("Notification time out: " + notTimeout);
                } 
                else if (validVoter(n.sid) && validVoter(n.leader)) {
                    /*
                     * Only proceed if the vote comes from a replica in the current or next
                     * voting view for a replica in the current or next voting view.
                     */
                    switch (n.state) {
                    case LOOKING:
                        if (getInitLastLoggedZxid() == -1) {
                            LOG.debug("Ignoring notification as our zxid is -1");
                            break;
                        }
                        if (n.zxid == -1) {
                            LOG.debug("Ignoring notification from member with -1 zxid" + n.sid);
                            break;
                        }
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock.get()) {
                            logicalclock.set(n.electionEpoch);
                            recvset.clear();
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                            if(LOG.isDebugEnabled()){
                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                        + Long.toHexString(n.electionEpoch)
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                            }
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        if(LOG.isDebugEnabled()){
                            LOG.debug("Adding vote: from=" + n.sid +
                                    ", proposed leader=" + n.leader +
                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                        }

                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {

                            // Verify if there is any change in the proposed leader
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        if(n.electionEpoch == logicalclock.get()){
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                            if(termPredicate(recvset, new Vote(n.leader,
                                            n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                            && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        /*
                         * Before joining an established ensemble, verify that
                         * a majority are following the same leader.
                         * Only peer epoch is used to check that the votes come
                         * from the same ensemble. This is because there is at
                         * least one corner case in which the ensemble can be
                         * created with inconsistent zxid and election epoch
                         * info. However, given that only one ensemble can be
                         * running at a single point in time and that each 
                         * epoch is used only once, using only the epoch to 
                         * compare the votes is sufficient.
                         * 
                         * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
                         */
                        outofelection.put(n.sid, new Vote(n.leader, 
                                IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
                        if (termPredicate(outofelection, new Vote(n.leader,
                                IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
                                && checkLeader(outofelection, n.leader, IGNOREVALUE)) {
                            synchronized(this){
                                logicalclock.set(n.electionEpoch);
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecoginized: " + n.state
                              + " (n.state), " + n.sid + " (n.sid)");
                        break;
                    }
                } else {
                    if (!validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            return null;
        } finally {
            try {
                if(self.jmxLeaderElectionBean != null){
                    MBeanRegistry.getInstance().unregister(
                            self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}",
                    manager.getConnectionThreadCount());
        }
    }

选举流程为:
1.自增选举轮次

//使得所有有效选票都在一个轮次中
logicalclock.incrementAndGet();

2.初始化选票
第一次选举前,每台服务器都会将自己推举为leader

//updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
//leader为myid
synchronized void updateProposal(long leader, long zxid, long epoch){
        proposedLeader = leader;
        proposedZxid = zxid;
        proposedEpoch = epoch;
    }

3.发送初始化选票

private void sendNotifications() {
        for (long sid : self.getCurrentAndNextConfigVoters()) {
            QuorumVerifier qv = self.getQuorumVerifier();
            ToSend notmsg = new ToSend(
//投票类型为notification
                    ToSend.mType.notification,
//投票leader的myid值
                    proposedLeader,
//投票leader的zxid值
                    proposedZxid,
//当前选举轮次
                    logicalclock.get(),
//当前服务器状态
                    QuorumPeer.ServerState.LOOKING,
//为myid
                    sid,
//当前currentEpoch的值,即currentEpoch文件的值
                    proposedEpoch, 
//参与选举的服务器地址
            qv.toString().getBytes());
            sendqueue.offer(notmsg);
        }
    }

会对所有参与选举的server端发送自己的选票
注意:在创建FastLeaderElection选举算法对象时,会调用它的start方法,

public void start() {
        this.messenger.start();
    }
// Starts instances of WorkerSender and WorkerReceiver
        void start(){
            this.wsThread.start();
            this.wrThread.start();
        }

启动两个线程,wsThread和wrThread,实际上会包装为WorkerSender和WorkerReceiver,WorkerSender会不断的从FastLeaderElection.sendqueue 中获得发送消息QuorumCnxManager的queueSendMap中,发送出去。WorkerReceiver会不断的从QuorumCnxManager的recvQueue中获得消息添加到FastLeaderElection.recvqueue中

实现流程图如下:

《zookeeper源码分析(4)-选举流程和服务器启动处理》 选票管理

如果当前处于选举状态 ServerState.LOOKING,会不断的进入选举循环中
4.接收外部选票Notification n
如果没有接收到外部投票,且QuorumCnxManager.queueSendMap为空,则重新发送自己的投票,否则检查连接,没有连接的话重新和其他服务器创建连接,如果已经建立则重新发送投票
5.判断选举轮次(如果接收到了外部选票)

  • 如果外部投票的轮次大于内部投票n.electionEpoch > logicalclock.get(),则立即更新自己的选举轮次logicalclock.set(n.electionEpoch); 并清空所有已经收到的投票recvset.clear(),然后使用初始化的投票来进行pk,并把内部投票发送出去
  • 外部投票的轮次小于内部投票,服务器会直接忽略掉该外部投票,返回步骤4
  • 外部投票的选举轮次和内部投票一致,开始pk选票
    6.选票pk FastLeaderElection#totalOrderPredicate
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */

        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
    }

因素考虑优先级:
1.选举轮次 2.ZXID 3.SID,谁越大选谁
7.变更投票,并将变更发送出去

updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();

8.选票归档
recvset用于记录当前服务器在本轮次的leader选举中收到的所有外部投票,按照SID来区分
9.统计投票,更新服务器状态
如果termPredicate返回为true,说明recvset接收到当前轮次所有其他服务器的投票,如果不再接收到其他选票,说明当前服务器的选票就是最终leader的SID,也就是有过半的服务器认可了当前的内部投票,如果确定已经有过半的服务器认可了该内部投票,则更新当前服务器的状态,确定是自身是leader还是follower,否则终止投票,否则返回步骤4

至此,选举过程已经分析完毕了,确定了服务器的角色之后,下面来看各个服务器的启动流程
先放张Leader服务器和Follewer服务器启动期交互过程图

《zookeeper源码分析(4)-选举流程和服务器启动处理》

Leader服务器启动

主要方法:Leader.lead()

void lead() throws IOException, InterruptedException {
       ········统计选举时间和注册JMX代码省略········
        try {
            self.tick.set(0);
            zk.loadData();

            leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

            // Start thread that waits for connection requests from new followers.
            cnxAcceptor = new LearnerCnxAcceptor();
            cnxAcceptor.start();

            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

            zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

            synchronized(this){
                lastProposed = zk.getZxid();
            }

            newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                   null, null);

            QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
            QuorumVerifier curQV = self.getQuorumVerifier();
            if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
               
               try {
                   QuorumVerifier newQV = self.configFromString(curQV.toString());
                   newQV.setVersion(zk.getZxid());
                   self.setLastSeenQuorumVerifier(newQV, true);    
               } catch (Exception e) {
                   throw new IOException(e);
               }
            }
            
            newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
            if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){
               newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
            }
            
            // We have to get at least a majority of servers in sync with
            // us. We do this by waiting for the NEWLEADER packet to get
            // acknowledged
                       
             waitForEpochAck(self.getId(), leaderStateSummary);
             self.setCurrentEpoch(epoch);    
            
             try {
                 waitForNewLeaderAck(self.getId(), zk.getZxid());
             } catch (InterruptedException e) {
                 shutdown("Waiting for a quorum of followers, only synced with sids: [ "
                         + newLeaderProposal.ackSetsToString() + " ]");
                 HashSet<Long> followerSet = new HashSet<Long>();

                 for(LearnerHandler f : getLearners()) {
                     if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
                         followerSet.add(f.getSid());
                     }
                 }    
                 boolean initTicksShouldBeIncreased = true;
                 for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
                     if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
                         initTicksShouldBeIncreased = false;
                         break;
                     }
                 }                  
                 if (initTicksShouldBeIncreased) {
                     LOG.warn("Enough followers present. "+
                             "Perhaps the initTicks need to be increased.");
                 }
                 return;
             }

             startZkServer();
            String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
            if (initialZxid != null) {
                long zxid = Long.parseLong(initialZxid);
                zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
            }

            if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                self.setZooKeeperServer(zk);
            }

            self.adminServer.setZooKeeperServer(zk);
            boolean tickSkip = true;
            // If not null then shutdown this leader
            String shutdownMessage = null;

            while (true) {
                synchronized (this) {
                    long start = Time.currentElapsedTime();
                    long cur = start;
                    long end = start + self.tickTime / 2;
                    while (cur < end) {
                        wait(end - cur);
                        cur = Time.currentElapsedTime();
                    }

                    if (!tickSkip) {
                        self.tick.incrementAndGet();
                    }

                    // We use an instance of SyncedLearnerTracker to
                    // track synced learners to make sure we still have a
                    // quorum of current (and potentially next pending) view.
                    SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
                    syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
                    if (self.getLastSeenQuorumVerifier() != null
                            && self.getLastSeenQuorumVerifier().getVersion() > self
                                    .getQuorumVerifier().getVersion()) {
                        syncedAckSet.addQuorumVerifier(self
                                .getLastSeenQuorumVerifier());
                    }

                    syncedAckSet.addAck(self.getId());

                    for (LearnerHandler f : getLearners()) {
                        if (f.synced()) {
                            syncedAckSet.addAck(f.getSid());
                        }
                    }

                    // check leader running status
                    if (!this.isRunning()) {
                        // set shutdown flag
                        shutdownMessage = "Unexpected internal error";
                        break;
                    }

                    if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
                        // Lost quorum of last committed and/or last proposed
                        // config, set shutdown flag
                        shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
                                + syncedAckSet.ackSetsToString() + " ]";
                        break;
                    }
                    tickSkip = !tickSkip;
                }
                for (LearnerHandler f : getLearners()) {
                    f.ping();
                }
            }
            if (shutdownMessage != null) {
                shutdown(shutdownMessage);
                // leader goes in looking state
            }
        } finally {
            zk.unregisterJMX(this);
        }
    }

1.重新加载快照和事务日志数据,可参考zookeeper源码分析(6)-数据和存储

  1. 启动Follewer接收器LearnerCnxAcceptor
    LearnerCnxAcceptor负责接收所有非Leader服务器的连接请求,用于集群间非选举通信
    LearnerCnxAcceptor.run()
 public void run() {
                while (!stop) {
                        Socket s = ss.accept();
                        // start with the initLimit, once the ack is processed
                        // in LearnerHandler switch to the syncLimit
                        s.setSoTimeout(self.tickTime * self.initLimit);
                        s.setTcpNoDelay(nodelay);
                        BufferedInputStream is = new BufferedInputStream(
                                s.getInputStream());
                        LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                        fh.start();
                    }
·······省略异常处理代码·······
}

可以看到当接收到其余服务器的连接请求时,会创建LearnerHandler实例,该实例负责Leader服务器和其他服务器之间的消息通信和数据同步,初次创建会收到其他服务器发送的OBSERVERINFO或是FOLLOWERINFO类型的消息,通信类型可参考zookeeper集群间通信类型
5.Leader解析Learner消息,计算新的epoch(getEpochToPropose)
逻辑为:如果Learner的epoch比Leader的epoch大,则epoch_of_leader = epoch_of_learner + 1,然后该LearnerHandler会进行等待,知道过半的Learner已经和Leader建立过通信,这样就可以确定Leader 的epoch了
6.Leader向其他服务器发送leader状态
LearnerHandler.run

public void run() {
        try {
            leader.addLearnerHandler(this);
            tickOfNextAckDeadline = leader.self.tick.get()
                    + leader.self.initLimit + leader.self.syncLimit;

            ia = BinaryInputArchive.getArchive(bufferedInput);
            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
            oa = BinaryOutputArchive.getArchive(bufferedOutput);

            QuorumPacket qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
                LOG.error("First packet " + qp.toString()
                        + " is not FOLLOWERINFO or OBSERVERINFO!");
                return;
            }

            byte learnerInfoData[] = qp.getData();
            if (learnerInfoData != null) {
                ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
                if (learnerInfoData.length >= 8) {
                    this.sid = bbsid.getLong();
                }
                if (learnerInfoData.length >= 12) {
                    this.version = bbsid.getInt(); // protocolVersion
                }
                if (learnerInfoData.length >= 20) {
                    long configVersion = bbsid.getLong();
                    if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
                        throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                    }
                }
            } else {
                this.sid = leader.followerCounter.getAndDecrement();
            }

            if (leader.self.getView().containsKey(this.sid)) {
                LOG.info("Follower sid: " + this.sid + " : info : "
                        + leader.self.getView().get(this.sid).toString());
            } else {
                LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
            }
                        
            if (qp.getType() == Leader.OBSERVERINFO) {
                  learnerType = LearnerType.OBSERVER;
            }

            long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

            long peerLastZxid;
            StateSummary ss = null;
            long zxid = qp.getZxid();
            long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
            long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

            if (this.getVersion() < 0x10000) {
                // we are going to have to extrapolate the epoch information
                long epoch = ZxidUtils.getEpochFromZxid(zxid);
                ss = new StateSummary(epoch, zxid);
                // fake the message
                leader.waitForEpochAck(this.getSid(), ss);
            } else {
                byte ver[] = new byte[4];
                ByteBuffer.wrap(ver).putInt(0x10000);
                QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
                oa.writeRecord(newEpochPacket, "packet");
                bufferedOutput.flush();
                QuorumPacket ackEpochPacket = new QuorumPacket();
                ia.readRecord(ackEpochPacket, "packet");
                if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                    LOG.error(ackEpochPacket.toString()
                            + " is not ACKEPOCH");
                    return;
                }
                ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
                leader.waitForEpochAck(this.getSid(), ss);
            }
            peerLastZxid = ss.getLastZxid();
           
            // Take any necessary action if we need to send TRUNC or DIFF
            // startForwarding() will be called in all cases
            boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
            
            LOG.debug("Sending NEWLEADER message to " + sid);
            // the version of this quorumVerifier will be set by leader.lead() in case
            // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
            // we got here, so the version was set
            if (getVersion() < 0x10000) {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, null, null);
                oa.writeRecord(newLeaderQP, "packet");
            } else {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
                                .toString().getBytes(), null);
                queuedPackets.add(newLeaderQP);
            }
            bufferedOutput.flush();

            /* if we are not truncating or sending a diff just send a snapshot */
            if (needSnap) {
                boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
                LearnerSnapshot snapshot = 
                        leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
                try {
                    long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                    bufferedOutput.flush();
                    // Dump data to peer
                    leader.zk.getZKDatabase().serializeSnapshot(oa);
                    oa.writeString("BenWasHere", "signature");
                    bufferedOutput.flush();
                } finally {
                    snapshot.close();
                }
            }

            // Start thread that blast packets in the queue to learner
            startSendingPackets();
            
            /*
             * Have to wait for the first ACK, wait until
             * the leader is ready, and only then we can
             * start processing messages.
             */
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            if(qp.getType() != Leader.ACK){
                LOG.error("Next packet was supposed to be an ACK,"
                    + " but received packet: {}", packetToString(qp));
                return;
            }

            if(LOG.isDebugEnabled()){
                LOG.debug("Received NEWLEADER-ACK message from " + sid);   
            }
            leader.waitForNewLeaderAck(getSid(), qp.getZxid());

            syncLimitCheck.start();
            
            // now that the ack has been processed expect the syncLimit
            sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

            /*
             * Wait until leader starts up
             */
            synchronized(leader.zk){
                while(!leader.zk.isRunning() && !this.isInterrupted()){
                    leader.zk.wait(20);
                }
            }
            // Mutation packets will be queued during the serialize,
            // so we need to mark when the peer can actually start
            // using the data
            //
            LOG.debug("Sending UPTODATE message to " + sid);      
            queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

while(true){
········之后表示主从已经同步完成,zkServer启动完毕,可以接收服务器间的通信了················
}

启动LearnerHandler之后,会向Learner发送LEADERINFO,此时leader线程和LearnerHandler线程都会等待在leader.waitForEpochAck(this.getSid(), ss);方法上
7.Leaner响应ACKEPOCH消息
当一半Leaner参与选举的服务器回复ACKEPOCH消息之后,Leader服务器发送开始进行主从数据同步,boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);可参考zookeeper源码分析(6)-数据和存储
8.zkServer启动
开始进行主从同步后,Leader线程会等待在waitForNewLeaderAck(self.getId(), zk.getZxid());方法上,每当一个LearnerHandler线程完成了和Learner服务器的同步,会发送一个NEWLWADER给Learner服务器,Learner服务器会响应一个ACK消息给LearnerHandler,一半Leaner参与选举的服务器回复ACK消息之后,leader服务器会启动LeaderZooKeeperServer,同时LearnerHandler会发送一个UPTODATE消息给同步好的Leaner服务器,表示同步完成,可对外提供服务了

注意:leader服务器维护了两个服务器校验器

 //last committed quorum verifier
    public QuorumVerifier quorumVerifier;
   
    //last proposed quorum verifier
    public QuorumVerifier lastSeenQuorumVerifier = null;

在和Leaner服务器进行同步前交互时,传递的一直是lastSeenQuorumVerifier,我的理解是这样不影响事务请求正常提交的quorumVerifier.version,代码解释为:如有不对,请小伙伴指教~

Follewer服务器启动

主要流程为Follower.followLeader()

void followLeader() throws InterruptedException {
·········省略JMX注册和异常检查代码·········
                 QuorumServer leaderServer = findLeader();
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                if (self.isReconfigStateChange())
                   throw new Exception("learned about role change");
                //check to see if the leader zxid is lower than ours
                //this should never happen but is just a safety check
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                syncWithLeader(newEpochZxid);                
                QuorumPacket qp = new QuorumPacket();
                while (this.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);
                }
            } 
    }

1.主动连接注册到Leader服务器,并发送FOLLOWERINFO消息
2.一旦tcp连接上了,会接收到leader服务器发送的LEADERINFO消息,并回复ACKEPOCH消息,调用Learner.registerWithLeader(Leader.FOLLOWERINFO);

 /**
     * Once connected to the leader, perform the handshake protocol to
     * establish a following / observing connection. 
     * @param pktType
     * @return the zxid the Leader sends for synchronization purposes.
     * @throws IOException
     */
    protected long registerWithLeader(int pktType) throws IOException{
        /*
         * Send follower info, including last zxid and sid
         */
        long lastLoggedZxid = self.getLastLoggedZxid();
        QuorumPacket qp = new QuorumPacket();                
        qp.setType(pktType);
        qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
        
        /*
         * Add sid to payload
         */
        LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
        ByteArrayOutputStream bsid = new ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
        boa.writeRecord(li, "LearnerInfo");
        qp.setData(bsid.toByteArray());
        
        writePacket(qp, true);
        readPacket(qp);        
        final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
        if (qp.getType() == Leader.LEADERINFO) {
            // we are connected to a 1.0 server so accept the new epoch and read the next packet
            leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
            byte epochBytes[] = new byte[4];
            final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
            if (newEpoch > self.getAcceptedEpoch()) {
                wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
                self.setAcceptedEpoch(newEpoch);
            } else if (newEpoch == self.getAcceptedEpoch()) {
                // since we have already acked an epoch equal to the leaders, we cannot ack
                // again, but we still need to send our lastZxid to the leader so that we can
                // sync with it if it does assume leadership of the epoch.
                // the -1 indicates that this reply should not count as an ack for the new epoch
                wrappedEpochBytes.putInt(-1);
            } else {
                throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
            }
            QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
            writePacket(ackNewEpoch, true);
            return ZxidUtils.makeZxid(newEpoch, 0);
        } else {
            if (newEpoch > self.getAcceptedEpoch()) {
                self.setAcceptedEpoch(newEpoch);
            }
            if (qp.getType() != Leader.NEWLEADER) {
                LOG.error("First packet should have been NEWLEADER");
                throw new IOException("First packet should have been NEWLEADER");
            }
            return qp.getZxid();
        }
    } 

3.开始数据同步syncWithLeader(newEpochZxid);,参考zookeeper源码分析(6)-数据和存储

4.数据同步完成,启动LearnerZooKeeperServer,初始化请求链

Observer服务器启动

主要流程为:Observer.observeLeader()

void observeLeader() throws Exception {
·········省略JMX注册和异常检查代码·········
        try {
            QuorumServer leaderServer = findLeader();
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
                if (self.isReconfigStateChange())
                   throw new Exception("learned about role change");
 
                syncWithLeader(newLeaderZxid);
                QuorumPacket qp = new QuorumPacket();
                while (this.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);
                }
    }

1.主动连接注册到Leader服务器,并发送OBSERVERINFO消息
2.一旦tcp连接上了,会接收到leader服务器发送的LEADERINFO消息,并回复ACKEPOCH消息,主要用来告诉服务器自己当前的lastLoggedZxid和epochBytes,调用Learner.registerWithLeader(Leader.OBSERVERINFO);
3.开始数据同步syncWithLeader(newEpochZxid);,参考zookeeper源码分析(6)-数据和存储

4.数据同步完成,启动LearnerZooKeeperServer,初始化请求链

此后当Leader节点断掉或Leader服务器失去了与过半Follower的联系时,底层节点之间的通信会抛出异常,此时Leader.lead() or Follower.followLeader()会结束方法内的循环,从而返回至Quorum.run方法内,节点分别关闭各自的所有通信,将选举状态置为LOOKING状态,开始新一轮的选举。

    原文作者:Monica2333
    原文地址: https://www.jianshu.com/p/2766802bcec7
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞