ZooKeeper 源码分析 Leader/Follower 启动, Leader 选举, Leader/Follower 建立 (基于3.4.6)

1. ZooKeeper Leader/Follower 启动, Leader 选举, Leader/Follower 建立 概述

先看一下下面这张图:

《ZooKeeper 源码分析 Leader/Follower 启动, Leader 选举, Leader/Follower 建立 (基于3.4.6)》 zookeeper启动.png

上面这张图片有点大, 建议在 百度云 里面进行下载预览, 接下来我们会一步一步进行下去
PS: 吐槽一下简书的图片系统, 图片一旦大了就预览出问题(不清晰)

2. QuorumPeerMain 解析配置文件构建 QuorumPeer

下面的代码主要是从配置文件中获取配置信息, 构建 QuorumPeer

// 根据 配置 QuorumPeerConfig 来启动  QuorumPeer
public void runFromConfig(QuorumPeerConfig config) throws IOException {
    LOG.info("QuorumPeerConfig : " + config);
  try {
      ManagedUtil.registerLog4jMBeans();
  } catch (JMException e) {
      LOG.warn("Unable to register log4j JMX control", e);
  }

  LOG.info("Starting quorum peer");
  try {                                                                         // 1. 在 ZooKeeper 集群中, 每个 QuorumPeer 代表一个 服务
      ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
      cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());

      quorumPeer = new QuorumPeer();
      quorumPeer.setClientPortAddress(config.getClientPortAddress());
      quorumPeer.setTxnFactory(new FileTxnSnapLog(                              // 2. 设置 FileTxnSnapLog(这个类包裹 TxnLog, SnapShot)
              new File(config.getDataLogDir()),
              new File(config.getDataDir())));
      quorumPeer.setQuorumPeers(config.getServers());                           // 3. 集群中所有机器
      quorumPeer.setElectionType(config.getElectionAlg());                      // 4. 设置集群 Leader 选举所使用的的算法(默认值 3, 代表 FastLeaderElection)
      quorumPeer.setMyid(config.getServerId());                                 // 5. 每个 QuorumPeer 设置一个 myId 用于区分集群中的各个节点
      quorumPeer.setTickTime(config.getTickTime());
      quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());           // 6. 客户端最小的 sessionTimeout 时间(若不设置的话, 就是 tickTime * 2)
      quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());           // 7. 客户端最小的 sessionTimeout 时间(若不设置的话, 就是 tickTime * 20)
      quorumPeer.setInitLimit(config.getInitLimit());                           // 8. 最常用的就是 initLimit * tickTime, getEpochToPropose(等待集群中所有节点的 Epoch值 ) waitForEpochAck(在 Leader 建立过程中, Leader 会向所有节点发送 LEADERINFO, 而Follower 节点会回复ACKEPOCH) waitForNewLeaderAck(在 Leader 建立的过程中, Leader 会向 Follower 发送 NEWLEADER, waitForNewLeaderAck 就是等候所有Follower 回复对应的 ACK 值)
      quorumPeer.setSyncLimit(config.getSyncLimit());                           // 9. 常用方法 self.tickTime * self.syncLimit 用于限制集群中各个节点相互连接的 socket 的soTimeout
      quorumPeer.setQuorumVerifier(config.getQuorumVerifier());                 // 10.投票方法, 默认超过半数就通过 (默认值 QuorumMaj)
      quorumPeer.setCnxnFactory(cnxnFactory);                                   // 11.设置集群节点接收client端连接使用的 nioCnxnFactory(用 基于原生 java nio, netty nio) (PS 在原生 NIO 的类中发现代码中没有处理 java nio CPU 100% 的bug)
      quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));     // 12.设置 ZKDataBase
      quorumPeer.setLearnerType(config.getPeerType());                          // 13.设置节点的类别 (参与者/观察者)
      quorumPeer.setSyncEnabled(config.getSyncEnabled());                       // 14.这个参数主要用于 (Observer Enables/Disables sync request processor. This option is enable by default and is to be used with observers.) 就是 Observer 是否使用 SyncRequestProcessor
      quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());

      quorumPeer.start();                                                       // 15.开启服务
      LOG.info("quorumPeer.join begin");
      quorumPeer.join();                                                        // 16.等到 线程 quorumPeer 执行完成, 程序才会继续向下再执行, 详情见方法注解 (Waits for this thread to die.)
      LOG.info("quorumPeer.join end");
  } catch (InterruptedException e) {
      // warn, but generally this is ok
      LOG.warn("Quorum Peer interrupted", e);
  }
}
3. QuorumPeer 启动

主要是 加载数据到DataTree, 开启监听客户端连接, 开启Leader选举, 最后程序会在 QuorumPeer.run() 的while loop 里面

public synchronized void start() {
    loadDataBase();           // 从SnapShot,TxnFile 加载数据到 DataTree
    cnxnFactory.start();      // 开启服务端的 端口监听
    startLeaderElection();    // 开启 Leader 选举线程
    super.start();            // 这一步 开启 Thread.run() 方法
}
4. QuorumPeer.loadDataBase

从 snapshot/TxnLog里面加载数据到DataTree里面

  // 经过下面的操作, 就会存在 currentEpoch, acceptEpoch 文件, 并且 DataTree 文件也会进行加载
  private void loadDataBase() {
      File updating = new File(getTxnFactory().getSnapDir(),                // 1. 在 snap shot 文件目录下面有对应的 updateEpoch 文件
                               UPDATING_EPOCH_FILENAME);
  try {
          zkDb.loadDataBase();                                              // 2. 从 snapshot, TxnLog 里面加载出 dataTree 及 sessionsWithTimeouts

          // load the epochs
          long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;    // 3. 获取 zkDb 对应的处理过的 最新的一个 zxid 的值
      long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); // 4. 将 zxid 的高 32 位当做 epoch 值, 低 32 位才是 zxid
          try {
            currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);      // 5. 从文件中加载 epoch 值 (若不存在 currentEpoch 文件, 则直接在 catch 中执行代码, 而且一般都是这样)
              if (epochOfZxid > currentEpoch && updating.exists()) {        // 6. 此处说明 QuorumPeer 在进行 takeSnapShot 后, 进程直接挂了, 还没来得及更新 currentEpoch
                  LOG.info("{} found. The server was terminated after " +
                           "taking a snapshot but before updating current " +
                           "epoch. Setting current epoch to {}.",
                           UPDATING_EPOCH_FILENAME, epochOfZxid);
                  setCurrentEpoch(epochOfZxid);
                  if (!updating.delete()) {
                      throw new IOException("Failed to delete " +
                                            updating.toString());
                  }
              }
          } catch(FileNotFoundException e) {
            // pick a reasonable epoch number
            // this should only happen once when moving to a
            // new code version
            currentEpoch = epochOfZxid;                                    // 7. 遇到的是 currentEpoch 文件不存在, 直接运行到这里了
            LOG.info(CURRENT_EPOCH_FILENAME
                    + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                    currentEpoch);
            writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
          }
          if (epochOfZxid > currentEpoch) {
            throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
          }
          try {
            acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);     // 8. 从文件中读取当前接收到的 epoch 值
          } catch(FileNotFoundException e) {
            // pick a reasonable epoch number
            // this should only happen once when moving to a
            // new code version
            acceptedEpoch = epochOfZxid;                                   // 9. 当从 acceptEpoch 文件里面读取数据失败时, 就直接运行这边的代码
            LOG.info(ACCEPTED_EPOCH_FILENAME
                    + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                    acceptedEpoch);
            writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);       // 10 将 acceptEpoch 值直接写入到对应的文件里面
          }
          if (acceptedEpoch < currentEpoch) {
            throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + " is less than the accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch));
          }
      } catch(IOException ie) {
          LOG.error("Unable to load database on disk", ie);
          throw new RuntimeException("Unable to run quorum server ", ie);
      }
}
5. QuorumPeer.startLeaderElection

