基于Netty的高性能JAVA的RPC框架

版权声明: https://blog.csdn.net/zhujunxxxxx/article/details/48742529

前言

今年7月份左右报名参加了阿里巴巴组织的高性能中间件挑战赛,这次比赛不像以往的比赛,是从一个工程的视角来比赛的。 

这个比赛有两个赛题,第一题是实现一个RPC框架,第二道题是实现一个Mom消息中间件。 

RPC题目如下

一个简单的RPC框架 RPC(Remote Procedure Call )——远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。 框架——让编程人员便捷地使用框架所提供的功能,由于RPC的特性,聚焦于应用的分布式服务化开发,所以成为一个对开发人员无感知的接口代理,显然是RPC框架优秀的设计。 题目要求 1.要成为框架:对于框架的使用者,隐藏RPC实现。 2.网络模块可以自己编写,如果要使用IO框架,要求使用netty-4.0.23.Final。 3.支持异步调用,提供future、callback的能力。 4.能够传输基本类型、自定义业务类型、异常类型(要在客户端抛出)。 5.要处理超时场景,服务端处理时间较长时,客户端在指定时间内跳出本次调用。 6.提供RPC上下文,客户端可以透传数据给服务端。 7.提供Hook,让开发人员进行RPC层面的AOP。 注:为了降低第一题的难度,RPC框架不需要注册中心,客户端识别-DSIP的JVM参数来获取服务端IP。 衡量标准 满足所有要求。 性能测试。 测试时会运行rpc-use-demo中的测试用例,测试的demo包由测试工具做好。 参赛者必须以com.alibaba.middleware.race.rpc.api.impl.RpcConsumerImpl为全类名,继承com.alibaba.middleware.race.rpc.api.RpcConsumer,并覆写所有的public方法。 参赛者必须以com.alibaba.middleware.race.rpc.api.impl.RpcProviderImpl为全类名,继承com.alibaba.middleware.race.rpc.api.RpcProvider,并覆写所有的public方法。 参赛者依赖公共maven中心库上的三方包,即可看到一个示例的demo,按照对应的包名,在自己的工程中建立对应的类(包名、类名一致)。 三方库里的代码起到提示的作用,可以作为参考,不要在最终的pom中依赖。 所以最终参赛者需要打出一个rpc-api的jar包,供测试工程调用。 (注意,参考完rpc-api的示例后,请从pom依赖中将其删除,避免依赖冲突) 测试Demo工程请参考Taocode SVN上的代码。

RPC的实现

题目中推荐的网络框架使用Netty4来实现,这个RPC框架中需要实现的有 

1. RPC客户端 

2. RPC服务端

RPC客户端的实现

RPC客户端和RPC服务器端需要一个相同的接口类,RPC客户端通过一个代理类来调用RPC服务器端的函数

RpcConsumerImpl的实现

……

package com.alibaba.middleware.race.rpc.api.impl;

import java.lang.reflect.InvocationHandler;

import java.lang.reflect.Method;

import java.lang.reflect.Proxy;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.LinkedList;

import java.util.List;

import java.util.Map;

import java.util.UUID;

import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.middleware.race.rpc.aop.ConsumerHook;

import com.alibaba.middleware.race.rpc.api.RpcConsumer;

import com.alibaba.middleware.race.rpc.async.ResponseCallbackListener;

import com.alibaba.middleware.race.rpc.context.RpcContext;

import com.alibaba.middleware.race.rpc.model.RpcRequest;

import com.alibaba.middleware.race.rpc.model.RpcResponse;

import com.alibaba.middleware.race.rpc.netty.RpcConnection;

import com.alibaba.middleware.race.rpc.netty.RpcNettyConnection;

import com.alibaba.middleware.race.rpc.tool.Tool;

public class RpcConsumerImpl extends RpcConsumer implements InvocationHandler {

    private static AtomicLong callTimes = new AtomicLong(0L);

    private RpcConnection connection;

    private List connection_list;

    private Map asyncMethods;

    private Class interfaceClass;

    private String version;

    private int timeout;

    private ConsumerHook hook;

    public Class getInterfaceClass() {

        return interfaceClass;

    }

    public String getVersion() {

        return version;

    }

    public int getTimeout() {

        this.connection.setTimeOut(timeout);

        return timeout;

    }

    public ConsumerHook getHook() {

        return hook;

    }

    RpcConnection select()

    {

        //Random rd=new Random(System.currentTimeMillis());

        int d=(int) (callTimes.getAndIncrement()%(connection_list.size()+1));

        if(d==0)

            return connection;

        else

        {

            return connection_list.get(d-1);

        }

    }

    public RpcConsumerImpl()

    {

        //String ip=System.getProperty(“SIP”);

        String ip=”127.0.0.1″;

        this.asyncMethods=new HashMap();

        this.connection=new RpcNettyConnection(ip,8888);

        this.connection.connect();

        connection_list=new ArrayList();

        int num=Runtime.getRuntime().availableProcessors()/3 -2;

        for (int i = 0; i < num; i++) {

            connection_list.add(new RpcNettyConnection(ip, 8888));

        }

        for (RpcConnection conn:connection_list)

        {

            conn.connect();

        }

    }

