baiyan
命令使用
命令含义:将 key 原子性地从当前实例传送到目标实例的指定数据库上,一旦传送成功, key 保证会出现在目标实例上,而当前实例上的 key 会被删除
命令格式:
MIGRATE host port key|"" destination-db timeout [COPY] [REPLACE] [KEYS key [key ...]]
命令实战:将键key1、key2、key3批量迁移到本机6380端口的redis实例上,并存储到目标实例的第0号数据库,超时时间为1000毫秒。可选项COPY如果表示不移除源实例上的 key ,REPLACE选项表示替换目标实例上已存在的 key 。KEYS选项表示可以同时批量传送多个keys(但前面的key参数的位置必须设置为空)
127.0.0.1:6379> migrate 127.0.0.1 6380 "" 0 5000 KEYS key1 key2 key3
OK
返回值:迁移成功时返回 OK ,否则返回错误
源码分析
migrate命令的执行过程可分为参数校验、连接建立、组装数据、发送数据、处理返回五个阶段。同样的,migrate命令的处理函数为migrateCommand():
参数校验
void migrateCommand(client *c) {
migrateCachedSocket *cs; // 连接另一个实例的socket
int copy = 0, replace = 0, j; // 是否开启copy及replace选项标记
char *password = NULL; // 密码
long timeout; // 超时时间
long dbid; // 数据库id
robj **ov = NULL; /* 要迁移的对象 */
robj **kv = NULL; /* 键名 */
robj **newargv = NULL;
rio cmd, payload; // 重要,存储目标实例执行的命令及DUMP的payload
int may_retry = 1;
int write_error = 0;
int argv_rewritten = 0;
/* 支持同时传输多个key. */
int first_key = 3; /* 第一个键参数的位置. */
int num_keys = 1; /* 默认只传送一个key. */
/* 校验其他选项,从COPY选项开始校验 */
for (j = 6; j < c->argc; j++) {
int moreargs = j < c->argc-1;
if (!strcasecmp(c->argv[j]->ptr,"copy")) { // 如果命令参数等于copy,开启copy选项
copy = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"replace")) { // 如果命令参数等于replace,开启replace选项
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"auth")) { // 如果命令参数等于auth,开启auth选项
if (!moreargs) { // 参数数量超出规定数量,报错
addReply(c,shared.syntaxerr);
return;
}
j++;
password = c->argv[j]->ptr;
} else if (!strcasecmp(c->argv[j]->ptr,"keys")) { // 如果设置了keys参数,表明要同时传输多个keys值过去
if (sdslen(c->argv[3]->ptr) != 0) { // 如果开启了keys选项,前面key参数的位置必须设置为空
addReplyError(c,
"When using MIGRATE KEYS option, the key argument"
" must be set to the empty string");
return;
}
first_key = j+1;
num_keys = c->argc - j - 1;
break; /*现在first_key值指向keys的第一个值.,并将num_keys设置为keys的数量 */
} else {
addReply(c,shared.syntaxerr);
return;
}
}
/* 选择的db和超时时间数据校验,看是否是合法的数字格式 */
if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
{
return;
}
if (timeout <= 0) timeout = 1000;
/* 接下来会检查是否有可以迁移的键 */
ov = zrealloc(ov,sizeof(robj*)*num_keys);
kv = zrealloc(kv,sizeof(robj*)*num_keys);
int oi = 0;
/* 检查所有的键,判断输入的键中,是否存在合法的键来进行迁移 */
for (j = 0; j < num_keys; j++) {
if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) { // 去键空间字典中查找该键,如果该键没有超时
kv[oi] = c->argv[first_key+j]; // 将未超时的键存到kv数组中,说明当前key是可以migrate的;否则如果超时就无法进行migrate
oi++;
}
}
num_keys = oi; // 更新当前可migrate的key总量
if (num_keys == 0) { // 如果没有可以迁移的key,那么给客户端返回“NOKEY"字符串
zfree(ov); zfree(kv);
addReplySds(c,sdsnew("+NOKEY\r\n"));
return;
}
刚开始执行migrate命令的时候,由于migrate参数很多,需要对其逐个做校验。尤其是在启用keys参数同时迁移多个keys的时候,需要进行参数的动态判断。同时需要判断是否有合法的键来进行迁移。只有没有过期的键才能够迁移,否则不进行迁移,最大化节省系统资源。
连接建立
假如我们要从当前6379端口上的redis实例迁移到6380端口上的redis实例,我们必然要建立一个socket连接:
try_again:
write_error = 0;
/* 连接建立 */
cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
if (cs == NULL) {
zfree(ov); zfree(kv);
return;
}
我们看到,在主流程中调用了migrateGetSocket()函数创建了一个socket,这里是一个带缓存的socket。我们暂时不跟进这个函数,后面我会以扩展的形式来跟进。
组装数据
基于这个socket,我们可以将数据以TCP协议中规定的字节流形式传输到目标实例上。这就需要一个序列化的过程了。6379实例需要将keys序列化,6380需要将数据反序列化。这就需要借助我们之前讲过的DUMP命令和RESTORE命令,分别来进行序列化和反序列化了。
redis并没有立即进行DUMP将key序列化,而是首先组装要在目标redis实例上所要执行的命令,比如AUTH/SELECT/RESTORE等命令。要想在目标实例上执行命令,那么必须同样基于之前建立的socket连接,以当前的redis实例作为客户端,往与目标redis实例建立的TCP连接中,写入按照redis协议封装的命令集合(如*2 \r\n SELECT \r\n $1 \r\n 1 \r\n)。redis使用了自己封装的I/O抽象层rio,它实现了一个I/O缓冲区。通过读取其缓冲区中的数据,就可以往我们在建立socket的时候生成的fd中写入数据啦。首先redis会建立一个rio缓冲区,并按照redis数据传输协议所要求的格式,组装要在目标实例上执行的redis命令:
// 初始化一个rio缓冲区
rioInitWithBuffer(&cmd,sdsempty());
/* 组装AUTH命令 */
if (password) {
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); // 按照redis协议写入一条命令开始的标识\*2。表示命令一共有2个参数
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4)); // 写入$4\r\n AUTH \r\n
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password, sdslen(password))); // 同上,按照协议格式写入密码
}
/* 在目标实例上选择数据库 */
int select = cs->last_dbid != dbid; /* 判断是否已经选择过数据库,如果选择过就不用再次执行SELECT命令 */
if (select) { // 如果没有选择过,需要执行SELECT命令选择数据库
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); // 同上,写入开始表示\*2
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); // 同上,写入$6\r\n SELECT \r\n
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); // 写入$1\r\n 1 \r\n
}
那么接下来需要进行DUMP的序列化操作了。由于序列化操作耗时较久,所以可能出现这种情况:在之前第一次检测是否超时的时候没有超时,但是由于这次序列化操作时间较久,执行期间,这个键超时了,那么redis简单粗暴地丢弃该超时键,直接放弃迁移这个键:
int non_expired = 0; // 暂存新的未过期的键的数量
/* 如果在DUMP的过程中过期了,直接continue. */
for (j = 0; j < num_keys; j++) {
long long ttl = 0;
long long expireat = getExpire(c->db,kv[j]);
if (expireat != -1) {
ttl = expireat-mstime();
if (ttl < 0) {
continue;
}
if (ttl < 1) ttl = 1;
}
/* 经过上面的筛选之后,都是最新的、没有过期的键,这些键可以最终被迁移了. */
kv[non_expired++] = kv[j];
然后,在目标实例上最终我们需要执行RESTORE命令,将之前经过DUMP序列化的字节流反序列化,过程和上面同理:
serverAssertWithInfo(c,NULL,
rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); // 同上,写入开始表示\*5或4
if (server.cluster_enabled) // 如果集群模式开启
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); // 同上,写入$7 RESTORE \r\n
serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
sdslen(kv[j]->ptr))); // 将所有需要反序列化的key写入
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); // 写入过期时间
接下来,我们就需要最终执行DUMP命令,将我们需要传输的所有键等数据序列化了,这里redis调用了createDumpPayload()来创建一个DUMP载荷,这就是最终序列化好的数据:
createDumpPayload(&payload,ov[j],kv[j]); // 序列化数据
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr))); // 将序列化数据存到rio cmd中等待发送
sdsfree(payload.io.buffer.ptr);
if (replace)
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); // replace选项开启
发送数据
目前,我们需要发送的、按照redis协议组装好的所有序列化好的命令及数据都存放在了cmd这个rio结构体变量缓存中。我们当前的6379redis实例仿佛就是一个客户端,而要传输的目标实例6380就是一个服务端。接下来就需要读取缓存并且往直前建立好的socket中写入数据,将数据最终传输至目标实例:
errno = 0;
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
int nwritten = 0;
while ((towrite = sdslen(buf)-pos) > 0) {
towrite = (towrite > (64*1024) ? (64*1024) : towrite); //按照64K的块大小来发送
nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout); // 往socket fd中写入数据(数据来源于rio的缓存)
if (nwritten != (signed)towrite) {
write_error = 1;
goto socket_err;
}
pos += nwritten;
}
}
处理返回
在目标redis上分别执行AUTH、SELECT、RESTORE命令,RESTORE命令会反序列化并将key写入目标实例。那么这几个命令执行完毕之后,我们如何知道它们是否执行成功呢?同样的,目标redis 6380实例在执行完命令之后,也会有相应的返回值,我们需要根据返回值来判断命令是否执行成功、是否将key成功迁移完成:
char buf0[1024]; /* 存储AUTH命令返回值. */
char buf1[1024]; /* 存储SELECT命令返回值 */
char buf2[1024]; /* 存储RESTORE命令返回值. */
/* 从socket fd中读取AUTH命令返回值. */
if (password && syncReadLine(cs->fd, buf0, sizeof(buf0), timeout) <= 0)
goto socket_err;
/* 从socket fd中读取SELECT命令返回值. */
if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
goto socket_err;
int error_from_target = 0;
int socket_error = 0;
int del_idx = 1;
/* 迁移完成之后需要将原有实例上的key删除 */
if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
for (j = 0; j < num_keys; j++) {
/* 从socket fd中读取RESTORE命令返回值 */
if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
socket_error = 1;
break;
}
if ((password && buf0[0] == '-') ||
(select && buf1[0] == '-') ||
buf2[0] == '-')
{
if (!error_from_target) {
...
} else {
if (!copy) { // 没有开启copy选项,需要删除原有实例的键
...
/* 删除原有实例上的键 */
dbDelete(c->db,kv[j]);
...
}
}
}
...
/* 如果发生socket错误,关闭连接 */
if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
...
sdsfree(cmd.io.buffer.ptr); // 释放cmd的rio缓冲区
zfree(ov); zfree(kv); zfree(newargv); // 释放存储key的robj结构体
return;
综上,migrate命令就执行完成了。我们总结一下它的执行过程:
- 命令参数校验
- 按照redis协议组装目标实例上需要执行的命令
- 将要传输的key序列化
- 创建socket连接
- 通过socket连接将命令及数据传输至目标实例
- 目标实例执行命令并存储相应的key
- 处理目标实例的返回值
- 如果失败执行重试逻辑,如果成功则执行完毕
扩展
缓存socket的实现
在migrate命令执行过程中,调用了migrateGetSocket()创建socket。redis借助字典结构,实现了缓存socket,避免了多次创建socket所带来的开销:
migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
int fd;
sds name = sdsempty();
migrateCachedSocket *cs;
/* 查找字典中是否有相应 ip:port 的缓存socket. */
name = sdscatlen(name,host->ptr,sdslen(host->ptr));
name = sdscatlen(name,":",1);
name = sdscatlen(name,port->ptr,sdslen(port->ptr));
// 查找字典
cs = dictFetchValue(server.migrate_cached_sockets,name);
if (cs) { // 如果找到了,说明之前创建过ip:port的socket
sdsfree(name);
cs->last_use_time = server.unixtime;
return cs; // 直接返回缓存socket
}
/* 如果在字典中没有找到,说明没有缓存,需要重新创建. */
/* 判断是否缓存的socket过多,最大为64个 */
if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
/* 如果字典中缓存的socket过多,需要随机删除一些 */
dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
cs = dictGetVal(de);
close(cs->fd);
zfree(cs);
dictDelete(server.migrate_cached_sockets,dictGetKey(de));
}
/* 创建socket */
fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
atoi(c->argv[2]->ptr));
if (fd == -1) {
sdsfree(name);
addReplyErrorFormat(c,"Can't connect to target node: %s",
server.neterr);
return NULL;
}
anetEnableTcpNoDelay(server.neterr,fd);
/* 检查是否在超时时间内创建完成 */
if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
sdsfree(name);
addReplySds(c,
sdsnew("-IOERR error or timeout connecting to the client\r\n"));
close(fd);
return NULL;
}
/* 将新创建的socket加入缓存并返回给调用者 */
cs = zmalloc(sizeof(*cs));
cs->fd = fd;
cs->last_dbid = -1;
cs->last_use_time = server.unixtime;
// 将新创建的socket加入字典,缓存起来等待下次使用
dictAdd(server.migrate_cached_sockets,name,cs);
return cs;
}