创建 Leader 选举的算法及开启 QuorumCnxManager.Listener 监听集群中的节点相互连接

// 开启 leader 的选举操作
synchronized public void startLeaderElection() {
  try {
    currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());  // 1. 生成投给自己的选票
  } catch(IOException e) {
    RuntimeException re = new RuntimeException(e.getMessage());
    re.setStackTrace(e.getStackTrace());
    throw re;
  }
    for (QuorumServer p : getView().values()) {                                // 2. 获取集群里面的所有的机器
        if (p.id == myid) {
            myQuorumAddr = p.addr;
            break;
        }
    }
    if (myQuorumAddr == null) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    if (electionType == 0) {                                                   // 3. 现在默认的选举算法是 FastLeaderElection
        try {
            udpSocket = new DatagramSocket(myQuorumAddr.getPort());
            responder = new ResponderThread();
            responder.start();
        } catch (SocketException e) {
            throw new RuntimeException(e);
        }
    }
    this.electionAlg = createElectionAlgorithm(electionType);                  // 4. 创建 Election
}

protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
            
    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:                                                                 // 1. 默认的 leader 选举的算法
        qcm = new QuorumCnxManager(this);
        QuorumCnxManager.Listener listener = qcm.listener;                  // 2. 等待集群中的其他的机器进行连接
        if(listener != null){
            listener.start();
            le = new FastLeaderElection(this, qcm);
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}

Listener 在监听到有其他节点连接上, 则进行相应的处理

6. QuorumCnxManager.Listener
/**
 * Sleeps on accept().
 */
@Override
public void run() {
    int numRetries = 0;
    InetSocketAddress addr;
    while((!shutdown) && (numRetries < 3)){       // 1. 有个疑惑 若真的出现 numRetries >= 3 从而退出了, 怎么办
        try {
            ss = new ServerSocket();
            ss.setReuseAddress(true);
            if (self.getQuorumListenOnAllIPs()) { // 2. 这里的默认值 quorumListenOnAllIPs 是 false
                int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
                addr = new InetSocketAddress(port);
            } else {
                addr = self.quorumPeers.get(self.getId()).electionAddr;
            }
            LOG.info("My election bind port: " + addr.toString());
            setName(self.quorumPeers.get(self.getId()).electionAddr
                    .toString());
            ss.bind(addr);
            while (!shutdown) {
                Socket client = ss.accept();     // 3. 这里会阻塞, 直到有请求到达
                setSockOpts(client);             // 4. 设置 socket 的连接属性
                LOG.info("Received connection request " + client.getRemoteSocketAddress());
                receiveConnection(client);
                numRetries = 0;
            }
        } catch (IOException e) {
            LOG.error("Exception while listening", e);
            numRetries++;
            try {
                ss.close();
                Thread.sleep(1000);
            } catch (IOException ie) {
                LOG.error("Error closing server socket", ie);
            } catch (InterruptedException ie) {
                LOG.error("Interrupted while sleeping. " +
                          "Ignoring exception", ie);
            }
        }
    }
    LOG.info("Leaving listener");
    if (!shutdown) {
        LOG.error("As I'm leaving the listener thread, "
                + "I won't be able to participate in leader "
                + "election any longer: "
                + self.quorumPeers.get(self.getId()).electionAddr);
    }
}
7. QuorumCnxManager.Listener.receiveConnection

为防止重复建立连接, 集群中各个节点之间只允许大的 myid 连接小的 myid, 建立之后会有SendWorker, RecvWorker 来处理消息的接受发送

/**
 * If this server receives a connection request, then it gives up on the new
 * connection if it wins. Notice that it checks whether it has a connection
 * to this server already or not. If it does, then it sends the smallest
 * possible long value to lose the challenge.
 * 
 */
public boolean receiveConnection(Socket sock) {                 // 1.接收 集群之间各个节点的相互的连接
    Long sid = null;
    LOG.info("sock:"+sock);
    try {
        // Read server id
        DataInputStream din = new DataInputStream(sock.getInputStream());
        sid = din.readLong();                                   // 2.读取对应的 myid (这里第一次读取的可能是一个协议的版本号)
        LOG.info("sid:"+sid);
        if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
            sid = din.readLong();
            LOG.info("sid:"+sid);
            // next comes the #bytes in the remainder of the message
            int num_remaining_bytes = din.readInt();            // 3.读取这整条消息的长度
            byte[] b = new byte[num_remaining_bytes];           // 4.构建要读取数据长度的字节数组
            // remove the remainder of the message from din
            int num_read = din.read(b);                         // 5.读取消息的内容 (疑惑来了, 这里会不会有拆包断包的情况出现)
            if (num_read != num_remaining_bytes) {              // 6.数据没有读满, 进行日志记录
                LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
            }
        }
        if (sid == QuorumPeer.OBSERVER_ID) {                    // 7.连接过来的是一个观察者
            /*
             * Choose identifier at random. We need a value to identify
             * the connection.
             */
            
            sid = observerCounter--;
            LOG.info("Setting arbitrary identifier to observer: " + sid);
        }
    } catch (IOException e) {                                   // 8.这里可能会有 EOFException 表示读取数据到文件尾部, 客户端已经断开, 没什么数据可以读取了, 所以直接关闭 socket
        closeSocket(sock);
        LOG.warn("Exception reading or writing challenge: " + e.toString() + ", sock:"+sock);
        return false;
    }
    
    //If wins the challenge, then close the new connection.
    if (sid < self.getId()) {                                   // 9.在集群中为了防止重复连接, 只能允许大的 myid 连接小的
        /*
         * This replica might still believe that the connection to sid is
         * up, so we have to shut down the workers before trying to open a
         * new connection.
         */
        SendWorker sw = senderWorkerMap.get(sid);               // 10.看看是否已经有 SendWorker, 有的话就进行关闭
        if (sw != null) {
            sw.finish();
        }

        /*
         * Now we start a new connection
         */
        LOG.debug("Create new connection to server: " + sid);
        closeSocket(sock);                                      // 11.关闭 socket
        connectOne(sid);                                        // 12.因为自己的 myid 比对方的大, 所以进行主动连接

        // Otherwise start worker threads to receive data.
    } else {                                                    // 13.自己的 myid 比对方小
        SendWorker sw = new SendWorker(sock, sid);              // 14.建立 SendWorker
        RecvWorker rw = new RecvWorker(sock, sid, sw);          // 15.建立 RecvWorker
        sw.setRecv(rw); 

        SendWorker vsw = senderWorkerMap.get(sid);              // 16.若以前存在 SendWorker, 则进行关闭
        
        if(vsw != null)
            vsw.finish();
        
        senderWorkerMap.put(sid, sw);
        
        if (!queueSendMap.containsKey(sid)) {                   // 17.若不存在 myid 对应的 消息发送 queue, 则就构建一个
            queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                    SEND_CAPACITY));
        }
        
        sw.start();                                             // 18.开启 消息发送 及 接收的线程
        rw.start();
        
        return true;    
    }
    return false;
}
8. QuorumPeer.run

