ZooKeeper服务端启动源码 单机

从服务端启动脚本可以看到启动类为org.apache.zookeeper.server.quorum.QuorumPeerMain,在这个类注释了关于启动程序的配置文件

第一个参数为配置文件,用于获取配置信息。该文件是一个属性文件,因此键和值由equals(=)分隔,键/值对由换行分隔。下面是配置文件中使用的键的一般摘要。

  • dataDir – The directory where the ZooKeeper data is stored.
  • dataLogDir – The directory where the ZooKeeper transaction log is stored.
  • clientPort – The port used to communicate with clients.
  • tickTime – The duration of a tick in milliseconds. This is the basic unit of time in ZooKeeper.
  • initLimit – The maximum number of ticks that a follower will wait to initially synchronize with a leader.
  • syncLimit – The maximum number of ticks that a follower will wait for a message (including heartbeats) from the leader.
  • server.id – This is the host:port[:port] that the server with the given id will use for the quorum protocol.

除了配置文件。数据目录中有一个名为“myid”的文件,其中包含服务器id作为ASCII十进制值。

public static void main(String[] args) {
  QuorumPeerMain main = new QuorumPeerMain();
  main.initializeAndRun(args);
}

initializeAndRun方法中初始化并启动

protected void initializeAndRun(String[] args) {
  QuorumPeerConfig config = new QuorumPeerConfig();
  if (args.length == 1) {
        //解析配置文件zoo.cfg
    config.parse(args[0]);
  }

  // 创建并启动历史文件清理器DatadirCleanupManager
  DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
        .getDataDir(), config.getDataLogDir(), config
        .getSnapRetainCount(), config.getPurgeInterval());
  purgeMgr.start();
    // 判断是分布式还是单机
  if (args.length == 1 && config.isDistributed()) {
    runFromConfig(config);
  } else {
    LOG.warn("。。");
    // there is only server in the quorum -- run as standalone
    ZooKeeperServerMain.main(args);
  }
}
  1. 解析配置文件zoo.cfg
  2. 创建并启动历史文件清理器DatadirCleanupManager
  3. 判断当前是集群模式还是单机模式的启动

先分析单机模式,调用ZooKeeperServerMain.main(args);,这个类启动并运行一个独立的ZooKeeperServer。

  1. 注册MBean

    ManagedUtil.registerLog4jMBeans();
    
  2. 解析ServerConfig

    ServerConfig config = new ServerConfig();
    config.parse(args[0]);
    
  3. 启动

    runFromConfig(config);
    

进入runFromConfig方法后

  1. 创建ZooKeeperServer实例

    final ZooKeeperServer zkServer = new ZooKeeperServer();
    
  2. 实例化FileTxnSnapLog,txnlog和snapshot的助手类

 txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
  1. 设置zkServer属性

    zkServer.setTxnLogFactory(txnLog);
    zkServer.setTickTime(config.tickTime);
    zkServer.setMinSessionTimeout(config.minSessionTimeout);
    zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
    
  2. 创建cnxnFactory,配置并启动,默认为NIOServerCnxnFactory

    cnxnFactory = ServerCnxnFactory.createFactory();
    cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
    cnxnFactory.startup(zkServer);
    

    configure配置连接工厂

    public void configure(InetSocketAddress addr, int maxcc) throws IOException {
      configureSaslLogin();
    
      thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
      thread.setDaemon(true);
      maxClientCnxns = maxcc;
      this.ss = ServerSocketChannel.open();
      ss.socket().setReuseAddress(true);
      LOG.info("binding to port " + addr);
      ss.socket().bind(addr);
      ss.configureBlocking(false);
      ss.register(selector, SelectionKey.OP_ACCEPT);
    }
    
    1. 创建一个后台线程,传参为this,也就是说thread线程将执行cnxnFactory的方法
    2. 打开ServerSocketChannel,监听端口
  3. 主线程阻塞并等待信号,优雅关机

    shutdownLatch.await();
    shutdown();
    //主线程等待IO线程执行完成
    cnxnFactory.join();
    if (zkServer.canShutdown()) {
        zkServer.shutdown(true);
    }
    

第8步并不是主流程,而是等待shutdownLatch的信号,执行关机操作

  1. 进入NIOServerCnxnFactory的startup方法

    start();
    setZooKeeperServer(zks);
    zks.startdata();
    zks.startup();
    
  2. start方法启动接收连接的线程

thread.start();

在第7步说了,该thread将执行cnxnFactory的run方法,run方法内部就是接收连接、处理新连接,也是常规的NIO操作

