接上篇文章,本文主要分析一下一个ZK集群从刚启动到对外提供服务这段时间发生了什么
一、执行流程概述
首先在ZK集群中,不管是什么类型的节点,刚刚启动时都是LOOKING状态然后发起选举寻找Leader,只有确定Leader以后才最终确定自己以Leader,Follower,Observer中哪种方式启动对外提供服务。
ZK整个恢复过程分为三步:
- 选取Leader,一个节点想要成为Leader首先它的epoch要大,已处理的事务要最多,如果有这两个条件都相同的多个节点则ServerId最大的节点成为Leader,之所以要这样是因为其他节点都会按照Leader节点的事务同步数据,如果Leader不是最新的就会造成数据的丢失。
- 数据同步,Leader选出来以后各个节点会以Leader为标准更新自己的事务。
- 提供服务,数据同步完成以后开始正式对外提供服务,接收客户端连接,处理读写请求。
二、源码分析
集群模式下启动所有的ZK节点启动入口都是QuorumPeerMain类的main方法。
main方法加载配置文件以后,最终会调用到QuorumPeer的start方法,来看下:
public synchronized void start() {
//校验ServerId是否合法
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
//载入之前持久化的一些信息
loadDataBase();
//启动线程监听
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
//初始化选举投票以及算法
startLeaderElection();
//当前也是一个线程,注意run方法
super.start();
}
我们已经知道了当一个节点启动时需要先发起选举寻找Leader节点,然后再根据Leader节点的事务信息进行同步,最后开始对外提供服务,这里我们先来看下初始化选举的逻辑,即上面的startLeaderElection方法:
synchronized public void startLeaderElection() {
try {
//所有节点启动的初始状态都是LOOKING,因此这里都会是创建一张投自己为Leader的票
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
//异常处理
}
//初始化选举算法,electionType默认为3
this.electionAlg = createElectionAlgorithm(electionType);
}
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le = null;
switch (electionAlgorithm) {
case 1:
//忽略
case 2:
//忽略
case 3:
//electionAlgorithm默认是3,直接走到这里
qcm = createCnxnManager();
//监听选举事件的listener
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
//开启监听器
listener.start();
//初始化选举算法
FastLeaderElection fle = new FastLeaderElection(this, qcm);
//发起选举
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
//忽略
}
return le;
}
初始化选举的地方一下开启两个线程,一个是Listener,一个是FastLeaderElection,接下来我们按照上面的顺序,先来看下Listener做的事情,由于它是一个线程,因此我们还是直接看run方法,为了简单起见,还是删除了很多代码,我们重点关注主流程:
public void run() {
while((!shutdown) && (numRetries < 3)){
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
addr=根据配置信息获取地址
setName(addr.toString());
//监听选举端口
ss.bind(addr);
while (!shutdown) {
try {
//接收客户端连接
client = ss.accept();
//设置连接参数
setSockOpts(client);
//开始处理
receiveConnection(client);
} catch (SocketTimeoutException e) {
}
}
} catch (IOException e) {
}
}
}
}
接下来重点看下当连接到来的时候receiveConnection方法的处理逻辑:
//这个方法很简单不多说了
public void receiveConnection(final Socket sock) {
DataInputStream din = null;
try {
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
handleConnection(sock, din);
} catch (IOException e) {
}
}
private void handleConnection(Socket sock, DataInputStream din) throws IOException {
//省略中从客户端发来的数据中解析serverId以及选举地址的代码...
//这里的思路是如果请求连接的节点的ServerId小于当前节点,则关闭连接,并由当前节点发起连接
//隐含的意思就是ZK集群中节点的连接都是由ServerId大的连ServerId小的
if (sid < self.getId()) {
//如果连接已经建立则关闭
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
closeSocket(sock);
//当前节点去连接对方节点
if (electionAddr != null) {
connectOne(sid, electionAddr);
} else {
connectOne(sid);
}
} else {
//如果接受该连接,则创建对应的读写worker
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
//如果已经创建则关闭旧的
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
//启动读写事件处理
sw.start();
rw.start();
}
}
上面处理连接的这段代码只需要关注两个重点:
- 只允许serverId大的节点去连接serverId小的节点
- 针对每个连接启动了读写两个worker负责IO处理
对于两个worker来说,它们本身的逻辑很简单,SendWorker就是不断的把queueSendMap中存放的对应serverId的数据发出去。RecvWorker就是把收到的数据加入recvQueue队列中,这里就不再贴代码了。
看完了Listener的逻辑,我们接着上面的代码看下FastLeaderElection选举算法的思路,从它的start方法开始看:
public void start() {
this.messenger.start();
}
void start(){
//对应WorkerSender类
this.wsThread.start();
//对应WorkerReceiver类
this.wrThread.start();
}
这里可以看到FastLeaderElection内部也是开启了两个线程负责读写,这里需要跟前面Listener的逻辑结合考虑。Listener开启的线程一个负责读取数据放入队列,一个负责把队列中的数据发出去,但读取的数据给谁用呢?发送的数据是哪来的呢?FastLeaderElection里的两线程就是跟它们交互的。
先来看下WorkerSender的run方法:
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
//处理发送消息
process(m);
} catch (InterruptedException e) {
break;
}
}
}
void process(ToSend m) {
//序列化消息
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch,
m.configData);
//发送数据
manager.toSend(m.sid, requestBuffer);
}
public void toSend(Long sid, ByteBuffer b) {
//如果数据时发送给自己的那么绕过IO直接加入到recv队列
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
} else {
//否则把数据加入到指定ServerId的待发送队列
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
if (oldq != null) {
addToSendQueue(oldq, b);
} else {
addToSendQueue(bq, b);
}
//连接指定ServerId,该方法内部如果连接已经建立则会返回,否则创建连接
connectOne(sid);
}
}
上面的代码总结一下就是FastLeaderElection内部的WorkerSender线程会从sendqueue队列中读取数据包然后放到queueSendMap里,而Listener里面的SendWorker又会不断从queueSendMap取出数据进行发送。
再来看一下WorkerReceiver的run方法:
public void run() {
Message response;
while (!stop) {
try {
//这里本质上是从recvQueue里取出数据
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
//没有数据则继续等待
if(response == null) continue;
//这里省略掉协议兼容,解析以及处理reconfig指令的逻辑..
int rstate = response.buffer.getInt();
long rleader = response.buffer.getLong();
long rzxid = response.buffer.getLong();
long relectionEpoch = response.buffer.getLong();
long rpeerepoch;
QuorumVerifier rqv = null;
//如果不是一个有投票权的节点,例如Observer节点
if(!validVoter(response.sid)) {
//直接把自己的投票信息返回
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
} else {
//获取发消息的节点的状态
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (rstate) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}
//赋值Notification
n.leader = rleader;
n.zxid = rzxid;
n.electionEpoch = relectionEpoch;
n.state = ackstate;
n.sid = response.sid;
n.peerEpoch = rpeerepoch;
n.version = version;
n.qv = rqv;
//如果当前节点正在寻找Leader
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
//把收到的消息加入队列
recvqueue.offer(n);
//如果对方节点也是LOOKING状态,且周期小于自己,则把自己投票信息发回去
if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get())){
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
} else {
//如果当前节点不是LOOKING状态,那么它已经知道谁是Leader了
Vote current = self.getCurrentVote();
//如果对方是LOOKING状态,那么就把自己认为的Leader信息返给对方
if(ackstate == QuorumPeer.ServerState.LOOKING){
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
}
}
}
上面的代码比较长,但逻辑不复杂,总结一下就是:
- 如果消息是Observer节点发来的,则直接返回自己的投票信息,结束。
- 如果当前节点是LOOKING状态,则把消息存入recvqueue队列用于统计投票。
- 如果发消息的节点也是LOOKING状态,且当前节点的周期大,则把当前节点的投票信息返回。
- 如果当前节点不是LOOKING状态且发消息的节点是LOOKING状态,则返回当前节点认为的Leader信息。
最后我们整理一下上面的四个处理IO的线程逻辑,首先是当前节点发送消息的时候是通过WorkerSender经由SendWorker发送出去,而接受消息是通过RecvWorker再传递到WorkerReceiver并且如果是投票节点,WorkerReceiver又会把收到的数据封装成Notification对象加入到recvqueue中用于统计票数。
以上四个IO处理类只是经行了数据的转发,封装及保存,那么真正的选举逻辑在哪里呢?其实是在本文最开始的代码片段,也就是QuorumPeer类中start方法的最后一行super.start(),QuorumPeer本身也是一个线程类,一起来看下它的run方法:
public void run() {
try {
while (running) {
//根据当前节点的状态执行不同流程
switch (getPeerState()) {
case LOOKING:
try {
//寻找Leader节点
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
setPeerState(ServerState.LOOKING);
}
break;
case OBSERVING:
try {
//当前节点启动模式为Observer
setObserver(makeObserver(logFactory));
//与Leader节点进行数据同步
observer.observeLeader();
} catch (Exception e) {
} finally {
}
break;
case FOLLOWING:
try {
//当前节点启动模式为Follower
setFollower(makeFollower(logFactory));
//与Leader节点进行数据同步
follower.followLeader();
} catch (Exception e) {
} finally {
}
break;
case LEADING:
try {
//当前节点启动模式为Leader
setLeader(makeLeader(logFactory));
//发送自己成为Leader的通知
leader.lead();
setLeader(null);
} catch (Exception e) {
} finally {
}
break;
}
}
}
}
节点初始化的状态为LOOKING,因此启动时直接会调用lookForLeader方法发起Leader选举,一起看下:
public Vote lookForLeader() throws InterruptedException {
try {
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
//向所有投票节点发送自己的投票信息
sendNotifications();
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
//读取各个节点返回的投票信息
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
//超时重发
if(n == null){
//如果前面待发送的消息已经全部发送,则重新发送
if(manager.haveDelivered()){
sendNotifications();
} else {
//否则尝试与各个节点建立连接
manager.connectAll();
}
//退避算法修改下次等待时间
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval);
}
else if (validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
case LOOKING:
//如果节点的周期大于自己的
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
//清除已收到的投票信息
recvset.clear();
//两个节点根据epoch,zxid,serverId来判断新的投票信息
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//修改选举周期以及投票信息,发起新一轮投票
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
//这里的break是跳出switch语句,别跟循环弄混
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
//如果对方的epoch,zxid,serverId比自己大
//则更新自己的投票给n的投票节点
updateProposal(n.leader, n.zxid, n.peerEpoch);
//重新发送自己新的投票信息
sendNotifications();
}
//把节点的投票信息记录下
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//统计投票信息,判断当前选举是否可以结束,也就是收到的票数信息已经足够确认Leader
if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
//如果没有多余的投票信息则可以结束本次选举周期
if (n == null) {
//根据serverId修改当前节点的类型
self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);
//清空接收消息队列
leaveInstance(endVote);
//返回最终的投票信息
return endVote;
}
}
break;
case OBSERVING:
//Observer节点不参与投票,忽略
break;
case FOLLOWING:
case LEADING:
//如果周期相同,说明当前节点参与了这次选举
if(n.electionEpoch == logicalclock.get()){
//保存投票信息
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//判断当前节点收到的票数是否可以结束选举
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
//把Leader跟Follower的投票信息加入outofelection,确认下它们的信息是否一致
outofelection.put(n.sid, new Vote(n.leader, IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
break;
}
}
}
return null;
}
经过上面的发起投票,统计投票信息最终每个节点都会确认自己的身份,节点根据类型的不同会执行以下逻辑:
- 如果是Leader节点,首先会想其他节点发送一条NEWLEADER信息,确认自己的身份,等到各个节点的ACK消息以后开始正式对外提供服务,同时开启新的监听器,处理新节点加入的逻辑。
- 如果是Follower节点,首先向Leader节点发送一条FOLLOWERINFO信息,告诉Leader节点自己已处理的事务的最大Zxid,然后Leader节点会根据自己的最大Zxid与Follower节点进行同步,如果Follower节点落后的不多则会收到Leader的DIFF信息通过内存同步,如果Follower节点落后的很多则会收到SNAP通过快照同步,如果Follower节点的Zxid大于Leader节点则会收到TRUNC信息忽略多余的事务。
- 如果是Observer节点,则与Follower节点相同。
同步数据的代码比较繁琐,这里就不贴了,但是大体思路就是我说的。
三、总结
本文是Zookeeper系列的最后一篇文章,整个系列代码贴的较多,图很少,因为实在不知道怎么画,另外几篇文章都没有深入到各个细节,我个人的习惯是从流程上理解,大方向把控。真遇到问题了再从流程上入手快速定位到代码块深入研究,希望对ZK感兴趣的同学可以有一定帮助。
最后非常推荐大家去看一下ZAB算法论文,写的很赞,附上链接:
ZooKeeper’s atomic broadcast protocol:Theory and practice。