程序最终一直在QuorumPeer.run里面, 而且状态从 LOOKING -> LEADING ->LOOING -> LEADING 一直循环

@Override
public void run() {
    setName("QuorumPeer" + "[myid=" + getId() + "]" +                    // 1. 设置当前线程的名称
            cnxnFactory.getLocalAddress());

    LOG.debug("Starting quorum peer");
    try {
        jmxQuorumBean = new QuorumBean(this);
        MBeanRegistry.getInstance().register(jmxQuorumBean, null);       // 2. 在 QuorumPeer 上包装 QuorumBean 注入到 JMX
        for(QuorumServer s: getView().values()){                         // 3. 遍历每个 ZooKeeperServer 节点
            ZKMBeanInfo p;
            if (getId() == s.id) {
                p = jmxLocalPeerBean = new LocalPeerBean(this);
                try {                                                    // 4. 将 LocalPeerBean 注入到 JMX 里面
                    MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                } catch (Exception e) {
                    LOG.warn("Failed to register with JMX", e);
                    jmxLocalPeerBean = null;
                }
            } else {                                                     // 5. 若myid不是本机, 也注入到 JMX 里面
                p = new RemotePeerBean(s);
                try {
                    MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                } catch (Exception e) {
                    LOG.warn("Failed to register with JMX", e);
                }
            }
        }
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        jmxQuorumBean = null;
    }

    try {
        /*
         * Main loop
         */
        while (running) {                                               // 6. QuorumPeer 会一直在这个 while 里面 (一般先是 LOOKING, LEADING/FOLLOWING)
            switch (getPeerState()) {
            case LOOKING:                                               // 7. QuorumPeer 是 LOOKING 状态, 正在寻找 Leader 机器
                LOG.info("LOOKING, and myid is " + myid);

                if (Boolean.getBoolean("readonlymode.enabled")) {       // 8. 判断启动服务是否是 readOnly 模式
                    LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                    // Create read-only server but don't start it immediately
                    final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                            logFactory, this,
                            new ZooKeeperServer.BasicDataTreeBuilder(),
                            this.zkDb);

                    // Instead of starting roZk immediately, wait some grace
                    // period before we decide we're partitioned.
                    //
                    // Thread is used here because otherwise it would require
                    // changes in each of election strategy classes which is
                    // unnecessary code coupling.
                    Thread roZkMgr = new Thread() {
                        public void run() {
                            try {
                                // lower-bound grace period to 2 secs
                                sleep(Math.max(2000, tickTime));
                                if (ServerState.LOOKING.equals(getPeerState())) {
                                    roZk.startup();
                                }
                            } catch (InterruptedException e) {
                                LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                            } catch (Exception e) {
                                LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                            }
                        }
                    };
                    try {
                        roZkMgr.start();                                 // 9.  这里分两部(a.. QuorumPeer.start().startLeaderElection().createElectionAlgorithm()
                        setBCVote(null);                                 // b. 调用 Election.lookForLeader方法开始选举, 直至 选举成功/其中发生异常
                        setCurrentVote(makeLEStrategy().lookForLeader());// 10. 创建选举 Leader 德策略)选举算法, 在这里可能需要消耗一点时间
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                        setPeerState(ServerState.LOOKING);
                    } finally {
                        // If the thread is in the the grace period, interrupt
                        // to come out of waiting.
                        roZkMgr.interrupt();
                        roZk.shutdown();
                    }
                } else {
                    try {                                                // 11. 这里分两部(a. QuorumPeer.start().startLeaderElection().createElectionAlgorithm()
                        setBCVote(null);                                 // b. 调用 Election.lookForLeader方法开始选举, 直至 选举成功/其中发生异常
                        setCurrentVote(makeLEStrategy().lookForLeader());// 12. 选举算法, 在这里可能需要消耗一点时间
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    }
                }
                break;
            case OBSERVING:
                try {
                    LOG.info("OBSERVING, and myid is " + myid);
                    setObserver(makeObserver(logFactory));
                    observer.observeLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e );                        
                } finally {
                    observer.shutdown();
                    setObserver(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            case FOLLOWING:
                try {
                    LOG.info("FOLLOWING, and myid is " + myid);         // 13. 最上层还是 QuorumPeer
                    setFollower(makeFollower(logFactory));              // 14. 初始化 follower, 在 Follower 里面引用 FollowerZooKeeperServer
                    follower.followLeader();                            // 15. 带用 follower.followLeader, 程序会阻塞在这里
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            case LEADING:
                LOG.info("LEADING, and myid is " + myid);
                try {
                    setLeader(makeLeader(logFactory));                  // 16. 初始化 Leader 对象
                    leader.lead();                                      // 17. Leader 程序会阻塞在这里
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    setPeerState(ServerState.LOOKING);
                }
                break;
            }
        }
    } finally {
        LOG.warn("QuorumPeer main thread exited");
        try {
            MBeanRegistry.getInstance().unregisterAll();
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        jmxQuorumBean = null;
        jmxLocalPeerBean = null;
    }
}
9. FastLeaderElection.lookForLeader

zookeeper默认使用FastLeaderElection做Leader选举的算法, 接下来直接看代码

/**
 * Starts a new round of leader election. Whenever our QuorumPeer
 * changes its state to LOOKING, this method is invoked, and it
 * sends notifications to all other peers.
 */
// 每个 QuorumPeer 启动时会调用这个方法, 通过这里的调用来完成 Leader 的选举
public Vote lookForLeader() throws InterruptedException {
    LOG.info("QuorumPeer {" + self  + "} is LOOKING !");
    try {
        self.jmxLeaderElectionBean = new LeaderElectionBean();
        MBeanRegistry.getInstance().register(                               // 1. 将 jmxLeaderElectionBean 注册入 JMX 里面 (有个注意点在使用 classLader 时,进行热部署需要 unregister 掉注入到 java核心包里面德 SQL Driver)
                self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        self.jmxLeaderElectionBean = null;
    }
    if (self.start_fle == 0) {
       self.start_fle = System.currentTimeMillis();
    }
    try {
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();                    // 2. 收到的投票信息

        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

        int notTimeout = finalizeWait;

        synchronized(this){
            LOG.info("logicalclock :" + logicalclock);
            logicalclock++;                                               // 3. 获取对应的 myid, zxid, epoch 值
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }

        LOG.info("New election. My id =  " + self.getId() + ", proposed zxid=0x " + Long.toHexString(proposedZxid));
        LOG.info("sendNotifications to QuorumPeers ");
        sendNotifications();                                              // 4. 先将进行 Leader 选举的信息发送给集群里面德节点 (包括自己)

        /*
         * Loop in which we exchange notifications until we find a leader
         */

        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ // 5. 若 QuorumPeer 还处于 LOOKING 状态, 则一直运行下面的 loop, 直到 Leader 选举成功
            /*
             * Remove next notification from queue, times out after 2 times
             * the termination time
             */                                                          // 6. 获取 投票的信息(这里是 Leader/Follower 发给自己德投票德信息)
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
            LOG.info("Notification:"+n);

            /*
             * Sends more notifications if haven't received enough.
             * Otherwise processes new notification.
             */
            if(n == null){                                               // 7. 这里 n == null, 说明有可能 集群之间的节点还没有正真连接上
                if(manager.haveDelivered()){
                    sendNotifications();
                } else {
                    manager.connectAll();                                // 8. 开始连接集群中的各台机器, 连接成功后都会有对应的 SenderWorker ReceiverWorker 与之对应
                }

                /*
                 * Exponential backoff
                 */
                int tmpTimeOut = notTimeout*2;
                notTimeout = (tmpTimeOut < maxNotificationInterval?
                        tmpTimeOut : maxNotificationInterval);
                LOG.info("Notification time out: " + notTimeout);
            }
            else if(self.getVotingView().containsKey(n.sid)) {           // 9. 处理集群中节点发来 Leader 选举德投票消息收到投票的信息
                /*
                 * Only proceed if the vote comes from a replica in the
                 * voting view.
                 */
                switch (n.state) {
                case LOOKING:                                            // 10.消息的来源方说自己也在找 Leader
                    // If notification > current, replace and send messages out
                    if (n.electionEpoch > logicalclock) {                // 11.若果接收到的 notication 的 epoch(选举的轮次)大于当前的轮次
                        logicalclock = n.electionEpoch;
                        recvset.clear();                                 // 12.totalOrderPredicate 将收到的 投票信息与 自己的进行比较(比较的次序依次是 epoch, zxid, myid)
                        boolean totalOrderPredicate = totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        LOG.info("n.leader:" + n.leader + ", n.zxid:"+ n.zxid +", n.peerEpoch:"+n.peerEpoch +", getInitId():"+getInitId() +", getInitLastLoggedZxid():"+getInitLastLoggedZxid() + ", getPeerEpoch():"+getPeerEpoch());
                        LOG.info("totalOrderPredicate:"+totalOrderPredicate);
                        if(totalOrderPredicate) {                        // 13.新收到的 Leader 选举信息胜出, 覆盖本地的选举信息
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(),
                                    getInitLastLoggedZxid(),
                                    getPeerEpoch());
                        }
                        sendNotifications();                             // 14.因为这里 Leader 选举德信息已经更新了, 所以这里 将 Leader 选举的消息发送出去
                    } else if (n.electionEpoch < logicalclock) {         // 15.若果接收到的 notication 的 epoch(选举的轮次)小于当前的轮次, 则直接丢掉
                        LOG.info("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                + Long.toHexString(n.electionEpoch)
                                + ", logicalclock=0x" + Long.toHexString(logicalclock));
                        break;                                           // 16.若接收到德选举消息德 epoch 与自己的在同一个选举周期上 totalOrderPredicate 将收到的 投票信息与 自己的进行比较(比较的次序依次是 epoch, zxid, myid)
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);   // 17.接收到的 Leader新收到的 Notification 胜出, 将Notification里面的内容覆盖本地的选举信息
                        sendNotifications();                             // 18.因为这里 Leader 选举德信息已经更新了, 所以这里 将 Leader 选举的消息发送出去
                    }                                                    // 19. 将收到的投票信息放入投票的集合 recvset 中, 用来作为最终的 "过半原则" 判断
                    Vote vote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                    LOG.info("Receive Notification: " + n);
                    LOG.info("Adding vote: " + vote);
                    recvset.put(n.sid, vote);

                                                                         // 20.生成这次Vote 通过 termPredicate判断这次选举是否结束
                    Vote selfVote = new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch);
                    boolean termPredicate = termPredicate(recvset,selfVote );    // 21.判断选举是否结束 (默认的就是过半原则)
                    LOG.info("recvset:"+recvset +", || selfVote: " + selfVote);
                    LOG.info("termPredicate:"+termPredicate);
                    if (termPredicate) {                                         // 22.满足过半原则, Leader 选举成功

                        // Verify if there is any change in the proposed leader
                        while((n = recvqueue.poll(finalizeWait,
                                TimeUnit.MILLISECONDS)) != null){                // 23.这时候再从 recvqueue 里面获取 Notification
                            boolean totalOrderPredicate2 = totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch);// 24.判断是否需要更新 本机(QuorumPeer) 的投票信息
                            LOG.info("totalOrderPredicate2:"+totalOrderPredicate2);
                            if(totalOrderPredicate2){                            // 25.若还需要更新 Leader 的投票信息
                                recvqueue.put(n);                                // 26.则将对方发来的 Notification 放入 recvqueue, 重新等待获取 Notification
                                break;
                            }
                        }

                        /*
                         * This predicate is true once we don't read any new
                         * relevant message from the reception queue
                         */
                        if (n == null) {                                        // 27.若n==null, 说明 Leader 集群中的选举可以定下来了, 修改状态信息 至 Leading
                            self.setPeerState((proposedLeader == self.getId()) ?
                                    ServerState.LEADING: learningState());      // 28.判断这时确认的 Leader 是否是本机, 若是的话, 则更新本机的state为 LEADING

                            Vote endVote = new Vote(proposedLeader,             // 29.组装生成这次 Leader 选举最终的投票的结果
                                                    proposedZxid,
                                                    logicalclock,
                                                    proposedEpoch);
                            leaveInstance(endVote);                             // 30.Leader选举结束, 清空 recvqueue
                            return endVote;                                     // 31.这时会退回到程序的上层, 进行 follower.followLeader() / leader.lead()
                        }
                    }
                    break;
                case OBSERVING:                                                // 32.角色是 OBSERVING 的 QuorumPeer 不参与 Leader 的选举
                    LOG.debug("Notification from observer: " + n.sid);
                    break;
                case FOLLOWING:
                case LEADING:
                    /*
                     * Consider all notifications from the same epoch
                     * together.
                     */
                    if(n.electionEpoch == logicalclock){
                        recvset.put(n.sid, new Vote(n.leader,                  // 33.同样需要将 投票的信息加入到集合里面
                                                      n.zxid,
                                                      n.electionEpoch,
                                                      n.peerEpoch));
                       
                        if(ooePredicate(recvset, outofelection, n)) {          // 34.检测投票是否结束,  Leader 是否已经去人
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());     // 35.在此处进行更行 QuorumPeer 的状态信息 (LEADING / FOLLOWING)

                            Vote endVote = new Vote(n.leader, 
                                    n.zxid, 
                                    n.electionEpoch, 
                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    /*
                     * Before joining an established ensemble, verify
                     * a majority is following the same leader.
                     */
                    outofelection.put(n.sid, new Vote(n.version,
                                                        n.leader,
                                                        n.zxid,
                                                        n.electionEpoch,
                                                        n.peerEpoch,
                                                        n.state));
       
                    if(ooePredicate(outofelection, outofelection, n)) {
                        synchronized(this){
                            logicalclock = n.electionEpoch;
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());
                        }
                        Vote endVote = new Vote(n.leader,
                                                n.zxid,
                                                n.electionEpoch,
                                                n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                            n.state, n.sid);
                    break;
                }
            } else {
                LOG.warn("Ignoring notification from non-cluster member " + n.sid);
            }
        }
        return null;
    } finally {
        try {
            if(self.jmxLeaderElectionBean != null){
                MBeanRegistry.getInstance().unregister(
                        self.jmxLeaderElectionBean);
            }
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        self.jmxLeaderElectionBean = null;
    }
}

通过 FastLeaderElection 确定 Leader 后, Leader与Follower就分开来处理, 下面分开进行

10. Leader.lead() 第一部分

在Leader端, 则通过lead()来处理与Follower的交互(这里的过程主要涉及 Follower, Learner, LearnerHandler, Leader 之间的交互, 而且有好几个阻塞的地方)

self.tick = 0;
zk.loadData();                                                      // 2. 从 snapshot, txn log 里面进行恢复 zxid
                                                                                                                                        // 3. 生成 Leader 的状态信息
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
LOG.info("leaderStateSummary:" + leaderStateSummary);
// Start thread that waits for connection requests from 
// new followers.
cnxAcceptor = new LearnerCnxAcceptor();                             // 4. LearnerCnxAcceptor 它会监听在对应端口, 一有 follower 连接上, 就开启一个 LearnerHandler 来处理对应的事件
LOG.info("cnxAcceptor start");
cnxAcceptor.start();

readyToStart = true;                                                // 5. 一开始这个 getAcceptedEpoch 是直接从文件中恢复过来的, 指的是处理过的 Propose
LOG.info("self.getId() :" + self.getId() + ",  self.getAcceptedEpoch():" +  self.getAcceptedEpoch());
                                                                    // 6. 等待足够多de Follower进来, 代表自己确实是 leader, 此处 lead 线程可能在 while 循环处等待
                                                                    // 7. 而在对应的 LearnerHandler 里面, 也会收到对应的 FOLLOWERINFO 数据包, 里面包含 acceptEpoch 数据
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

在 Leader 端会建立一个 LearnerCnxAcceptor, 用于连接集群中的其他节点, 建立后会用一个 LearnerHandler 来维护他们之间的交互; 而这时 Leader 会阻塞在 getEpochToPropose里面, 直到有过半 Follower发来信息, 并且在 LearnerHandler里面调用了 Leader.getEpochToPropose后就解除阻塞

11. Follower 连接 Leader

程序先通过 findLeader找到Leader

/**
 * Returns the address of the node we think is the leader.
 */
// 返回 Leader 的网络信息
protected InetSocketAddress findLeader() {
    InetSocketAddress addr = null;
    // Find the leader by id
    Vote current = self.getCurrentVote();            // 获取 QuorumPeer 的投票信息, 里面包含自己Leader选举所投的信息
    for (QuorumServer s : self.getView().values()) {
        if (s.id == current.getId()) {
            addr = s.addr;                          // 获取 Leader 的 addr
            break;
        }
    }
    if (addr == null) {
        LOG.warn("Couldn't find the leader with id = "
                + current.getId());
    }
    return addr;
}   

找到 Leader 的地址后就可以和Leader建立连接

/**
 * Establish a connection with the Leader found by findLeader. Retries
 * 5 times before giving up. 
 * @param addr - the address of the Leader to connect to.
 * @throws IOException - if the socket connection fails on the 5th attempt
 * @throws ConnectException
 * @throws InterruptedException
 */
// 连接 leader, 建立成功后, 在 Leader 端会有一个 LearnerHandler 处理与之的通信
protected void connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException {

    sock = new Socket();        
    sock.setSoTimeout(self.tickTime * self.initLimit);          // 1. 这里的 SoTimeout 很重要, 若 InputStream.read 超过这个时间,则会报出 SocketTimeoutException 异常
    for (int tries = 0; tries < 5; tries++) {                   // 2. 连接 Leader 尝试 5次, 若还是失败, 则抛出异常, 一直往外抛出, 直到 QuorumPeer 的重新开始选举 leader run 方法里面 -> 进行选举 Leader
        try {
            sock.connect(addr, self.tickTime * self.syncLimit); // 3. 连接 leader
            sock.setTcpNoDelay(nodelay);                        // 4. 设置 tcpnoDelay <- 这里其实就是禁止 tcp 底层合并小数据包, 一次发送所有数据的 算法
            break;
        } catch (IOException e) {
            if (tries == 4) {
                LOG.error("Unexpected exception",e);
                throw e;
            } else {
                LOG.warn("Unexpected exception, tries="+tries+
                        ", connecting to " + addr,e);
                sock = new Socket();
                sock.setSoTimeout(self.tickTime * self.initLimit);
            }
        }
        Thread.sleep(1000);
    }                                                           // 5. 封装对应的 I/O 数据流
    leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); //  6. 封装输出数据流
} 

