顺风车运营研发团队 李乐
1.命令执行过程
1.1命令请求格式
当用户在客户端键入一条命令请求时,客户端会将其按照特定协议转换为字符串,发送给服务器;服务器解析字符串,获取命令请求;
例如,当用户执行 set key value 时,转换后的字符串为 *3rn3rnset3rnkey$5rnvaluern
其中,*3表示当前命令请求参数数目(set命令也是一个参数);rn用于分隔每个参数;3、5等表示参数字符串长度;
1.2 服务端读取命令请求
1.2.1客户端处理函数简介:
服务器启动时,会监听socket,并创建对应文件事件,监听此fd上的可读事件;(server.c/initServer)
//监听socket
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
//为所有监听的socket创建文件事件,监听可读事件;事件处理函数为acceptTcpHandler
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
}
}
当客户端连接到服务器时,会调用acceptTcpHandler处理函数,服务器会为每个链接创建一个client对象,并创建相应文件事件,监听此链接fd的可读事件,并指定事件处理函数
//接收客户端链接请求
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
}
//创建客户端
if ((c = createClient(fd)) == NULL) {
close(fd); /* May be already closed, just ignore errors */
return;
}
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
//设置fd为非阻塞;非延迟(epoll等坚挺的额socket必须是非阻塞;延迟的话发送的数据会先存储在tcp缓冲区,等到一定事件或数据量大时才会发送)
//创建文件事件,监听可读事件,事件处理函数为readQueryFromClient
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
……………………//初始化client结构体各字段
}
1.2.2 读取命令请求到输入缓冲区
命令请求字符串会先读入client的输入缓冲区中
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
…………
qblen = sdslen(c->querybuf);
//统计
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
//读取
nread = read(fd, c->querybuf+qblen, readlen);
//处理输入缓冲区
processInputBuffer(c);
…………
}
1.2.3 解析输入缓冲区数据
首先调用processMultibulkBuffer:解析*3获取行数,循环获取每一行参数(会先解析$3获取参数长度),构造为一个redisObject对象,存储在客户端结构体的argv和argc字段
其次调用processCommand处理命令请求
void processInputBuffer(client *c) {
while(sdslen(c->querybuf)) {
//判断命令请求类型;telnet发送的命令和redis-cli发送的命令请求格式不同
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
//参数个数为0时
if (c->argc == 0) {
resetClient(c);
} else {
//处理命令
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
resetClient(c);
}
if (server.current_client == NULL) break;
}
}
}
//resetClient()函数会释放client结构体arg字段中的各参数,重置argc为0
解析缓冲区字符串逻辑如下:
int processMultibulkBuffer(client *c) {
char *newline = NULL;
long pos = 0;
int ok;
long long ll;
if (c->multibulklen == 0) {
//定位到第一行结束
newline = strchr(c->querybuf,'\r');
//解析行数,即参数个数
ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
c->multibulklen = ll;
//argv村春命令参数,解析之前先清空
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
}
//循环解析所有参数
while(c->multibulklen) {
//读取$3字符串,解析数值3
if (c->bulklen == -1) {
newline = strchr(c->querybuf+pos,'\r');
if (c->querybuf[pos] != '$') {
return C_ERR;
}
ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
c->bulklen = ll;
}
//读取参数
c->argv[c->argc++] =
createStringObject(c->querybuf+pos,c->bulklen);
pos += c->bulklen+2;
c->bulklen = -1;
c->multibulklen--;
}
/* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) return C_OK;
/* Still not ready to process the command */
return C_ERR;
}
1.2.4 补充:什么是redisObject
redis底层有多种数据结构:sds,intset,ziplist,linkedlist,skiplist,dict等;redis对外提供的数据类型有字符串、列表、哈希表、有需集合、集合;
redis会根据实际情况选择合适的数据结构来存储某一种数据类型;而同一种数据类型可能使用不同的数据结构存储;
redisObject是对底层多种数据结构的进一步封装;看看结构体:
typedef struct redisObject {
unsigned type:4; //数据类型:字符串、列表、哈希表、集合、有序集合
unsigned encoding:4; //存储数据类型使用哪种数据结构,如列表的实现可能是ziplist或linkedlist
unsigned lru:LRU_BITS; //lru,数据淘汰
int refcount; //引用计数
void *ptr; //指向具体的数据结构
} robj;
1.2.5 处理命令
processInputBuffer解析输入缓冲区中命令请求字符串,将各个参数转换为redisObject存储在client结构体的argv字段中,argc存储参数个数,下一步调用processCommand处理命令
1.2.5.1命令结构体redisCommand
struct redisCommand {
char *name; //命令名称
redisCommandProc *proc; //命令处理函数指针
int arity; //命令参数个数,用于检查命令请求参数是否正确;当取值-N时,表示参数个数大于等于N
char *sflags; //标识命令属性;如读命令或写命令等
int flags; //sflags的二进制表示
//没搞明白
redisGetKeysProc *getkeys_proc;
int firstkey;
int lastkey;
int keystep;
//命令执行总耗时和执行次数
long long microseconds, calls;
};
1.2.5.2命令执行前校验
int processCommand(client *c) {
//quit命令直接返回
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
//命令字典查找指定命令;所有的命令都存储在命令字典中 struct redisCommand redisCommandTable[]={}
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) { //没有查找到命令
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { //命令请求参数个数错误
}
//是否通过认证;没有通过且必须认证时,只接受认证命令
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand){
}
//开启最大内存限制
if (server.maxmemory) {
//释放内存
int retval = freeMemoryIfNeeded();
//CMD_DENYOOM表示内存不够时,禁止执行此命令
if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
}
//当此服务器是master时:aof持久化失败时,或上一次bgsave执行错误,且配置bgsave参数和stop_writes_on_bgsave_err;禁止执行写命令
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == C_ERR) ||
server.aof_last_write_status == C_ERR) &&
server.masterhost == NULL &&
(c->cmd->flags & CMD_WRITE ||
c->cmd->proc == pingCommand)){
}
//当此服务器时master时:如果配置了repl_min_slaves_to_write,当slave数目小于时,禁止执行写命令
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write){
}
//当此服务器是slave,且配置了只读时,如果客户端不是master,则拒绝执行写命令
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE){
}
//当客户端正在订阅频道时,只会执行以下命令
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
}
//服务器为slave,但没有正确连接master时,只会执行带有CMD_STALE标志的命令,如info等
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & CMD_STALE)){
}
//正在加载数据库时,只会执行带有CMD_LOADING标志的命令,其余都会被拒绝
if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
}
//当服务器因为执行lua脚本阻塞时,只会执行以下几个命令,其余都会拒绝
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != replconfCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k')){
}
//执行命令
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){
//开启了事务,命令只会入队列;
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
//直接执行命令
call(c,CMD_CALL_FULL);
}
}
1.2.5.3 命令执行
执行命令时,会需要做很多额外的操作,统计,记录慢查询日志,传播命令道monitor、slave,aof持久化等
//flags=CMD_CALL_FULL=(CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE)
//表示需要记录慢查询日志,统计,广播命令
void call(client *c, int flags) {
//dirty记录数据库修改次数;start记录命令开始执行时间us;duration记录命令执行花费时间
long long dirty, start, duration;
int client_old_flags = c->flags;
//有监视器的话,需要将命令发送给监视器
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
//处理命令,调用命令处理函数
dirty = server.dirty;
start = ustime();
c->cmd->proc(c);
duration = ustime()-start;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
//记录慢查询日志
if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}
//统计
if (flags & CMD_CALL_STATS) {
c->lastcmd->microseconds += duration;
c->lastcmd->calls++;
}
//广播命令
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
//dirty大于0时,需要广播命令给slave和aof
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
//广播命令,写如aof,发送命令到slave
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
}
2.命令表
redis有个命令表redisCommandTable存储每个命令的详细属性
//命令名称,命令处理函数,命令参数(-3表示参数数目大于等于3个),命令属性标志,…………
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
{"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
{"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
{"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
{"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
{"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0},
{"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
…………
}
3.set命令执行
3.1 set命令介绍
SET key value [EX seconds] [PX milliseconds] [NX|XX]
将字符串值 value 关联到 key 。
如果 key 已经持有其他值, SET 就覆写旧值,无视类型。
对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。
EX、PX、NX/XX可选参数含义如下:
- EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
- PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX keymillisecond value 。
- NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
- XX :只在键已经存在时,才对键进行设置操作。
3.2 set命令执行函数
3.2.1 解析set命令请求
/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
void setCommand(client *c) {
int j;
robj *expire = NULL; //过期时间
int unit = UNIT_SECONDS; //时间单位
int flags = OBJ_SET_NO_FLAGS; //标志命令是否携带nx、xx、ex、px可选参数
for (j = 3; j < c->argc; j++) {
char *a = c->argv[j]->ptr;
robj *next = (j == c->argc-1) ? NULL : c->argv[j+1]; //最后一个参数可能是过期时间
if ((a[0] == 'n' || a[0] == 'N') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_XX))
{
flags |= OBJ_SET_NX; //NX标志
} else if ((a[0] == 'x' || a[0] == 'X') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_NX))
{
flags |= OBJ_SET_XX; //XX标志
} else if ((a[0] == 'e' || a[0] == 'E') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_PX) && next)
{
flags |= OBJ_SET_EX; //EX标志
unit = UNIT_SECONDS;
expire = next;
j++;
} else if ((a[0] == 'p' || a[0] == 'P') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_EX) && next)
{
flags |= OBJ_SET_PX; //PX标志
unit = UNIT_MILLISECONDS;
expire = next;
j++;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL); //处理命令
}
3.2.2 命令处理
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0; /* initialized to avoid any harmness warning */
//设置了过期时间;expire是robj类型,获取整数值
if (expire) {
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
return;
if (milliseconds <= 0) {
addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
return;
}
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
//NX,key存在时直接返回;XX,key不存在时直接返回
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
(flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
}
//添加都数据库字典
setKey(c->db,key,val);
server.dirty++;
//过期时间添加到过期字典
if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
//键空间通知
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
"expire",key,c->db->id);
addReply(c, ok_reply ? ok_reply : shared.ok);
}