从服务端启动脚本可以看到启动类为org.apache.zookeeper.server.quorum.QuorumPeerMain
,在这个类注释了关于启动程序的配置文件
第一个参数为配置文件,用于获取配置信息。该文件是一个属性文件,因此键和值由equals(=)分隔,键/值对由换行分隔。下面是配置文件中使用的键的一般摘要。
- dataDir – The directory where the ZooKeeper data is stored.
- dataLogDir – The directory where the ZooKeeper transaction log is stored.
- clientPort – The port used to communicate with clients.
- tickTime – The duration of a tick in milliseconds. This is the basic unit of time in ZooKeeper.
- initLimit – The maximum number of ticks that a follower will wait to initially synchronize with a leader.
- syncLimit – The maximum number of ticks that a follower will wait for a message (including heartbeats) from the leader.
- server.id – This is the host:port[:port] that the server with the given id will use for the quorum protocol.
除了配置文件。数据目录中有一个名为“myid”的文件,其中包含服务器id作为ASCII十进制值。
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
main.initializeAndRun(args);
}
initializeAndRun方法中初始化并启动
protected void initializeAndRun(String[] args) {
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
//解析配置文件zoo.cfg
config.parse(args[0]);
}
// 创建并启动历史文件清理器DatadirCleanupManager
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
// 判断是分布式还是单机
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("。。");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
- 解析配置文件
zoo.cfg
- 创建并启动历史文件清理器DatadirCleanupManager
- 判断当前是集群模式还是单机模式的启动
先分析单机模式,调用ZooKeeperServerMain.main(args);
,这个类启动并运行一个独立的ZooKeeperServer。
注册MBean
ManagedUtil.registerLog4jMBeans();
解析ServerConfig
ServerConfig config = new ServerConfig(); config.parse(args[0]);
启动
runFromConfig(config);
进入runFromConfig方法后
创建ZooKeeperServer实例
final ZooKeeperServer zkServer = new ZooKeeperServer();
实例化FileTxnSnapLog,txnlog和snapshot的助手类
txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
设置zkServer属性
zkServer.setTxnLogFactory(txnLog); zkServer.setTickTime(config.tickTime); zkServer.setMinSessionTimeout(config.minSessionTimeout); zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
创建cnxnFactory,配置并启动,默认为NIOServerCnxnFactory
cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); cnxnFactory.startup(zkServer);
configure配置连接工厂
public void configure(InetSocketAddress addr, int maxcc) throws IOException { configureSaslLogin(); thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr); thread.setDaemon(true); maxClientCnxns = maxcc; this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port " + addr); ss.socket().bind(addr); ss.configureBlocking(false); ss.register(selector, SelectionKey.OP_ACCEPT); }
- 创建一个后台线程,传参为this,也就是说thread线程将执行cnxnFactory的方法
- 打开ServerSocketChannel,监听端口
主线程阻塞并等待信号,优雅关机
shutdownLatch.await(); shutdown(); //主线程等待IO线程执行完成 cnxnFactory.join(); if (zkServer.canShutdown()) { zkServer.shutdown(true); }
第8步并不是主流程,而是等待shutdownLatch的信号,执行关机操作
进入NIOServerCnxnFactory的startup方法
start(); setZooKeeperServer(zks); zks.startdata(); zks.startup();
start方法启动接收连接的线程
thread.start();
在第7步说了,该thread将执行cnxnFactory的run方法,run方法内部就是接收连接、处理新连接,也是常规的NIO操作
for (SelectionKey k : selectedList) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
// 1 这个是对同一个ip限制最大连接树,配置文件可配
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
// 2 为每个客户端channel创建一个NIOServerCnxn,并把cnxn放到SelectionKey的attachment上
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// 3 读写,取出attachment,处理IO
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select " + k.readyOps());
}
}
}
selected.clear();
- 新连接对同一个ip限制最大连接树,配置文件可配
- 新连接为每个客户端channel创建一个NIOServerCnxn,并把cnxn放到SelectionKey的attachment上
- 读写操作,取出上面的attachment,也就是NIOServerCnxn,处理IO
关于请求处理流程这里不分析
回到第9步,
zks.startdata();
恢复数据,重点关注一下
内部调用loadData
方法,恢复session和节点数据public void loadData() { // 设置Zxid,这个在集群选举的时候很重要 if(zkDb.isInitialized()){ setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { // 未初始化,加载数据并设置Zxid setZxid(zkDb.loadDataBase()); } // 清理过期session LinkedList<Long> deadSessions = new LinkedList<Long>(); for (Long session : zkDb.getSessions()) { if (zkDb.getSessionWithTimeOuts().get(session) == null) { deadSessions.add(session); } } zkDb.setDataTreeInit(true); for (long session : deadSessions) { // XXX: Is lastProcessedZxid really the best thing to use? killSession(session, zkDb.getDataTreeLastProcessedZxid()); } }
ZKDatabase#loadDataBase
方法内部通过调用FileTxnSnapLog#restore
恢复数据,这里面有一些zk关于数据存储和恢复的机制需要关注下。/** * 这个函数在读取快照和事务日志之后恢复服务器数据库 */ public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { // 1 snapLog.deserialize(dt, sessions); // 2 return fastForwardFromEdits(dt, sessions, listener); }
- 从最后一个有效快照反序列化数据树,并返回最后一个反序列化的zxid
- 对服务器数据库进行快进处理,使其包含最新的事务。这与恢复相同,但只从事务日志中读取,而不从快照中恢复。
简单来说就是先通过snapshot恢复数据,然后从log中补充缺失的数据。
进到
FileSnap#deserialize
方法public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException { // 查找最新的、有效的100个snapshot文件 List<File> snapList = findNValidSnapshots(100); if (snapList.size() == 0) { return -1L; } File snap = null; boolean foundValid = false; for (int i = 0; i < snapList.size(); i++) { snap = snapList.get(i); InputStream snapIS = null; CheckedInputStream crcIn = null; try { LOG.info("Reading snapshot " + snap); snapIS = new BufferedInputStream(new FileInputStream(snap)); crcIn = new CheckedInputStream(snapIS, new Adler32()); InputArchive ia = BinaryInputArchive.getArchive(crcIn); deserialize(dt,sessions, ia); long checkSum = crcIn.getChecksum().getValue(); long val = ia.readLong("val"); if (val != checkSum) { throw new IOException("CRC corruption in snapshot : " + snap); } // 已恢复有效的snapshot文件 foundValid = true; break; } catch(IOException e) { LOG.warn("problem reading snap file " + snap, e); } finally { if (snapIS != null) snapIS.close(); if (crcIn != null) crcIn.close(); } } if (!foundValid) { throw new IOException("Not able to find valid snapshots in " + snapDir); } //Zxid dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX); return dt.lastProcessedZxid; }
- snapshot文件的查找通过配置中的dataDir目录中以snapshot开头的文件
- 100个?在后续的恢复代码中可以看到,snapshot文件可能出错,需要从多个文件中重试,zookeeper可以通过配置定期删除不需要的、旧的snapshot文件
- snapshot文件命名即为
snapshot.zxid
的形式,根据snapshot的文件的名称即可得到最新的zxid
接下来从log文件恢复snapshot文件缺失的数据,
FileTxnSnapLog#fastForwardFromEdits
方法public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { // 1. log文件 FileTxnLog txnLog = new FileTxnLog(dataDir); // 2 iterator TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; try { while (true) { // 迭代到的TxnHeader hdr = itr.getHeader(); if (hdr == null) { //empty logs return dt.lastProcessedZxid; } if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error("..."); } else { highestZxid = hdr.getZxid(); } try { //3 processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("..."); } //4 listener.onTxnLoaded(hdr, itr.getTxn()); //5 if (!itr.next()) break; } } finally { if (itr != null) { itr.close(); } } return highestZxid; }
这个dataDir并不是配置文件中配置的dataDir,而是dataLogDir,在
ZooKeeperServerMain#runFromConfig
方法中,创建txnLog时txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(config.dataDir)); // FileTxnSnapLog构造函数 public FileTxnSnapLog(File dataDir, File snapDir) throws IOException { this.dataDir = new File(dataDir, version + VERSION); this.snapDir = new File(snapDir, version + VERSION); txnLog = new FileTxnLog(this.dataDir); snapLog = new FileSnap(this.snapDir); }
可以看到配置中
dataDir
是snapLog文件的路径,而dataLogDir
是txnLog的路径,但dataLogDir
不是必须配置的,在QuorumPeerConfig#parseProperties
方法,解析config时if (dataDir == null) { throw new IllegalArgumentException("dataDir is not set"); } if (dataLogDir == null) { dataLogDir = dataDir; }
可以看到
dataDir
是必须配置的,而dataLogDir
不配置时会取dataDir
。创建并初始化TxnIterator,初始化会读取事务日志中比snapshot的zxid大的日志
// FileTxnIterator#init while (hdr.getZxid() < zxid) { if (!next()) return; }
处理本地事务,保存到dataTree中
将提议加入到广播队列中,等待选举称为Leader后广播给其他Followers、Leaners,暂不考虑,需要注意的是传入的listener是调用
ZKDatabase#loadDataBase
方法传入的commitProposalPlaybackListener
private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() { public void onTxnLoaded(TxnHeader hdr, Record txn){ addCommittedProposal(hdr, txn); } };
迭代下一个事务操作
通过读取snapshot文件+log文件,将内存数据库恢复
回到第9步,
zks.startup();
public synchronized void startup() { //1 会话管理器 if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); // 请求处理链 setupRequestProcessors(); // jmx registerJMX(); setState(State.RUNNING); notifyAll(); }
startup
方法有很多子流程创建并启动会话管理器
protected void createSessionTracker() { sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, 1, getZooKeeperServerListener()); } protected void startSessionTracker() { ((SessionTrackerImpl)sessionTracker).start(); }
在
SessionTrackerImpl
的构造函数中public SessionTrackerImpl(SessionExpirer expirer, ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime, long sid, ZooKeeperServerListener listener) { super("SessionTracker", listener); this.expirer = expirer; this.expirationInterval = tickTime; this.sessionsWithTimeout = sessionsWithTimeout; nextExpirationTime = roundToInterval(Time.currentElapsedTime()); this.nextSessionId = initializeNextSession(sid); for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) { addSession(e.getKey(), e.getValue()); } }
- 传入的
expirer
是zks实例,也就是会调用ZooKeeperServer#expire
方法过期session -
expirationInterval
是配置文件中配置的tickTime
,作为过期分桶的最小间隔 -
sessionsWithTimeout
sessionId的过期时间 -
nextExpirationTime
下次清理过期session的时间,从SessionTrackerImpl#roundToInterval
方法可以看出分桶过期策略 -
nextSessionId
分配sessionId时起始值,值为serverId(8) + time(40) + 0(16)
:高8为为serverId(这里传入的1),中40位为时间,底16位递增 - 遍历
sessionsWithTimeout
调用addSession
方法,这个方法内部创建SessionImpl
实例作为session的实体,同时调用SessionTrackerImpl#touchSession
方法将session分桶,同样是调用roundToInterval
方法
启动:sessionTracker作为线程启动,后台定期清理过期数据
- 传入的
设置请求处理链
protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); }
单机模式的请求链
PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
注册JMX服务
那么至此单机版zk就启动完成了,接下来便是处理请求、响应请求、触发节点监听等处理了。