Follower在与Leader建立连接之后会调用registerWithLeader()方法, 与Leader同步确认 epoch 值

12. Learner.registerWithLeader 第一部分

registerWithLeader(Leader.FOLLOWERINFO) 将Follower的zxid及 myid 等信息封装好发送到Leader

LOG.info("registerWithLeader:" + pktType);
/*
 * Send follower info, including last zxid and sid
 */
long lastLoggedZxid = self.getLastLoggedZxid();                     // 获取 Follower 的最后处理的 zxid
QuorumPacket qp = new QuorumPacket();                
qp.setType(pktType);                                                // 若是 Follower ,则当前的角色是  Leader.FOLLOWERINFO
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));         // Follower 的 lastZxid 的值

/*
 * Add sid to payload
 */
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);            // 将 Follower 的信息封装成 LearnerInfo
LOG.info("li:" + li);

ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());                                     // 在 QuorumPacket 里面添加 Follower 的信息
LOG.info("qp:" + qp);

writePacket(qp, true);                                              // 发送 QuorumPacket 包括 learnerInfo 与 Leader.FOLLOWERINFO, 通过 self.getAcceptedEpoch() 构成的 zxid
12. LearnerHandler.run 第一部分

处理Follower发过来的Leader.FOLLOWERINFO信息

tickOfNextAckDeadline = leader.self.tick + leader.self.initLimit + leader.self.syncLimit;
LOG.info("tickOfNextAckDeadline : " + tickOfNextAckDeadline);
                                                                            // 1. 构建与 Follower 之间建立的 socket 成数据流
ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
                                                                            // 2. 等待 Follower 发来数据包
