导语
1.zab协议崩溃恢复如何实现leader选举及数据同步?
2.zab消息广播阶段如何实现发起投票、收集选票、提交事务,并保证事务的顺序一致性?
3.paxos、zab、raft vs pbft vs pow、pos、ripple 等区块链共识算法的区别是什么,分别适合什么场景?//TODO:
zab协议包含两个阶段崩溃恢复
与消息广播
,基于zookeeper 3.5.3集群启动以及ZooKeeper.setData来分别说明下两阶段的流程。
一.Index
1.1 崩溃恢复
org.apache.zookeeper.server.quorum.QuorumPeerMain.main
->QuorumPeerMain.runFromConfig
->NettyServerCnxnFactory.configure //配置ServerCnxnFactory
->quorumPeer = getQuorumPeer();
->quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(),config.getDataDir())) //设置FileTxnSnapLog,管理txn以及snap
->quorumPeer.setMyid(config.getServerId()) //设置myid
->quorumPeer.setInitLimit(config.getInitLimit());quorumPeer.setSyncLimit(config.getSyncLimit()); //超时时间
->quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));//初始化一个空的ZKDatabase
->quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); //设置QuorumVerifier
->quorumPeer.setCnxnFactory(cnxnFactory)
->org.apache.zookeeper.server.quorum.QuorumPeer.start() //启动QuorumPeer
->QuorumPeer.loadDataBase
->zkDb.loadDataBase() //恢复挂机之前的内存状态
->long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
->processTransaction(hdr,dt,sessions, itr.getTxn());//重放所有的Transaction,
->listener.onTxnLoaded(hdr, itr.getTxn()); //将已提交的日志放入org.apache.zookeeper.server.ZKDatabase.committedLog中并更新minCommittedLog、maxCommittedLog两个offset便于同步时使用
->addCommittedProposal(r)
->currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); //获取之前记录的Epoch
->acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
->QuorumPeer.startServerCnxnFactory //绑定端口,启动服务
->QuorumPeer.startLeaderElection //开启选举
->currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());//设置当前服务器的投票,使用之前记录的epoch以及zxid
->this.electionAlg = createElectionAlgorithm(electionType); //设置选举算法,默认
->qcm = new QuorumCnxManager(this);
->org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener.run
->org.apache.zookeeper.server.quorum.QuorumCnxManager.Listener.ss.bind(addr)//绑定选举端口
->client = ss.accept();//等待连接
->org.apache.zookeeper.server.quorum.QuorumCnxManager.receiveConnection //如果建立链接的对端的的sid小于当前服务器id,则当前服务器作为客户端去建立链接,否则启动发送接收线程开始选举流程,此逻辑是为了避免两台服务器建立多个链接
->closeSocket(sock);//关闭之前的连接
->org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne
->QuorumCnxManager.initiateConnection //net层面的数据接收发发送
->dout.writeLong(self.getId());dout.flush(); //建立链接发送的第一条为当前服务器端myid
->SendWorker sw = new SendWorker(sock, sid); //将QuorumCnxManager.queueSendMap发送出去
->RecvWorker rw = new RecvWorker(sock, sid, sw);//将收到的消息加入QuorumCnxManager.recvQueue中
->sw.start();rw.start();
->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.start //启动消息处理器
->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerSender.run
->org.apache.zookeeper.server.quorum.FastLeaderElection.Messenger.WorkerReceiver.run
->QuorumPeer.run
->while (running) {
->case LOOKING //注意leader选举用的当前节点的zxid为org.apache.zookeeper.server.DataTree.lastProcessedZxid,而本字段为FinalRequestProcessor.processRequest方法更新,FinalRequestProcessor为ProposalRequestProcessor->CommitProcessor之后的,即commit阶段才会更新,则zk其实是按照进入提交阶段zxid为准,即写入过半后发出大于1个commit后算写入成功
->setCurrentVote(makeLEStrategy().lookForLeader());
->org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader
->synchronized(this){logicalclock.incrementAndGet(); updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());} //更新选举周期,并设置当前服务器的leader选举propose为自己:FastLeaderElection.proposedLeader,proposedZxid,proposedEpoch
->sendNotifications //将自己的投票发给集群中所有节点
->ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,proposedEpoch, qv.toString().getBytes());
->sendqueue.offer(notmsg);
->while ((self.getPeerState() == ServerState.LOOKING) &&(!stop)){ //执行循环,直到确定leader
->Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS); //获取收到的Notification
->switch (n.state) {
->case LOOKING:
->if (n.electionEpoch > logicalclock.get()) { //如果收到的投票周期大于当前节点认为的周期,则更新周期、清空已经收到的投票、
->logicalclock.set(n.electionEpoch);
->recvset.clear();
->if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);} else{updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch())} //更新当前节点的投票,先比较周期,周期相同再比较zxid,zxid相同然后就比较myid,然后选大的作为当前节点后续的投票
->sendNotifications();//将最新的投票发给集群节点
->else if (n.electionEpoch < logicalclock.get()) {} //如果收到的投票周期小于当前节点周期,则忽略,继续取下一条Notification
->else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); //收到的投票比较大,则更新为收到的投票,并发送给集群其他节点
->recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //以上条件都不符合,则为收到的投票为其他节点认可当前节点的投票,把选票更新到recvset里
->if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock.get(), proposedEpoch))) { //查看收到的选票是否达到了要求,QuorumMaj为过半策略,没有达到要求继续循环获取Notification,否则设置投票结果,结束leader选举
->SyncedLearnerTracker.addAck(entry.getKey());
->QuorumVerifierAcksetPair.getAckset().add(sid);
->SyncedLearnerTracker.hasAllQuorums
->qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())
->QuorumMaj.containsQuorum
-> while((n = recvqueue.poll(finalizeWait..//读取剩余的notification,直到读取完成
-> self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState()); //如果选举的leader为自己,则QuorumPeer.state设为LEADING,否则设为FOLLOWING
->Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);
->return endVote;//结束选举
->}
->case LEADING: //如果当前节点为lead节点
->QuorumPeer.makeLeader
->org.apache.zookeeper.server.quorum.Leader.Leader
->Leader.ss.bind(self.getQuorumAddress());//leader绑定端口号等待follower建立连接,本端口用来同步数据,leader与follower之间心跳、投票等数据交换
->org.apache.zookeeper.server.quorum.Leader.lead
->zk.loadData();
->setZxid(zkDb.getDataTreeLastProcessedZxid()); //设置zxid
->killSession(session, zkDb.getDataTreeLastProcessedZxid()); //清理过期的session
->takeSnapshot();//落地一个干净的Snapshot
->cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();
->LearnerCnxAcceptor.run
-> while (!stop) {Socket s = ss.accept();LearnerHandler fh = new LearnerHandler(s, Leader.this);fh.start();} //等待链接,初始化follower的LearnerHandler作为消息处理器
->LearnerHandler.run //leader处理flower请求的主流程
->ia.readRecord(qp, "packet");//读取第一个follower发送过来的消息[#1.1],读取出sid,设置LearnerHandler的sid
->this.sid = bbsid.getLong();
->QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); //发送leaderinof
->[#1.2]oa.writeRecord(newEpochPacket, "packet");
->ia.readRecord(ackEpochPacket, "packet");//读取Follower的ack消息[#1.3]
->ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
->leader.waitForEpochAck(this.getSid(), ss);
->Leader.waitForEpochAck //阻塞,直到过半Follower返回ack消息
->electingFollowers.add(id);
->if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers))
->electionFinished = true;
->electingFollowers.notifyAll();
->boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader); //与follower同步
->long maxCommittedLog = db.getmaxCommittedLog(); //获取三个值,zxid以及内存中ZKDatabase.committedLog保存的最大最小日志
->long minCommittedLog = db.getminCommittedLog();
->long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
->[#1.4]if (lastProcessedZxid == peerLastZxid) {queueOpPacket(Leader.DIFF, peerLastZxid);}//如果相同,则发送Leader.DIFF消息
->[#1.5]if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) //如果leader节点的log小于当前节点,则发送TRUNC消息截断
->queueOpPacket(Leader.TRUNC, maxCommittedLog);
->currentZxid = maxCommittedLog;
->if ((maxCommittedLog >= peerLastZxid)&& (minCommittedLog <= peerLastZxid)) //如果处在maxCommittedLog与minCommittedLog中间,则说明缺了部分数据,则按照普通投票方式把ZKDatabase.committedLog内存中保存的这部分数据按照普通的Proposal发送出去,并提交Committed
->LearnerHandler.queueCommittedProposals
->while (itr.hasNext())
->Proposal propose = itr.next()
->queuePacket(propose.packet);//发送propose
->queueOpPacket(Leader.COMMIT, packetZxid);//提交propose
->queueOpPacket(Leader.DIFF, lastCommitedZxid); //不管啥情况下,都发个DIFF消息表名同步完成
->if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {//如果内存中log不够,则把Log以及snap中的消息都发过去
->leaderLastZxid = leader.startForwarding(this, currentZxid);[TODO://]//处理toBeApplied、outstandingProposals的目的是啥
->[#1.6]QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);queuedPackets.add(newLeaderQP);bufferedOutput.flush();//发送NEWLEADER消息
->ia.readRecord(qp, "packet");//读取[#1.7]的消息,即NEWLEADER的ack
->leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());//阻塞,直到收到足够的Follower的NEWLEADER的ack
->Leader.newLeaderProposal.addAck(sid);
->[#1.9]while(!leader.zk.isRunning() && !this.isInterrupted()){leader.zk.wait(20);}//阻塞,等待leader的ZooKeeperServer.state变为RUNNING
->[#1.10]queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); //发送UPTODATE消息给Follower
->while (true) //崩溃恢复阶段完成,进入普通消息处理阶段,leader与follower之前的消息处理参考消息广播阶段示例
->ia.readRecord(qp, "packet");
->.....
->zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
->lastProposed = zk.getZxid();
->waitForEpochAck(self.getId(), leaderStateSummary);//主节点也是投票节点,参与新节点选举的策略
->self.setCurrentEpoch(epoch);
->[#1.6]newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),null, null);
->waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);//阻塞,并投出本leader的票,与LearnerHandler一块参与newLeaderProposal的过半策略
->Leader.newLeaderProposal.addAck(sid);
->startZkServer(); //启动leader,
->Leader.startZkServer
->lastCommitted = zk.getZxid()
->zk.startup();
->ZooKeeperServer.startup
->startSessionTracker();
->org.apache.zookeeper.server.quorum.LeaderZooKeeperServer.setupRequestProcessors();//启动处理链,处理链的逻辑下面消息广播详细说明
->setState(State.RUNNING);//标志leader启动完成,并触发LearnerHandler[#1.9]继续向下走
->self.updateElectionVote(getEpoch());//设置QuorumPeer.currentVote
->zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
->while (true) //循环向follower发送pin消息,获取session等信息,PING处理不再展开
->f.ping();
->LearnerHandler.ping
->QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
->queuePacket(ping);
->case FOLLOWING: //FOLLOWING节点
->setFollower(makeFollower(logFactory));
->org.apache.zookeeper.server.quorum.Follower.followLeader
->connectToLeader(addr);
->long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
->Learner.registerWithLeader
->qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
->boa.writeRecord(li, "LearnerInfo");
->[#1.1]writePacket(qp, true); //发送给leader第一个消息注册
->readPacket(qp);//读取leader返回的LEADERINFO[#1.2]
->if (qp.getType() == Leader.LEADERINFO)
->QuorumPeer.setAcceptedEpoch
->acceptedEpoch = e; //设置QuorumPeer.acceptedEpoch
->writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
->QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
->[#1.3]writePacket(ackNewEpoch, true);
->syncWithLeader(newEpochZxid);
->Learner.syncWithLeader
->readPacket(qp);
->if (qp.getType() == Leader.DIFF) {snapshotNeeded = false;} //获取[#1.4]发送的DIFF消息,本消息不做任何操作,仅作为标志
->if (qp.getType() == Leader.TRUNC) //处理[#1.5]消息,截断多余的消息
->boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
->zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
->zk.createSessionTracker();
->outerLoop:
->while (self.isRunning())
->readPacket(qp);
->case Leader.PROPOSAL:
->packetsNotCommitted.add(pif);//将日志加入到LinkedList packetsNotCommitted中
->case Leader.COMMIT:
->pif = packetsNotCommitted.peekFirst()//取出刚才放入packetsNotCommitted的日志
->packetsCommitted.add(qp.getZxid());//放入packetsCommitted中
->case Leader.NEWLEADER: //处理[#1.6]消息
->[#1.7]writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
->case Leader.UPTODATE:
->break outerLoop;跳出outerLoop,完成Follower的崩溃恢复阶段
->while (this.isRunning()) {
->Follower.readPacket(qp);
->Follower.processPacket(qp);//不再展开,具体参考消息广播阶段示例
->}
->}
1.2 消息广播
[Client]org.apache.zookeeper.test.ClientTest.performClientTest
->MyWatcher watcher = new MyWatcher();
->org.apache.zookeeper.ZooKeeper.ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig)
->clientConfig = new ZKClientConfig();
->watchManager = defaultWatchManager();
->cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly)
->ClientCnxn.ClientCnxn()
->sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();
->cnxn.start();
->ClientCnxn.SendThread.run()
->while (state.isAlive())
->SendThread.startConnect();//选择任何节点,建立连接
->clientCnxnSocket.connect(addr);
->org.apache.zookeeper.ClientCnxnSocketNetty.connect
->bootstrap.setPipelineFactory(new ZKClientPipelineFactory());
->pipeline.addLast("handler", new ZKClientHandler()); //ZKClientHandler作为客户端的ChannelHandler
->connectFuture = bootstrap.connect(addr);
->[Follower]org.apache.zookeeper.server.NettyServerCnxnFactory.CnxnChannelHandler.channelConnected
->allChannels.add(ctx.getChannel());
->addCnxn(cnxn);
->org.apache.zookeeper.ClientCnxnSocketNetty.connect.operationComplete //完成连接
->org.apache.zookeeper.ClientCnxn.SendThread.primeConnection();
->ConnectRequest conReq = new ConnectRequest(0, lastZxid,sessionTimeout, sessId, sessionPasswd);
->[#2.1]outgoingQueue.addFirst(new Packet(null, null, conReq,null, null, readOnly));
->[Follower]org.apache.zookeeper.server.NettyServerCnxnFactory.CnxnChannelHandler.processMessage
->org.apache.zookeeper.server.NettyServerCnxn.receiveMessage
->zks.processConnectRequest(this, bb);//由于第一次发消息org.apache.zookeeper.server.NettyServerCnxn.initialized为false,所以走这个分支
->org.apache.zookeeper.server.ZooKeeperServer.processConnectRequest //处理[#2.1]的ConnectRequest请求
->if (sessionId == 0) createSession(cnxn, passwd, sessionTimeout);
->long sessionId = sessionTracker.createSession(timeout);
->org.apache.zookeeper.server.quorum.LearnerSessionTracker.nextSessionId.getAndIncrement();
->Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
->submitRequest(si);//createSession为事务请求,需要走正常的事务请求的流程,由leader发起Proposal,并最终ack后commit,此处不再展开,后面基于setdata来说下事务流程
->org.apache.zookeeper.server.ZooKeeperServer.firstProcessor.processRequest(si);
->FollowerRequestProcessor.processRequest(si);
->initialized = true;
->clientCnxnSocket.connectionPrimed();
->wakeupCnxn();
->outgoingQueue.add(WakeupPacket.getInstance());//发送空Packet
->sendPing();//达到阈值后发送心跳
->RequestHeader h = new RequestHeader(-2, OpCode.ping);
->queuePacket(h, null, null, null, null, null, null, null, null);
->clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);//循环发送org.apache.zookeeper.ClientCnxn.pendingQueue里的Packet
->ClientCnxn.EventThread.run()
->zk.setData("/benwashere", "hi".getBytes(), 57);//事务流程
->final String serverPath = prependChroot(clientPath);
->h.setType(ZooDefs.OpCode.setData);
->org.apache.zookeeper.ClientCnxn.submitRequest
->Packet packet = queuePacket(h, r, request, response, null, null, null,null, watchRegistration, watchDeregistration); //将消息放入org.apache.zookeeper.ClientCnxn.outgoingQueue
->[Follower]org.apache.zookeeper.server.NettyServerCnxn.receiveMessage
->org.apache.zookeeper.server.ZooKeeperServer.processPacket
->Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),h.getType(), incomingBuffer, cnxn.getAuthInfo());
->si.setOwner(ServerCnxn.me);
->org.apache.zookeeper.server.ZooKeeperServer.submitRequest
->touch(si.cnxn);
->firstProcessor.processRequest(si);
->FollowerRequestProcessor.processRequest
->org.apache.zookeeper.server.quorum.FollowerRequestProcessor.run
->nextProcessor.processRequest(request);//先放入queuedRequests等待commit之后放入到committedRequests中
->CommitProcessor.processRequest(request);
->org.apache.zookeeper.server.quorum.CommitProcessor.run
->zks.getFollower().request(request); //事务请求发送给leader
->org.apache.zookeeper.server.quorum.Learner.request
->QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
->writePacket(qp, true);
->leaderOs.writeRecord(pp, "packet");
->[Leader]org.apache.zookeeper.server.quorum.LearnerHandler.run -> case Leader.REQUEST ->leader.zk.submitLearnerRequest(si);
->org.apache.zookeeper.server.quorum.LeaderZooKeeperServer.submitLearnerRequest -> prepRequestProcessor.processRequest(request);
->org.apache.zookeeper.server.PrepRequestProcessor.processRequest //由于请求处理是放入submittedRequests中,然后等待PrepRequestProcessor单线程run方法顺序处理,即PrepRequestProcessor.pRequest为严格单线程执行,不存在并发问题
->org.apache.zookeeper.server.PrepRequestProcessor.pRequest
->SetDataRequest setDataRequest = new SetDataRequest();
->org.apache.zookeeper.server.PrepRequestProcessor.pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); //注意此处生成Zxid,即org.apache.zookeeper.server.ZooKeeperServer.hzxid.incrementAndGet();
->zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
->checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
->request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
->[#2.2]org.apache.zookeeper.server.PrepRequestProcessor.addChangeRecord(nodeRecord);
->zks.outstandingChanges.add(c);
->zks.outstandingChangesForPath.put(c.path, c);
->request.zxid = zks.getZxid();//设置上面方法生成的自增zxid
->nextProcessor.processRequest(request); //分叉到两个条线路CommitProcessor、SyncRequestProcessor,发送propose给集群follower
->ProposalRequestProcessor.processRequest
->nextProcessor.processRequest(request);
->CommitProcessor.processRequest(request);
->zks.getLeader().propose(request); -> org.apache.zookeeper.server.quorum.Leader.propose
->QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,baos.toByteArray(), null);
->Proposal p = new Proposal();p.packet = pp;
->p.addQuorumVerifier(self.getQuorumVerifier());
->lastProposed = p.packet.getZxid();
->outstandingProposals.put(lastProposed, p);//发出的投票放到org.apache.zookeeper.server.quorum.Leader.outstandingProposals中
->sendPacket(pp);//投票发送给各个follower
->for (LearnerHandler f : forwardingFollowers) {f.queuePacket(qp);} //放入org.apache.zookeeper.server.quorum.LearnerHandler.queuedPackets中
->org.apache.zookeeper.server.quorum.LearnerHandler.sendPackets
->[Follower]org.apache.zookeeper.server.quorum.Follower.processPacket -> case Leader.PROPOSAL:
->lastQueued = hdr.getZxid(); //保存到Follower.lastQueued,follower处理txn为严格有序的
->FollowerZooKeeperServer.logRequest(hdr, txn);
->Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
->pendingTxns.add(request); //保存到org.apache.zookeeper.server.quorum.FollowerZooKeeperServer.pendingTxns中,供后续COMMIT时使用
->syncProcessor.processRequest(request);
->org.apache.zookeeper.server.SyncRequestProcessor.run
->si = queuedRequests.take();
->zks.getZKDatabase().append(si)
->this.snapLog.append(si);
->org.apache.zookeeper.server.persistence.FileTxnLog.append
->zks.getZKDatabase().rollLog(); || zks.takeSnapshot(); //检查l是否需要做Snapshot以及roll
->toFlush.add(si);if (toFlush.size() > 1000) {flush(toFlush);} //达到阈值刷新或是没有新的日志了都刷新,刷新的时候才触发持久化到日志文件,并继续向下流转
->SyncRequestProcessor.flush
->zks.getZKDatabase().commit(); //将上面的日志刷写到磁盘上
->org.apache.zookeeper.server.persistence.FileTxnLog.commit
->nextProcessor.processRequest(i);
->SendAckRequestProcessor.processRequest
->QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,null);
->learner.writePacket(qp, false); -> leaderOs.writeRecord(pp, "packet"); //将ack信息返回leader
->[Leader]org.apache.zookeeper.server.quorum.LearnerHandler.run -> case Leader.ACK
->leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); -> Leader.processAck
->Proposal p = outstandingProposals.get(zxid);
->p.addAck(sid); //当前sid加入到确认列表中
->boolean hasCommitted = Leader.tryToCommit(p, zxid, followerAddr); //每收到一个ack,检查一次commit的阈值,由于leader单线程发送proposal,所以为严格有序的,而且过半策略情况下,当前zxid未达到commmit条件,zxid+1也必然不会达到commit条件
->if (!p.hasAllQuorums()) {return false;} ->QuorumMaj.containsQuorum -> (ackSet.size() > half); //没达到阈值继续下次触发,QuorumMaj为过半策略
->if (zxid != lastCommitted+1) {...} //检查本次commit的zxid是否为上次的zxid+1,由于zab协议为严格顺序执行,且没有事务提交失败情况
->outstandingProposals.remove(zxid); //移除proposal阶段存的txn
->toBeApplied.add(p);//将待提交的Proposal放入org.apache.zookeeper.server.quorum.Leader.toBeApplied中
->commit(zxid);
->lastCommitted = zxid; //设置Leader.lastCommitted
->QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
->sendPacket(qp);
->[Follower]org.apache.zookeeper.server.quorum.Follower.processPacket -> case Leader.COMMIT:
->long firstElementZxid = pendingTxns.element().zxid; //取出proposal阶段保存的org.apache.zookeeper.server.quorum.FollowerZooKeeperServer.pendingTxns
->if (firstElementZxid != zxid) {System.exit(12);} //如果提交的zxid与proposal阶段的zxid不一致,说明系统出问题了,直接exit
->Request request = pendingTxns.remove(); //从pendingTxns移除
->commitProcessor.commit(request);
->committedRequests.add(request); -> CommitProcessor.run() //加入CommitProcessor.committedRequests
->nextPending.set(request); //放入org.apache.zookeeper.server.quorum.CommitProcessor.nextPending
->processCommitted();
->request = committedRequests.poll();//取出nextPending以及committedRequests对比,如果一致
->Request pending = nextPending.get();
->if (pending != null &&pending.sessionId == request.sessionId &&pending.cxid == request.cxid)
->currentlyCommitting.set(pending);//保存到CommitProcessor.currentlyCommitting中
->sendToNextProcessor(pending); //workerPool此处按照request.sessionId分组了
->workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
->org.apache.zookeeper.server.quorum.CommitProcessor.CommitWorkRequest.doWork
->nextProcessor.processRequest(request); -> Leader.ToBeAppliedRequestProcessor.processRequest
->next.processRequest(request);
->FinalRequestProcessor.processRequest(request);
->rc = zks.processTxn(request); //讲事务应用到内存数据库中
->org.apache.zookeeper.server.DataTree.processTxn ->case OpCode.setData:
->DataTree.setData
->DataNode n = nodes.get(path);n.data = data; //更新内存数据节点
->dataWatches.triggerWatch(path, EventType.NodeDataChanged); //触发trigger
->lastProcessedZxid = rc.zxid; //更新org.apache.zookeeper.server.DataTree.lastProcessedZxid //此字段表明commit的offset,选leader同步数据都会用到
->rsp = new SetDataResponse(rc.stat); ->
->leader.toBeApplied.iterator().remove();//将提交完成的从Leader.toBeApplied移除
->currentlyCommitting.compareAndSet(request, null); //currentlyCommitting置空
->inform(p); //发送给observer本次Proposal
->QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,proposal.packet.getData(), null);
->sendObserverPacket(qp);
->zk.commitProcessor.commit(p.request); //执行Leader的commit,执行逻辑参考Follower,区别仅仅是移除[#2.2]阶段放入outstandingChanges、outstandingChangesForPath的数据
->...//pendingSyncs跳过,暂不考虑leader接收client请求的情况
->((Flushable)nextProcessor).flush();
->SendAckRequestProcessor.flush
->syncProcessor.processRequest(request);
->while (!packet.finished) {packet.wait();} //阻塞,等待回复
二.reference
三.下篇
趁着公司618备战期间把 zk review下,看源码感觉看得快忘的也快,还是写个index记录思路来的清晰,本想把redis、spark-core、spark-streaming、spark-graphx、hive,jdk、netty什么的都写下,发现每review一个还真是挺废精力的,很多当时想明白的问题,现在看之前的写的TODO,发现一脸懵逼,已经忘了。有的当时看的版本比较老,比如spark1.8版本也不打算再review了,回头直接看2.x 版本SQL的实现了;有的看只看了一部分,hive的词法解析器、物理执行计划基本都没咋看,所以写个review也没有整体概念,单写个别模块貌似没啥意思。先给自己挖个坑了,剩下的以后再补了…