【Redis学习笔记】2018-06-21 redis命令执行过程 SET

顺风车运营研发团队 李乐
1.命令执行过程

1.1命令请求格式

当用户在客户端键入一条命令请求时,客户端会将其按照特定协议转换为字符串,发送给服务器;服务器解析字符串,获取命令请求;

例如,当用户执行 set key value 时,转换后的字符串为 *3rn3rnset3rnkey$5rnvaluern

其中,*3表示当前命令请求参数数目(set命令也是一个参数);rn用于分隔每个参数;3、5等表示参数字符串长度;

1.2 服务端读取命令请求

1.2.1客户端处理函数简介:

服务器启动时,会监听socket,并创建对应文件事件,监听此fd上的可读事件;(server.c/initServer)

//监听socket
if (server.port != 0 &&
    listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
    exit(1);
 
//为所有监听的socket创建文件事件,监听可读事件;事件处理函数为acceptTcpHandler
for (j = 0; j < server.ipfd_count; j++) {
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR)
        {
     
        }
}

当客户端连接到服务器时,会调用acceptTcpHandler处理函数,服务器会为每个链接创建一个client对象,并创建相应文件事件,监听此链接fd的可读事件,并指定事件处理函数

//接收客户端链接请求
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
}
 
//创建客户端
if ((c = createClient(fd)) == NULL) {
    close(fd); /* May be already closed, just ignore errors */
    return;
}
 
 
client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));
 
    //设置fd为非阻塞;非延迟(epoll等坚挺的额socket必须是非阻塞;延迟的话发送的数据会先存储在tcp缓冲区,等到一定事件或数据量大时才会发送)
    //创建文件事件,监听可读事件,事件处理函数为readQueryFromClient
    if (fd != -1) {
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
    ……………………//初始化client结构体各字段
}

1.2.2 读取命令请求到输入缓冲区

命令请求字符串会先读入client的输入缓冲区中

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
 
    …………
 
    qblen = sdslen(c->querybuf);
 
    //统计
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    //读取
    nread = read(fd, c->querybuf+qblen, readlen);
 
    //处理输入缓冲区
    processInputBuffer(c);
 
    …………
}

1.2.3 解析输入缓冲区数据

首先调用processMultibulkBuffer:解析*3获取行数,循环获取每一行参数(会先解析$3获取参数长度),构造为一个redisObject对象,存储在客户端结构体的argv和argc字段

其次调用processCommand处理命令请求

 void processInputBuffer(client *c) {
     
    while(sdslen(c->querybuf)) {
        
        //判断命令请求类型;telnet发送的命令和redis-cli发送的命令请求格式不同
        if (!c->reqtype) {
            if (c->querybuf[0] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }
 
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }
 
        //参数个数为0时
        if (c->argc == 0) {
            resetClient(c);
        } else {
            //处理命令
            if (processCommand(c) == C_OK) {
                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                 
                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                    resetClient(c);
            }
             
            if (server.current_client == NULL) break;
        }
    }
}
 
//resetClient()函数会释放client结构体arg字段中的各参数,重置argc为0

解析缓冲区字符串逻辑如下:

int processMultibulkBuffer(client *c) {
    char *newline = NULL;
    long pos = 0;
    int ok;
    long long ll;
 
    if (c->multibulklen == 0) {
 
       //定位到第一行结束
        newline = strchr(c->querybuf,'\r');
       
        //解析行数,即参数个数
        ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
       
        c->multibulklen = ll;
 
        //argv村春命令参数,解析之前先清空
        if (c->argv) zfree(c->argv);
        c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
    }
 
    //循环解析所有参数
    while(c->multibulklen) {
         
        //读取$3字符串,解析数值3
        if (c->bulklen == -1) {
            newline = strchr(c->querybuf+pos,'\r');
             
            if (c->querybuf[pos] != '$') {
                return C_ERR;
            }
 
            ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
             
            c->bulklen = ll;
        }
 
        //读取参数
        c->argv[c->argc++] =
        createStringObject(c->querybuf+pos,c->bulklen);
        pos += c->bulklen+2;
             
        c->bulklen = -1;
        c->multibulklen--;
         
    }
 
    /* We're done when c->multibulk == 0 */
    if (c->multibulklen == 0) return C_OK;
 
    /* Still not ready to process the command */
    return C_ERR;
}

1.2.4 补充:什么是redisObject

redis底层有多种数据结构:sds,intset,ziplist,linkedlist,skiplist,dict等;redis对外提供的数据类型有字符串、列表、哈希表、有需集合、集合;

redis会根据实际情况选择合适的数据结构来存储某一种数据类型;而同一种数据类型可能使用不同的数据结构存储;

redisObject是对底层多种数据结构的进一步封装;看看结构体:

typedef struct redisObject {
    unsigned type:4; //数据类型:字符串、列表、哈希表、集合、有序集合
    unsigned encoding:4; //存储数据类型使用哪种数据结构,如列表的实现可能是ziplist或linkedlist
    unsigned lru:LRU_BITS; //lru,数据淘汰
    int refcount; //引用计数
    void *ptr;    //指向具体的数据结构
} robj;

1.2.5 处理命令

processInputBuffer解析输入缓冲区中命令请求字符串,将各个参数转换为redisObject存储在client结构体的argv字段中,argc存储参数个数,下一步调用processCommand处理命令

1.2.5.1命令结构体redisCommand

struct redisCommand {
    char *name; //命令名称
    redisCommandProc *proc; //命令处理函数指针
    int arity; //命令参数个数,用于检查命令请求参数是否正确;当取值-N时,表示参数个数大于等于N
    char *sflags; //标识命令属性;如读命令或写命令等
    int flags;    //sflags的二进制表示
     
    //没搞明白
    redisGetKeysProc *getkeys_proc;
    int firstkey;
    int lastkey; 
    int keystep;
 
    //命令执行总耗时和执行次数
    long long microseconds, calls;
};

1.2.5.2命令执行前校验

int processCommand(client *c) {
     
    //quit命令直接返回
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }
 
    //命令字典查找指定命令;所有的命令都存储在命令字典中 struct redisCommand redisCommandTable[]={}
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
 
    if (!c->cmd) { //没有查找到命令
         
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { //命令请求参数个数错误
     
    }
 
    //是否通过认证;没有通过且必须认证时,只接受认证命令
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand){
     
    }
     
    //开启最大内存限制
    if (server.maxmemory) {
    //释放内存
    int retval = freeMemoryIfNeeded();
     
    //CMD_DENYOOM表示内存不够时,禁止执行此命令
    if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
         
    }
 
    //当此服务器是master时:aof持久化失败时,或上一次bgsave执行错误,且配置bgsave参数和stop_writes_on_bgsave_err;禁止执行写命令
    if (((server.stop_writes_on_bgsave_err &&
        server.saveparamslen > 0 &&
        server.lastbgsave_status == C_ERR) ||
        server.aof_last_write_status == C_ERR) &&
        server.masterhost == NULL &&
        (c->cmd->flags & CMD_WRITE ||
        c->cmd->proc == pingCommand)){
    
    }
 
    //当此服务器时master时:如果配置了repl_min_slaves_to_write,当slave数目小于时,禁止执行写命令
    if (server.masterhost == NULL &&
        server.repl_min_slaves_to_write &&
        server.repl_min_slaves_max_lag &&
        c->cmd->flags & CMD_WRITE &&
        server.repl_good_slaves_count < server.repl_min_slaves_to_write){
     
    }
 
    //当此服务器是slave,且配置了只读时,如果客户端不是master,则拒绝执行写命令
    if (server.masterhost && server.repl_slave_ro &&
        !(c->flags & CLIENT_MASTER) &&
        c->cmd->flags & CMD_WRITE){
     
    }
 
    //当客户端正在订阅频道时,只会执行以下命令
    if (c->flags & CLIENT_PUBSUB &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
     
    }
 
    //服务器为slave,但没有正确连接master时,只会执行带有CMD_STALE标志的命令,如info等
    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
        server.repl_serve_stale_data == 0 &&
        !(c->cmd->flags & CMD_STALE)){
    
    }
 
    //正在加载数据库时,只会执行带有CMD_LOADING标志的命令,其余都会被拒绝
    if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
    
    }
 
    //当服务器因为执行lua脚本阻塞时,只会执行以下几个命令,其余都会拒绝
    if (server.lua_timedout &&
        c->cmd->proc != authCommand &&
        c->cmd->proc != replconfCommand &&
        !(c->cmd->proc == shutdownCommand &&
        c->argc == 2 &&
        tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
        !(c->cmd->proc == scriptCommand &&
        c->argc == 2 &&
        tolower(((char*)c->argv[1]->ptr)[0]) == 'k')){
     
    }
 
    //执行命令
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){
        //开启了事务,命令只会入队列;
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        //直接执行命令
        call(c,CMD_CALL_FULL);
    
    }
}  

1.2.5.3 命令执行

执行命令时,会需要做很多额外的操作,统计,记录慢查询日志,传播命令道monitor、slave,aof持久化等