QuorumPacket qp = new QuorumPacket();
long a1 = System.currentTimeMillis();
ia.readRecord(qp, "packet");                                                // 3. 读取 Follower 发过来的 FOLLOWERINFO 数据包
LOG.info("System.currentTimeMillis() - a1 : " + (System.currentTimeMillis() - a1));
LOG.info("qp:" + qp);
                                                                            // 4. 不应该有这种数据的存在
if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
    LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!");
    return;
}
byte learnerInfoData[] = qp.getData();                                      // 5. 读取参与者发来的数据

LOG.info("learnerInfoData :" + Arrays.toString(learnerInfoData));           // 6. 这里的 learnerInfo 就是 Follower/Observer 的信息
if (learnerInfoData != null) {
    if (learnerInfoData.length == 8) {
        ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
        this.sid = bbsid.getLong();
    } else {
        LearnerInfo li = new LearnerInfo();                                 // 7. 反序列化出 LearnerInfo
        ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
        LOG.info("li :" + li);
        this.sid = li.getServerid();                                        // 8. 取出 Follower 的 myid
        this.version = li.getProtocolVersion();                             // 9. 通讯的协议
    }
} else {
    this.sid = leader.followerCounter.getAndDecrement();
}

LOG.info("Follower sid: " + sid + " : info : " + leader.self.quorumPeers.get(sid));
            
