Netty只提供的异步传输数据的方式,但是并没有实现多路复用的client。
一个分布式的客户端代码基本是这个样子的:
public Response sent(final Request request) {
channel.writeAndFlush(request);
return clientChannelInitializer.getResponse(request.getMessageId());
}
首先通过channel发送一个请求到server,然后等待server返回的结果。
但是Netty是异步的,writeAndFlush这个方法,只是告诉server,我要发数据了,然后就马上返回了。所以这时直接调用getResponse,会得不到值。因为netty是在回调里面写返回值的。
解决的办法是,使用BlockingQueue接收返回的数据。一旦BlockingQueue有数据了,就take出来。如果没数据,就一直等待。
在单线程里,这个办法没问题。这就要确保Netty的client只被一个线程访问。如果是多线程同时访问,因为异步的原因,有可能第二个线程的返回值被第一个线程拿到。举个例子:
线程A使用client发送请求
线程A从BlockingQueue中取数据。这时Queue为空,线程A等待。
server接受并开启一个线程处理请求A
线程B使用client发送请求
线程B从BlockingQueue中取数据。这时Queue为空,线程B也等待。
server接受并开启另一个线程处理请求B
线程B的请求处理速度较快,先返回
client将返回值B写入BlockingQueue
BlockQueue有数据了,线程A take到数据,但是是线程B的结果
这就导致request和response不匹配。
要解决这个问题有3个办法。
使用短连接,每次请求new一个client,接收的response后close掉这个client。这个的缺点很明显,这就相当于http服务器了,不能发挥内网长连接的优势。
使用连接池,每次从连接池里拿一个client,接收完response后将这个client返还连接池。这个就有点像数据库连接池。这个方法没什么缺点,但是需要自己实现连接池。
使用单一client,但是在request中生成一个唯一的messageId,可以是nanoTime。然后server处理完后,在response中也返回这个messageId。这样在client不是维护一个BlockingQueue,而是维护一个ConcurrentHashMap,key是messageId,value是一个空的BlockingQueue。当client发送resquest到server时,在Map里写入messageId,并实例化一个BlockingQueue(为了优化,size可以是1)。然后等待这个BlockingQueue有值。在接收到response的回调方法里,根据messageId取出blockingQueue的值,然后删除掉这个Key。
我自己实现了第三种方式,具体代码请参见: