问题:客户端发起远程调用,如果服务端长时间不返回怎么办?
这就涉及到一个调用超时的问题,平时我们应用中很多场景都会规定超时时间,比如:sql查询超时,http请求超时等。那么如果服务端方法执行的时间超过规定的timeout时间,那么客户端就需要调出当前调用,抛出TimeoutException。
好了,下面开始对RpcBuidler进行改造了,让其支持超时情况的处理。同样,先给出预期的测试方案和结果:
// 业务类UserService在之前的基础上增加超时调用的方法:
public interface UserService {
// other method
/**
* 超时测试
*/
public boolean timeoutTest();
}
//实现类
public class UserServiceImpl implements UserService {
// other method
@Override
public boolean timeoutTest() {
try {
//模拟长时间执行
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {}
return true;
}
}
ClientTest中测试代码:
@Test
public void timeoutTest(){
long beginTime = System.currentTimeMillis();
try {
boolean result = userService.timeoutTest();
} catch (Exception e) {
long period = System.currentTimeMillis() - beginTime;
System.out.println("period:" + period);
Assert.assertTrue(period < 3100);
}
}
有了异步方法的实现经验,其实这个超时处理过程和异步非常类似,都是利用Future机制来实现的,下面对doInvoke方法进行重构,返回一个异步任务:
private Future<RpcResponse> doInvoke(final RpcRequest request) throws IOException, ClassNotFoundException{
//构造并提交FutureTask异步任务
Future<RpcResponse> retVal = (Future<RpcResponse>) handlerPool.submit(new Callable<RpcResponse>(){
@Override
public RpcResponse call() throws Exception {
Object res = null;
try{
//创建连接,获取输入输出流
Socket socket = new Socket(host,port);
try{
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
try{
//发送
out.writeObject(request);
//接受server端的返回信息---阻塞
res = in.readObject();
}finally{
out.close();
in.close();
}
}finally{
socket.close();
}
}catch(Exception e){
throw e;
}
return (RpcResponse)res;
}
});
return retVal;
}
回调方法invoke修改如下:
@Override
public Object invoke(Object proxy, Method method,
Object[] args) throws Throwable {
//如果是异步方法,立即返回null
if(asyncMethods.get().contains(method.getName())) return null;
Object retVal = null;
RpcRequest request = new RpcRequest(method.getName(), method.getParameterTypes(),args,RpcContext.getAttributes());
RpcResponse rpcResp = null;
try{
Future<RpcResponse> response = doInvoke(request);
//获取异步结果
rpcResp = (RpcResponse)response.get(TIMEOUT,TimeUnit.MILLISECONDS);
}catch(TimeoutException e){
throw e;
}catch(Exception e){}
if(!rpcResp.isError()){
retVal = rpcResp.getResponseBody();
}else{
throw new RpcException(rpcResp.getErrorMsg());
}
return retVal;
}
可见,经过这样改造后,所有的方法调用都是通过Future获取结果。
提供Hook,让开发人员进行RPC层面的AOP。
首先看下题目提供的Hook接口:
public interface ConsumerHook {
public void before(RpcRequest request);
public void after(RpcRequest request);
}
//实现类
public class UserConsumerHook implements ConsumerHook{
@Override
public void before(RpcRequest request) {
RpcContext.addAttribute("hook key","this is pass by hook");
}
@Override
public void after(RpcRequest request) {
System.out.println("I have finished Rpc calling.");
}
}
hook实现的功能很简单,即在客户端进行远程调用的前后执行before和after方法。
public final class RpcConsumer implements InvocationHandler{
//。。。
//钩子
private ConsumerHook hook;
public RpcConsumer hook(ConsumerHook hook){
this.hook = hook;
return this;
}
static{
userService = (UserService)consumer.targetHostPort(host, port)
.interfaceClass(UserService.class)
.timeout(TIMEOUT)
.hook(new UserConsumerHook())//新增钩子
.newProxy();
}
//。。。
}
//UserServiceImpl中的测试方法
public Map<String, Object> getMap() {
Map<String,Object> newMap = new HashMap<String,Object>();
newMap.put("name","getMap");
newMap.putAll(RpcContext.getAttributes());
return newMap;
}
我们只需要在doInvoke方法开始出添加钩子函数的执行逻辑即可。如下:
private Future<RpcResponse> doInvoke(final RpcRequest request) throws IOException, ClassNotFoundException{
//插入钩子
hook.before(request);
//。。。
}
同时在asyncCall和invoke方法的结束添加after的执行逻辑。具体实现可以看源码。