if (qp.getType() == Leader.OBSERVERINFO) {
      learnerType = LearnerType.OBSERVER;
}            

long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());          // 10. 通过 zxid 来获取 Follower 的 Leader 选举的 epoch

LOG.info("qp : " + qp + ", lastAcceptedEpoch : " + lastAcceptedEpoch);

long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); // 11. 将 Follower 的 Leader 选举的 epoch  信息加入到 connectingFollowers 里面, 判断 集群中过半的Leader参与者了 getEpochToPropose

程序最后调用了leader.getEpochToPropose(), 当集群中有过半的节点发来后, 会在这里解除阻塞
在解除阻塞之后, Leader会向Follower发送LeaderLEADERINFO的信息

byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);                                   // 14. 构建出 描述Leader信息的数据包
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
LOG.info("newEpochPacket:" + newEpochPacket);
oa.writeRecord(newEpochPacket, "packet");                               // 15. 将 Leader 的信息发送给对应的 Follower / Observer
bufferedOutput.flush();

而此时Follower在接受到Leader.LEADERINFO信息之后会回复 Leader.ACKEPOCH 信息

// we are connected to a 1.0 server so accept the new epoch and read the next packet
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
LOG.info("leaderProtocolVersion:" + leaderProtocolVersion);
byte epochBytes[] = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);

LOG.info("newEpoch:" + newEpoch + ", self.getAcceptedEpoch():" + self.getAcceptedEpoch());
if (newEpoch > self.getAcceptedEpoch()) {                       // 若 Follower 的 election 的 epoch 值小于自己, 则用 Leader 的
    wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
    self.setAcceptedEpoch(newEpoch);
} else if (newEpoch == self.getAcceptedEpoch()) {
    // since we have already acked an epoch equal to the leaders, we cannot ack
    // again, but we still need to send our lastZxid to the leader so that we can
    // sync with it if it does assume leadership of the epoch.
    // the -1 indicates that this reply should not count as an ack for the new epoch
    wrappedEpochBytes.putInt(-1);
} else {                                                         // 若 Follower.epoch > Leader.epoch 则说明前面的 Leader 选举出错了
    throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
}                                                                // 在 接收到 Leader.LEADERINFO 的消息后, 进行回复 Leader.ACKEPOCH 的消息, 并且加上 lastLargestZxid 值
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);

LOG.info("ackNewEpoch:" + ackNewEpoch);
writePacket(ackNewEpoch, true);                                  // 将 ACKEPOCH 信息发送给对方 用于回复Leader发过来的LEADERINFO
return ZxidUtils.makeZxid(newEpoch, 0);

接着就是LearnerHandler对Leader.ACKEPOCH的处理了

byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);                                   // 14. 构建出 描述Leader信息的数据包
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
LOG.info("newEpochPacket:" + newEpochPacket);
oa.writeRecord(newEpochPacket, "packet");                               // 15. 将 Leader 的信息发送给对应的 Follower / Observer
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");                                // 16. Leader 读取 Follower 发来的 ACKEPOCH 信息

LOG.info("ackEpochPacket:" +ackEpochPacket);                            // 17. 刚刚发送了 leader 的信息, 现在获取一下确认的 ack

if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
    LOG.error(ackEpochPacket.toString()
            + " is not ACKEPOCH");
    return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
LOG.info("ss : " + ss);
leader.waitForEpochAck(this.getSid(), ss);                              // 18. 在这边等待所有的 Follower 都回复 ACKEPOCH 值 (这里也是满足过半就可以)

接下来就是 Leader 与 Follower之间的数据同步

13. Learner同步数据至Follower

这里面将涉及下面几个概念

1. committedLog 里面保存着Leader 端处理的最新的500个Proposal
2. 当 Follower处理的Proposal大于 maxCommittedLog, 则 Follower 要TRUNC自己的Proposal至maxCommittedLog
3. 当 Follower处理的Proposal小于 maxCommittedLog, 大于minCommittedLog, 则Leader将Follower没有的Proposal发送到Follower, 让其处理
4. 当 Follower处理的Proposal小于 minCommittedLog, 则Leader发送 Leader.SNAP给FOLLOWER, 并且将自身的数据序列化成数据流, 发送给 Follower

下面直接看代码

