顺风车运营研发团队 谭淼
1、dump
dump命令可以序列化给定 key ,并返回被序列化的值,使用 RESTORE命令可以将这个值反序列化为 Redis 键。
/* DUMP keyname
* DUMP is actually not used by Redis Cluster but it is the obvious
* complement of RESTORE and can be useful for different applications. */
void dumpCommand(client *c) {
robj *o, *dumpobj;
rio payload;
/* 检查key是否存在 */
if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
addReply(c,shared.nullbulk);
return;
}
/* 创建序列化负载 */
createDumpPayload(&payload,o);
/* 传输给客户端 */
dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr);
addReplyBulk(c,dumpobj);
decrRefCount(dumpobj);
return;
}
dump命令的核心内容是创建序列化负载,该功能的实现是调用createDumpPayload()函数。
/* Generates a DUMP-format representation of the object 'o', adding it to the
* io stream pointed by 'rio'. This function can't fail. */
void createDumpPayload(rio *payload, robj *o) {
unsigned char buf[2];
uint64_t crc;
/* Serialize the object in a RDB-like format. It consist of an object type
* byte followed by the serialized object. This is understood by RESTORE. */
rioInitWithBuffer(payload,sdsempty());
/* 在负载中添加对象的类型 */
serverAssert(rdbSaveObjectType(payload,o));
/* 根据不同的对象类型序列号对象 */
serverAssert(rdbSaveObject(payload,o));
/* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+
* ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
* ----------------+---------------------+---------------+
* RDB version and CRC are both in little endian.
*/
/* RDB的版本,这部分被分成了两个字节存储,可以表示0-65535*/
buf[0] = RDB_VERSION & 0xff;
buf[1] = (RDB_VERSION >> 8) & 0xff;
payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
/* 计算CRC64校验码,共8字节 */
crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
sdslen(payload->io.buffer.ptr));
memrev64ifbe(&crc);
payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
}
根据上面的代码可以看出序列化后的内容由下面几部分组成:
+————-+———————+—————+
| RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
+————-+———————+—————+
浩含给出的更加详细的介绍wiki链接:dump与restore
2、exists
exists命令可以检查给定 key 是否存在。
/* EXISTS key1 key2 ... key_N.
* Return value is the number of keys existing. */
void existsCommand(client *c) {
long long count = 0;
int j;
for (j = 1; j < c->argc; j++) {
/* 判断所给的key是否过期 */
expireIfNeeded(c->db,c->argv[j]);
/* 若不过期,则将计数器count自增1 */
if (dbExists(c->db,c->argv[j])) count++;
}
/* 最后返回计数器的值 */
addReplyLongLong(c,count);
}
示例如下:
127.0.0.1:7777> exists k1
(integer) 1
127.0.0.1:7777> exists k2
(integer) 1
127.0.0.1:7777> exists k1 k2
(integer) 2
3、expire、expireat、pexpire、pexpireat
这四个命令的作用是指定一个key的过期时间,它们的原理也都相同,最后都会转换为pexpireat命令来执行。
/* EXPIRE key seconds */
void expireCommand(client *c) {
expireGenericCommand(c,mstime(),UNIT_SECONDS);
}
/* EXPIREAT key time */
void expireatCommand(client *c) {
expireGenericCommand(c,0,UNIT_SECONDS);
}
/* PEXPIRE key milliseconds */
void pexpireCommand(client *c) {
expireGenericCommand(c,mstime(),UNIT_MILLISECONDS);
}
/* PEXPIREAT key ms_time */
void pexpireatCommand(client *c) {
expireGenericCommand(c,0,UNIT_MILLISECONDS);
}
可以看出,四个命令的实现原理都是调用pexpireatCommand()函数。
/* This is the generic command implementation for EXPIRE, PEXPIRE, EXPIREAT
* and PEXPIREAT. Because the commad second argument may be relative or absolute
* the "basetime" argument is used to signal what the base time is (either 0
* for *AT variants of the command, or the current time for relative expires).
*
* unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for
* the argv[2] parameter. The basetime is always specified in milliseconds. */
void expireGenericCommand(client *c, long long basetime, int unit) {
/* 获取参数 */
robj *key = c->argv[1], *param = c->argv[2];
long long when; /* unix time in milliseconds when the key will expire. */
/* 将输入的参数保存在变量when之中 */
if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK)
return;
/* 将when转换为毫秒时间戳 */
if (unit == UNIT_SECONDS) when *= 1000;
when += basetime;
/* 查询要设置的key是否存在 */
if (lookupKeyWrite(c->db,key) == NULL) {
addReply(c,shared.czero);
return;
}
/* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
* should never be executed as a DEL when load the AOF or in the context
* of a slave instance.
*
* Instead we take the other branch of the IF statement setting an expire
* (possibly in the past) and wait for an explicit DEL from the master. */
/* 如果设置的时间已经过期,且没有正在加载AOF或者RDB文件,且执行命令的服务器是主服务器,
则需要对过期key进行删除 */
if (when <= mstime() && !server.loading && !server.masterhost) {
robj *aux;
int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :
dbSyncDelete(c->db,key);
serverAssertWithInfo(c,key,deleted);
server.dirty++;
/* Replicate/AOF this as an explicit DEL or UNLINK. */
aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
rewriteClientCommandVector(c,2,aux,key);
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
addReply(c, shared.cone);
return;
} else {
/* 如果没有过期,则设置过期时间 */
setExpire(c,c->db,key,when);
addReply(c,shared.cone);
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
server.dirty++;
return;
}
}
设置过期时间,主要是调用setExpire()函数
/* Set an expire to the specified key. If the expire is set in the context
* of an user calling a command 'c' is the client, otherwise 'c' is set
* to NULL. The 'when' parameter is the absolute unix time in milliseconds
* after which the key will no longer be considered valid. */
void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de;
/* 首先在redis的dict中找到key,共用该key的sds */
kde = dictFind(db->dict,key->ptr);
serverAssertWithInfo(NULL,key,kde != NULL);
/* 在过期dict中添加或找到该key */
de = dictAddOrFind(db->expires,dictGetKey(kde));
/* 为这个key设置过期时间 */
dictSetSignedIntegerVal(de,when);
int writable_slave = server.masterhost && server.repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
rememberSlaveKeyWithExpire(db,key);
}
该原理如下:
对于redis的每个数据库,都有一个统一的数据结构:
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
} redisDb;
其中dict是DB的键空间,expires记录了键空间的超时时间,需要注意的是dict和expires使用的是同一个key的sds。在设置过期时间的时候,首先会在dict中找到需要设置的key,找到后,在expires中添加或找到该key的sds,最后为key添加long long类型的过期时间。
4、keys
keys命令的作用是查找所有符合给定模式 pattern 的 key 。虽然keys速度很快,但是还是禁止在线上使用,因为会极大地消耗redis的资源。
void keysCommand(client *c) {
dictIterator *di;
dictEntry *de;
sds pattern = c->argv[1]->ptr;
int plen = sdslen(pattern), allkeys;
unsigned long numkeys = 0;
void *replylen = addDeferredMultiBulkLength(c);
/* 安全迭代器 */
di = dictGetSafeIterator(c->db->dict);
allkeys = (pattern[0] == '*' && pattern[1] == '\0');
/* 遍历dict的entry */
while((de = dictNext(di)) != NULL) {
sds key = dictGetKey(de);
robj *keyobj;
/* 判断key是否与正则匹配,若匹配且key没有过期则在回复给客户端的内容中记录 */
if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
keyobj = createStringObject(key,sdslen(key));
if (expireIfNeeded(c->db,keyobj) == 0) {
addReplyBulk(c,keyobj);
numkeys++;
}
decrRefCount(keyobj);
}
}
dictReleaseIterator(di);
setDeferredMultiBulkLength(c,replylen,numkeys);
}
遍历与迭代:
迭代(iterate) – 按顺序访问线性结构中的每一项
遍历(traversal) – 按规则访问非线性结构中的每一项
参考链接:https://www.zhihu.com/questio…
洪宝的关于keys命令的wiki:2018.06.27日记(redis keys命令)
5、migrate
migrate的作用是将 key 原子性地从当前实例传送到目标实例的指定数据库上,一旦传送成功, key 保证会出现在目标实例上。
这个命令是一个原子操作,它在执行的时候会阻塞进行迁移的两个实例,直到迁移成功,迁移失败或等待超时。
该命令使用的场景不多,故没有详细分析,该命令的实现函数是migratecommand()函数,其原理是在当前实例对给定 key 执行DUMP命令 ,将它序列化,然后通过socket传送到目标实例,目标实例再使用RESTORE命令对数据进行反序列化,并将反序列化所得的数据添加到数据库中。
6、move
move命令也是对key进行转移,不过是将key从当前数据库转移到指定数据库中。
void moveCommand(client *c) {
robj *o;
redisDb *src, *dst;
int srcid;
long long dbid, expire;
if (server.cluster_enabled) {
addReplyError(c,"MOVE is not allowed in cluster mode");
return;
}
/* 获取源数据库和目标数据库 */
src = c->db;
srcid = c->db->id;
/* 判断DB是否合法 */
if (getLongLongFromObject(c->argv[2],&dbid) == C_ERR ||
dbid < INT_MIN || dbid > INT_MAX ||
selectDb(c,dbid) == C_ERR)
{
addReply(c,shared.outofrangeerr);
return;
}
dst = c->db;
selectDb(c,srcid); /* Back to the source DB */
/* 源和目标数据库不能相同 */
if (src == dst) {
addReply(c,shared.sameobjecterr);
return;
}
/* 检查目标数据库是否存在需要移动的key */
o = lookupKeyWrite(c->db,c->argv[1]);
if (!o) {
addReply(c,shared.czero);
return;
}
expire = getExpire(c->db,c->argv[1]);
/* Return zero if the key already exists in the target DB */
if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
addReply(c,shared.czero);
return;
}
/* 向目标数据库添加数据 */
dbAdd(dst,c->argv[1],o);
if (expire != -1) setExpire(c,dst,c->argv[1],expire);
incrRefCount(o);
/* 删除源数据库的数据 */
dbDelete(src,c->argv[1]);
server.dirty++;
addReply(c,shared.cone);
}
7、object
object命令允许从内部察看给定 key 的 Redis 对象,可以查看对象的refcount、encoding、idletime和freq。主要的思路是找到Redis对象,返回对象结构体中的相关参数(refcount和encoding)或者通过相关参数计算出需要查询的内容(idletime和freq)。
/* Object command allows to inspect the internals of an Redis Object.
* Usage: OBJECT <refcount|encoding|idletime|freq> <key> */
void objectCommand(client *c) {
robj *o;
if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc == 2) {
void *blenp = addDeferredMultiBulkLength(c);
int blen = 0;
blen++; addReplyStatus(c,
"OBJECT <subcommand> key. Subcommands:");
blen++; addReplyStatus(c,
"refcount -- Return the number of references of the value associated with the specified key.");
blen++; addReplyStatus(c,
"encoding -- Return the kind of internal representation used in order to store the value associated with a key.");
blen++; addReplyStatus(c,
"idletime -- Return the idle time of the key, that is the approximated number of seconds elapsed since the last access to the key.");
blen++; addReplyStatus(c,
"freq -- Return the access frequency index of the key. The returned integer is proportional to the logarithm of the recent access frequency of the key.");
setDeferredMultiBulkLength(c,blenp,blen);
} else if (!strcasecmp(c->argv[1]->ptr,"refcount") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
== NULL) return;
addReplyLongLong(c,o->refcount);
} else if (!strcasecmp(c->argv[1]->ptr,"encoding") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
== NULL) return;
addReplyBulkCString(c,strEncoding(o->encoding));
} else if (!strcasecmp(c->argv[1]->ptr,"idletime") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
== NULL) return;
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
addReplyError(c,"An LFU maxmemory policy is selected, idle time not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
return;
}
addReplyLongLong(c,estimateObjectIdleTime(o)/1000);
} else if (!strcasecmp(c->argv[1]->ptr,"freq") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
== NULL) return;
if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LFU)) {
addReplyError(c,"An LFU maxmemory policy is not selected, access frequency not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
return;
}
/* LFUDecrAndReturn should be called
* in case of the key has not been accessed for a long time,
* because we update the access time only
* when the key is read or overwritten. */
addReplyLongLong(c,LFUDecrAndReturn(o));
} else {
addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try OBJECT help",
(char *)c->argv[1]->ptr);
}
}