RPC即远程过程调用,通过网络从远程计算机程序上请求服务。也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。而在RPC框架基础上则不需要关注底层的网络技术!
详细请参见:https://www.zhihu.com/question/25536695
RPC框架的技术原理:
1、定义RPC请求消息、应答消息结构,里面要包括RPC的接口定义模块、包括远程调用的类名、方法名称、参数结构、参数值等信息。
2、服务端初始化的时候通过容器加载RPC接口定义和RPC接口实现类对象的映射关系,然后等待客户端发起调用请求。
3、客户端发起的RPC消息里面包含,远程调用的类名、方法名称、参数结构、参数值等信息,通过网络,以字节流的方式送给RPC服务端,RPC服务端接收到字节流的请求之后,去对应的容器里面,查找客户端接口映射的具体实现对象。
4、RPC服务端找到实现对象的参数信息,通过反射机制创建该对象的实例,并返回调用处理结果,最后封装成RPC应答消息通知到客户端。
5、客户端通过网络,收到字节流形式的RPC应答消息,进行拆包、解析之后,显示远程调用结果。
对于Provider端
加载配置文档中的bean,向zookeeper等注册中心注册相关服务,并维护相应服务,接受并处理customer服务请求,返回请求结果
对于Customer端
加载服务应用的bean,从zookeeper等注册中心获取服务相关信息(如服务提供者的IP+端口),生成相关请求类的动态代理,被调用时向Provider发送网络请求,异步回调返回执行结果。
customer端需要引入Provider的服务接口API,在使用过程中可以完全忽略网络请求过程,整体体验有如调用普通本地jar包中的方法,却能够实现无侵入的服务接入。
以下是一个简单实现过程:
Provider端
核心在这几个类:
RPCServer : 主要类
ServiceRegistory : 服务注册类
RPCServerHandler : 服务调用处理类
通过server.xml配置文档,加载zookeeper信息与服务信息到RPCServer
<!--扫描需求发布的服务所在的包-->
<context:component-scan base-package="com.yingjun.rpc.service.impl"/>
<context:property-placeholder location="classpath:system.properties"/>
<!--服务端配置-->
<bean id="rpcServer" class="com.yingjun.rpc.server.RPCServer">
<constructor-arg name="zookeeper" value="${zookeeper.address}"/>
<constructor-arg name="serverAddress" value="${server.address}"/>
</bean>
RPCServer中加载配置的服务类,启动Netty服务并注册handler等
public class RPCServer implements BeanNameAware, BeanFactoryAware, ApplicationContextAware, InitializingBean {
private ServiceRegistry serviceRegistry;
private String serverAddress;
//存放服务名与服务对象之间的映射关系
private Map<String, Object> serviceBeanMap = new ConcurrentHashMap<String, Object>();
//采用线程池,提高接口调用性能
private static ExecutorService threadPoolExecutor;
...
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
logger.info("setApplicationContext()");
//扫描含有@RPCService的注解类
Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(HRPCService.class);
if (MapUtils.isNotEmpty(serviceBeanMap)) {
for (Object serviceBean : serviceBeanMap.values()) {
//获取接口名称
String interfaceName = serviceBean.getClass().getAnnotation(HRPCService.class).value().getName();
logger.info("@HRPCService:" + interfaceName);
//在zookeeper上注册该接口服务
serviceRegistry.createInterfaceAddressNode(interfaceName, serverAddress);
//本地保存该接口服务
this.serviceBeanMap.put(interfaceName, serviceBean);
}
}
}
@Override
//在实例被创建时执行,后续及是init-method
//创建netty服务
public void afterPropertiesSet() throws Exception {
logger.info("afterPropertiesSet()");
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,0))
.addLast(new RPCDecoder(RPCRequest.class)) //注册解码器
.addLast(new RPCEncoder(RPCResponse.class)) // 注册编码器
.addLast(new RPCServerHandler(serviceBeanMap)); //注册服务请求处理类,并传入provider服务
}
})
.option(ChannelOption.SO_BACKLOG, 128)
// 通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
.option(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
ChannelFuture future = bootstrap.bind(host, port).sync(); //绑定IP+端口
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void submit(Runnable task) {
if (threadPoolExecutor == null) {
synchronized (RPCServer.class) {
if (threadPoolExecutor == null) {
threadPoolExecutor = Executors.newFixedThreadPool(16);
}
}
}
threadPoolExecutor.submit(task);
}
RPCServerHandler中,在channelRead0监控Netty服务请求,解析,并找到相应的服务bean进行处理,最后封装到response中,通过netty返回结果
注意:Netty接受到请求后,请求直接就返回了(虽然还没拿到结果),在Handler中处理完成后才会返回真正的调用处理结果。
public class RPCServerHandler extends SimpleChannelInboundHandler<RPCRequest> {
...
@Override
public void channelRead0(final ChannelHandlerContext ctx, final RPCRequest request) throws Exception {
logger.info("======rpc server channelRead0:" + ctx.channel().remoteAddress());
RPCServer.submit(new Runnable() {
@Override
public void run() {
logger.info("receive request:" + request.getRequestId() +
" className:" + request.getClassName() +
" methodName:" + request.getMethodName());
RPCResponse response = new RPCResponse();
response.setRequestId(request.getRequestId());
try {
//通过反射原理找到对应的服务类和方法
String className = request.getClassName();
Object serviceBean = serviceBeanMap.get(className);
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
// JDK reflect
/*Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
Object result=method.invoke(serviceBean, parameters);*/
// 避免使用 Java 反射带来的性能问题,我们使用 CGLib 提供的反射 API
FastClass serviceFastClass = FastClass.create(serviceBean.getClass());
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
Object result = serviceFastMethod.invoke(serviceBean, parameters);
response.setResult(result);
} catch (Exception e) {
response.setError(e.getMessage());
logger.error("Exception", e);
}
ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
logger.info("send response for request: " + request.getRequestId());
}
});
}
});
}
ServiceRegistry中注册提供服务的相关信息到 Zookeeper注册中心!
public class ServiceRegistry {
public ServiceRegistry(String address) {
this.address = address;
//连接zookeeper
zooKeeper = connectServer();
//创建根节点
if (zooKeeper != null) {
setRootNode();
}
}
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(address, Config.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (IOException e) {
logger.error("", e);
} catch (InterruptedException ex) {
logger.error("", ex);
}
return zk;
}
...
Provider的大致原理就是这样!
对于Customer端
核心类:
RPCClient: 主体类
RPCProxy: 动态代理类
ServiceDiscovery: 服务发现类
public class RPCClient{
private ServiceDiscovery serviceDiscovery;
public RPCClient(String zookeeper, List<String> interfaces) {
this.serviceDiscovery = new ServiceDiscovery(zookeeper, interfaces);
}
//创建用于同步调用的代理对象
public static <T> T createProxy(Class<T> interfaceClass) {
// 创建动态代理对象
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new RPCProxy<T>(interfaceClass)
);
}
可以采用上面的RPCClient的实现一次性注册所有的服务到一个RPCClient中,在使用的时候需要自己调用createProxy
但是,这对于屏蔽实现细节是不利的!
可以对于每一个服务引用生成一个RPCClient,完全屏蔽底层细节。
public class RPCClient implements BeanFactory{
private ServiceDiscovery serviceDiscovery;
private String interfaceName;
public RPCClient(String zookeeper, String interface) {
this.serviceDiscovery = new ServiceDiscovery(zookeeper, interface);
this.interfaceName = interface;
}
public Object getObject(){ //由于实现了FactoryBean,在容器初始化RPCCLient这个bean时,会默认调用getObject来获取。
Class interfaceClass = this.getClass().getClassLoader().loadClass(interfaceName);
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new RPCProxy<T>(interfaceClass));
}
public class RPCProxy<T> implements InvocationHandler {
...
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RPCRequest request = new RPCRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
logger.info("invoke class: {} method: {}", method.getDeclaringClass().getName(), method.getName());
RPCClientHandler handler = ConnectManage.getInstance().chooseHandler(method.getDeclaringClass().getName());
if(handler==null){
logger.error("NoSuchServiceException:",
new NoSuchServiceException("no such service about"+method.getDeclaringClass().getName()));
return null;
}
RPCFuture RPCFuture = handler.sendRequestBySync(request);
return RPCFuture.get();
}
这里还有一个比较关键的类RPCFuture,上面最后return调用的该类对象的get方法,其实是为了实现阻塞,因为RPC服务调用时异步非阻塞实现,customer向Provider发送请求后就直接返回了,等待Provider处理完后发response,而在拿到response结果之前customer的程序是需要阻塞的!
具体实现:
public class RPCFuture implements Future<Object> {
private CountDownLatch countDownLatch;
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
boolean awaitSuccess = false;
try {
awaitSuccess = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!awaitSuccess) {
throw new RPCTimeoutException();
}
long useTime = System.currentTimeMillis() - startTime;
logger.info("request id: {} class: {} method: {} useTime {}ms",
request.getRequestId(), request.getClassName(), request.getMethodName() , useTime);
return response.getResult();
}
public void done(RPCResponse res) {
response = res;
countDownLatch.countDown();
if (callback != null) {
if (!response.isError()) {
callback.success(response.getResult());
} else {
callback.fail(new ResponseException(response.getError()));
}
}
long useTime = System.currentTimeMillis() - startTime;
logger.info("has done requestId: {} class: {} method: {} useTime: {}",
request.getRequestId(), request.getClassName(), request.getMethodName() , useTime);
}
将RPCClientHandler注册到Netty中,用于发送调用请求,并且接受调用处理结果!
public abstract class RPCClientHandler extends SimpleChannelInboundHandler<RPCResponse> {
...
@Override
//接受到provider的服务response后,解析并执行回调
public void channelRead0(ChannelHandlerContext ctx, RPCResponse response) throws Exception {
String requestId = response.getRequestId();
RPCFuture rpcFuture = pending.get(requestId);
if (rpcFuture != null) {
pending.remove(requestId);
rpcFuture.done(response); //!!!!!!这里比较关键!
}
}
// 发送服务请求后,相关的请求信息将被保存 (服务ID,rpcFuture回调)
public RPCFuture sendRequestBySync(RPCRequest request) {
RPCFuture rpcFuture = new RPCFuture(request);
pending.put(request.getRequestId(), rpcFuture);
channel.writeAndFlush(request);
return rpcFuture;
}
public RPCFuture sendRequestByAsync(RPCRequest request, AsyncRPCCallback callback) {
RPCFuture rpcFuture = new RPCFuture(request, callback);
pending.put(request.getRequestId(), rpcFuture);
channel.writeAndFlush(request);
return rpcFuture;
}
详细代码可以阅读下:https://github.com/wosyingjun/HRPC
先记录这些,回头再润色!