zookeeper源码分析(7)-服务器请求处理链的初始化

在zookeeper集群中,分为Leader,Follewer,Observer三种类型的服务器角色,请求是通过各自的请求处理链来处理,所有的请求处理器均实现了RequestProcessor接口,通过处理链的上一个请求处理器调用该处理器的processRequest方法将请求传递过来,这个请求的传递过程是由一个线程完成的。

public interface RequestProcessor {
    public static class RequestProcessorException extends Exception {
        public RequestProcessorException(String msg, Throwable t) {
            super(msg, t);
        }
    }
//请求处理方法
    void processRequest(Request request) throws RequestProcessorException;

    void shutdown();
}

下面分别看下不同角色的服务器启动时的请求处理链初始化过程。

Leader请求处理链初始化

Leader的主要工作如下:

  • 事务请求的唯一调度和处理者,保证集群事务处理的顺序性。
  • 集群内部各服务器的调度者。
    当leader完成集群间数据的同步时,会启动LeaderZooKeeperServer,初始化请求链。
    LeaderZooKeeperServer.setupRequestProcessors
protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        proposalProcessor.initialize();
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
//设置LeaderZooKeeperServer.firstProcessor
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

        setupContainerManager();
    }

请求处理器都持有下一个请求处理器的引用private final RequestProcessor nextProcessor;,在上面的构造方法中会设置各自的nextProcessor,并启动处理器。最后会设置LeaderZooKeeperServer.firstProcessorLeaderRequestProcessor,这个处理器主要是对本地session创建临时节点时的请求预处理,将在=======介绍,它的nextProcessorPrepRequestProcessor

可大体认为Leader的请求处理链如下:

《zookeeper源码分析(7)-服务器请求处理链的初始化》

PrepRequestProcessor

Leader服务器的请求预处理器,进行一些创建请求事务头,事务体,ACL检查和版本检查等的预处理操作。初始化方法为:

public PrepRequestProcessor(ZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        super("ProcessThread(sid:" + zks.getServerId() + " cport:"
                + zks.getClientPort() + "):", zks.getZooKeeperServerListener());
        this.nextProcessor = nextProcessor;
        this.zks = zks;
    }

主要属性为:

//请求存储队列,该线程启动后会不断从队列中获取请求进行预处理
LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
//设置为ProposalRequestProcessor
    private final RequestProcessor nextProcessor;
//当前zkServer实例
    ZooKeeperServer zks;

ProposalRequestProcessor
Leader服务器的事务投票处理器,也是事务处理流程的发起者。对于非事务请求,它会直接将请求流转到 CommitProcessor处理器。对于事务请求,除了将请求交给CommitProcessor处理器外,还会根据请求类型创建对应的Proposal提议,并发送给所有的Follewer服务器来发起一次集群内的事务投票。同时,它还会将事务请求交给SyncRequestProcessor处理器进行事务日志的记录。
初始化过程为:

public ProposalRequestProcessor(LeaderZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        this.zks = zks;
        this.nextProcessor = nextProcessor;
//初始化SyncRequestProcessor
        AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
        syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
    }
//启动syncProcessor
 public void initialize() {
        syncProcessor.start();
    }

主要属性为:

 LeaderZooKeeperServer zks;
//设置为CommitProcessor
    RequestProcessor nextProcessor;
    SyncRequestProcessor syncProcessor;

SyncRequestProcessor
是事务日志记录处理器,主要用来将事务请求记录到事务日志文件中,同时会根据条件触发zookeeper进行数据快照。
并在数据同步完成后将请求传递给nextProcessor
初始化过程为:

public SyncRequestProcessor(ZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        super("SyncThread:" + zks.getServerId(), zks
                .getZooKeeperServerListener());
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        running = true;
    }

主要属性为:

private final ZooKeeperServer zks;
//请求存储队列
    private final LinkedBlockingQueue<Request> queuedRequests =
        new LinkedBlockingQueue<Request>();
//leader服务器中中会设置为AckRequestProcessor
    private final RequestProcessor nextProcessor;
//用于异步快照的线程
    private Thread snapInProcess = null;
//线程运行状态标志
    volatile private boolean running;