    public void destroy() throws Throwable {

        if (null != connection) {

            connection.close();

        }

    }

    @SuppressWarnings(“unchecked”)

    public T proxy(Class interfaceClass) throws Throwable {

        if (!interfaceClass.isInterface()) {

            throw new IllegalArgumentException(interfaceClass.getName()

                    + ” is not an interface”);

        }

        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),

                new Class[] { interfaceClass }, this);

    }

    @Override

    public RpcConsumer interfaceClass(Class interfaceClass) {

        // TODO Auto-generated method stub

        this.interfaceClass=interfaceClass;

        return this;

    }

    @Override

    public RpcConsumer version(String version) {

        // TODO Auto-generated method stub

        this.version=version;

        return this;

    }

    @Override

    public RpcConsumer clientTimeout(int clientTimeout) {

        // TODO Auto-generated method stub

        this.timeout=clientTimeout;

        return this;

    }

    @Override

    public RpcConsumer hook(ConsumerHook hook) {

        // TODO Auto-generated method stub

        this.hook=hook;

        return this;

    }

    @Override

    public Object instance() {

        // TODO Auto-generated method stub

        try {

            return proxy(this.interfaceClass);

        }

        catch (Throwable e)

        {

            e.printStackTrace();

        }

        return null;

    }

    @Override

    public void asynCall(String methodName) {

        // TODO Auto-generated method stub

        asynCall(methodName, null);

    }

    @Override

    public void asynCall(

            String methodName, T callbackListener) {

        this.asyncMethods.put(methodName, callbackListener);

        this.connection.setAsyncMethod(asyncMethods);

        for (RpcConnection conn:connection_list)

        {

            conn.setAsyncMethod(asyncMethods);

        }

    }

    @Override

    public void cancelAsyn(String methodName) {

        // TODO Auto-generated method stub

        this.asyncMethods.remove(methodName);

        this.connection.setAsyncMethod(asyncMethods);

        for (RpcConnection conn:connection_list)

        {

            conn.setAsyncMethod(asyncMethods);

        }

    }

    @Override

    public Object invoke(Object proxy, Method method, Object[] args)

            throws Throwable {

        // TODO Auto-generated method stub

        List parameterTypes = new LinkedList();

        for (Class parameterType : method.getParameterTypes()) {

            parameterTypes.add(parameterType.getName());

        }

        RpcRequest request = new RpcRequest();

        request.setRequestId(UUID.randomUUID().toString());

        request.setClassName(method.getDeclaringClass().getName());

        request.setMethodName(method.getName());

        request.setParameterTypes(method.getParameterTypes());

        request.setParameters(args);

        if(hook!=null)

            hook.before(request);

        RpcResponse response = null;

        try

        {

            request.setContext(RpcContext.props);

            response = (RpcResponse) select().Send(request,asyncMethods.containsKey(request.getMethodName()));

            if(hook!=null)

                hook.after(request);

            if(!asyncMethods.containsKey(request.getMethodName())&&response.getExption()!=null)

            {

                Throwable e=(Throwable) Tool.deserialize(response.getExption(),response.getClazz());

                throw e.getCause();

            }

        }

        catch (Throwable t)

        { 

            //t.printStackTrace();

            //throw new RuntimeException(t);

            throw t;

        }

        finally

        {

//          if(asyncMethods.containsKey(request.getMethodName())&&asyncMethods.get(request.getMethodName())!=null)

//          {

//              cancelAsyn(request.getMethodName());

//          }

        }

        if(response==null)

        {

            return null;

        }

        else if (response.getErrorMsg() != null)

        {

            throw response.getErrorMsg();

        }

        else

        {

            return response.getAppResponse();

        }

    }

}

RpcConsumer consumer;

consumer = (RpcConsumer) getConsumerImplClass().newInstance();

consumer.someMethod();

因为consumer对象是通过代理生成的,所以当consumer调用的时候,就会调用invoke函数,我们就可以把这次本地的函数调用的信息通过网络发送到RPC服务器然后等待服务器返回的信息后再返回。

服务器实现

RPC服务器主要是在收到RPC客户端之后解析出RPC调用的接口名,函数名以及参数。

package com.alibaba.middleware.race.rpc.api.impl;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import java.lang.reflect.Method;

import java.util.HashMap;

import java.util.Map;

import net.sf.cglib.reflect.FastClass;

import net.sf.cglib.reflect.FastMethod;

import com.alibaba.middleware.race.rpc.context.RpcContext;

import com.alibaba.middleware.race.rpc.model.RpcRequest;

import com.alibaba.middleware.race.rpc.model.RpcResponse;

import com.alibaba.middleware.race.rpc.serializer.KryoSerialization;

import com.alibaba.middleware.race.rpc.tool.ByteObjConverter;

import com.alibaba.middleware.race.rpc.tool.ReflectionCache;

import com.alibaba.middleware.race.rpc.tool.Tool;

/**

* 处理服务器收到的RPC请求并返回结果

* @author sei.zz

*

*/

public class RpcRequestHandler extends ChannelInboundHandlerAdapter {

