如何解决Redis 主从数据不一致问题

线上问题

近期我们在对Redis做大规模迁移升级的时候,采用模拟复制协议的方式进行数据传输同步。

在此期间,我们遇到如下两个问题:

  1. 迁移前后Redis过期时间不一致。
  2. 迁移前后Redis key 数量不一致。

迁移前后Redis过期时间不一致

针对第一个问题,Redis 过期时间不一致问题,通过测试并且查阅Redis源码中得出如下结论:
Redis社区版本在正常的主从复制也会出现过期时间不一致问题,主要是由于在主从进行全同步期间,如果主库此时有expire 命令,那么到从库中,该命令将会被延迟执行。因为全同步需要耗费时间,数据量越大,那么过期时间差距就越大。
Redis expire 命令主要实现如下:

expireGenericCommand(c,mstime(),UNIT_SECONDS);

void expireGenericCommand(redisClient *c, long long basetime, int unit) {
    robj *key = c->argv[1], *param = c->argv[2];
    long long when; /* unix time in milliseconds when the key will expire. */
    if (getLongLongFromObjectOrReply(c, param, &when, NULL) != REDIS_OK)
        return;
    if (unit == UNIT_SECONDS) when *= 1000;
    when += basetime;

expire 600 到redis中过期时间其实是(当前timestamp+600)*1000,最终Redis会存储计算后这个值在Redis中。所以上面提到的情况,等到命令到从库的时候,当前的timestamp跟之前的timestamp不一样了,特别是发生在全同步后的expire命令,延迟时间基本上等于全同步的数据,最终造成过期时间不一致。

这个问题其实已经是官方的已知问题,解决方案有两个:

1. 业务采用expireat timestamp 方式,这样命令传送到从库就没有影响。
2. 在Redis代码中将expire命令转换为expireat命令。

官方没有做第二个选择,反而是提供expireat命令来给用户选择。其实从另外一个角度来看,从库的过期时间大于主库的过期时间,其实影响不大。因为主库会主动触发过期删除,如果该key删除之后,主库也会向从库发送删除的命令。但是如果主库的key已经到了过期时间,redis没有及时进行淘汰,这个时候访问从库该key,那么这个key是不会被触发淘汰的,这样如果对于过期时间要求非常苛刻的业务还是会有影响的。
而且目前针对于我们大规模迁移的时间,在进行过期时间校验的时候,发现大量key的过期时间都不一致,这样也不利于我们进行校验。

所以针对第一个问题,我们将expire/pexpire/setex/psetex 命令在复制到从库的时候转换成时间戳的方式,比如expire 转成expireat命令,setex转换成set和expireat命令,具体实现如下:

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & REDIS_PROPAGATE_REPL) {
        if (!strcasecmp(argv[0]->ptr,"expire") ||
            !strcasecmp(argv[0]->ptr,"setex") ||
            !strcasecmp(argv[0]->ptr,"pexpire") ||
            !strcasecmp(argv[0]->ptr,"psetex") ) {
            long long when;
            robj *tmpargv[3];
            robj *tmpexpire[3];
            argv[2] = getDecodedObject(argv[2]);
            when = strtoll(argv[2]->ptr,NULL,10);
            if (!strcasecmp(argv[0]->ptr,"expire") ||
                !strcasecmp(argv[0]->ptr,"setex")) {
                    when *= 1000;
            }    
            when += mstime();
            /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
            if (!strcasecmp(argv[0]->ptr,"expire") ||
                !strcasecmp(argv[0]->ptr,"pexpire")) {
                tmpargv[0] = createStringObject("PEXPIREAT",9);
                tmpargv[1] = getDecodedObject(argv[1]);
                tmpargv[2] = createStringObjectFromLongLong(when);
                replicationFeedSlaves(server.slaves,dbid,tmpargv,argc);
                decrRefCount(tmpargv[0]);
                decrRefCount(tmpargv[1]);
                decrRefCount(tmpargv[2]);
            }    
            /* Translate SETEX/PSETEX to SET and PEXPIREAT */
            if (!strcasecmp(argv[0]->ptr,"setex") ||
                !strcasecmp(argv[0]->ptr,"psetex")) {
                argc = 3;
                tmpargv[0] = createStringObject("SET",3);
                tmpargv[1] = getDecodedObject(argv[1]);
                tmpargv[2] = getDecodedObject(argv[3]);
                replicationFeedSlaves(server.slaves,dbid,tmpargv,argc);
                tmpexpire[0] = createStringObject("PEXPIREAT",9);
                tmpexpire[1] = getDecodedObject(argv[1]);
                tmpexpire[2] = createStringObjectFromLongLong(when);
                replicationFeedSlaves(server.slaves,dbid,tmpexpire,argc);
                decrRefCount(tmpargv[0]);
                decrRefCount(tmpargv[1]);
                decrRefCount(tmpargv[2]);
                decrRefCount(tmpexpire[0]);
                decrRefCount(tmpexpire[1]);
                decrRefCount(tmpexpire[2]);
            }
        } else {
                replicationFeedSlaves(server.slaves,dbid,argv,argc);
        }
}
}

目前上述修改已经应用到线上迁移环境中,上线以后Redis过期时间不一致问题解决,目前迁移前后的过期时间是严格保持一致的。

迁移前后Redis key 数量不一致

针对于第二个问题,Redis key 迁移前后数量不一致问题,其实在Redis社区版本的主从复制中,也会经常出现key数量不一致。其中一个非常关键的问题是,redis在做主从复制的时候,会对当前的存量数据做一个RDB快照(bgsave命令),然后将RDB快照传给从库,从库会解析RDB文件并且load到内存中。然儿在上述的两个步骤中Redis会忽略过期的key:

