RPC框架Pigeon简析(四)-- 服务端对请求的响应

接上文,在客户端发出请求后,首先处理的自然是netty,在IO处理之后,就进入业务处理NettyServerHandler。最终的处理任务就落在了RequestThreadPoolProcessor这个类身上,主要方法是doProcessRequest

public Future<InvocationResponse> doProcessRequest(final InvocationRequest request,
                                                       final ProviderContext providerContext) {
        requestContextMap.put(request, providerContext);
        doMonitorData(request, providerContext);
        Callable<InvocationResponse> requestExecutor = new Callable<InvocationResponse>() {

            @Override
            public InvocationResponse call() throws Exception {
                providerContext.getTimeline().add(new TimePoint(TimePhase.T));
                try {
                    ServiceInvocationHandler invocationHandler = ProviderProcessHandlerFactory
                            .selectInvocationHandler(providerContext.getRequest().getMessageType());
                    if (invocationHandler != null) {
                        providerContext.setThread(Thread.currentThread());
                        return invocationHandler.handle(providerContext);
                    }
                } catch (Throwable t) {
                    logger.error("Process request failed with invocation handler, you should never be here.", t);
                } finally {
                    requestContextMap.remove(request);
                }
                return null;
            }
        };
        final ThreadPool pool = selectThreadPool(request);
        try {
            checkRequest(pool, request);
            providerContext.getTimeline().add(new TimePoint(TimePhase.T));
            return pool.submit(requestExecutor);
        } catch (RejectedExecutionException e) {
            requestContextMap.remove(request);
            throw new RejectedException(getProcessorStatistics(pool), e);
        }

    }

requestExecutor是整个请求的执行器,内部和请求一样,通过chain-filter来处理。selectThreadPool方法是来选择一个线程池来处理请求。pigeon提供了方法级别的线程池、服务级别的线程池和用户自定义的线程池。默认是使用共享线程池和慢速线程池。为什么会有这么多种线程池呢?比如默认提供的两种,是考虑如果有请求比较缓慢,那么会将请求扔到慢速线程池,这样不会因为几个慢速task,堵死整个线程池。同理,开发人员也可以用自己的策略来使用不同的线程池处理task。
接下来,看看服务端的filter

                registerBizProcessFilter(new TraceFilter());
        if (Constants.MONITOR_ENABLE) {
            registerBizProcessFilter(new MonitorProcessFilter());
        }
        registerBizProcessFilter(new WriteResponseProcessFilter());
        registerBizProcessFilter(new ContextTransferProcessFilter());
        registerBizProcessFilter(new ExceptionProcessFilter());
        registerBizProcessFilter(new SecurityFilter());
        registerBizProcessFilter(new GatewayProcessFilter());
        registerBizProcessFilter(new BusinessProcessFilter());
        bizInvocationHandler = createInvocationHandler(bizProcessFilters);

TraceFilter和客户端类似。MonitorProcessFilter是用来处理监控的,这个要看监控开关是否打开。WriteResponseProcessFilter是利用netty将结果返回给客户端。ContextTransferProcessFilter是用来处理请求参数的。ExceptionProcessFilter是用来处理在调用过程中发生的异常,之前的版本是没有这个filter的,就会导致如果服务端的代码没有捕获异常直接抛出,异常信息没有办法正常的传递给客户端。SecurityFilter是安全相关的filter,包括黑名单、白名单、请求秘钥等等。GatewayProcessFilter处理了一些请求数拦截的事情,比如可以设置某个服务最多请求多少次之类的。BusinessProcessFilter核心就是通过反射直接调用请求的方法,完成远程方法调用。

以上两篇就是整个RPC的请求和应答处理的过程,其实比较简单,后面两遍针对一些特定的问题,讲述下pigeon是用了哪些方法解决的,主要有一些很有用的小技巧,赶脚看了源码之后学到不少。

    原文作者:FutureTech
    原文地址: https://www.jianshu.com/p/270f0067d347
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