/* we are sending the diff check if we have proposals in memory to be able to 
 * send a diff to the 
 */ 
ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
ReadLock rl = lock.readLock();
try {
    rl.lock();                                                             // 20. Leader上将 最近已经提交的 Request 缓存到 ZKDatabase.committedLog里面(这个操作在 FinalRequestProcessor.processRequest 里面操作)  事务的 zxid 会 minCommittedLog -> maxCommittedLog 之间的事务
    final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
    final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();

    LOG.info("sid:" + sid + ", maxCommittedLog:" + Long.toHexString(maxCommittedLog)
                            + ", minCommittedLog:" +Long.toHexString(minCommittedLog)
                            + " peerLastZxid=0x"+Long.toHexString(peerLastZxid)
    );


    /**
     * http://www.jianshu.com/p/4cc1040b6a14
     * 获取 Leader 已经提交的 Request 数据
     * 1) 若 lastzxid 在 min 和 max 之间
     *      循环 proposals
     *      a) 当单个 proposal 的zxid <= 当前的 peerLastZxid 时, 说明已经提交过了, 直接跳过
     *      b) 当 proposal 的zxid 大于 peerLastZxid 时, 则删除小于 peerLastZxid部分, 因为已经提交过了, 剩余部分继续做 commit 操作,
     *          因此在所有 commit 之前, 先发送一个 trunc 事件, 删除已经提交过的部分, 然后发送需要的 commit 的相关节点
     * 2) 如果当前的 peerLastZxid 大于 max, 则全部做 TRUNC
     * 3) 剩下的不处理, 可能是新加入的节点, 所以事件类型为 SNAP, 同步数据时直接取快照
     */

    LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();           // 21. 获取Leader 上最近提交的Request, 查看是否还有需要的投票
    LOG.info("proposals:"+proposals);
    if (proposals.size() != 0) {                                                            // 22. proposals 里面存储的是已经提交的  Proposal
        LOG.debug("proposal size is {}", proposals.size());

        if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {       // 23. 若这个 If 条件成立, 说明 Follower 与 Leader 之间有少于 500 条数据未同步
            LOG.info("sid:" + sid + ", maxCommittedLog:" + Long.toHexString(maxCommittedLog)
                    + ", minCommittedLog:" +Long.toHexString(minCommittedLog)
                    + " peerLastZxid=0x"+Long.toHexString(peerLastZxid)
            );
            LOG.debug("Sending proposals to follower");

            // as we look through proposals, this variable keeps track of previous
            // proposal Id.
            long prevProposalZxid = minCommittedLog;

            // Keep track of whether we are about to send the first packet.
            // Before sending the first packet, we have to tell the learner
            // whether to expect a trunc or a diff
            boolean firstPacket=true;

            // If we are here, we can use committedLog to sync with
            // follower. Then we only need to decide whether to
            // send trunc or not
            packetToSend = Leader.DIFF;
            zxidToSend = maxCommittedLog;

            for (Proposal propose: proposals) {
                // skip the proposals the peer already has                                 // 24. 这个 Propose 已经处理过了, continue
                if (propose.packet.getZxid() <= peerLastZxid) {
                    prevProposalZxid = propose.packet.getZxid();                           // 25. 若 follower 已经处理过, 则更新 prevProposalZxid, 轮询下个 Proposal
                    continue;
                } else {
                    // If we are sending the first packet, figure out whether to trunc
                    // in case the follower has some proposals that the leader doesn't
                    if (firstPacket) {                                                     // 26. 在发起 Proposal 之前一定要确认 是否 follower 比 Leader 超前处理 Proposal
                        firstPacket = false;
                        // Does the peer have some proposals that the leader hasn't seen yet
                        if (prevProposalZxid < peerLastZxid) {                             // 27. follower 的处理事务处理比 leader 多, 也就是说prevProposalZxid这时就是maxCommittedLog,   则发送 TRUC 进行 Proposal 数据同步
                            // send a trunc message before sending the diff
                            packetToSend = Leader.TRUNC;                                        
                            zxidToSend = prevProposalZxid;
                            updates = zxidToSend;
                        }
                    }
                    queuePacket(propose.packet);                                           // 28. 将 事务发送到 发送队列里面
                    QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                            null, null);
                    queuePacket(qcommit);                                                  // 29. 紧接着发送一个 commit, 让 Follower 来进行提交 request
                }
            }
        } else if (peerLastZxid > maxCommittedLog) {                                       // 30. follower 的处理事务处理比 leader 多, 则发送 TRUC 进行 Proposal 数据同步
            LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
                    Long.toHexString(maxCommittedLog),
                    Long.toHexString(updates));

            LOG.info("sid:" + sid + ", maxCommittedLog:" + Long.toHexString(maxCommittedLog)
                    + ", minCommittedLog:" +Long.toHexString(minCommittedLog)
                    + " peerLastZxid=0x"+Long.toHexString(peerLastZxid)
                    + ", updates : " + Long.toHexString(updates)
            );

            packetToSend = Leader.TRUNC;                                                   // 31. 发送 TRUNC, Follower 将删除比 Leader 多的 Request
            zxidToSend = maxCommittedLog;                                                  // 32. 这里的 maxCommittedLog 是 Leader 处理过的最大 Request的 zxid
            updates = zxidToSend;
        } else {
            LOG.warn("Unhandled proposal scenario");
        }                                                                                  // 33. 若 Follower 与 Leader 的 lastZxid 相同, 则 发送 DIFF
    } else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
        // The leader may recently take a snapshot, so the committedLog
        // is empty. We don't need to send snapshot if the follow
        // is already sync with in-memory db.
        LOG.info("committedLog is empty but leader and follower "
                        + "are in sync, zxid=0x{}",
                Long.toHexString(peerLastZxid));

        LOG.info("sid:" + sid + ", maxCommittedLog:" + Long.toHexString(maxCommittedLog)
                + ", minCommittedLog:" +Long.toHexString(minCommittedLog)
                + " peerLastZxid=0x"+Long.toHexString(peerLastZxid)
        );

        packetToSend = Leader.DIFF;
        zxidToSend = peerLastZxid;
    } else {
        // just let the state transfer happen
        LOG.debug("proposals is empty");
    }               


    LOG.info("Sending " + Leader.getPacketType(packetToSend));
    leaderLastZxid = leader.startForwarding(this, updates);                               // 34. leader 将没有持久化但已经过半 ACK 确认过了的Proposal发给 Learner (这里就是细节)
    LOG.info("leaderLastZxid : " + leaderLastZxid);

} finally {
    rl.unlock();
} 

