Zookeeper源码分析(三) ----- 单机模式(standalone)运行

zookeeper源码分析系列文章:

原创博客,纯手敲,转载请注明出处,谢谢!

一、Zookeeper单机启动原理

Zookeeper属于C/S架构,也就是传统的客户端-服务器模式,客户端发送请求,服务器响应请求。这和高性能网络框架Netty是一样的,因此我们也可以猜想到它的启动方式无非就是从main()方法开始,客户端和服务器各有一个main()方法。

那我们先来看看Zookeeper服务器端的启动过程,当你打开Zookeeper目录下/bin目录中zkServer.cmd文件你就会发现,其实Zookeeper的启动入口为org.apache.zookeeper.server.quorum.QuorumPeerMain类的main方法,无论你是单机模式启动Zookeeper还是复制模式启动Zookeeper,执行入口都是这个类,至于如何区别是哪种模式启动,该类会根据你配置文件的配置进行判断,具体的判断接下来将会详细讲解。

zkServer.cmd详细源代码:

setlocal
call "%~dp0zkEnv.cmd"

set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain // 设置主类入口
echo on
call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%"  %ZOOMAIN% "%ZOOCFG%" %*  // 执行该类的main()方法

endlocal
复制代码

下面先看看单机启动时序图:

《Zookeeper源码分析(三) ----- 单机模式(standalone)运行》

1、首先执行main方法 2、解析传进来的配置文件路径,默认会去${baseDir}\conf\zoo.cfg找配置文件 3、创建NIOServerCnxnFactory进行监听客户端的连接请求,在Zookeeper中有两种ServerCnxnFactory,一种是NIOServerCnxnFactory,另一种是NettyServerCnxnFactory,前者为默认工厂,后者除非你在启动main方法时指定Systemzookeeper.serverCnxnFactory属性值为NettyServerCnxnFactory

下面将详细深入源码分析各个阶段是如何实现以及工作的。

二、Zookeeper单机模式(standalone)启动

  • 1、Zookeeper是如何解析配置文件的?

zk的属性配置分为两种:

1、Java System property:Java系统环境变量,也就是System.setProperty()设置的参数

2、No Java system property配置文件属性,也就是你在配置文件中配置的属性

配置文件的解析原理很简单,无非就是解析一些.properties文件中的键值对,其实Java已经提供了Properties类来代表.properties文件中所有键值对集合,我们可以使用Properties对象的load()方法将一个配置文件装载进内存,然后对该对象进行遍历就得到我们锁配置的属性值集合了。

说到Zookeeper中的配置文件解析,原理也和上面差不多,只不过是在变量键值对的时候多了一些Zookeeper自身的逻辑判断。ZooKeeper中的配置文件解析从QuorumPeerConfig类的parse()方法说起,源代码如下:

/**
 * Parse a ZooKeeper configuration file 解析一个配置文件
 * @param path the patch of the configuration file
 * @throws ConfigException error processing configuration
 */
public void parse(String path) throws ConfigException {
    File configFile = new File(path);
    
    LOG.info("Reading configuration from: " + configFile);
    
    try {
    	if (!configFile.exists()) {
    		throw new IllegalArgumentException(configFile.toString() + " file is missing");
    	}
        // 声明一个Properties对象
    	Properties cfg = new Properties();
    	FileInputStream in = new FileInputStream(configFile);
    	try {
    	    // 传入一个配置文件输入流即可装载所有配置
    		cfg.load(in);
    	} finally {
    	    // 涉及到流的操作记得最后将流关闭
    		in.close();
    	}
        // 此处是zk自身的逻辑处理
    	parseProperties(cfg);
    } catch (IOException e) {
    	throw new ConfigException("Error processing " + path, e);
    } catch (IllegalArgumentException e) {
    	throw new ConfigException("Error processing " + path, e);
    }
}
复制代码

接下来我们来看看上面的parseProperties(cfg)方法,该方法太长了,硬着头皮啃完:

/**
 * Parse config from a Properties.
 * @param zkProp Properties to parse from.
 * @throws IOException
 * @throws ConfigException
 */