for (SelectionKey k : selectedList) {
  if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
    SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
    InetAddress ia = sc.socket().getInetAddress();
    int cnxncount = getClientCnxnCount(ia);
    // 1 这个是对同一个ip限制最大连接树,配置文件可配
    if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
      LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns );
      sc.close();
    } else {
      LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress());
      sc.configureBlocking(false);
      SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
      // 2 为每个客户端channel创建一个NIOServerCnxn,并把cnxn放到SelectionKey的attachment上
      NIOServerCnxn cnxn = createConnection(sc, sk);
      sk.attach(cnxn);
      addCnxn(cnxn);
    }
  } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
    // 3 读写,取出attachment,处理IO
    NIOServerCnxn c = (NIOServerCnxn) k.attachment();
    c.doIO(k);
  } else {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Unexpected ops in select " + k.readyOps());
    }
  }
}
selected.clear();
  1. 新连接对同一个ip限制最大连接树,配置文件可配
  2. 新连接为每个客户端channel创建一个NIOServerCnxn,并把cnxn放到SelectionKey的attachment上
  3. 读写操作,取出上面的attachment,也就是NIOServerCnxn,处理IO

关于请求处理流程这里不分析

  1. 回到第9步,zks.startdata();恢复数据,重点关注一下
    内部调用loadData方法,恢复session和节点数据

    public void loadData() {
      // 设置Zxid,这个在集群选举的时候很重要
      if(zkDb.isInitialized()){
        setZxid(zkDb.getDataTreeLastProcessedZxid());
      } else {
        // 未初始化,加载数据并设置Zxid
        setZxid(zkDb.loadDataBase());
      }
    
      // 清理过期session
      LinkedList<Long> deadSessions = new LinkedList<Long>();
      for (Long session : zkDb.getSessions()) {
        if (zkDb.getSessionWithTimeOuts().get(session) == null) {
          deadSessions.add(session);
        }
      }
      zkDb.setDataTreeInit(true);
      for (long session : deadSessions) {
        // XXX: Is lastProcessedZxid really the best thing to use?
        killSession(session, zkDb.getDataTreeLastProcessedZxid());
      }
    }
    

    ZKDatabase#loadDataBase方法内部通过调用FileTxnSnapLog#restore恢复数据,这里面有一些zk关于数据存储和恢复的机制需要关注下。

    /**
    * 这个函数在读取快照和事务日志之后恢复服务器数据库
    */
    public long restore(DataTree dt, Map<Long, Integer> sessions, 
                        PlayBackListener listener) throws IOException {
      // 1
      snapLog.deserialize(dt, sessions);
      // 2
      return fastForwardFromEdits(dt, sessions, listener);
    }
    
    1. 从最后一个有效快照反序列化数据树,并返回最后一个反序列化的zxid
    2. 对服务器数据库进行快进处理,使其包含最新的事务。这与恢复相同,但只从事务日志中读取,而不从快照中恢复。

    简单来说就是先通过snapshot恢复数据,然后从log中补充缺失的数据。

    进到FileSnap#deserialize方法

    public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
      // 查找最新的、有效的100个snapshot文件
      List<File> snapList = findNValidSnapshots(100);
      if (snapList.size() == 0) {
        return -1L;
      }
      File snap = null;
      boolean foundValid = false;
      for (int i = 0; i < snapList.size(); i++) {
        snap = snapList.get(i);
        InputStream snapIS = null;
        CheckedInputStream crcIn = null;
        try {
          LOG.info("Reading snapshot " + snap);
          snapIS = new BufferedInputStream(new FileInputStream(snap));
          crcIn = new CheckedInputStream(snapIS, new Adler32());
          InputArchive ia = BinaryInputArchive.getArchive(crcIn);
          deserialize(dt,sessions, ia);
          long checkSum = crcIn.getChecksum().getValue();
          long val = ia.readLong("val");
          if (val != checkSum) {
            throw new IOException("CRC corruption in snapshot :  " + snap);
          }
          // 已恢复有效的snapshot文件
          foundValid = true;
          break;
        } catch(IOException e) {
          LOG.warn("problem reading snap file " + snap, e);
        } finally {
          if (snapIS != null) 
            snapIS.close();
          if (crcIn != null) 
            crcIn.close();
        } 
      }
      if (!foundValid) {
        throw new IOException("Not able to find valid snapshots in " + snapDir);
      }
      //Zxid
      dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
      return dt.lastProcessedZxid;
    }
    
    1. snapshot文件的查找通过配置中的dataDir目录中以snapshot开头的文件
    2. 100个?在后续的恢复代码中可以看到,snapshot文件可能出错,需要从多个文件中重试,zookeeper可以通过配置定期删除不需要的、旧的snapshot文件
    3. snapshot文件命名即为snapshot.zxid的形式,根据snapshot的文件的名称即可得到最新的zxid

    接下来从log文件恢复snapshot文件缺失的数据,FileTxnSnapLog#fastForwardFromEdits方法

    public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
                                     PlayBackListener listener) throws IOException {
      // 1. log文件
      FileTxnLog txnLog = new FileTxnLog(dataDir);
      // 2 iterator
      TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
      long highestZxid = dt.lastProcessedZxid;
      TxnHeader hdr;
      try {
        while (true) {
          // 迭代到的TxnHeader
          hdr = itr.getHeader();
          if (hdr == null) {
            //empty logs 
            return dt.lastProcessedZxid;
          }
          if (hdr.getZxid() < highestZxid && highestZxid != 0) {
            LOG.error("...");
          } else {
            highestZxid = hdr.getZxid();
          }
          try {
            //3
            processTransaction(hdr,dt,sessions, itr.getTxn());
          } catch(KeeperException.NoNodeException e) {
            throw new IOException("...");
          }
          //4
          listener.onTxnLoaded(hdr, itr.getTxn());
          //5
          if (!itr.next()) 
            break;
        }
      } finally {
        if (itr != null) {
          itr.close();
        }
      }
      return highestZxid;
    }
    
    1. 这个dataDir并不是配置文件中配置的dataDir,而是dataLogDir,在ZooKeeperServerMain#runFromConfig方法中,创建txnLog时

      txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(config.dataDir));
      // FileTxnSnapLog构造函数
      public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
        this.dataDir = new File(dataDir, version + VERSION);
        this.snapDir = new File(snapDir, version + VERSION);
        
        txnLog = new FileTxnLog(this.dataDir);
        snapLog = new FileSnap(this.snapDir);
      }
      

      可以看到配置中dataDir是snapLog文件的路径,而dataLogDir是txnLog的路径,但dataLogDir不是必须配置的,在QuorumPeerConfig#parseProperties方法,解析config时

      if (dataDir == null) {
        throw new IllegalArgumentException("dataDir is not set");
      }
      if (dataLogDir == null) {
        dataLogDir = dataDir;
      }
      

      可以看到dataDir是必须配置的,而dataLogDir不配置时会取dataDir

    2. 创建并初始化TxnIterator,初始化会读取事务日志中比snapshot的zxid大的日志

      // FileTxnIterator#init
      while (hdr.getZxid() < zxid) {
        if (!next())
          return;
      }
      
    3. 处理本地事务,保存到dataTree中

    4. 将提议加入到广播队列中,等待选举称为Leader后广播给其他Followers、Leaners,暂不考虑,需要注意的是传入的listener是调用ZKDatabase#loadDataBase方法传入的commitProposalPlaybackListener

      private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {
        public void onTxnLoaded(TxnHeader hdr, Record txn){
          addCommittedProposal(hdr, txn);
        }
      };
      
    5. 迭代下一个事务操作

    通过读取snapshot文件+log文件,将内存数据库恢复

  2. 回到第9步,zks.startup();

    public synchronized void startup() {
      //1 会话管理器
      if (sessionTracker == null) {
        createSessionTracker();
      }
      startSessionTracker();
      // 请求处理链
      setupRequestProcessors();
    
      // jmx
      registerJMX();
    
      setState(State.RUNNING);
      notifyAll();
    }
    

    startup方法有很多子流程

  3. 创建并启动会话管理器

    protected void createSessionTracker() {
      sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                                              tickTime, 1, getZooKeeperServerListener());
    }
    protected void startSessionTracker() {
      ((SessionTrackerImpl)sessionTracker).start();
    }
    

    SessionTrackerImpl的构造函数中

    public SessionTrackerImpl(SessionExpirer expirer,
      ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
      long sid, ZooKeeperServerListener listener) {
      super("SessionTracker", listener);
      this.expirer = expirer;
      this.expirationInterval = tickTime;
      this.sessionsWithTimeout = sessionsWithTimeout;
      nextExpirationTime = roundToInterval(Time.currentElapsedTime());
      this.nextSessionId = initializeNextSession(sid);
      for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
        addSession(e.getKey(), e.getValue());
      }
    }
    
    • 传入的expirer是zks实例,也就是会调用ZooKeeperServer#expire方法过期session
    • expirationInterval是配置文件中配置的tickTime,作为过期分桶的最小间隔
    • sessionsWithTimeoutsessionId的过期时间
    • nextExpirationTime下次清理过期session的时间,从SessionTrackerImpl#roundToInterval方法可以看出分桶过期策略
    • nextSessionId分配sessionId时起始值,值为serverId(8) + time(40) + 0(16):高8为为serverId(这里传入的1),中40位为时间,底16位递增
    • 遍历sessionsWithTimeout调用addSession方法,这个方法内部创建SessionImpl实例作为session的实体,同时调用SessionTrackerImpl#touchSession方法将session分桶,同样是调用roundToInterval方法

    启动:sessionTracker作为线程启动,后台定期清理过期数据

  4. 设置请求处理链

    protected void setupRequestProcessors() {
      RequestProcessor finalProcessor = new FinalRequestProcessor(this);
      RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
      ((SyncRequestProcessor)syncProcessor).start();
      firstProcessor = new PrepRequestProcessor(this, syncProcessor);
      ((PrepRequestProcessor)firstProcessor).start();
    }
    

    单机模式的请求链

    PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
    
  5. 注册JMX服务

那么至此单机版zk就启动完成了,接下来便是处理请求、响应请求、触发节点监听等处理了。

    原文作者:Kohler
    原文地址: https://www.jianshu.com/p/05d5c54cacfc
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