对于消息的发送 Leader 最后发送一个 Leader.NEWLEADER

 QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, ZxidUtils.makeZxid(newEpoch, 0), null, null);
 LOG.info("newLeaderQP:" + newLeaderQP);
 if (getVersion() < 0x10000) {
    oa.writeRecord(newLeaderQP, "packet");
} else {
    queuedPackets.add(newLeaderQP);                                                       // 36. 将 Leader.NEWLEADER 的数据包加入到发送队列(注意此时还没有启动发送队列的线程)
}
bufferedOutput.flush();
14. Follower处理与Leader之间的数据同步
synchronized (zk) {
    if (qp.getType() == Leader.DIFF) {                              // DIFF 数据包(DIFF数据包显示集群中两个节点的 lastZxid 相同)
        LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));                
    }
    else if (qp.getType() == Leader.SNAP) {                         // 收到的信息是 snap, 则从 leader 复制一份 镜像数据到本地(Leader比Follower处理的Proposal多至少500个)
        LOG.info("Getting a snapshot from leader");
        // The leader is going to dump the database
        // clear our own database and read
        zk.getZKDatabase().clear();
        zk.getZKDatabase().deserializeSnapshot(leaderIs);           // 从 InputStream 里面 反序列化出 DataTree
        String signature = leaderIs.readString("signature");        // 看了一个 读取 tag "signature" 代表的一个 String 对象
        if (!signature.equals("BenWasHere")) {
            LOG.error("Missing signature. Got " + signature);
            throw new IOException("Missing signature");                   
        }
    } else if (qp.getType() == Leader.TRUNC) {                     // 回滚到对应的事务到 qp.getZxid()(Follower处理的事务比Leader多)
        //we need to truncate the log to the lastzxid of the leader
        LOG.warn("Truncating log to get in sync with the leader 0x" + Long.toHexString(qp.getZxid()));
        boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
        LOG.info("truncated:" + truncated + ", qp.getZxid():" + qp.getZxid());
        if (!truncated) {
            // not able to truncate the log
            LOG.error("Not able to truncate the log "
                    + Long.toHexString(qp.getZxid()));
            System.exit(13);
        }

    }
    else {
        LOG.error("Got unexpected packet from leader "
                + qp.getType() + " exiting ... " );
        System.exit(13);

    }
    zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());          // 因为这里的 ZKDatatree 是从 Leader 的 SnapShot 的 InputStream 里面获取的, 所以调用这里通过 set 进行赋值
    zk.createSessionTracker();                                      // Learner 创建对应的 SessionTracker (Follower/Observer)(LearnerSessionTracker)
    
    long lastQueued = 0;

    // in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
    // we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER)
    // we need to make sure that we don't take the snapshot twice.
    boolean snapshotTaken = false;
    // we are now going to start getting transactions to apply followed by an UPTODATE
    outerLoop:
    while (self.isRunning()) {                                     // 这里的 self.isRunning() 默认就是 true
        readPacket(qp);

        LOG.info("qp:" + qp);

        switch(qp.getType()) {
        case Leader.PROPOSAL:                                     // 将投票信息加入到 待处理列表
            PacketInFlight pif = new PacketInFlight();
            pif.hdr = new TxnHeader();
            pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);         // 反序列化对应的 请求事务体
            LOG.info("pif:" + pif);
            if (pif.hdr.getZxid() != lastQueued + 1) {
            LOG.warn("Got zxid 0x"
                    + Long.toHexString(pif.hdr.getZxid())
                    + " expected 0x"
                    + Long.toHexString(lastQueued + 1));
            }
            lastQueued = pif.hdr.getZxid();
            packetsNotCommitted.add(pif);
            break;
        case Leader.COMMIT:                                        // commit 则将事务提交给 Server 处理
            LOG.info("snapshotTaken :" + snapshotTaken);
            if (!snapshotTaken) {
                pif = packetsNotCommitted.peekFirst();
                if (pif.hdr.getZxid() != qp.getZxid()) {
                    LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                } else {
                    zk.processTxn(pif.hdr, pif.rec);               // 处理对应的事件
                    packetsNotCommitted.remove();
                }
            } else {
                packetsCommitted.add(qp.getZxid());
            }
            break;
        case Leader.INFORM:                                                         // 这个 INFORM 只有Observer 才会处理
            /*
             * Only observer get this type of packet. We treat this
             * as receiving PROPOSAL and COMMMIT.
             */
            PacketInFlight packet = new PacketInFlight();
            packet.hdr = new TxnHeader();
            packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
            LOG.info("packet:" + packet);
            // Log warning message if txn comes out-of-order
            if (packet.hdr.getZxid() != lastQueued + 1) {
                LOG.warn("Got zxid 0x"
                        + Long.toHexString(packet.hdr.getZxid())
                        + " expected 0x"
                        + Long.toHexString(lastQueued + 1));
            }
            lastQueued = packet.hdr.getZxid();
            LOG.info("snapshotTaken : " + snapshotTaken);
            if (!snapshotTaken) {
                // Apply to db directly if we haven't taken the snapshot
                zk.processTxn(packet.hdr, packet.rec);
            } else {
                packetsNotCommitted.add(packet);
                packetsCommitted.add(qp.getZxid());
            }
            break;
        case Leader.UPTODATE:                                               // UPTODATE 数据包, 说明同步数据成功, 退出循环
            LOG.info("snapshotTaken : " + snapshotTaken + ", newEpoch:" + newEpoch);
            if (!snapshotTaken) { // true for the pre v1.0 case
                zk.takeSnapshot();
                self.setCurrentEpoch(newEpoch);
            }
            self.cnxnFactory.setZooKeeperServer(zk);                
            break outerLoop;                                                // 获取 UPTODATE 后 退出 while loop
        case Leader.NEWLEADER: // it will be NEWLEADER in v1.0              // 说明之前残留的投票已经处理完, 则将内存中的数据写入文件, 并发送 ACK 包
            LOG.info("newEpoch:" + newEpoch);
            // Create updatingEpoch file and remove it after current
            // epoch is set. QuorumPeer.loadDataBase() uses this file to
            // detect the case where the server was terminated after
            // taking a snapshot but before setting the current epoch.
            File updating = new File(self.getTxnFactory().getSnapDir(),
                                QuorumPeer.UPDATING_EPOCH_FILENAME);
            if (!updating.exists() && !updating.createNewFile()) {
                throw new IOException("Failed to create " +
                                      updating.toString());
            }
            zk.takeSnapshot();
            self.setCurrentEpoch(newEpoch);
            if (!updating.delete()) {
                throw new IOException("Failed to delete " +
                                      updating.toString());
            }
            snapshotTaken = true;
            writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
            break;
        }
    }
}

接着Follower将发送NEWLEADER对应的ACK信息, 并且处理数据同步时Leader发送过来的Proposal消息, 紧接着Followerjiuzai 一个while循环里面一直读取数据包并处理数据包; 与之对应的是LearnerHandler, LearnerHandler最后就在 while loop 里面一直处理与Follower之间的消息

15. 总结

整个Leader/Follower启动, Leader选举, Leader与Follower之间数据的同步涉及好几个步骤, 细节点比较多, 还好总体线路比较清晰, 若想了解的话, 可以先看一下片头的那个流程图!

点赞