public void parseProperties(Properties zkProp) throws IOException, ConfigException {
    int clientPort = 0;
    String clientPortAddress = null;
    // 遍历所有的key-value键值对
    for (Entry<Object, Object> entry : zkProp.entrySet()) {
        // 注意这里要把首尾空格去掉
        String key = entry.getKey().toString().trim();
        String value = entry.getValue().toString().trim();
        // 存储快照文件snapshot的目录配置
        if (key.equals("dataDir")) {
        	dataDir = value;
        	// 事务日志存储目录
        } else if (key.equals("dataLogDir")) {
        	dataLogDir = value;
        	// 客户端连接server的端口,zk启动总得有个端口吧!如果你没有配置,则会报错!一般我们会将端口配置为2181
        } else if (key.equals("clientPort")) {
        	clientPort = Integer.parseInt(value);
        	// 服务器IP地址
        } else if (key.equals("clientPortAddress")) {
        	clientPortAddress = value.trim();
        	// zk中的基本事件单位,用于心跳和session最小过期时间为2*tickTime
        } else if (key.equals("tickTime")) {
        	tickTime = Integer.parseInt(value);
        	// 客户端并发连接数量,注意是一个客户端跟一台服务器的并发连接数量,也就是说,假设值为3,那么某个客户端不能同时并发连接3次到同一台服务器(并发嘛!),否则会出现下面错误too many connections from /127.0.0.1 - max is 3
        } else if (key.equals("maxClientCnxns")) {
        	maxClientCnxns = Integer.parseInt(value);
        } else if (key.equals("minSessionTimeout")) {
        	minSessionTimeout = Integer.parseInt(value);
        } else if (key.equals("maxSessionTimeout")) {
        	maxSessionTimeout = Integer.parseInt(value);
        } else if (key.equals("initLimit")) {
        	initLimit = Integer.parseInt(value);
        } else if (key.equals("syncLimit")) {
        	syncLimit = Integer.parseInt(value);
        } else if (key.equals("electionAlg")) {
        	electionAlg = Integer.parseInt(value);
        } else if (key.equals("quorumListenOnAllIPs")) {
        	quorumListenOnAllIPs = Boolean.parseBoolean(value);
        } else if (key.equals("peerType")) {
            if (value.toLowerCase().equals("observer")) {
        	peerType = LearnerType.OBSERVER;
            } else if (value.toLowerCase().equals("participant")) {
        	peerType = LearnerType.PARTICIPANT;
            } else {
        	throw new ConfigException("Unrecognised peertype: " + value);
        }
	......
复制代码

下面对解析的所有配置项用表格总结下: 所有的配置项都可以在官网查询到。

下面我们一起看看Zookkeeper的配置文件属性:

配置项说明异常情况是否报错?错误 or 备注
clientPort服务端server监听客户端连接的端口不配置clientPort is not set
clientPortAddress客户端连接的服务器ip地址不配置默认使用网卡的地址
dataDir数据快照目录不配置dataDir is not set
dataLogDir事务日志存放目录不配置默认跟dataDir目录相同
tickTimeZK基本时间单元(毫秒),用于心跳和超时.minSessionTimeout默认是两倍ticket不配置tickTime is not set
maxClientCnxns同一ip地址最大并发连接数(也就是说同一个ip最多可以同时维持与服务器链接的个数)不配置默认最大连接数为60,设置为0则无限制
minSessionTimeout最小会话超时时间,默认2*ticket不配置否,若minSessionTimeout > maxSessionTimeout,则报错minSessionTimeout must not be larger than maxSessionTimeout
maxSessionTimeout最大会话超时时间,默认20*ticket不配置不能小于minSessionTimeout
initLimit允许follower同步和连接到leader的时间总量,以ticket为单位不配置initLimit is not set,如果zk管理的数据量特别大,则辞职应该调大
syncLimitfollowerleader之间同步的世间量不配置syncLimit is not set
electionAlgzk选举算法选择,默认值为3,表示采用快速选举算法不配置如果没有配置选举地址server,则抛Missing election port for server: serverid
quorumListenOnAllIPs当设置为true时,ZooKeeper服务器将侦听来自所有可用IP地址的对等端的连接,而不仅仅是在配置文件的服务器列表中配置的地址(即集群中配置的server.1,server.2。。。。)。 它会影响处理ZAB协议和Fast Leader Election协议的连接。 默认值为false不配置
peerType服务器的角色,是观察者observer还是参与选举或成为leader,默认为PARTICIPANT不配置若配置了不知支持的角色,则报Unrecognised peertype:
autopurge.snapRetainCount数据快照保留个数,默认是3,最小也是3不配置
autopurge.purgeInterval执行日志、快照清除任务的时间间隔(小时)不配置默认是 0
server.x=[hostname]:nnnnn[:nnnnn]集群服务器配置不配置单机:否;集群:是zk集群启动将加载该该配置,每台zk服务器必须有一个myid文件,里边存放服务器的id,该id值必须匹配server.x中的x ; 第一个端口表示与leader连接的端口,第二个端口表示用于选举的端口,第二个端口是可选的
  • 2、Zookeeper是如何判断何种模式启动服务器的?

因为ZookeeperZkServer.cmd启动文件指定的统一入口为org.apache.zookeeper.server.quorum.QuorumPeerMain,那么我们就要问了,那ZK是怎么判断我要单机模式启动还是集群方式启动呢?答案是明显的,也就是取决于你在配置文件zoo.cfg中是否有配置server.x=hostname:port1:port2,以上的配置项表明我们想让ZK以集群模式运行,那么在代码中是如何体现的呢?

上面讲到ZK解析配置文件的原理,我们依旧走进parseProperties()方法,看看如下代码:

.....
// 此处解析配置文件以server.开头的配置
} else if (key.startsWith("server.")) {
    // server.3
    int dot = key.indexOf('.');
    long sid = Long.parseLong(key.substring(dot + 1));
    String parts[] = splitWithLeadingHostname(value);
    if ((parts.length != 2) && (parts.length != 3) && (parts.length != 4)) {
    LOG.error(value + " does not have the form host:port or host:port:port "
    		+ " or host:port:port:type");
}
    LearnerType type = null;
    String hostname = parts[0];
    Integer port = Integer.parseInt(parts[1]);
    Integer electionPort = null;
    if (parts.length > 2) {
    electionPort = Integer.parseInt(parts[2]);
    }
if (parts.length > 3) {
    if (parts[3].toLowerCase().equals("observer")) {
    	type = LearnerType.OBSERVER;
    } else if (parts[3].toLowerCase().equals("participant")) {
    	type = LearnerType.PARTICIPANT;
    } else {
    	throw new ConfigException("Unrecognised peertype: " + value);
    }
}
if (type == LearnerType.OBSERVER) {
    observers.put(Long.valueOf(sid),
    		new QuorumServer(sid, hostname, port, electionPort, type));
    } else {
    // 如果配置了,那么就加进servers中,其中servers是一个本地缓存Map,用于存储配置的ip地址
    servers.put(Long.valueOf(sid), new QuorumServer(sid, hostname, port, electionPort, type));
}
复制代码

如果配置了,那么serverssize>0,解析完成之后,回到QuorumPeerMaininitializeAndRun()方法:

 // 如果servers长度大于0,则集群方式启动,否则,单机启动
 if (args.length == 1 && config.servers.size() > 0) {
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
复制代码

从上面可以看出,单机启动的入口为ZooKeeperServerMain类,而统一的入口类为QuorumPeerMain,所以,在ZK中,服务器端的启动类就只有这两个了。

  • 3、单机模式下,Zookeeper是如何处理客户端请求的?

无论是哪种方式启动Zookeeper,它都必须对客户端的请求进行处理,那么ZK是如何处理客户端请求的呢?让我们一起来看看源码是怎么写的!

上面说到,Zk单机启动的入口类为ZooKeeperServerMain,我们一起看下其runFromConfig()方法:

/**
 * Run from a ServerConfig.
 * @param config ServerConfig to use.
 * @throws IOException
 */
public void runFromConfig(ServerConfig config) throws IOException {
    LOG.info("Starting server");
    FileTxnSnapLog txnLog = null;
    try {
        // 创建一个ZooKeeperServer,ZooKeeperServer代表具体运行的zk服务器,包含监听客户端请求
        final ZooKeeperServer zkServer = new ZooKeeperServer();
        // 这个是表明上面创建的ZooKeeperServer线程执行完之后,当前主线程才结束,类似Thread的join()方法
        final CountDownLatch shutdownLatch = new CountDownLatch(1);
        // 关闭服务器时的回调处理器
        zkServer.registerServerShutdownHandler(
                new ZooKeeperServerShutdownHandler(shutdownLatch));
        // 执行快照数据,日志的定时保存操作,指定保存路径
        txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
                config.dataDir));
        zkServer.setTxnLogFactory(txnLog);
        zkServer.setTickTime(config.tickTime);
        zkServer.setMinSessionTimeout(config.minSessionTimeout);
        zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
        // 创建ServerCnxnFactory,默认实现为NIOServerCnxnFactory,也可以指定为NettyServerCnxnFactory
        cnxnFactory = ServerCnxnFactory.createFactory();
        cnxnFactory.configure(config.getClientPortAddress(),
                config.getMaxClientCnxns());
        // 启动服务器,将一个服务器zkServer丢给工厂,然后启动
        cnxnFactory.startup(zkServer);
        // 这里将会等待,除非调用shutdown()方法
        shutdownLatch.await();
        shutdown();
        // 这里会等待直到zkServer线程完成
        cnxnFactory.join();
        if (zkServer.canShutdown()) {
            zkServer.shutdown(true);
        }
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Server interrupted", e);
    } finally {
        if (txnLog != null) {
            txnLog.close();
        }
    }
}
复制代码

了解完上面的代码,我们明白单机启动ZooKeeperServerZK做了什么工作,主要点在zk创建的是哪种工厂,至于NIOServerCnxnFactory的代码,我就不说了,大家有兴趣可以去看看。

回归正题,让我们进入NIOServerCnxnFactoryrun()方法中看看:

public void run() {
while (!ss.socket().isClosed()) {
try {
    // 每一秒轮询一次
	selector.select(1000);
	Set<SelectionKey> selected;
	synchronized (this) {
		selected = selector.selectedKeys();
	}
	ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
	Collections.shuffle(selectedList);
	for (SelectionKey k : selectedList) {
	    // 如果有读请求或者连接请求,则接收请求
		if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
			SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
			InetAddress ia = sc.socket().getInetAddress();
			int cnxncount = getClientCnxnCount(ia);
			// 这里对maxClientCnxns做出判断,防止DOS攻击
			if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
				LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns);
				sc.close();
			} else {
				LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress());
				sc.configureBlocking(false);
				SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
				NIOServerCnxn cnxn = createConnection(sc, sk);
				sk.attach(cnxn);
				addCnxn(cnxn);
			}
		// 如果有读请求且客户端之前有连接过的,则直接处理
		} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
			NIOServerCnxn c = (NIOServerCnxn) k.attachment();
			c.doIO(k);
		} else {
			if (LOG.isDebugEnabled()) {
				LOG.debug("Unexpected ops in select " + k.readyOps());
			}
		}
	}
	selected.clear();
} catch (RuntimeException e) {
	LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
	LOG.warn("Ignoring exception", e);
}
}
closeAll();
LOG.info("NIOServerCnxn factory exited run method");
}
复制代码

看到这,我觉得对于Zk如何监听处理客户端的请求就清晰多了,上面的代码主要采用轮询机制,每一秒轮询一次,通过selector.select(1000)方法指定,这里的监听方式和传统的BIO不同,传统的网络监听采用阻塞的accept()方法,zk采用java的nio实现。

谢谢阅读~~

    原文作者:java集合源码分析
    原文地址: https://juejin.im/post/5ae6b7bcf265da0b767d3f6f
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