Redis Pipelining
可以一次发送多个命令,并按顺序执行、返回结果,节省RTT(Round Trip Time)。
使用Pipelining
Jedis客户端支持Redis的Pipelining,使用方式如下:
public static void main(String[] args) {
Jedis jedis = null;
Pipeline pipeline = null;
try{
jedis = new Jedis("localhost", 6379);
//使用pipeline
pipeline = jedis.pipelined();
//开始时间
long start = System.currentTimeMillis();
//删除lists
pipeline.del("lists");
//循环添加10000个元素
for(int i = 0; i < 10000; i++){
pipeline.rpush("lists", i + "");
}
//执行
pipeline.sync();
//结束时间
long end = System.currentTimeMillis();
System.out.println(end - start);
}catch (Exception e){
e.printStackTrace();
}finally {
if(pipeline != null){
try {
pipeline.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(jedis != null){
jedis.close();
}
}
}
=> "237"
同样的命令,不使用Pipelining消耗时间是800ms-900ms。测试时Redis Server和Client都运行在同一台机器,由于本地环回接口(loopback interface)的原因RTT会非常短,真实环境下的差距会更大。
从协议层面看Pipelining
从Redis的RESP协议上看,Pipelining并没有什么特殊的地方,只是把多个命令连续的发送给Redis Server,然后一一解析返回结果:
public static void main(String[] args) throws Exception{
Socket socket = new Socket();
//TIME_WAIT状态下可以复用端口
socket.setReuseAddress(true);
//空闲时发送数据包,确认服务端状态
socket.setKeepAlive(true);
//关闭Nagle算法,尽快发送
socket.setTcpNoDelay(true);
//调用close方法立即关闭socket,丢弃所有未发送的数据包
socket.setSoLinger(true, 0);
//连接server
socket.connect(new InetSocketAddress("localhost", 6379), 3000);
//设置读取时超时时间
socket.setSoTimeout(3000);
OutputStream os = socket.getOutputStream();
InputStream is = socket.getInputStream();
/**
* SET 命令 写第一个命令
* 协议: array 3个元素 SET simpleKey simpleValue
*/
os.write(getBytes("*3\r\n$3\r\nSET\r\n$9\r\nsimpleKey\r\n$11\r\nsimpleValue\r\n"));
/**
* GET 命令 写第二个命令
* 协议: array 2个元素 GET simpleKey
*/
os.write(getBytes("*2\r\n$3\r\nGET\r\n$9\r\nsimpleKey\r\n"));
os.flush();
/**
* 解析第一个命令SET的返回结果
*/
String result = analysisResult(is);
System.out.println("SET command response : " + result);
System.out.println();
/**
* 解析第二个命令GET返回结果
*/
String value = analysisResult(is);
System.out.println("GET command response : " + value);
}
=>
response type is : +
SET command response : OK
response type is : $
$ value len : 11
GET command response : simpleValue
RESP协议和详细代码可以参考《Redis协议:RESP》。
Jedis的Pipelining实现方式
Pipeline pipeline = jedis.pipelined();
通过Jedis对象的pipelined方法可以创建Pipeline对象。pipelined方法内部实际上是把Jedis对象的client赋给了pipeline。在《Redis客户端:Jedis》中介绍过Jedis类的结构,Pipeline类的结构与Jedis类似也实现了多个接口。不同的是方法的返回值,所有Pipeline中方法的返回值都被封装成了Response类。
当通过Pipeline对象执行命令时,同样也会委托给内部的Client对象去执行,但不会立即调用client的getXXX方法获取返回结果,而是创建了一个Response对象:
public Response<Long> del(String key) {
getClient(key).del(key);
return getResponse(BuilderFactory.LONG);
}
DEL
命令的返回值是0或1表示是否删除成功,所以传入了BuilderFactory.LONG
用来解析Integer型的返回结果。
protected <T> Response<T> getResponse(Builder<T> builder) {
Response<T> lr = new Response<T>(builder);
pipelinedResponses.add(lr);
return lr;
}
在getResponse方法中,创建了Response对象,每个Response对象都有一个解析返回结果的Builder。Response按照命令的执行顺序被添加到pipelinedResponses队列中。
Pipeline对象的sync方法会真正的执行命令:
public void sync() {
if (getPipelinedResponseLength() > 0) {
//这里会真正的调用client执行命令,并获取返回结果
List<Object> unformatted = client.getAll();
//将按照协议解析、分隔好的返回结果,按顺序赋给队列中的Response
for (Object o : unformatted) {
generateResponse(o);
}
}
}
在getAll方法中会按照RESP协议的结构解析返回结果,将输入流中的内容按照协议格式切分成每个命令的返回结果:
public List<Object> getAll(int except) {
List<Object> all = new ArrayList<Object>();
//执行命令
flush();
//pipelinedCommands是一个计数器,记录了执行了多少个命令
while (pipelinedCommands > except) {
try {
//readProtocolWithCheckingBroken中解析了返回结果
all.add(readProtocolWithCheckingBroken());
} catch (JedisDataException e) {
all.add(e);
}
//每解析一个命令,计数减1,为0时退出循环
pipelinedCommands--;
}
return all;
}
readProtocolWithCheckingBroken方法解析的方式与普通Jedis解析方式一致。当从Response获取返回结果时,会用设置好的Builder把unformatted的数据转换成对应的结构。
public class Response<T> {
protected T response = null;
...
//通过get获取命令返回值
public T get() {
...
if (!built) {
//通过builder格式化返回结果
build();
}
...
return response;
}
private void build() {
...
response = builder.build(data);
...
}
}