RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。
RPC 可基于 HTTP 或 TCP 协议,Web Service 就是基于 HTTP 协议的 RPC,它具有良好的跨平台性,但其性能却不如基于 TCP 协议的 RPC。会两方面会直接影响 RPC 的性能,一是传输方式,二是序列化。
众所周知,TCP 是传输层协议,HTTP 是应用层协议,而传输层较应用层更加底层,在数据传输方面,越底层越快,因此,在一般情况下,TCP 一定比 HTTP 快。就序列化而言,Java 提供了默认的序列化方式,但在高并发的情况下,这种方式将会带来一些性能上的瓶颈,于是市面上出现了一系列优秀的序列化框架,比如:Protobuf、Kryo、Hessian、Jackson 等,它们可以取代 Java 默认的序列化,从而提供更高效的性能。
下面是简单实现的基于netty的RPC调用。
一、首先定义消息传递的实体类
public class ClassInfo implements Serializable{
private static final long serialVersionUID = -8970942815543515064L;
private String className;//类名
private String methodName;//函数名称
private Class[] types;//参数类型
private Object[] objects;//参数列表
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class[] getTypes() {
return types;
}
public void setTypes(Class[] types) {
this.types = types;
}
public Object[] getObjects() {
return objects;
}
public void setObjects(Object[] objects) {
this.objects = objects;
}
}
二、创建Netty操作的服务端,以及具体操作
1. 服务端
public class RPCServer {
private int port;
public RPCServer(int port){
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap().
group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .localAddress(port).
childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(“encoder”, new ObjectEncoder());
pipeline.addLast(“decoder”, new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new InvokerHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println(“Server start listen at ” + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8899;
}
new RPCServer(port).start();
}
}
2、服务端操作
由服务端我们看到具体的数据传输操作是进行序列化的,具体的操作还是比较简单的,就是获取发送过来的信息,这样就可以通过反射获得类名,根据函数名和参数值,执行具体的操作,将执行结果发送给客户端。
public class InvokerHandler extends ChannelInboundHandlerAdapter {
public static ConcurrentHashMapclassMap = new ConcurrentHashMap();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ClassInfo classInfo = (ClassInfo)msg;
Object claszz = null;
if(!classMap.containsKey(classInfo.getClassName())){
try {
claszz = Class.forName(classInfo.getClassName()).newInstance();
classMap.put(classInfo.getClassName(), claszz);
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
e.printStackTrace();
}
}else {
claszz = classMap.get(classInfo.getClassName());
}
Method method = claszz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
Object result = method.invoke(claszz, classInfo.getObjects());
ctx.write(result);
ctx.flush();
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
三、客户端,通过代理机制来触发远程调用
1、客户端
当执行具体的函数时会调用远程操作,将具体操作的类、函数及参数信息发送到服务端
public class RPCProxy{
@SuppressWarnings(“unchecked”)
public staticT create(final Object target){
return (T) Proxy.newProxyInstance(target.getClass().getClassLoader(),target.getClass().getInterfaces(),
new InvocationHandler(){
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
ClassInfo classInfo = new ClassInfo();
classInfo.setClassName(target.getClass().getName());
classInfo.setMethodName(method.getName());
classInfo.setObjects(args);
classInfo.setTypes(method.getParameterTypes());
final ResultHandler resultHandler = new ResultHandler();
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(“frameDecoder”, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(“frameEncoder”, new LengthFieldPrepender(4));
pipeline.addLast(“encoder”, new ObjectEncoder());
pipeline.addLast(“decoder”, new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast(“handler”,resultHandler);
}
});
ChannelFuture future = b.connect(“localhost”, 8899).sync();
future.channel().writeAndFlush(classInfo).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
return resultHandler.getResponse();
}
});
}
}
2、获取远程调用返回的结果值
public class ResultHandler extends ChannelInboundHandlerAdapter{
private Object response;
public Object getResponse() {
return response;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)throwsException {
response=msg;
System.out.println(“client接收到服务器返回的消息:”+ msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
System.out.println(“client exception is general”);
}
}
四、 接口、实现类及Main操作
public interface HelloRpc {
String hello(String name);
}
public class HelloRpcImpl implements HelloRpc{
@Override
public String hello(String name) {
return”hello “+name;
}
}
public class NettyRpcMain {
public static void main(String[] args){
HelloRpc helloRpc = new HelloRpcImpl();
HelloRpc echo = RPCProxy.create(helloRpc);
System.out.println(echo.hello(“这是我的第一个手写rpc!”));
}
}