//flags=CMD_CALL_FULL=(CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE)
//表示需要记录慢查询日志,统计,广播命令
void call(client *c, int flags) {
 
    //dirty记录数据库修改次数;start记录命令开始执行时间us;duration记录命令执行花费时间
    long long dirty, start, duration;
    int client_old_flags = c->flags;
 
    //有监视器的话,需要将命令发送给监视器
    if (listLength(server.monitors) &&
        !server.loading &&
        !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
    {
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }
 
    //处理命令,调用命令处理函数
    dirty = server.dirty;
    start = ustime();
    c->cmd->proc(c);
    duration = ustime()-start;
    dirty = server.dirty-dirty;
    if (dirty < 0) dirty = 0;
 
    //记录慢查询日志
    if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
        char *latency_event = (c->cmd->flags & CMD_FAST) ?
                              "fast-command" : "command";
        latencyAddSampleIfNeeded(latency_event,duration/1000);
        slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
    }
    //统计
    if (flags & CMD_CALL_STATS) {
        c->lastcmd->microseconds += duration;
        c->lastcmd->calls++;
    }
 
    //广播命令
    if (flags & CMD_CALL_PROPAGATE &&
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
    {
        int propagate_flags = PROPAGATE_NONE;
 
        //dirty大于0时,需要广播命令给slave和aof
        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
 
        
        if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
        if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
 
   
        if (c->flags & CLIENT_PREVENT_REPL_PROP ||
            !(flags & CMD_CALL_PROPAGATE_REPL))
                propagate_flags &= ~PROPAGATE_REPL;
        if (c->flags & CLIENT_PREVENT_AOF_PROP ||
            !(flags & CMD_CALL_PROPAGATE_AOF))
                propagate_flags &= ~PROPAGATE_AOF;
 
        //广播命令,写如aof,发送命令到slave
        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
    }
}

2.命令表

redis有个命令表redisCommandTable存储每个命令的详细属性

//命令名称,命令处理函数,命令参数(-3表示参数数目大于等于3个),命令属性标志,…………
struct redisCommand redisCommandTable[] = {
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
    {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
    {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
    {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0},
    {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
    …………
}

3.set命令执行

3.1 set命令介绍

SET key value [EX seconds] [PX milliseconds] [NX|XX]

将字符串值 value 关联到 key 。

如果 key 已经持有其他值, SET 就覆写旧值,无视类型。

对于某个原本带有生存时间(TTL)的键来说, 当 SET 命令成功在这个键上执行时, 这个键原有的 TTL 将被清除。

EX、PX、NX/XX可选参数含义如下:

  • EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
  • PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX keymillisecond value 。
  • NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
  • XX :只在键已经存在时,才对键进行设置操作。

3.2 set命令执行函数

3.2.1 解析set命令请求

/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
void setCommand(client *c) {
    int j;
    robj *expire = NULL;        //过期时间
    int unit = UNIT_SECONDS;    //时间单位
    int flags = OBJ_SET_NO_FLAGS; //标志命令是否携带nx、xx、ex、px可选参数
 
    for (j = 3; j < c->argc; j++) {
        char *a = c->argv[j]->ptr;
        robj *next = (j == c->argc-1) ? NULL : c->argv[j+1]; //最后一个参数可能是过期时间
 
        if ((a[0] == 'n' || a[0] == 'N') &&
            (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
            !(flags & OBJ_SET_XX))
        {
            flags |= OBJ_SET_NX;   //NX标志
        } else if ((a[0] == 'x' || a[0] == 'X') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_NX))
        {
            flags |= OBJ_SET_XX;  //XX标志
        } else if ((a[0] == 'e' || a[0] == 'E') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_PX) && next)
        {
            flags |= OBJ_SET_EX;   //EX标志
            unit = UNIT_SECONDS;
            expire = next;
            j++;
        } else if ((a[0] == 'p' || a[0] == 'P') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_EX) && next)
        {
            flags |= OBJ_SET_PX;   //PX标志
            unit = UNIT_MILLISECONDS;
            expire = next;
            j++;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }
     
    c->argv[2] = tryObjectEncoding(c->argv[2]);
    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL); //处理命令
}

3.2.2 命令处理

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0; /* initialized to avoid any harmness warning */
 
    //设置了过期时间;expire是robj类型,获取整数值
    if (expire) {
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
            return;
        if (milliseconds <= 0) {
            addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
            return;
        }
        if (unit == UNIT_SECONDS) milliseconds *= 1000;
    }
 
    //NX,key存在时直接返回;XX,key不存在时直接返回
    if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
    {
        addReply(c, abort_reply ? abort_reply : shared.nullbulk);
        return;
    }
 
    //添加都数据库字典
    setKey(c->db,key,val);
    server.dirty++;
 
    //过期时间添加到过期字典
    if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
 
    //键空间通知
    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
        "expire",key,c->db->id);
    addReply(c, ok_reply ? ok_reply : shared.ok);
}
    原文作者:LNMPR源码研究
    原文地址: https://segmentfault.com/a/1190000015404627
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