AckRequestProcessor
负责在SyncRequestProcessor处理器完成事务日志记录后,向Proposal投票收集器发送ACK反馈,表示当前leader服务器已经完成了对该Proposal的事务日志记录。初始化过程为:

//持有leader引用,调用leader.processAck发送反馈
 Leader leader;

    AckRequestProcessor(Leader leader) {
        this.leader = leader;
    }

CommitProcessor
事务提交处理器,对于非事务请求,该处理器会直接将请求交给nextProcessor处理;对于事务请求,它会等待集群内针对Proposal的投票直到Proposal可被提交,它保证了事务请求的顺序处理。初始化过程为:

//LeaderZooKeeperServer.setupRequestProcessors
 commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        commitProcessor.start();

public CommitProcessor(RequestProcessor nextProcessor, String id,
                           boolean matchSyncs, ZooKeeperServerListener listener) {
        super("CommitProcessor:" + id, listener);
        this.nextProcessor = nextProcessor;
        this.matchSyncs = matchSyncs;
    }

主要属性为:

 /** Default: numCores */
    public static final String ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS =
        "zookeeper.commitProcessor.numWorkerThreads";
    /** Default worker pool shutdown timeout in ms: 5000 (5s) */
    public static final String ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT =
        "zookeeper.commitProcessor.shutdownTimeout";

//刚进来的请求存储队列
    protected LinkedBlockingQueue<Request> queuedRequests =
        new LinkedBlockingQueue<Request>();

   //已经提交的请求队列
    protected final LinkedBlockingQueue<Request> committedRequests =
        new LinkedBlockingQueue<Request>();

//等待Proposal可被提交的请求,key为session id,value为所关联的session's requests.
    protected final Map<Long, LinkedList<Request>> pendingRequests =
            new HashMap<Long, LinkedList<Request>>(10000);

    /** The number of requests currently being processed */
    protected final AtomicInteger numRequestsProcessing = new AtomicInteger(0);

//设置为ToBeAppliedProcessor
    RequestProcessor nextProcessor;

    /** For testing purposes, we use a separated stopping condition for the
     * outer loop.*/
    protected volatile boolean stoppedMainLoop = true; 
    protected volatile boolean stopped = true;

    private long workerShutdownTimeoutMS;
//处理已提交的请求
    protected WorkerService workerPool;
    private Object emptyPoolSync = new Object();

    /**
     * This flag indicates whether we need to wait for a response to come back from the
     * leader or we just let the sync operation flow through like a read. The flag will
     * be false if the CommitProcessor is in a Leader pipeline.
     */
//leader服务器默认为false
    boolean matchSyncs;

start方法主要是初始化workerPool并启动线程

public void start() {
        int numCores = Runtime.getRuntime().availableProcessors();
        int numWorkerThreads = Integer.getInteger(
            ZOOKEEPER_COMMIT_PROC_NUM_WORKER_THREADS, numCores);
        workerShutdownTimeoutMS = Long.getLong(
            ZOOKEEPER_COMMIT_PROC_SHUTDOWN_TIMEOUT, 5000);

        LOG.info("Configuring CommitProcessor with "
                 + (numWorkerThreads > 0 ? numWorkerThreads : "no")
                 + " worker threads.");
        if (workerPool == null) {
            workerPool = new WorkerService(
                "CommitProcWork", numWorkerThreads, true);
        }
        stopped = false;
        stoppedMainLoop = false;
        super.start();
    }

ToBeAppliedProcessor
维护了那些已被CommitProcessor处理过的可被提交的Proposal事务请求的队列leader.toBeApplied,会将请求交给下一个处理器next处理,如果是事务请求,next处理完之后需要将请求从toBeApplied中移除。初始化方法为:

/**
         * This request processor simply maintains the toBeApplied list. For
         * this to work next must be a FinalRequestProcessor and
         * FinalRequestProcessor.processRequest MUST process the request
         * synchronously!
         *
         * @param next
         *                a reference to the FinalRequestProcessor
         */
        ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {
            if (!(next instanceof FinalRequestProcessor)) {
                throw new RuntimeException(ToBeAppliedRequestProcessor.class
                        .getName()
                        + " must be connected to "
                        + FinalRequestProcessor.class.getName()
                        + " not "
                        + next.getClass().getName());
            }
            this.leader = leader;
            this.next = next;
        }