1. 主库在做RDB快照文件的时候,发现key已经过期了,则此时不会将过期的key写到RDB文件中。
2. 从库在load RDB文件到内存中的时候,发现key已经过期了,则此时不会将过期的key load进去。

所以针对上述两个问题会造成Redis主从key不一致问题,这个对于我们做数据校验的时候会有些影响,因始终觉得key不一致,但是不影响业务逻辑。
针对上述问题,目前我们将以上两个步骤都改为不忽略过期key,过期key的删除统一由主库触发删除,然后将删除命令传送到从库中。这样key的数量就完全一致了。
最终在打上以上两个patch之后,再进行迁移测试的时候,验证key过期时间以及数量都是完全一致的。
最后贴上以上修改的代码(针对于社区版本Redis 3.0.7):

以下代码修改均在我标记注释的下面

1、做bgsave的时候不忽略过期的key

rdb.c

/* Save a key-value pair, with expire time, type, key, value.
 * On error -1 is returned.
 * On success if the key was actually saved 1 is returned, otherwise 0
 * is returned (the key was already expired). */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
                        long long expiretime, long long now)
{
    /* Save the expire time */
    if (expiretime != -1) {
        /* If this key is already expired skip it */
        /* 注释下面这一行 */
        /* if (expiretime < now) return 0; */                                                                                                                                                
        if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }   

    /* Save type, key, value */
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    if (rdbSaveObject(rdb,val) == -1) return -1;
    return 1;
}

2、做bgrewirteaof 的时候不忽略过期的key

aof.c

int rewriteAppendOnlyFile(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    rio aof;
    FILE *fp;
    char tmpfile[256];
    int j;
    long long now = mstime();
    char byte;
    size_t processed = 0;

    /* Note that we have to use a different temp name here compared to the
     * one used by rewriteAppendOnlyFileBackground() function. */
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return REDIS_ERR;
    }   

    server.aof_child_diff = sdsempty();
    rioInitWithFile(&aof,fp);
    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AOF_AUTOSYNC_BYTES);
    for (j = 0; j < server.dbnum; j++) {
        char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }   

        /* SELECT the new DB */
        if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
        if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;

        /* Iterate this DB writing every entry */  
        while((de = dictNext(di)) != NULL) {
            sds keystr;
            robj key, *o;
            long long expiretime;
 
            keystr = dictGetKey(de);
            o = dictGetVal(de);
            initStaticStringObject(key,keystr);
 
            expiretime = getExpire(db,&key);
 
            /* If this key is already expired skip it */
            /* 注释下面这一行 */
            /* if (expiretime != -1 && expiretime < now) continue; */

3、在load rdb 的时候不忽略过期key

rdb.c

int rdbLoad(char *filename) {
    uint32_t dbid;
    int type, rdbver;
    redisDb *db = server.db+0;
    char buf[1024];
    long long expiretime, now = mstime();
    FILE *fp;
    rio rdb;

    if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR;

    rioInitWithFile(&rdb,fp);
    rdb.update_cksum = rdbLoadProgressCallback;
    rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
    if (rioRead(&rdb,buf,9) == 0) goto eoferr;
    buf[9] = '\0';
    if (memcmp(buf,"REDIS",5) != 0) {
        fclose(fp);
        redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
        errno = EINVAL;
        return REDIS_ERR;
    }
    rdbver = atoi(buf+5);
    if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) {
        fclose(fp);
        redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
        errno = EINVAL;
        return REDIS_ERR;
    }

    startLoading(fp);
    while(1) {
        robj *key, *val;
        expiretime = -1;

        /* Read type. */
        if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
        if (type == REDIS_RDB_OPCODE_EXPIRETIME) {
            if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
            /* the EXPIRETIME opcode specifies time in seconds, so convert
             * into milliseconds. */
            expiretime *= 1000;
        } else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {
            /* Milliseconds precision expire times introduced with RDB
             * version 3. */
            if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
        }
 
        if (type == REDIS_RDB_OPCODE_EOF)
            break;
 
        /* Handle SELECT DB opcode as a special case */
        if (type == REDIS_RDB_OPCODE_SELECTDB) {
            if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR)
                goto eoferr;
            if (dbid >= (unsigned)server.dbnum) {
                redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
                exit(1);
            }
            db = server.db+dbid;
            continue;
        }
        /* Read key */
        if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
        /* Read value */
        if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
        /* Check if the key already expired. This function is used when loading
         * an RDB file from disk, either at startup, or when an RDB was
         * received from the master. In the latter case, the master is
         * responsible for key expiry. If we would expire keys here, the
         * snapshot taken by the master may not be reflected on the slave. */
         /* 注释下面5行 */
        /* if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
            decrRefCount(key);
            decrRefCount(val);
             continue;
        } */
        /* Add the new object in the hash table */
        dbAdd(db,key,val);
 
        /* Set the expire time if needed */
        if (expiretime != -1) setExpire(db,key,expiretime);
 
        decrRefCount(key);
    }

总结

注意上述修改在内存策略为noeviction一直有效,但是其他内存策略只能在Redis 使用内存小于最大内存的时候才会有效,因为从库在使用内存超过最大内存的时候也会触发淘汰,这个时候也没法完全保证数据一致性了。

    原文作者:zbdba
    原文地址: https://segmentfault.com/a/1190000013144617
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