【Redis学习笔记】2018-07-03 redis源码学习 quicklist

顺风车运营研发团队 方波
quicklist概述:
quicklist是一个ziplist的双向链表,用在上层list数据结构的底层实现。quicklist包含多个quicklistNode,quicklistNode由ziplist来保存数据。

quicklist结构图:

《【Redis学习笔记】2018-07-03 redis源码学习 quicklist》

quicklist特点:
双向链表的内存开销很大,每个节点的地址不连续,容易产生内存碎片,利用ziplist减少node数量,但ziplist插入和删除数都很麻烦,复杂度高,为避免长度较长的ziplist修改时带来的内存拷贝开销,通过配置项配置合理的ziplist长度。

Redis的配置文件中,给出了每个ziplist中的元素个数设定,对应quicklist的fill属性,设置根据场景而定:
list-max-ziplist-size -2 // 如果取值为正数表示ziplist节点数量,最大16bit
负数取值:

  • -1 每个节点的ziplist字节大小不能超过4kb
  • -2 每个节点的ziplist字节大小不能超过8kb redis默认值
  • -3 每个节点的ziplist字节大小不能超过16kb
  • -4 每个节点的ziplist字节大小不能超过32kb
  • -5 每个节点的ziplist字节大小不能超过64kb

因为链表的特性,一般收尾两端操作较频繁,中部操作相对较少,所以redis提供压缩深度配置:
参数list-compress-depth的取值含义如下:

  • 0: 是个特殊值,表示都不压缩。这是Redis的默认值。
  • 1: 表示quicklist两端各有1个节点不压缩,中间的节点压缩。
  • 2: 表示quicklist两端各有2个节点不压缩,中间的节点压缩。
  • 3: 表示quicklist两端各有3个节点不压缩,中间的节点压缩。

依此类推,最大16bit

quicklist结构体:

typedef struct quicklist {
    quicklistNode *head;
    quicklistNode *tail;
    unsigned long count;        /* total count of all entries in all ziplists */
    unsigned long len;          /* number of quicklistNodes */
    int fill : 16;              /* fill factor for individual nodes */
    unsigned int compress : 16; /* depth of end nodes not to compress;0=off */
} quicklist;
head,tail:指向头尾节点的指针
count: 所有ziplist中entry数量
len: node数量
fill: ziplist中entry能保存的数量,由list-max-ziplist-size配置项控制
compress: 压缩深度,由list-compress-depth配置项控制
 
typedef struct quicklistNode {
    struct quicklistNode *prev;
    struct quicklistNode *next;
    unsigned char *zl;
    unsigned int sz;             /* ziplist size in bytes */
    unsigned int count : 16;     /* count of items in ziplist */
    unsigned int encoding : 2;   /* RAW==1 or LZF==2 */
    unsigned int container : 2;  /* NONE==1 or ZIPLIST==2 */
    unsigned int recompress : 1; /* was this node previous compressed? */
    unsigned int attempted_compress : 1; /* node can't compress; too small */
    unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;
prev,next:前后节点指针
zl: 数据指针。当前节点的数据没有压缩,那么它指向一个ziplist结构;否则,它指向一个quicklistLZF结构。
sz: zl指向的ziplist实际占用内存大小。需要注意的是:如果ziplist被压缩了,那么这个sz的值仍然是压缩前的ziplist大小。
count: ziplist里面包含的数据项个数
encoding: ziplist是否压缩。取值:1--ziplist,2--quicklistLZF
container: 存储类型,目前使用固定值2 表示使用ziplist存储
recompress: 当我们使用类似lindex这样的命令查看了某一项本来压缩的数据时,需要把数据暂时解压,这时就设置recompress=1做一个标记,等有机会再把数据重新压缩
attempted_compress: Redis自动化测试程序有用
extra: 其它扩展字段,目前没有使用
 
typedef struct quicklistLZF {
    unsigned int sz; /* LZF size in bytes*/
    char compressed[];
} quicklistLZF;
sz: 压缩后的ziplist大小
compressed:柔性数组,存放压缩后的ziplist字节数组
lpush操作:
void pushGenericCommand(client *c, int where) {
    int j, pushed = 0;
    robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
 
    if (lobj && lobj->type != OBJ_LIST) {
        addReply(c,shared.wrongtypeerr);
        return;
    }
    for (j = 2; j < c->argc; j++) {
        if (!lobj) {
            lobj = createQuicklistObject();
            quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
                                server.list_compress_depth);
            dbAdd(c->db,c->argv[1],lobj);
        }
        listTypePush(lobj,c->argv[j],where);
        pushed++;
    }
    addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
    if (pushed) {
        char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
 
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
    }
    server.dirty += pushed;
}
如果链表不存在,则创建,并设置fill和compress参数,然后调用listTypePush -> quicklistPush -> quicklistPushHead

int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
    quicklistNode *orig_head = quicklist->head;
    if (likely(_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
        quicklist->head->zl = ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
        quicklistNodeUpdateSz(quicklist->head);
    } else {
        quicklistNode *node = quicklistCreateNode();
        node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
        quicklistNodeUpdateSz(node);
        _quicklistInsertNodeBefore(quicklist, quicklist->head, node);
    }
    quicklist->count++;
    quicklist->head->count++;
    return (orig_head != quicklist->head);
}
_quicklistNodeAllowInsert 判断该头部节点是否允许插入,计算头部节点中的大小和fill参数设置的大小相比较,如果超出fill 则创建新的node和ziplist
quicklistNodeUpdateSz 更新头部大小

_quicklistNodeAllowInsert:

REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node, const int fill, const size_t sz) {
    if (unlikely(!node))
        return 0;
    int ziplist_overhead; // 跳过head的字节数
    /* 根据ziplist结构,长度小于254使用一个字节存储长度 */
    if (sz < 254)
        ziplist_overhead = 1;
    else
        ziplist_overhead = 5;
 
    /*  */
    if (sz < 64)
        ziplist_overhead += 1;
    else if (likely(sz < 16384))
        ziplist_overhead += 2;
    else
        ziplist_overhead += 5;
 
    /* new_sz overestimates if 'sz' encodes to an integer type */
    unsigned int new_sz = node->sz + sz + ziplist_overhead;
    if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
        return 1;
    else if (!sizeMeetsSafetyLimit(new_sz))
        return 0;
    else if ((int)node->count < fill)
        return 1;
    else
        return 0;
}
_quicklistNodeSizeMeetsOptimizationRequirement 比较new_sz和fill,如果fill>=0返回0,否则返回-fill对应optimization_level的大小与sz对比

REDIS_STATIC int
_quicklistNodeSizeMeetsOptimizationRequirement(const size_t sz,
                                               const int fill) {
    if (fill >= 0)
        return 0;
    size_t offset = (-fill) - 1;
    if (offset < (sizeof(optimization_level) / sizeof(*optimization_level))) {
        if (sz <= optimization_level[offset]) {
            return 1;
        } else {
            return 0;
        }
    } else {
        return 0;
    }
}
 
static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536}; // 分别是4k 8k 16k 32k 64k 分别对应了上面给出的list-max-ziplist-size负数配置项
    原文作者:LNMPR源码研究
    原文地址: https://segmentfault.com/a/1190000015476108
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