主要属性为:

//设置为FinalRequestProcessor
private final RequestProcessor next;

        private final Leader leader;

FinalRequestProcessor
最后一个请求处理器,处理请求并构造客户端请求的响应。针对事务事情,会负责将事务应用到内存数据库中。仅有ZooKeeperServer zks 成员变量。

Follewer请求处理链初始化

Follewer服务器的主要工作如下:

  • 处理客户端非事务请求,转发事务请求给Leader服务器
  • 参与事务请求Proposal的投票
  • 参与Leader选举投票
    当Follewer完成集群间数据的同步时,会启动FollowerZooKeeperServer,初始化请求链。
    FollowerZooKeeperServer.setupRequestProcessors
protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this,
                new SendAckRequestProcessor((Learner)getFollower()));
        syncProcessor.start();
    }

可以看出第一个请求处理器firstProcessor=new FollowerRequestProcessor(this, commitProcessor); 同leader服务器请求处理链的初始化过程,会初始化每个请求处理器的nextProcessor,并启动处理器。
Follewer的请求处理链如下:

《zookeeper源码分析(7)-服务器请求处理链的初始化》

FollowerRequestProcessor
Follewer服务器的第一个请求处理器,识别当前请求是否是事务请求。如果是事务请求,不仅将请求交给nextProcessor,还会将事务请求转发给Leader服务器。初始化过程为:

public FollowerRequestProcessor(FollowerZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        super("FollowerRequestProcessor:" + zks.getServerId(), zks
                .getZooKeeperServerListener());
        this.zks = zks;
        this.nextProcessor = nextProcessor;
    }

主要属性为:

FollowerZooKeeperServer zks;
//设置为CommitProcessor
    RequestProcessor nextProcessor;

    LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();

    boolean finished = false;

CommitProcessor
在Follewer服务器中,初始化过程为:

RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();

此时,matchSyncs=true,表示对于OpCode.sync同步请求,需要等待leader的响应。
SyncRequestProcessor
在Follewer服务器中,nextProcessor为SendAckRequestProcessor,会接收leader服务器的请求,并将数据强刷到磁盘上,并将请求交给SendAckRequestProcessor处理
SendAckRequestProcessor
承担事务日志记录反馈的角色,在完成强刷事务日志记录后,会向leader服务器发送ACK消息表明自身完成了事务日志的记录工作。

Observer请求处理链初始化

Observer作用是观察zookeeper集群的最新状态并将变更同步过来,原理同Follewer,对于非事务请求,可进行独立处理。对于事务请求,会转发给Leader服务器处理。但是不参与任何形式的投票。
当Observer完成集群间数据的同步时,会启动ObserverZooKeeperServer,初始化请求链。
ObserverZooKeeperServer.setupRequestProcessors

protected void setupRequestProcessors() {      
        // We might consider changing the processor behaviour of 
        // Observers to, for example, remove the disk sync requirements.
        // Currently, they behave almost exactly the same as followers.
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true,
                getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
        ((ObserverRequestProcessor) firstProcessor).start();

        /*
         * Observer should write to disk, so that the it won't request
         * too old txn from the leader which may lead to getting an entire
         * snapshot.
         *
         * However, this may degrade performance as it has to write to disk
         * and do periodic snapshot which may double the memory requirements
         */
        if (syncRequestProcessorEnabled) {
            syncProcessor = new SyncRequestProcessor(this, null);
            syncProcessor.start();
        }
    }

可以看出在初始化请求处理链的阶段会将SyncRequestProcessor启动,为了能记录事务日志和定期同步快照信息。但是nextProcessor为null,不需要参与事务请求的ACK回复。第一个请求处理器firstProcessor = new ObserverRequestProcessor(this, commitProcessor);同FollowerRequestProcessor的作用。
Observer请求处理链为:

《zookeeper源码分析(7)-服务器请求处理链的初始化》

    原文作者:Monica2333
    原文地址: https://www.jianshu.com/p/5f5c3a2c909d
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