分析从客户端发送命令,到服务端执行命令、返回执行结果经历的整个过程。
建立连接
无论是redis-cli还是Jedis这样的三方包客服端,要向Redis服务器发送命令,首先要建立与Redis服务器之间的TCP连接。在分析Redis启动过程时,初始化这一步会注册事件处理器:
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
在Redis配置文件中有一项
bind
配置,通过bind
可以配置监听来自哪些网络接口请求。Redis在启动时会监听这些接口,将fds保存在server.ipfd
数组中。
Redis循环所有的网络接口,为这些接口绑定AE_READABLE(可读事件),事件的处理器是acceptTcpHandler
。
aeCreateFileEvent
函数会为每个对应的fd绑定一个aeFileEvent
,aeFileEvent
中绑定了处理读写事件的函数。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
//根据fd值获取aeFileEvent,后面用来绑定aeFileProc
aeFileEvent *fe = &eventLoop->events[fd];
//注册事件,取决于具体实现(epoll、select等)
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
//这里注册的事读事件,aeFileProc是acceptTcpHandler
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
当有客户端尝试连接Redis服务器时,aeApiPoll
函数会返回1,并从eventLoop->fired[j]
中获取发生事件的fd,进而获取到对应的aeFileEvent
。因为之前在serverSocket上注册的是AE_READABLE
事件,所以调用fe->rfileProc
处理客户端连接(acceptTcpHandler
)。
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
...
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
...
}
在acceptTcpHandler
函数中,会创建与客户端之间的socket连接、调用createClient
创建客户端:
if ((c = createClient(fd)) == NULL) {
...
return;
}
Redis在调用
createClient
后会判断客户端数量,如果超过上限会返回错误信息并关闭客户端。因为在createClient
中已经将socket设置为non-bolcking,如果超出上限在向客户端写入错误信息时,不会被阻塞。
在createClient函数中注册了readQueryFromClient
处理器并初始化客户端:
//在socket上注册读事件处理器
if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR)
...
//设置默认db
selectDb(c,0);
//客户端id
c->id = server.next_client_id++;
//客户端命令缓冲区
c->querybuf = sdsempty();
//命令参数个数
c->argc = 0;
//参数列表
c->argv = NULL;
//回复列表
c->reply = listCreate();
//回复列表长度
c->reply_bytes = 0;
//watch命令的key
c->watched_keys = listCreate();
//精确订阅的频道
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
//匹配模式订阅的频道
c->pubsub_patterns = listCreate();
执行到这里客户端与服务器连接建立完成,服务器已经为客户端socket注册了命令处理器,等待客户端发送命令。
客户端发送命令
客户端向socket写入RESP协议格式的命令,等待服务器返回执行结果。RESP协议可以参考这里Redis协议:RESP。
假设客户端向服务器发送了命令:
SET simpleKey simpleValue
转换为RESP协议之后是:
*3\r\n$3\r\nSET\r\n$9\r\nsimpleKey\r\n$11\r\nsimpleValue\r\n
服务器解析命令
在经历了N个事件循环之后,客户端发送的请求终于到达了服务器,还是老位置:
numevents = aeApiPoll(eventLoop, tvp);
只是这次事件处理器变成了readQueryFromClient
。在readQueryFromClient
函数中会读取readlen个字节到c->querybuf
中,默认readlen长度为1024*16。
//获取当前长度
qblen = sdslen(c->querybuf);
//计算峰值
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
//扩展sds长度,以保存readlen字节的内容
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
//读取数据
nread = read(fd, c->querybuf+qblen, readlen);
如果c->querybuf
的长度没有超过server.client_max_querybuf_len
的限制,就会开始解析c->querybuf
中的内容。
在processInputBuffer
函数中,会一直循环处理c->querybuf
中的内容,直到全部处理完,或者processInlineBuffer
和processMultibulkBuffer
返回C_ERR(c->flags变化也会导致退出循环,但跟处理命令流程关系不大,这里不展开)。
void processInputBuffer(client *c) {
//设置当前正在处理的client
server.current_client = c;
//当client缓冲中有内容时循环
while(sdslen(c->querybuf)) {
...
//判断命令类型
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
...
//解析命令到argc和argv
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
if (c->argc == 0) {
resetClient(c);
} else {
//执行命令
if (processCommand(c) == C_OK)
//重置客户端,记录precmd,释放参数列表等
resetClient(c);
}
}
server.current_client = NULL;
}
绝大多数Redis命令都是PROTO_REQ_MULTIBULK
类型的,例如上面的SET
命令,会通过processMultibulkBuffer
来解析,整个解析大致可以分为两个步骤,确认有多少个bulk和确认每个bulk的长度,对照RESP协议:
*3\r\n$3\r\nSET\r\n$9\r\nsimpleKey\r\n$11\r\nsimpleValue\r\n
第一步是读取*3\r\n
,确认有3个bulk。之后是循环解析$3\r\n
$9\r\n
$11\r\n
,确认3个bulk的长度,将解析结果放入argc(数量)和argv(参数列表)中:
c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen);
解析结果argc=3,argv=[‘SET’, ‘simpleKey’, ‘simpleValue’]。
实际上argv是redisObject结构的数据,这里为了方便理解直接用数组结构表达。
服务器执行命令
在上一步中Redis服务器已经将客户端的请求解析完成,参数保存在client的argv中。在processCommand
函数中首先会通过argv[0]在命令字典中找到对应的命令然后做一系列的判断,例如client是否通过auth验证、命令参数个数是否正确、是否开启了集群功能需要转向请求、服务器最大内存限制判断等等,这里只专注于命令执行,简化后的代码如下:
int processCommand(client *c) {
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
...
} else {
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return C_OK;
}
else分支中的call函数真正调用了命令执行函数:
c->cmd->proc(c);
client的cmd是一个redisCommand
结构变量,它的结构是:
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
char *sflags; /* Flags as string representation, one char per flag. */
int flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* Used for Redis Cluster redirect. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
long long microseconds, calls;
};
proc可以在server.c文件中的redisCommandTable
中找到:
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}
setCommand
属于String类型值的命令,可以在t_string.c中找到。setCommand
函数中会针对NX EX expire等进行判断,最终通过dict的setKey函数设置键值对,更新server.dirty值:
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
...
setKey(c->db,key,val);
server.dirty++;
...
addReply(c, ok_reply ? ok_reply : shared.ok);
}
c->cmd->proc(c)
执行完成后,call函数中还要进行一些收尾工作:
- 判断是否需要写slowlog。
- 更新cmd总执行时间和次数。
- 向Slave和AOF传播命令。
服务器返回结果
在setCommand
函数中,调用了addReply
函数向client的输出缓冲或reply中写入返回结果,返回结果为shared.ok
:
shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyObjectToList(c,obj);
}
...
}
prepareClientToWrite
函数中会判断client的flag,如果符合条件将当前client放入server.clients_pending_write
链表。
有几种情况会跳过:
- 如果是执行lua脚本的client;
- 如果client设置了CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP;
- 如果client是连接master的。
以上几种情况不需要回复。
_addReplyToBuffer
函数会尝试将命令结果放入输出缓冲(c->buf)中,如果不成功(c-reply中有内容,或者超过缓冲大小),会调用_addReplyObjectToList
函数放入c->reply链表中。
以上内容与命令执行在一次事件循环中,因为算是输出执行结果的一部分,所以到了返回结果的一节中。
再贴一下事件循环的代码,在aeProcessEvents
执行前会先执行eventLoop->beforesleep
函数,这个函数在main函数中指定,是beforeSleep
。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
在beforeSleep
函数中与输出结果相关的是调用了handleClientsWithPendingWrites
函数。
int handleClientsWithPendingWrites(void) {
...
//获取待输出结果的client数量
int processed = listLength(server.clients_pending_write);
...
while((ln = listNext(&li))) {
...
//输出buf内容
if (writeToClient(c->fd,c,0) == C_ERR) continue;
...
//注册写事件处理器
if (clientHasPendingReplies(c) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
return processed;
}
代码中调用了writeToClient
函数输出结果,如果调用writeToClient
后还有待输出内容,则为client注册写事件处理器sendReplyToClient
。
int writeToClient(int fd, client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
sds o;
//如果client还有待输出结果,执行循环
while(clientHasPendingReplies(c)) {
//先检查buf中是否有内容
if (c->bufpos > 0) {
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
//统计本次一共输出了多少子节
totwritten += nwritten;
//如果输出子节与buf中数量一直,代表缓冲内容已经全部输出
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
//检查c->reply中
} else {
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o);
nwritten = write(fd, o + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
c->reply_bytes -= objlen;
}
}
server.stat_net_output_bytes += totwritten;
//如果输出的字节数量已经超过NET_MAX_WRITES_PER_EVENT限制,break
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory)) break;
}
...
return C_OK;
}
在writeToClient
函数中,会检查缓冲和reploy中的内容,向客户端输出内容。如果超过了每次输出的最大值NET_MAX_WRITES_PER_EVENT
会跳出循环。
如果缓冲区和reply中的内容没有输出完,handleClientsWithPendingWrites
函数中会为client关联写事件处理器sendReplyToClient
,在后面的事件循环中socket会返回并调用sendReplyToClient
继续输出。sendReplyToClient
函数内部直接调用了writeToClient
函数,区别是参数handler_installed
不同,需要对事件处理器做额外的处理:
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
//如果内容已经全部输出,删除事件处理器
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
freeClient(c);
return C_ERR;
}
}
客户端接收命令返回结果
client可以从socket中收到server返回的RESP结果的返回结果,经过转换返回给调用端或者在控制台输出。