在分析RMI原理一文中,我们知道RMI是通过底层封装TCP网络通信实现。
基于此思路本文从以下切入点实现一个简单的RPC框架,反之也促进了对RMI的理解,相辅相成。
服务端
服务端通过端口发布一个服务,监听端口请求,通过反射调用本地方法,并返回结果给调用方。
1、定义要发布的接口和接口实现类
public interface IHello {
String say();
}
public class IHelloImpl implements IHello {
@Override
public String say() {
return "Hello";
}
}
public class ServerDemo {
public static void main(String[] args) {
IHello iHello = new IHelloImpl();
RpcServer rpcServer = new RpcServer();
rpcServer.publisher(iHello,8080);
}
}
2、定义发布中心,通过线程池来监听请求
public class RpcServer {
//创建线程池
private static final ExecutorService executorService=Executors.newCachedThreadPool();
public void publisher(Object service,Integer port){
try {
ServerSocket serverSocket = new ServerSocket(port);
//循环监听
while (true){
Socket socket = serverSocket.accept();
executorService.submit(new ProcessorHandler(socket,service));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
3、处理请求,通过反射得到结果,并将结果传递给客户端
public class ProcessorHandler implements Runnable {
private Socket socket;
private Object service;
public ProcessorHandler(Socket socket, Object service) {
this.socket = socket;
this.service = service;
}
@Override
public void run(){
try {
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object result = invoke(rpcRequest);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(result);
objectOutputStream.flush();
objectOutputStream.close();
objectInputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private Object invoke(RpcRequest rpcRequest) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Object[] args = rpcRequest.getParams();
Class[] types = null;
if(args != null){
types = new Class[args.length];
for(int i=0 ; i<args.length;i++){
types[i] = args[i].getClass();
}
}
Method method = service.getClass().getMethod(rpcRequest.getMethod(),types);
return method.invoke(service,args);
}
}
客户端
寻找服务,发送请求,得到结果。
1、定义代理,通过代理调用服务端接口
public class RpcClientProxy {
public <T> T clientProxy(Class<T> target,String host,Integer port){
return (T)Proxy.newProxyInstance(target.getClassLoader(),new Class[]{target},new RemoteInvocationHandler(host,port));
}
}
public class RemoteInvocationHandler implements InvocationHandler {
public String host;
public Integer port;
public RemoteInvocationHandler(String host, Integer port) {
this.host = host;
this.port = port;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setName(method.getDeclaringClass().getName());
rpcRequest.setMethod(method.getName());
rpcRequest.setParams(args);
TCPTransport tcpTransport = new TCPTransport(host,port);
return tcpTransport.send(rpcRequest);
}
2、封装网络通信过程
public class TCPTransport {
public String host;
public Integer port;
public TCPTransport(String host, Integer port) {
this.host = host;
this.port = port;
}
private Socket newSocket(){
try {
return new Socket(host,port);
} catch (IOException e) {
throw new RuntimeException("连接建立失败");
}
}
public Object send(RpcRequest rpcRequest){
Socket socket = newSocket();
//发送请求到服务端
try {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(rpcRequest);
objectOutputStream.flush();
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
Object result = objectInputStream.readObject();
objectInputStream.close();
objectOutputStream.close();
return result;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("发起远程调用异常",e);
} finally {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public class ClientDemo {
public static void main(String[] args) {
RpcClientProxy rpcClientProxy = new RpcClientProxy();
IHello iHello = rpcClientProxy.clientProxy(IHello.class,"localhost",8080);
System.out.println(iHello.say());
}
}