APACHE ZOOKEEPER 3.5.3 CODE REVIEW

导语

1.zab协议崩溃恢复如何实现leader选举及数据同步?
2.zab消息广播阶段如何实现发起投票、收集选票、提交事务,并保证事务的顺序一致性?
3.paxos、zab、raft vs pbft vs pow、pos、ripple 等区块链共识算法的区别是什么,分别适合什么场景?//TODO:

zab协议包含两个阶段崩溃恢复消息广播,基于zookeeper 3.5.3集群启动以及ZooKeeper.setData来分别说明下两阶段的流程。

一.Index

1.1 崩溃恢复

org.apache.zookeeper.server.quorum.QuorumPeerMain.main
    ->QuorumPeerMain.runFromConfig
        ->NettyServerCnxnFactory.configure //配置ServerCnxnFactory
        ->quorumPeer = getQuorumPeer();
        ->quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(),config.getDataDir())) //设置FileTxnSnapLog,管理txn以及snap
        ->quorumPeer.setMyid(config.getServerId()) //设置myid
        ->quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit()); //超时时间
        ->quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));//初始化一个空的ZKDatabase
        ->quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); //设置QuorumVerifier
        ->quorumPeer.setCnxnFactory(cnxnFactory)
        ->org.apache.zookeeper.server.quorum.QuorumPeer.start() //启动QuorumPeer
            ->QuorumPeer.loadDataBase
                ->zkDb.loadDataBase() //恢复挂机之前的内存状态
                    ->long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
                        ->processTransaction(hdr,dt,sessions, itr.getTxn());//重放所有的Transaction,
                        ->listener.onTxnLoaded(hdr, itr.getTxn()); //将已提交的日志放入org.apache.zookeeper.server.ZKDatabase.committedLog中并更新minCommittedLog、maxCommittedLog两个offset便于同步时使用
                            ->addCommittedProposal(r)
                ->currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); //获取之前记录的Epoch
                ->acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
            ->QuorumPeer.startServerCnxnFactory //绑定端口,启动服务
            ->QuorumPeer.startLeaderElection //开启选举
                ->currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());//设置当前服务器的投票,使用之前记录的epoch以及zxid
                ->this.electionAlg = createElectionAlgorithm(electionType); //设置选举算法,默认
                    ->qcm = new QuorumCnxManager(this);
                    ->org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener.run
                        ->org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener.ss.bind(addr)//绑定选举端口
                        ->client = ss.accept();//等待连接
                        ->org.apache.zookeeper.server.quorum.QuorumCnxManager.receiveConnection //如果建立链接的对端的的sid小于当前服务器id,则当前服务器作为客户端去建立链接,否则启动发送接收线程开始选举流程,此逻辑是为了避免两台服务器建立多个链接
                            ->closeSocket(sock);//关闭之前的连接
                            ->org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne
                                ->QuorumCnxManager.initiateConnection //net层面的数据接收发发送
                                    ->dout.writeLong(self.getId());dout.flush(); //建立链接发送的第一条为当前服务器端myid
                                    ->SendWorker sw = new SendWorker(sock, sid); //将QuorumCnxManager.queueSendMap发送出去
                                    ->RecvWorker rw = new RecvWorker(sock, sid, sw);//将收到的消息加入QuorumCnxManager.recvQueue中
                                    ->sw.start();rw.start(); 
                    ->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.start //启动消息处理器
                        ->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerSender.run
                        ->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerReceiver.run
            ->QuorumPeer.run
                ->while (running) {
                    ->case LOOKING //注意leader选举用的当前节点的zxid为org.apache.zookeeper.server.DataTree.lastProcessedZxid,而本字段为FinalRequestProcessor.processRequest方法更新,FinalRequestProcessor为ProposalRequestProcessor->CommitProcessor之后的,即commit阶段才会更新,则zk其实是按照进入提交阶段zxid为准,即写入过半后发出大于1个commit后算写入成功
                        ->setCurrentVote(makeLEStrategy().lookForLeader());
                            ->org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader
                                ->synchronized(this){logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());} //更新选举周期,并设置当前服务器的leader选举propose为自己:FastLeaderElection.proposedLeader,proposedZxid,proposedEpoch
                                ->sendNotifications //将自己的投票发给集群中所有节点
                                    ->ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,proposedEpoch, qv.toString().getBytes());
                                    ->sendqueue.offer(notmsg);
                                ->while ((self.getPeerState() == ServerState.LOOKING) &&(!stop)){ //执行循环,直到确定leader
                                    ->Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS); //获取收到的Notification
                                    ->switch (n.state) {
                                        ->case LOOKING:
                                            ->if (n.electionEpoch > logicalclock.get()) { //如果收到的投票周期大于当前节点认为的周期,则更新周期、清空已经收到的投票、
                                                ->logicalclock.set(n.electionEpoch);
                                                ->recvset.clear();
                                                ->if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);} else{updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch())} //更新当前节点的投票,先比较周期,周期相同再比较zxid,zxid相同然后就比较myid,然后选大的作为当前节点后续的投票
                                                ->sendNotifications();//将最新的投票发给集群节点
                                            ->else if (n.electionEpoch < logicalclock.get()) {} //如果收到的投票周期小于当前节点周期,则忽略,继续取下一条Notification
                                            ->else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); //收到的投票比较大,则更新为收到的投票,并发送给集群其他节点
                                            ->recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //以上条件都不符合,则为收到的投票为其他节点认可当前节点的投票,把选票更新到recvset里
                                            ->if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock.get(), proposedEpoch))) { //查看收到的选票是否达到了要求,QuorumMaj为过半策略,没有达到要求继续循环获取Notification,否则设置投票结果,结束leader选举
                                                ->SyncedLearnerTracker.addAck(entry.getKey());
                                                    ->QuorumVerifierAcksetPair.getAckset().add(sid);
                                                ->SyncedLearnerTracker.hasAllQuorums
                                                    ->qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())
                                                        ->QuorumMaj.containsQuorum
                                            -> while((n = recvqueue.poll(finalizeWait..//读取剩余的notification,直到读取完成
                                            -> self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState()); //如果选举的leader为自己,则QuorumPeer.state设为LEADING,否则设为FOLLOWING
                                            ->Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);
                                            ->return endVote;//结束选举
                                    ->}
                    ->case LEADING: //如果当前节点为lead节点
                        ->QuorumPeer.makeLeader
                        ->org.apache.zookeeper.server.quorum.Leader.Leader
                            ->Leader.ss.bind(self.getQuorumAddress());//leader绑定端口号等待follower建立连接,本端口用来同步数据,leader与follower之间心跳、投票等数据交换
                        ->org.apache.zookeeper.server.quorum.Leader.lead
                            ->zk.loadData();
                                ->setZxid(zkDb.getDataTreeLastProcessedZxid()); //设置zxid
                                ->killSession(session, zkDb.getDataTreeLastProcessedZxid()); //清理过期的session
                                ->takeSnapshot();//落地一个干净的Snapshot
                            ->cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();
                                ->LearnerCnxAcceptor.run
                                    -> while (!stop) {Socket s = ss.accept();LearnerHandler fh = new LearnerHandler(s, Leader.this);fh.start();} //等待链接,初始化follower的LearnerHandler作为消息处理器
                                        ->LearnerHandler.run //leader处理flower请求的主流程
                                            ->ia.readRecord(qp, "packet");//读取第一个follower发送过来的消息[#1.1],读取出sid,设置LearnerHandler的sid
                                                ->this.sid = bbsid.getLong();
                                            ->QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); //发送leaderinof
                                            ->[#1.2]oa.writeRecord(newEpochPacket, "packet");
                                            ->ia.readRecord(ackEpochPacket, "packet");//读取Follower的ack消息[#1.3]
                                            ->ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                                            ->leader.waitForEpochAck(this.getSid(), ss);
                                                ->Leader.waitForEpochAck //阻塞,直到过半Follower返回ack消息
                                                    ->electingFollowers.add(id);
                                                    ->if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) 
                                                        ->electionFinished = true;
                                                        ->electingFollowers.notifyAll();
                                            ->boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader); //与follower同步
                                                ->long maxCommittedLog = db.getmaxCommittedLog(); //获取三个值,zxid以及内存中ZKDatabase.committedLog保存的最大最小日志
                                                ->long minCommittedLog = db.getminCommittedLog();
                                                ->long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
                                                ->[#1.4]if (lastProcessedZxid == peerLastZxid) {queueOpPacket(Leader.DIFF, peerLastZxid);}//如果相同,则发送Leader.DIFF消息
                                                ->[#1.5]if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) //如果leader节点的log小于当前节点,则发送TRUNC消息截断
                                                    ->queueOpPacket(Leader.TRUNC, maxCommittedLog);
                                                    ->currentZxid = maxCommittedLog;
                                                ->if ((maxCommittedLog >= peerLastZxid)&& (minCommittedLog <= peerLastZxid)) //如果处在maxCommittedLog与minCommittedLog中间,则说明缺了部分数据,则按照普通投票方式把ZKDatabase.committedLog内存中保存的这部分数据按照普通的Proposal发送出去,并提交Committed
                                                    ->LearnerHandler.queueCommittedProposals
                                                        ->while (itr.hasNext()) 
                                                            ->Proposal propose = itr.next()
                                                            ->queuePacket(propose.packet);//发送propose
                                                            ->queueOpPacket(Leader.COMMIT, packetZxid);//提交propose
                                                        ->queueOpPacket(Leader.DIFF, lastCommitedZxid); //不管啥情况下,都发个DIFF消息表名同步完成
                                                ->if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {//如果内存中log不够,则把Log以及snap中的消息都发过去
                                                ->leaderLastZxid = leader.startForwarding(this, currentZxid);[TODO://]//处理toBeApplied、outstandingProposals的目的是啥
                                            ->[#1.6]QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);queuedPackets.add(newLeaderQP);bufferedOutput.flush();//发送NEWLEADER消息
                                            ->ia.readRecord(qp, "packet");//读取[#1.7]的消息,即NEWLEADER的ack
                                            ->leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());//阻塞,直到收到足够的Follower的NEWLEADER的ack
                                                ->Leader.newLeaderProposal.addAck(sid);
                                            ->[#1.9]while(!leader.zk.isRunning() && !this.isInterrupted()){leader.zk.wait(20);}//阻塞,等待leader的ZooKeeperServer.state变为RUNNING
                                            ->[#1.10]queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); //发送UPTODATE消息给Follower
                                            ->while (true) //崩溃恢复阶段完成,进入普通消息处理阶段,leader与follower之前的消息处理参考消息广播阶段示例
                                                ->ia.readRecord(qp, "packet");
                                                ->.....
                            ->zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
                            ->lastProposed = zk.getZxid();
                            ->waitForEpochAck(self.getId(), leaderStateSummary);//主节点也是投票节点,参与新节点选举的策略
                            ->self.setCurrentEpoch(epoch);
                            ->[#1.6]newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),null, null);
                            ->waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);//阻塞,并投出本leader的票,与LearnerHandler一块参与newLeaderProposal的过半策略
                                ->Leader.newLeaderProposal.addAck(sid);
                            ->startZkServer(); //启动leader,
                                ->Leader.startZkServer
                                    ->lastCommitted = zk.getZxid()
                                    ->zk.startup();
                                        ->ZooKeeperServer.startup
                                            ->startSessionTracker();
                                            ->org.apache.zookeeper.server.quorum.LeaderZooKeeperServer.setupRequestProcessors();//启动处理链,处理链的逻辑下面消息广播详细说明
                                            ->setState(State.RUNNING);//标志leader启动完成,并触发LearnerHandler[#1.9]继续向下走
                                    ->self.updateElectionVote(getEpoch());//设置QuorumPeer.currentVote
                                    ->zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
                            ->while (true) //循环向follower发送pin消息,获取session等信息,PING处理不再展开
                                ->f.ping();
                                    ->LearnerHandler.ping
                                        ->QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
                                        ->queuePacket(ping);
                    ->case FOLLOWING: //FOLLOWING节点
                        ->setFollower(makeFollower(logFactory));
                        ->org.apache.zookeeper.server.quorum.Follower.followLeader
                            ->connectToLeader(addr);
                            ->long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                                ->Learner.registerWithLeader
                                    ->qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
                                    ->boa.writeRecord(li, "LearnerInfo");
                                    ->[#1.1]writePacket(qp, true); //发送给leader第一个消息注册
                                    ->readPacket(qp);//读取leader返回的LEADERINFO[#1.2]
                                    ->if (qp.getType() == Leader.LEADERINFO) 
                                        ->QuorumPeer.setAcceptedEpoch
                                            ->acceptedEpoch = e; //设置QuorumPeer.acceptedEpoch
                                            ->writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
                                    ->QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
                                    ->[#1.3]writePacket(ackNewEpoch, true);
                                ->syncWithLeader(newEpochZxid);
                                    ->Learner.syncWithLeader
                                        ->readPacket(qp);
                                        ->if (qp.getType() == Leader.DIFF) {snapshotNeeded = false;} //获取[#1.4]发送的DIFF消息,本消息不做任何操作,仅作为标志
                                        ->if (qp.getType() == Leader.TRUNC) //处理[#1.5]消息,截断多余的消息
                                            ->boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
                                        ->zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
                                        ->zk.createSessionTracker();
                                        ->outerLoop:
                                            ->while (self.isRunning()) 
                                                ->readPacket(qp);
                                                ->case Leader.PROPOSAL:
                                                    ->packetsNotCommitted.add(pif);//将日志加入到LinkedList packetsNotCommitted中
                                                ->case Leader.COMMIT:
                                                    ->pif = packetsNotCommitted.peekFirst()//取出刚才放入packetsNotCommitted的日志
                                                    ->packetsCommitted.add(qp.getZxid());//放入packetsCommitted中
                                                ->case Leader.NEWLEADER: //处理[#1.6]消息
                                                    ->[#1.7]writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                                                ->case Leader.UPTODATE:
                                                    ->break outerLoop;跳出outerLoop,完成Follower的崩溃恢复阶段
                                ->while (this.isRunning()) {
                                    ->Follower.readPacket(qp);
                                    ->Follower.processPacket(qp);//不再展开,具体参考消息广播阶段示例
                                ->}     
                ->}

1.2 消息广播

[Client]org.apache.zookeeper.test.ClientTest.performClientTest
    ->MyWatcher watcher = new MyWatcher();
    ->org.apache.zookeeper.ZooKeeper.ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig)
        ->clientConfig = new ZKClientConfig();
        ->watchManager = defaultWatchManager();
        ->cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly)
            ->ClientCnxn.ClientCnxn()
                ->sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();
        ->cnxn.start();
            ->ClientCnxn.SendThread.run()
                ->while (state.isAlive())
                    ->SendThread.startConnect();//选择任何节点,建立连接
                        ->clientCnxnSocket.connect(addr);
                            ->org.apache.zookeeper.ClientCnxnSocketNetty.connect
                                ->bootstrap.setPipelineFactory(new ZKClientPipelineFactory());
                                    ->pipeline.addLast("handler", new ZKClientHandler()); //ZKClientHandler作为客户端的ChannelHandler
                                ->connectFuture = bootstrap.connect(addr); 
                                    ->[Follower]org.apache.zookeeper.server.NettyServerCnxnFactory.CnxnChannelHandler.channelConnected
                                        ->allChannels.add(ctx.getChannel());
                                        ->addCnxn(cnxn);
                                ->org.apache.zookeeper.ClientCnxnSocketNetty.connect.operationComplete //完成连接
                                    ->org.apache.zookeeper.ClientCnxn.SendThread.primeConnection();
                                        ->ConnectRequest conReq = new ConnectRequest(0, lastZxid,sessionTimeout, sessId, sessionPasswd);
                                        ->[#2.1]outgoingQueue.addFirst(new Packet(null, null, conReq,null, null, readOnly));
                                            ->[Follower]org.apache.zookeeper.server.NettyServerCnxnFactory.CnxnChannelHandler.processMessage
                                                ->org.apache.zookeeper.server.NettyServerCnxn.receiveMessage
                                                  ->zks.processConnectRequest(this, bb);//由于第一次发消息org.apache.zookeeper.server.NettyServerCnxn.initialized为false,所以走这个分支
                                                    ->org.apache.zookeeper.server.ZooKeeperServer.processConnectRequest //处理[#2.1]的ConnectRequest请求
                                                        ->if (sessionId == 0) createSession(cnxn, passwd, sessionTimeout);
                                                            ->long sessionId = sessionTracker.createSession(timeout);
                                                                ->org.apache.zookeeper.server.quorum.LearnerSessionTracker.nextSessionId.getAndIncrement();
                                                            ->Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
                                                            ->submitRequest(si);//createSession为事务请求,需要走正常的事务请求的流程,由leader发起Proposal,并最终ack后commit,此处不再展开,后面基于setdata来说下事务流程
                                                                ->org.apache.zookeeper.server.ZooKeeperServer.firstProcessor.processRequest(si);
                                                                    ->FollowerRequestProcessor.processRequest(si);
                                                  ->initialized = true;
                                        ->clientCnxnSocket.connectionPrimed();
                                    ->wakeupCnxn();
                                        ->outgoingQueue.add(WakeupPacket.getInstance());//发送空Packet
                    ->sendPing();//达到阈值后发送心跳
                        ->RequestHeader h = new RequestHeader(-2, OpCode.ping);
                        ->queuePacket(h, null, null, null, null, null, null, null, null);
                    ->clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);//循环发送org.apache.zookeeper.ClientCnxn.pendingQueue里的Packet
            ->ClientCnxn.EventThread.run()
    ->zk.setData("/benwashere", "hi".getBytes(), 57);//事务流程
        ->final String serverPath = prependChroot(clientPath);
        ->h.setType(ZooDefs.OpCode.setData);
        ->org.apache.zookeeper.ClientCnxn.submitRequest
            ->Packet packet = queuePacket(h, r, request, response, null, null, null,null, watchRegistration, watchDeregistration); //将消息放入org.apache.zookeeper.ClientCnxn.outgoingQueue
                ->[Follower]org.apache.zookeeper.server.NettyServerCnxn.receiveMessage
                    ->org.apache.zookeeper.server.ZooKeeperServer.processPacket
                        ->Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),h.getType(), incomingBuffer, cnxn.getAuthInfo());
                        ->si.setOwner(ServerCnxn.me);
                        ->org.apache.zookeeper.server.ZooKeeperServer.submitRequest
                            ->touch(si.cnxn);
                            ->firstProcessor.processRequest(si);
                                ->FollowerRequestProcessor.processRequest
                                    ->org.apache.zookeeper.server.quorum.FollowerRequestProcessor.run
                                        ->nextProcessor.processRequest(request);//先放入queuedRequests等待commit之后放入到committedRequests中
                                            ->CommitProcessor.processRequest(request);
                                                ->org.apache.zookeeper.server.quorum.CommitProcessor.run
                                        ->zks.getFollower().request(request); //事务请求发送给leader
                                            ->org.apache.zookeeper.server.quorum.Learner.request
                                                ->QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
                                                ->writePacket(qp, true);
                                                    ->leaderOs.writeRecord(pp, "packet");
                                                        ->[Leader]org.apache.zookeeper.server.quorum.LearnerHandler.run -> case Leader.REQUEST ->leader.zk.submitLearnerRequest(si);
                                                            ->org.apache.zookeeper.server.quorum.LeaderZooKeeperServer.submitLearnerRequest -> prepRequestProcessor.processRequest(request);
                                                                ->org.apache.zookeeper.server.PrepRequestProcessor.processRequest //由于请求处理是放入submittedRequests中,然后等待PrepRequestProcessor单线程run方法顺序处理,即PrepRequestProcessor.pRequest为严格单线程执行,不存在并发问题
                                                                    ->org.apache.zookeeper.server.PrepRequestProcessor.pRequest
                                                                        ->SetDataRequest setDataRequest = new SetDataRequest();
                                                                        ->org.apache.zookeeper.server.PrepRequestProcessor.pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); //注意此处生成Zxid,即org.apache.zookeeper.server.ZooKeeperServer.hzxid.incrementAndGet();
                                                                            ->zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                                                                            ->checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
                                                                            ->request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
                                                                            ->[#2.2]org.apache.zookeeper.server.PrepRequestProcessor.addChangeRecord(nodeRecord);
                                                                                ->zks.outstandingChanges.add(c);
                                                                                ->zks.outstandingChangesForPath.put(c.path, c);
                                                                        ->request.zxid = zks.getZxid();//设置上面方法生成的自增zxid
                                                                        ->nextProcessor.processRequest(request); //分叉到两个条线路CommitProcessor、SyncRequestProcessor,发送propose给集群follower
                                                                            ->ProposalRequestProcessor.processRequest
                                                                                ->nextProcessor.processRequest(request);
                                                                                    ->CommitProcessor.processRequest(request);
                                                                                ->zks.getLeader().propose(request); -> org.apache.zookeeper.server.quorum.Leader.propose
                                                                                    ->QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,baos.toByteArray(), null);
                                                                                    ->Proposal p = new Proposal();p.packet = pp;
                                                                                    ->p.addQuorumVerifier(self.getQuorumVerifier());
                                                                                    ->lastProposed = p.packet.getZxid();
                                                                                    ->outstandingProposals.put(lastProposed, p);//发出的投票放到org.apache.zookeeper.server.quorum.Leader.outstandingProposals中
                                                                                    ->sendPacket(pp);//投票发送给各个follower
                                                                                        ->for (LearnerHandler f : forwardingFollowers) {f.queuePacket(qp);} //放入org.apache.zookeeper.server.quorum.LearnerHandler.queuedPackets中
                                                                                            ->org.apache.zookeeper.server.quorum.LearnerHandler.sendPackets
                                                                                                ->[Follower]org.apache.zookeeper.server.quorum.Follower.processPacket -> case Leader.PROPOSAL: 
                                                                                                    ->lastQueued = hdr.getZxid(); //保存到Follower.lastQueued,follower处理txn为严格有序的
                                                                                                    ->FollowerZooKeeperServer.logRequest(hdr, txn);
                                                                                                        ->Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
                                                                                                        ->pendingTxns.add(request); //保存到org.apache.zookeeper.server.quorum.FollowerZooKeeperServer.pendingTxns中,供后续COMMIT时使用
                                                                                                        ->syncProcessor.processRequest(request);
                                                                                                            ->org.apache.zookeeper.server.SyncRequestProcessor.run
                                                                                                                ->si = queuedRequests.take();
                                                                                                                ->zks.getZKDatabase().append(si)
                                                                                                                    ->this.snapLog.append(si);
                                                                                                                        ->org.apache.zookeeper.server.persistence.FileTxnLog.append
                                                                                                                ->zks.getZKDatabase().rollLog(); || zks.takeSnapshot(); //检查l是否需要做Snapshot以及roll
                                                                                                                ->toFlush.add(si);if (toFlush.size() > 1000) {flush(toFlush);} //达到阈值刷新或是没有新的日志了都刷新,刷新的时候才触发持久化到日志文件,并继续向下流转
                                                                                                                    ->SyncRequestProcessor.flush
                                                                                                                        ->zks.getZKDatabase().commit(); //将上面的日志刷写到磁盘上
                                                                                                                            ->org.apache.zookeeper.server.persistence.FileTxnLog.commit
                                                                                                                        ->nextProcessor.processRequest(i);
                                                                                                                            ->SendAckRequestProcessor.processRequest
                                                                                                                                ->QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,null);
                                                                                                                                ->learner.writePacket(qp, false); -> leaderOs.writeRecord(pp, "packet"); //将ack信息返回leader
                                                                                                                                    ->[Leader]org.apache.zookeeper.server.quorum.LearnerHandler.run -> case Leader.ACK
                                                                                                                                        ->leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); -> Leader.processAck
                                                                                                                                            ->Proposal p = outstandingProposals.get(zxid);
                                                                                                                                            ->p.addAck(sid); //当前sid加入到确认列表中
                                                                                                                                            ->boolean hasCommitted = Leader.tryToCommit(p, zxid, followerAddr); //每收到一个ack,检查一次commit的阈值,由于leader单线程发送proposal,所以为严格有序的,而且过半策略情况下,当前zxid未达到commmit条件,zxid+1也必然不会达到commit条件
                                                                                                                                                ->if (!p.hasAllQuorums()) {return false;} ->QuorumMaj.containsQuorum -> (ackSet.size() > half); //没达到阈值继续下次触发,QuorumMaj为过半策略
                                                                                                                                                ->if (zxid != lastCommitted+1) {...} //检查本次commit的zxid是否为上次的zxid+1,由于zab协议为严格顺序执行,且没有事务提交失败情况
                                                                                                                                                ->outstandingProposals.remove(zxid); //移除proposal阶段存的txn
                                                                                                                                                ->toBeApplied.add(p);//将待提交的Proposal放入org.apache.zookeeper.server.quorum.Leader.toBeApplied中
                                                                                                                                                ->commit(zxid);
                                                                                                                                                    ->lastCommitted = zxid; //设置Leader.lastCommitted
                                                                                                                                                    ->QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
                                                                                                                                                    ->sendPacket(qp);
                                                                                                                                                        ->[Follower]org.apache.zookeeper.server.quorum.Follower.processPacket -> case Leader.COMMIT: 
                                                                                                                                                            ->long firstElementZxid = pendingTxns.element().zxid; //取出proposal阶段保存的org.apache.zookeeper.server.quorum.FollowerZooKeeperServer.pendingTxns
                                                                                                                                                            ->if (firstElementZxid != zxid) {System.exit(12);} //如果提交的zxid与proposal阶段的zxid不一致,说明系统出问题了,直接exit
                                                                                                                                                            ->Request request = pendingTxns.remove(); //从pendingTxns移除
                                                                                                                                                            ->commitProcessor.commit(request);
                                                                                                                                                                ->committedRequests.add(request); -> CommitProcessor.run() //加入CommitProcessor.committedRequests
                                                                                                                                                                    ->nextPending.set(request); //放入org.apache.zookeeper.server.quorum.CommitProcessor.nextPending
                                                                                                                                                                    ->processCommitted();
                                                                                                                                                                        ->request = committedRequests.poll();//取出nextPending以及committedRequests对比,如果一致
                                                                                                                                                                        ->Request pending = nextPending.get();
                                                                                                                                                                        ->if (pending != null &&pending.sessionId == request.sessionId &&pending.cxid == request.cxid)
                                                                                                                                                                            ->currentlyCommitting.set(pending);//保存到CommitProcessor.currentlyCommitting中
                                                                                                                                                                            ->sendToNextProcessor(pending); //workerPool此处按照request.sessionId分组了
                                                                                                                                                                                ->workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
                                                                                                                                                                                    ->org.apache.zookeeper.server.quorum.CommitProcessor.CommitWorkRequest.doWork
                                                                                                                                                                                        ->nextProcessor.processRequest(request); -> Leader.ToBeAppliedRequestProcessor.processRequest
                                                                                                                                                                                            ->next.processRequest(request);
                                                                                                                                                                                                ->FinalRequestProcessor.processRequest(request);
                                                                                                                                                                                                    ->rc = zks.processTxn(request); //讲事务应用到内存数据库中
                                                                                                                                                                                                        ->org.apache.zookeeper.server.DataTree.processTxn ->case OpCode.setData:
                                                                                                                                                                                                            ->DataTree.setData
                                                                                                                                                                                                                ->DataNode n = nodes.get(path);n.data = data; //更新内存数据节点
                                                                                                                                                                                                                ->dataWatches.triggerWatch(path, EventType.NodeDataChanged); //触发trigger
                                                                                                                                                                                                        ->lastProcessedZxid = rc.zxid; //更新org.apache.zookeeper.server.DataTree.lastProcessedZxid //此字段表明commit的offset,选leader同步数据都会用到
                                                                                                                                                                                                    ->rsp = new SetDataResponse(rc.stat);                                                                                                                                                                                                                                                                                                                                                                                           ->
                                                                                                                                                                                            ->leader.toBeApplied.iterator().remove();//将提交完成的从Leader.toBeApplied移除
                                                                                                                                                                                        ->currentlyCommitting.compareAndSet(request, null); //currentlyCommitting置空
                                                                                                                                                ->inform(p); //发送给observer本次Proposal
                                                                                                                                                    ->QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,proposal.packet.getData(), null);
                                                                                                                                                    ->sendObserverPacket(qp);
                                                                                                                                                ->zk.commitProcessor.commit(p.request); //执行Leader的commit,执行逻辑参考Follower,区别仅仅是移除[#2.2]阶段放入outstandingChanges、outstandingChangesForPath的数据


                                                                                                                                                ->...//pendingSyncs跳过,暂不考虑leader接收client请求的情况
                                                                                                                        ->((Flushable)nextProcessor).flush();
                                                                                                                            ->SendAckRequestProcessor.flush
                                                                                ->syncProcessor.processRequest(request);
            ->while (!packet.finished) {packet.wait();} //阻塞,等待回复

二.reference

三.下篇

REDIS 3.2.8 CODE REVIEW

趁着公司618备战期间把 zk review下,看源码感觉看得快忘的也快,还是写个index记录思路来的清晰,本想把redis、spark-core、spark-streaming、spark-graphx、hive,jdk、netty什么的都写下,发现每review一个还真是挺废精力的,很多当时想明白的问题,现在看之前的写的TODO,发现一脸懵逼,已经忘了。有的当时看的版本比较老,比如spark1.8版本也不打算再review了,回头直接看2.x 版本SQL的实现了;有的看只看了一部分,hive的词法解析器、物理执行计划基本都没咋看,所以写个review也没有整体概念,单写个别模块貌似没啥意思。先给自己挖个坑了,剩下的以后再补了…

    原文作者:hnail
    原文地址: https://www.jianshu.com/p/255fbabf109a
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