    //对应每个请求ID和端口好 对应一个RpcContext的Map;

    private static Map> ThreadLocalMap=new HashMap>();

    //服务端接口-实现类的映射表

    private final Map handlerMap;

    KryoSerialization kryo=new KryoSerialization();

    public RpcRequestHandler(Map handlerMap) {

        this.handlerMap = handlerMap;

    }

    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

            System.out.println(“active”);

    }

    @Override

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        // TODO Auto-generated method stub

        System.out.println(“disconnected”);

    }

    //更新RpcContext的类容

    private void UpdateRpcContext(String host,Map map)

    {

        if(ThreadLocalMap.containsKey(host))

        {

            Map local=ThreadLocalMap.get(host);

            local.putAll(map);//把客户端的加进来

            ThreadLocalMap.put(host, local);//放回去

            for(Map.Entry entry:map.entrySet()){ //更新变量

                RpcContext.addProp(entry.getKey(), entry.getValue());

            }

        }

        else

        {

            ThreadLocalMap.put(host, map);

            //把对应线程的Context更新

            for(Map.Entry entry:map.entrySet()){

                  RpcContext.addProp(entry.getKey(), entry.getValue());

            }

        }

    }

      //用来缓存住需要序列化的结果

      private static Object cacheName=null;

      private static Object cacheVaule=null;

      @Override

      public void channelRead(

          ChannelHandlerContext ctx, Object msg) throws Exception {

          RpcRequest request=(RpcRequest)msg;

          String host=ctx.channel().remoteAddress().toString();

          //更新上下文

          UpdateRpcContext(host,request.getContext());

          //TODO 获取接口名 函数名 参数    找到实现类  反射实现

          RpcResponse response = new RpcResponse();

          response.setRequestId(request.getRequestId());

          try

          {

              Object result = handle(request);

              if(cacheName!=null&&cacheName.equals(result))

              {

                  response.setAppResponse(cacheVaule);

              }

              else

              {

                  response.setAppResponse(ByteObjConverter.ObjectToByte(result));

                  cacheName=result;

                  cacheVaule=ByteObjConverter.ObjectToByte(result);

              }

          }

          catch (Throwable t)

          {

              //response.setErrorMsg(t);

              response.setExption(Tool.serialize(t));

              response.setClazz(t.getClass());

          }

          ctx.writeAndFlush(response);

      }

      /**

          * 运行调用的函数返回结果

          * @param request

          * @return

          * @throws Throwable

          */

    private static RpcRequest methodCacheName=null;

    private static Object  methodCacheValue=null;

    private Object handle(RpcRequest request) throws Throwable

    {

        String className = request.getClassName();

        Object classimpl = handlerMap.get(className);//通过类名找到实现的类

        Class clazz = classimpl.getClass();

        String methodName = request.getMethodName();

        Class[] parameterTypes = request.getParameterTypes();

        Object[] parameters = request.getParameters();

//      Method method = ReflectionCache.getMethod(clazz.getName(),methodName, parameterTypes);

//      method.setAccessible(true);

        //System.out.println(className+”:”+methodName+”:”+parameters.length);

        if(methodCacheName!=null&&methodCacheName.equals(request))

        {

            return methodCacheValue;

        }

        else

        {

            try

            {

                methodCacheName=request;

                if(methodMap.containsKey(methodName))

                {

                    methodCacheValue= methodMap.get(methodName).invoke(classimpl, parameters);

                    return methodCacheValue;

                }

                else

                {

                    FastClass serviceFastClass = FastClass.create(clazz);

                    FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);

                    methodMap.put(methodName, serviceFastMethod);

                    methodCacheValue= serviceFastMethod.invoke(classimpl, parameters);

                    return methodCacheValue;

                }

                //return method.invoke(classimpl, parameters);

            }

            catch (Throwable e)

            {

                throw e.getCause();

            }

        }

    }

      private Map methodMap=new HashMap();

      @Override

      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        ctx.flush();

      }

      @Override

      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception

      {

          //ctx.close();

          //cause.printStackTrace();

          ctx.close();

      }

    }

handel函数通过Java的反射机制,找到要调用的接口类然后调用对应函数然后执行,然后返回结果到客户端,本次RPC调用结束。

RPC主要的实现类在我的github上可以看见,我的这套RPC框架虽说不上完美,但是性能还是挺好的在服务器上测试时TPC有9w+。 

主要的优化就是使用Neety4这个框架以及对数据包的处理,数据序列化与反序列化的速度 

github地址: https://github.com/zhujunxxxxx/

原创声明

作者 小竹zz 

本文地址http://blog.csdn.net/zhujunxxxxx/article/details/48742529,如需转载请注明出处

给大家免费分享一波福利吧,我自己收集了一些Java资料,里面就包涵了一些BAT面试资料,以及一些 Java 高并发、分布式、微服务、高性能、源码分析、JVM等技术资料

感兴趣的可以自己来我的Java架构进阶群,可以免费来群里下载,群号:171662117对Java技术,架构技术感兴趣的同学,欢迎加群,一起学习,相互讨论。

    原文作者:杜弥
    原文地址: https://www.jianshu.com/p/3a7d253d5cb9
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