Memcached LRU策略

Memcached LRU介绍

​ Memcached作为内存缓存,不可避免的问题之一就是内存的置换问题。我们比较常用的内存置换策略有FIFO(先进先出),LRU(最近最少使用)等。Memcached采用了LRU的策略。

​ LRU策略是指把当内存发生置换的时候,把最近使用次数最少的内存置换掉,虽然这种置换方式不是最优的,确是性价比很高的实现方式。这篇文章将对Memcached LRU策略进行分析,进一步总结Memcached内存的管理方式。

Memcached LRU总览

​ Memcached的内存管理方式如下所示:

《Memcached LRU策略》 Alt pic

​ 从上图,我们可以看到两个非常重要的链表:slots空闲列表和LRU链表。而Memcached1.4.24版本中,对LRU进行了进一步的划分,存在四级LRU:HOT_LRU、WARM_LRU、COLD_LRU、NOEXP_LRU。我们可以通过配置项来控制Memcached具体的LRU策略。

1. LRU分配

​ 通过Memcached内存管理的学习,我们知道Memcached会初始化固定数量的slabclass(64个),每一个slabclass对应有4个LRU的链表。每一个LRU链表使用head和tail来标记,在Memcached中维护了大小为256的head和tail数组,其中不同数组段的具体含义如下所示。正是由于Memcached采用了以64作为LRU的分级策略,所以它可以通过位运算快速的定位各个LRU链表。

#define HOT_LRU 0
#define WARM_LRU 64
#define COLD_LRU 128
#define NOEXP_LRU 192
static unsigned int lru_type_map[4] = {HOT_LRU, WARM_LRU, COLD_LRU, NOEXP_LRU};
#define CLEAR_LRU(id) (id & ~(3<<6)) //这个方法会把所有的LRU都定位到0~63之间

2. LRU策略

​ 通过对配置项lru_maintainer_thread,设置为0,我们可以使用基本的LRU策略,该策略下,Memcached只使用COLD_LRU链表来记录。而通过对lru_maintainer_thread设为1,我们可以使用4个LRU链表。使用多个LRU链表的最大好处是不同的LRU链表可以使用不同的置换策略,提供Memcached的性能。这里我们将具体介绍Memcached的分级LRU。

《Memcached LRU策略》 Alt pic

​ 在Memcached分级LRU策略中,严格定义了四级LRU数据的流向。HOT_LRU和WARM_LRU数据可以流向COLD_LRU中,COLD_LRU数据可以流向WARM_LRU。而NOEXP_LRU比较特殊,它只存放那些不设置超时时间且不被强制换出的数据。

​ 在具体介绍lru检查算法之前,我们先回顾一下Memcached中Item的结构:

#define ITEM_LINKED 1 //是否在链表中
#define ITEM_CAS 2 //是否CAS

/* temp */
#define ITEM_SLABBED 4 //是否在SLAB中未分配

/* Item was fetched at least once in its lifetime */
#define ITEM_FETCHED 8 //是否被fetched
/* Appended on fetch, removed on LRU shuffling */
#define ITEM_ACTIVE 16 //是否Active,在LRU检查时被置为0,在fetch时被置为1

typedef struct _stritem {
    /* Item被LRU锁保护,而LRU锁是通过hv来实现的 */
    struct _stritem *next; //next和prev这里可以有两种含义,可以是空闲链表和LRU链表
    struct _stritem *prev;
    /* Rest are protected by an item lock */
    struct _stritem *h_next;    /* 数据的查找是通过hash算法实现的,这里值hash值相同的链表 */
    rel_time_t      time;       /* least recent access */
    rel_time_t      exptime;    /* expire time */
    int             nbytes;     /* size of data */
    unsigned short  refcount;   /* 引用计数,这个非常重要,refcount为0的Item回收才会放回slots空闲链表 */
    uint8_t         nsuffix;    /* length of flags-and-length string */
    uint8_t         it_flags;   /* 标记了该ITEM的状态,非常重要 */
    uint8_t         slabs_clsid;/* slab id,是LRU的标记 */
    uint8_t         nkey;       /* key length, w/terminating null and padding */
    /* this odd type prevents type-punning issues when we do
     * the little shuffle to save space when not using CAS. */
    union {
        uint64_t cas;
        char end;
    } data[];
    /* if it_flags & ITEM_CAS we have 8 bytes CAS */
    /* then null-terminated key */
    /* then " flags length\r\n" (no terminating null) */
    /* then data with terminating \r\n (no terminating null; it's binary!) */
} item;

​ 在Item中,time,exptime,refcount和it_flags对于LRU策略来说是非常重要的,理解了这些变量的含义非常有助于我们对LRU检查算法的理解。

static int lru_pull_tail(const int orig_id, const int cur_lru,
        const unsigned int total_chunks, const bool do_evict, const uint32_t cur_hv) {
    item *it = NULL;
    int id = orig_id;
    int removed = 0;
    if (id == 0)
        return 0;

    int tries = 5;
    item *search;
    item *next_it;
    void *hold_lock = NULL;
    unsigned int move_to_lru = 0; //表示要移动的LRU类型
    uint64_t limit;

    id |= cur_lru; //根据当前的LRU类型获取LRU的id
    pthread_mutex_lock(&lru_locks[id]);
    search = tails[id];

    /* 从尾部开始搜索,尝试5次,只走查可以被locked的item,一旦尾部的Item满足条件,跳出循环 */
    for (; tries > 0 && search != NULL; tries--, search=next_it) {
        /* we might relink search mid-loop, so search->prev isn't reliable */
        next_it = search->prev;
        if (search->nbytes == 0 && search->nkey == 0 && search->it_flags == 1)
        {
            /* 满足该条件的属于爬虫数据,忽略 */
            tries++;
            continue;
        }
        uint32_t hv = hash(ITEM_key(search), search->nkey);
        /* Attempt to hash item lock the "search" item. If locked, no
         * other callers can incr the refcount. Also skip ourselves. */
        /* 尝试对Item进行加锁,如果lock失败,则跳过*/
        if (hv == cur_hv || (hold_lock = item_trylock(hv)) == NULL)
            continue;

        /* item的refcount是引用计数,记录了该item被引用的次数,正常情况下,refcount>1说明该Item还是使用中 */
        if (refcount_incr(&search->refcount) != 2) {
            /* Note pathological case with ref'ed items in tail.
             * Can still unlink the item, but it won't be reusable yet */
            itemstats[id].lrutail_reflocked++;
            /* In case of refcount leaks, enable for quick workaround. */
            /* WARNING: This can cause terrible corruption */
            /*对于refcount为1的item,要进行额外的判断,默认不执行这段代码*/
            if (settings.tail_repair_time &&
                    search->time + settings.tail_repair_time < current_time) {
                itemstats[id].tailrepairs++;
                search->refcount = 1;
                /* This will call item_remove -> item_free since refcnt is 1 */
                do_item_unlink_nolock(search, hv);
                item_trylock_unlock(hold_lock);
                continue;
            }
        }

        /* Expired or flushed */
        if ((search->exptime != 0 && search->exptime < current_time)
            || is_flushed(search)) {
            itemstats[id].reclaimed++;
            if ((search->it_flags & ITEM_FETCHED) == 0) {
                itemstats[id].expired_unfetched++;
            }
            /* refcnt 2 -> 1 */
            /* 该方法删除Item的Hash值,同时从LRU中移除该Item,并尝试回收该ITEM,由于refcount从2->1,refcount>0,回收会失败*/
            do_item_unlink_nolock(search, hv);
            /* refcount从1->0,回收成功,回收的ITEM会被放入slots的空闲链表头 */
            do_item_remove(search);
            item_trylock_unlock(hold_lock);
            removed++;

            /* If all we're finding are expired, can keep going */
            continue;
        }

        /*如果ITEM在HOT_LRU或者WARM_LRU中,同时LRU的大小超过限制,该ITEM移动COLD_LRU; 如果ITEM在COLD_LRU,移动到WARM_LRU,除非我们要强制清除 */
        switch (cur_lru) {
            case HOT_LRU:
                limit = total_chunks * settings.hot_lru_pct / 100;
            case WARM_LRU:
                limit = total_chunks * settings.warm_lru_pct / 100;
                if (sizes[id] > limit) {
                    itemstats[id].moves_to_cold++;
                    move_to_lru = COLD_LRU;
                    /*该方法只是把Item从LRU链表中移除*/
                    do_item_unlink_q(search);
                    it = search;
                    removed++;
                    break;
                } else if ((search->it_flags & ITEM_ACTIVE) != 0) {
                    /* Only allow ACTIVE relinking if we're not too large. */
                    itemstats[id].moves_within_lru++;
                    search->it_flags &= ~ITEM_ACTIVE;
                    do_item_update_nolock(search);
                    do_item_remove(search);
                    item_trylock_unlock(hold_lock);
                } else {
                    /* Don't want to move to COLD, not active, bail out */
                    it = search;
                }
                break;
            case COLD_LRU:
                it = search; /* No matter what, we're stopping */
                if (do_evict) {
                    if (settings.evict_to_free == 0) {
                        /* Don't think we need a counter for this. It'll OOM.  */
                        break;
                    }
                    itemstats[id].evicted++;
                    itemstats[id].evicted_time = current_time - search->time;
                    if (search->exptime != 0)
                        itemstats[id].evicted_nonzero++;
                    if ((search->it_flags & ITEM_FETCHED) == 0) {
                        itemstats[id].evicted_unfetched++;
                    }
                    /*想当于做了do_item_unlink_q和do_item_remove*/
                    do_item_unlink_nolock(search, hv);
                    removed++;
                } else if ((search->it_flags & ITEM_ACTIVE) != 0
                        && settings.lru_maintainer_thread) {
                    itemstats[id].moves_to_warm++;
                    search->it_flags &= ~ITEM_ACTIVE;
                    move_to_lru = WARM_LRU;
                    do_item_unlink_q(search);
                    removed++;
                }
                break;
        }
        if (it != NULL)
            break;
    }

    pthread_mutex_unlock(&lru_locks[id]);

    if (it != NULL) {
        if (move_to_lru) {
            it->slabs_clsid = ITEM_clsid(it);
            it->slabs_clsid |= move_to_lru;
            item_link_q(it);
        }
        /*能够进到这一步的item,我们需要手动减少refcount的值*/
        do_item_remove(it);
        item_trylock_unlock(hold_lock);
    }

    return removed;
}

3.Item失效和引用计数

​ Memcached从LRU链表的尾部进行Item的检查(跳过已经被其他线程加锁的Item),当满足以下的条件时,Item被清理,�那Item满足失效的具体条件是什么呢。在介绍Item失效之前,我们先介绍一下Item的引用计数,引用计数的作用主要要为了防止多个线程的影响。打个比方,线程A在读取的数据的同时线程B在删除数据就会引发一定的问题。在Memcached中,当线程在使用Item的时候,该Item的引用计数加1,使用之后Item的引用计数减1,所以只有当Item的引用计数为0时,该Item才可以被真正的清理。理解这段逻辑对于Item的失效和回收非常重要。现在我们看看Item的失效条件。

​ 1. Item已经过期,惰性回收,在访问的时候会进行判断回收;在Item内存分配时也会进行一定的回收检查。

​ 2. Item被淘汰(这个有个flush_all命令,可以强制回收老的数据)。为了防止大量Item被回收造成的性能低下,Memcached也采用了惰性回收的机制。

​ 3. Item不满足条件,但是Memcached设置了tail_repair_time(该设置主要是为了防止部分线程异常,没有减少refcount而导致异常。正常的refcount都是1),同时该Item的最后访问的时间小于(当前时间-tail_repair_time),该Item进行清理

​ 在LRU链表的检查中,如果Item没有满足上面提到的清理条件,则Item会进行判断,看是否需要再不同级LRU之间移动,其具体的规则如下:

​ 1. 如果当前Item在HOT_LRU或者WARM_LRU,则进行当前LRU最大limit的判断。当该LRU中Item数量>limit时,Item被移动到COLD_LRU中;当Item数量<=limit同时Item处于Active状态时,更新Item;

​ 2. 如果当前Item在COLD_LRU中时,如果允许拒绝Item(do_evict=true),直接清理该Item;否则,如果该Item处于Active状态时,移动到WARM_LRU中。

​ Memcached的分级LRU策略把最新的数据尽量往HOT_LRU和WARM_LRU中分配,而COLD_LRU中的数据一般是最近最少使用的数据,这样可以很好地提高回收的效果。

4. LRU管理和爬虫机制

​ 上面我们具体的分析了Memcached中LRU的分级策略,除了惰性回收机制,Memcached也给我们提供了强制清除的方法。LRU管理和爬虫机制。首先先看看Item可能发生回收的地方:

​ 1. 在获取Item的时候,如果Item满足回收条件,则Item会被回收。

​ 2. 在Item分配空间时会对LRU进行有限度的回收。它会进行最多5次的回收尝试。

​ 3. LRU管理线程会定时对LRU进行检查,回收可以被回收的资源,同时根据资源的分配和使用情况,不同级LRU中进行移动。

​ 4. Item爬虫线程会对每个LRU链表进行检查,回收满足回收条件的资源。同时,LRU管理线程结束也会促发Item爬虫线程的执行。

​ LRU管理线程主要是对分级LRU的管理,负责过期ITEM的回收和不同级LRU之间数据的移动。它的控制粒度比较大,是针对单个slabclass来说的。

/*数据在不同的LRU之间的转移,每次1000次循环;做的事情非常简单,对LRU的Item进行回收,如果HOT_LRU和WARM_LRU数据过多,移到COLDE_LRU;如果COLD_LRU数据过多,进行清除*/
static int lru_maintainer_juggle(const int slabs_clsid) {
    int i;
    int did_moves = 0;
    bool mem_limit_reached = false;
    unsigned int total_chunks = 0;
    /* TODO: if free_chunks below high watermark, increase aggressiveness */
    slabs_available_chunks(slabs_clsid, &mem_limit_reached, &total_chunks);
    if (settings.expirezero_does_not_evict)
        total_chunks -= noexp_lru_size(slabs_clsid);

    /* Juggle HOT/WARM up to N times */
    for (i = 0; i < 1000; i++) {
        int do_more = 0;
        if (lru_pull_tail(slabs_clsid, HOT_LRU, total_chunks, false, 0) ||
            lru_pull_tail(slabs_clsid, WARM_LRU, total_chunks, false, 0)) {
            do_more++;
        }
        do_more += lru_pull_tail(slabs_clsid, COLD_LRU, total_chunks, false, 0);
        if (do_more == 0)
            break;
        did_moves++;
    }
    return did_moves;
}

​ 而爬虫机制则是针对每个LRU链表来进行清理的。Memcached为了提高性能,所以不能对某一个LRU进行加锁完成所有Item的检查回收。它采用的是在LRU链表中插入一个标记Item来进行定位清理。

/* 爬虫的实现是在LRU链表尾部加入爬虫的ITEM,然后该ITEM逐步向链表头移动,每次检查一个Item,看是否满足回收条件,满足则回收,当爬虫ITEM达到链表头则执行结束。该方法就是把爬虫ITEM前移的方法 */
static item *crawler_crawl_q(item *it) {
    item **head, **tail;
    assert(it->it_flags == 1);
    assert(it->nbytes == 0);
    assert(it->slabs_clsid < LARGEST_ID);
    head = &heads[it->slabs_clsid];
    tail = &tails[it->slabs_clsid];

    /* We've hit the head, pop off */
    if (it->prev == 0) {
        assert(*head == it);
        if (it->next) {
            *head = it->next;
            assert(it->next->prev == it);
            it->next->prev = 0;
        }
        return NULL; /* Done */
    }

    /* Swing ourselves in front of the next item */
    /* NB: If there is a prev, we can't be the head */
    assert(it->prev != it);
    if (it->prev) {
        if (*head == it->prev) {
            /* Prev was the head, now we're the head */
            *head = it;
        }
        if (*tail == it) {
            /* We are the tail, now they are the tail */
            *tail = it->prev;
        }
        assert(it->next != it);
        if (it->next) {
            assert(it->prev->next == it);
            it->prev->next = it->next;
            it->next->prev = it->prev;
        } else {
            /* Tail. Move this above? */
            it->prev->next = 0;
        }
        /* prev->prev's next is it->prev */
        it->next = it->prev;
        it->prev = it->next->prev;
        it->next->prev = it;
        /* New it->prev now, if we're not at the head. */
        if (it->prev) {
            it->prev->next = it;
        }
    }
    assert(it->next != it);
    assert(it->prev != it);

    return it->next; /* success */
}

/* 爬虫真正回收Item的方法
 */
static void item_crawler_evaluate(item *search, uint32_t hv, int i) {
    int slab_id = CLEAR_LRU(i);
    crawlerstats_t *s = &crawlerstats[slab_id];
    itemstats[i].crawler_items_checked++;
    if ((search->exptime != 0 && search->exptime < current_time)
        || is_flushed(search)) {
        itemstats[i].crawler_reclaimed++;
        s->reclaimed++;

        if (settings.verbose > 1) {
            int ii;
            char *key = ITEM_key(search);
            fprintf(stderr, "LRU crawler found an expired item (flags: %d, slab: %d): ",
                search->it_flags, search->slabs_clsid);
            for (ii = 0; ii < search->nkey; ++ii) {
                fprintf(stderr, "%c", key[ii]);
            }
            fprintf(stderr, "\n");
        }
        if ((search->it_flags & ITEM_FETCHED) == 0) {
            itemstats[i].expired_unfetched++;
        }
        do_item_unlink_nolock(search, hv);
        do_item_remove(search);
        assert(search->slabs_clsid == 0);
    } else {
        s->seen++;
        refcount_decr(&search->refcount);
        if (search->exptime == 0) {
            s->noexp++;
        } else if (search->exptime - current_time > 3599) {
            s->ttl_hourplus++;
        } else {
            rel_time_t ttl_remain = search->exptime - current_time;
            int bucket = ttl_remain / 60;
            s->histo[bucket]++;
        }
    }
}

源码分析

​ 下面我们来根据Memcached的关键源码来看看其LRU策略。

item *do_item_alloc(char *key, const size_t nkey, const int flags,
                    const rel_time_t exptime, const int nbytes,
                    const uint32_t cur_hv) {
    int i;
    uint8_t nsuffix;
    item *it = NULL;
    char suffix[40];
    unsigned int total_chunks;
    /*计算需要分配的空间*/
    /*nkey+1,这里的1主要是Key的分隔符*/
    size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
    if (settings.use_cas) {
        ntotal += sizeof(uint64_t);
    }

    /*该方法返回要保存该数据的SlabClass的ID*/
    unsigned int id = slabs_clsid(ntotal);
    if (id == 0)
        return 0;

    /* 在进行item内存获取的时候,Memcached会进行LRU的回收。 */
    for (i = 0; i < 5; i++) {
        /* 先试着回收内存 */
        /* lru_maintainer_thread,该值由LUR管理控制,在默认分配的时候,该值是false */
        if (!settings.lru_maintainer_thread) {
            lru_pull_tail(id, COLD_LRU, 0, false, cur_hv);
        }
        it = slabs_alloc(ntotal, id, &total_chunks);
        if (settings.expirezero_does_not_evict)
            total_chunks -= noexp_lru_size(id);
        if (it == NULL) {
            if (settings.lru_maintainer_thread) {
                lru_pull_tail(id, HOT_LRU, total_chunks, false, cur_hv);
                lru_pull_tail(id, WARM_LRU, total_chunks, false, cur_hv);
                lru_pull_tail(id, COLD_LRU, total_chunks, true, cur_hv);
            } else {
                lru_pull_tail(id, COLD_LRU, 0, true, cur_hv);
            }
        } else {
            break;
        }
    }

    if (i > 0) {
        pthread_mutex_lock(&lru_locks[id]);
        /* 直接回收 */
        itemstats[id].direct_reclaims += i;
        pthread_mutex_unlock(&lru_locks[id]);
    }

    if (it == NULL) {
        pthread_mutex_lock(&lru_locks[id]);
        itemstats[id].outofmemory++;
        pthread_mutex_unlock(&lru_locks[id]);
        return NULL;
    }

    assert(it->slabs_clsid == 0);
    //assert(it != heads[id]);

    /* Refcount is seeded to 1 by slabs_alloc() */
    it->next = it->prev = it->h_next = 0;
    /* Items are initially loaded into the HOT_LRU. This is '0' but I want at
     * least a note here. Compiler (hopefully?) optimizes this out.
     */
    if (settings.lru_maintainer_thread) {
        //Item默认进入的LRU链表的规则
        if (exptime == 0 && settings.expirezero_does_not_evict) {
            id |= NOEXP_LRU;
        } else {
            id |= HOT_LRU;
        }
    } else {
        /* There is only COLD in compat-mode */
        id |= COLD_LRU;
    }
    it->slabs_clsid = id;

    DEBUG_REFCNT(it, '*');
    it->it_flags = settings.use_cas ? ITEM_CAS : 0;
    it->nkey = nkey;
    it->nbytes = nbytes;
    memcpy(ITEM_key(it), key, nkey);
    it->exptime = exptime;
    memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
    it->nsuffix = nsuffix;
    return it;
}

​ 下面的代码是LRU链表中对各个ITEM的操作。

/* item的回收,把item加入到slots空闲链表 */
void item_free(item *it) {
    size_t ntotal = ITEM_ntotal(it);
    unsigned int clsid;
    assert((it->it_flags & ITEM_LINKED) == 0);
    assert(it != heads[it->slabs_clsid]);
    assert(it != tails[it->slabs_clsid]);
    assert(it->refcount == 0);

    clsid = ITEM_clsid(it);
    DEBUG_REFCNT(it, 'F');

    slabs_free(it, ntotal, clsid);
}

/*把Item放到LRU的头部*/
static void do_item_link_q(item *it) { /* item is the new head */
    item **head, **tail;
    assert((it->it_flags & ITEM_SLABBED) == 0);

    head = &heads[it->slabs_clsid];
    tail = &tails[it->slabs_clsid];
    assert(it != *head);
    assert((*head && *tail) || (*head == 0 && *tail == 0));
    it->prev = 0;
    it->next = *head;
    if (it->next) it->next->prev = it;
    *head = it;
    if (*tail == 0) *tail = it;
    sizes[it->slabs_clsid]++;
    return;
}

/* 加锁的do_item_link_q*/
static void item_link_q(item *it) {
    pthread_mutex_lock(&lru_locks[it->slabs_clsid]);
    do_item_link_q(it);
    pthread_mutex_unlock(&lru_locks[it->slabs_clsid]);
}

/*把Item从LRU队列中去掉*/
static void do_item_unlink_q(item *it) {
    item **head, **tail;
    head = &heads[it->slabs_clsid];
    tail = &tails[it->slabs_clsid];

    if (*head == it) {
        assert(it->prev == 0);
        *head = it->next;
    }
    if (*tail == it) {
        assert(it->next == 0);
        *tail = it->prev;
    }
    assert(it->next != it);
    assert(it->prev != it);

    if (it->next) it->next->prev = it->prev;
    if (it->prev) it->prev->next = it->next;
    sizes[it->slabs_clsid]--;
    return;
}

/* 加锁的do_item_unlink_q */
static void item_unlink_q(item *it) {
    pthread_mutex_lock(&lru_locks[it->slabs_clsid]);
    do_item_unlink_q(it);
    pthread_mutex_unlock(&lru_locks[it->slabs_clsid]);
}

/* 这个方法非常重要,是memcached中加入item的方法 */
int do_item_link(item *it, const uint32_t hv) {
    MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
    assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
    it->it_flags |= ITEM_LINKED;
    it->time = current_time;

    STATS_LOCK();
    stats.curr_bytes += ITEM_ntotal(it);
    stats.curr_items += 1;
    stats.total_items += 1;
    STATS_UNLOCK();

    /* Allocate a new CAS ID on link. */
    ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
    assoc_insert(it, hv);
    item_link_q(it);
    refcount_incr(&it->refcount);

    return 1;
}

/* 这个方法非常重要,是memcached中剔除item的方法 */
void do_item_unlink(item *it, const uint32_t hv) {
    MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
    if ((it->it_flags & ITEM_LINKED) != 0) {
        it->it_flags &= ~ITEM_LINKED;
        STATS_LOCK();
        stats.curr_bytes -= ITEM_ntotal(it);
        stats.curr_items -= 1;
        STATS_UNLOCK();
        assoc_delete(ITEM_key(it), it->nkey, hv);
        item_unlink_q(it);
        do_item_remove(it);
    }
}

/* FIXME: Is it necessary to keep this copy/pasted code? */
void do_item_unlink_nolock(item *it, const uint32_t hv) {
    MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
    if ((it->it_flags & ITEM_LINKED) != 0) {
        it->it_flags &= ~ITEM_LINKED;
        STATS_LOCK();
        stats.curr_bytes -= ITEM_ntotal(it);
        stats.curr_items -= 1;
        STATS_UNLOCK();
        assoc_delete(ITEM_key(it), it->nkey, hv);
        do_item_unlink_q(it);
        do_item_remove(it);
    }
}

/* 回收Item,只有item->refcount==1的对象才可以被回收*/
void do_item_remove(item *it) {
    MEMCACHED_ITEM_REMOVE(ITEM_key(it), it->nkey, it->nbytes);
    assert((it->it_flags & ITEM_SLABBED) == 0);
    assert(it->refcount > 0);

    if (refcount_decr(&it->refcount) == 0) {
        item_free(it);
    }
}

/*刷新了item的时间,没有加锁*/
void do_item_update_nolock(item *it) {
    MEMCACHED_ITEM_UPDATE(ITEM_key(it), it->nkey, it->nbytes);
    if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
        assert((it->it_flags & ITEM_SLABBED) == 0);

        if ((it->it_flags & ITEM_LINKED) != 0) {
            do_item_unlink_q(it);
            it->time = current_time;
            do_item_link_q(it);
        }
    }
}

/* 加锁的do_item_update */
void do_item_update(item *it) {
    MEMCACHED_ITEM_UPDATE(ITEM_key(it), it->nkey, it->nbytes);
    if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
        assert((it->it_flags & ITEM_SLABBED) == 0);

        if ((it->it_flags & ITEM_LINKED) != 0) {
            it->time = current_time;
            if (!settings.lru_maintainer_thread) {
                item_unlink_q(it);
                item_link_q(it);
            }
        }
    }
}

int do_item_replace(item *it, item *new_it, const uint32_t hv) {
    MEMCACHED_ITEM_REPLACE(ITEM_key(it), it->nkey, it->nbytes,
                           ITEM_key(new_it), new_it->nkey, new_it->nbytes);
    assert((it->it_flags & ITEM_SLABBED) == 0);

    do_item_unlink(it, hv);
    return do_item_link(new_it, hv);
}

​ 下面是LRU管理线程的工作,主要在各个LRU之间进行数据的移动和清理。

/* Will crawl all slab classes a minimum of once per hour */
#define MAX_MAINTCRAWL_WAIT 60 * 60

/* 这个方法定义了LRU爬虫的触发机制 */
static void lru_maintainer_crawler_check(void) {
    int i;
    static rel_time_t last_crawls[MAX_NUMBER_OF_SLAB_CLASSES];
    static rel_time_t next_crawl_wait[MAX_NUMBER_OF_SLAB_CLASSES];
    for (i = POWER_SMALLEST; i < MAX_NUMBER_OF_SLAB_CLASSES; i++) {
        crawlerstats_t *s = &crawlerstats[i];
        /* We've not successfully kicked off a crawl yet. */
        if (last_crawls[i] == 0) {
            /* 这一步往LRU尾部加入爬虫的Item,并设置可以进行爬虫的执行 */
            if (lru_crawler_start(i, 0) > 0) {
                last_crawls[i] = current_time;
            }
        }
        pthread_mutex_lock(&lru_crawler_stats_lock);
        /* 爬虫执行完成 */
        if (s->run_complete) {
            int x;
            /* Should we crawl again? */
            uint64_t possible_reclaims = s->seen - s->noexp;
            uint64_t available_reclaims = 0;
            /* Need to think we can free at least 1% of the items before
             * crawling. */
            /* FIXME: Configurable? */
            uint64_t low_watermark = (s->seen / 100) + 1;
            rel_time_t since_run = current_time - s->end_time;
            /* Don't bother if the payoff is too low. */
            if (settings.verbose > 1)
                fprintf(stderr, "maint crawler: low_watermark: %llu, possible_reclaims: %llu, since_run: %u\n",
                        (unsigned long long)low_watermark, (unsigned long long)possible_reclaims,
                        (unsigned int)since_run);
            for (x = 0; x < 60; x++) {
                if (since_run < (x * 60) + 60)
                    break;
                available_reclaims += s->histo[x];
            }
            if (available_reclaims > low_watermark) {
                last_crawls[i] = 0;
                if (next_crawl_wait[i] > 60)
                    next_crawl_wait[i] -= 60;
            } else if (since_run > 5 && since_run > next_crawl_wait[i]) {
                last_crawls[i] = 0;
                if (next_crawl_wait[i] < MAX_MAINTCRAWL_WAIT)
                    next_crawl_wait[i] += 60;
            }
            if (settings.verbose > 1)
                fprintf(stderr, "maint crawler: available reclaims: %llu, next_crawl: %u\n", (unsigned long long)available_reclaims, next_crawl_wait[i]);
        }
        pthread_mutex_unlock(&lru_crawler_stats_lock);
    }
}

static pthread_t lru_maintainer_tid;

#define MAX_LRU_MAINTAINER_SLEEP 1000000
#define MIN_LRU_MAINTAINER_SLEEP 1000

static void *lru_maintainer_thread(void *arg) {
    int i;
    useconds_t to_sleep = MIN_LRU_MAINTAINER_SLEEP;
    rel_time_t last_crawler_check = 0;

    pthread_mutex_lock(&lru_maintainer_lock);
    if (settings.verbose > 2)
        fprintf(stderr, "Starting LRU maintainer background thread\n");
    while (do_run_lru_maintainer_thread) {
        int did_moves = 0;
        pthread_mutex_unlock(&lru_maintainer_lock);
        usleep(to_sleep);
        pthread_mutex_lock(&lru_maintainer_lock);

        STATS_LOCK();
        stats.lru_maintainer_juggles++;
        STATS_UNLOCK();
        /* We were asked to immediately wake up and poke a particular slab
         * class due to a low watermark being hit */
        if (lru_maintainer_check_clsid != 0) {
            did_moves = lru_maintainer_juggle(lru_maintainer_check_clsid);
            lru_maintainer_check_clsid = 0;
        } else {
            for (i = POWER_SMALLEST; i < MAX_NUMBER_OF_SLAB_CLASSES; i++) {
                did_moves += lru_maintainer_juggle(i);
            }
        }
        if (did_moves == 0) {
            if (to_sleep < MAX_LRU_MAINTAINER_SLEEP)
                to_sleep += 1000;
        } else {
            to_sleep /= 2;
            if (to_sleep < MIN_LRU_MAINTAINER_SLEEP)
                to_sleep = MIN_LRU_MAINTAINER_SLEEP;
        }
        /* Once per second at most */
        if (settings.lru_crawler && last_crawler_check != current_time) {
            /* 爬虫线程的检查,满足条件会促发爬虫线程的执行 */
            lru_maintainer_crawler_check();
            last_crawler_check = current_time;
        }
    }
    pthread_mutex_unlock(&lru_maintainer_lock);
    if (settings.verbose > 2)
        fprintf(stderr, "LRU maintainer thread stopping\n");

    return NULL;
}
int stop_lru_maintainer_thread(void) {
    int ret;
    pthread_mutex_lock(&lru_maintainer_lock);
    /* LRU thread is a sleep loop, will die on its own */
    do_run_lru_maintainer_thread = 0;
    pthread_mutex_unlock(&lru_maintainer_lock);
    if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
        fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
        return -1;
    }
    /*lru_maintainer_thread控制了lru_pull_tail的策略*/
    settings.lru_maintainer_thread = false;
    return 0;
}

int start_lru_maintainer_thread(void) {
    int ret;

    pthread_mutex_lock(&lru_maintainer_lock);
    do_run_lru_maintainer_thread = 1;
    settings.lru_maintainer_thread = true;
    if ((ret = pthread_create(&lru_maintainer_tid, NULL,
        lru_maintainer_thread, NULL)) != 0) {
        fprintf(stderr, "Can't create LRU maintainer thread: %s\n",
            strerror(ret));
        pthread_mutex_unlock(&lru_maintainer_lock);
        return -1;
    }
    pthread_mutex_unlock(&lru_maintainer_lock);

    return 0;
}

/* If we hold this lock, crawler can't wake up or move */
void lru_maintainer_pause(void) {
    pthread_mutex_lock(&lru_maintainer_lock);
}

void lru_maintainer_resume(void) {
    pthread_mutex_unlock(&lru_maintainer_lock);
}

int init_lru_maintainer(void) {
    if (lru_maintainer_initialized == 0) {
        pthread_mutex_init(&lru_maintainer_lock, NULL);
        lru_maintainer_initialized = 1;
    }
    return 0;
}

​ 除了L�RU管理线程,我们还需要看看Memcached的LRU爬虫机制的实现:

/*** ITEM CRAWLER THREAD ***/

/*把爬虫的Item的加到了最末尾*/
static void crawler_link_q(item *it) { /* item is the new tail */
    item **head, **tail;
    assert(it->it_flags == 1);
    assert(it->nbytes == 0);

    head = &heads[it->slabs_clsid];
    tail = &tails[it->slabs_clsid];
    assert(*tail != 0);
    assert(it != *tail);
    assert((*head && *tail) || (*head == 0 && *tail == 0));
    it->prev = *tail;
    it->next = 0;
    if (it->prev) {
        assert(it->prev->next == 0);
        it->prev->next = it;
    }
    *tail = it;
    if (*head == 0) *head = it;
    return;
}

/*把爬虫的Item去掉*/
static void crawler_unlink_q(item *it) {
    item **head, **tail;
    head = &heads[it->slabs_clsid];
    tail = &tails[it->slabs_clsid];

    if (*head == it) {
        assert(it->prev == 0);
        *head = it->next;
    }
    if (*tail == it) {
        assert(it->next == 0);
        *tail = it->prev;
    }
    assert(it->next != it);
    assert(it->prev != it);

    if (it->next) it->next->prev = it->prev;
    if (it->prev) it->prev->next = it->next;
    return;
}

static void *item_crawler_thread(void *arg) {
    int i;
    int crawls_persleep = settings.crawls_persleep;

    pthread_mutex_lock(&lru_crawler_lock);
    if (settings.verbose > 2)
        fprintf(stderr, "Starting LRU crawler background thread\n");
    while (do_run_lru_crawler_thread) {
    pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);

    while (crawler_count) {
        item *search = NULL;
        void *hold_lock = NULL;

        for (i = POWER_SMALLEST; i < LARGEST_ID; i++) {
            if (crawlers[i].it_flags != 1) {
                continue;
            }
            pthread_mutex_lock(&lru_locks[i]);
            search = crawler_crawl_q((item *)&crawlers[i]);
            /* 满足该条件意味着爬虫已经爬到链表头或者没有剩余数*/
            if (search == NULL ||
                (crawlers[i].remaining && --crawlers[i].remaining < 1)) {
                if (settings.verbose > 2)
                    fprintf(stderr, "Nothing left to crawl for %d\n", i);
                crawlers[i].it_flags = 0;
                crawler_count--;
                /* 移除爬虫Item */
                crawler_unlink_q((item *)&crawlers[i]);
                pthread_mutex_unlock(&lru_locks[i]);
                pthread_mutex_lock(&lru_crawler_stats_lock);
                crawlerstats[CLEAR_LRU(i)].end_time = current_time;
                crawlerstats[CLEAR_LRU(i)].run_complete = true;
                pthread_mutex_unlock(&lru_crawler_stats_lock);
                continue;
            }
            uint32_t hv = hash(ITEM_key(search), search->nkey);
            /* Attempt to hash item lock the "search" item. If locked, no
             * other callers can incr the refcount
             */
            if ((hold_lock = item_trylock(hv)) == NULL) {
                pthread_mutex_unlock(&lru_locks[i]);
                continue;
            }
            /* Now see if the item is refcount locked */
            if (refcount_incr(&search->refcount) != 2) {
                refcount_decr(&search->refcount);
                if (hold_lock)
                    item_trylock_unlock(hold_lock);
                pthread_mutex_unlock(&lru_locks[i]);
                continue;
            }

            /* Frees the item or decrements the refcount. */
            /* Interface for this could improve: do the free/decr here
             * instead? */
            pthread_mutex_lock(&lru_crawler_stats_lock);
            item_crawler_evaluate(search, hv, i);
            pthread_mutex_unlock(&lru_crawler_stats_lock);

            if (hold_lock)
                item_trylock_unlock(hold_lock);
            pthread_mutex_unlock(&lru_locks[i]);

            if (crawls_persleep <= 0 && settings.lru_crawler_sleep) {
                usleep(settings.lru_crawler_sleep);
                crawls_persleep = settings.crawls_persleep;
            }
        }
    }
    if (settings.verbose > 2)
        fprintf(stderr, "LRU crawler thread sleeping\n");
    STATS_LOCK();
    stats.lru_crawler_running = false;
    STATS_UNLOCK();
    }
    pthread_mutex_unlock(&lru_crawler_lock);
    if (settings.verbose > 2)
        fprintf(stderr, "LRU crawler thread stopping\n");

    return NULL;
}

static pthread_t item_crawler_tid;

int stop_item_crawler_thread(void) {
    int ret;
    pthread_mutex_lock(&lru_crawler_lock);
    do_run_lru_crawler_thread = 0;
    pthread_cond_signal(&lru_crawler_cond);
    pthread_mutex_unlock(&lru_crawler_lock);
    if ((ret = pthread_join(item_crawler_tid, NULL)) != 0) {
        fprintf(stderr, "Failed to stop LRU crawler thread: %s\n", strerror(ret));
        return -1;
    }
    settings.lru_crawler = false;
    return 0;
}

int start_item_crawler_thread(void) {
    int ret;

    if (settings.lru_crawler)
        return -1;
    pthread_mutex_lock(&lru_crawler_lock);
    do_run_lru_crawler_thread = 1;
    settings.lru_crawler = true;
    if ((ret = pthread_create(&item_crawler_tid, NULL,
        item_crawler_thread, NULL)) != 0) {
        fprintf(stderr, "Can't create LRU crawler thread: %s\n",
            strerror(ret));
        pthread_mutex_unlock(&lru_crawler_lock);
        return -1;
    }
    pthread_mutex_unlock(&lru_crawler_lock);

    return 0;
}

/* 'remaining' is passed in so the LRU maintainer thread can scrub the whole
 * LRU every time.
 */
static int do_lru_crawler_start(uint32_t id, uint32_t remaining) {
    int i;
    uint32_t sid;
    uint32_t tocrawl[3];
    int starts = 0;
    tocrawl[0] = id | HOT_LRU;
    tocrawl[1] = id | WARM_LRU;
    tocrawl[2] = id | COLD_LRU;

    for (i = 0; i < 3; i++) {
        sid = tocrawl[i];
        pthread_mutex_lock(&lru_locks[sid]);
        if (tails[sid] != NULL) {
            if (settings.verbose > 2)
                fprintf(stderr, "Kicking LRU crawler off for LRU %d\n", sid);
            crawlers[sid].nbytes = 0;
            crawlers[sid].nkey = 0;
            crawlers[sid].it_flags = 1; /* For a crawler, this means enabled. */
            crawlers[sid].next = 0;
            crawlers[sid].prev = 0;
            crawlers[sid].time = 0;
            crawlers[sid].remaining = remaining;
            crawlers[sid].slabs_clsid = sid;
            crawler_link_q((item *)&crawlers[sid]);
            crawler_count++;
            starts++;
        }
        pthread_mutex_unlock(&lru_locks[sid]);
    }
    if (starts) {
        STATS_LOCK();
        stats.lru_crawler_running = true;
        stats.lru_crawler_starts++;
        STATS_UNLOCK();
        pthread_mutex_lock(&lru_crawler_stats_lock);
        memset(&crawlerstats[id], 0, sizeof(crawlerstats_t));
        crawlerstats[id].start_time = current_time;
        pthread_mutex_unlock(&lru_crawler_stats_lock);
    }
    return starts;
}

static int lru_crawler_start(uint32_t id, uint32_t remaining) {
    int starts;
    if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
        return 0;
    }
    starts = do_lru_crawler_start(id, remaining);
    if (starts) {
        pthread_cond_signal(&lru_crawler_cond);
    }
    pthread_mutex_unlock(&lru_crawler_lock);
    return starts;
}

/* FIXME: Split this into two functions: one to kick a crawler for a sid, and one to
 * parse the string. LRU maintainer code is generating a string to set up a
 * sid.
 * Also only clear the crawlerstats once per sid.
 */
enum crawler_result_type lru_crawler_crawl(char *slabs) {
    char *b = NULL;
    uint32_t sid = 0;
    int starts = 0;
    uint8_t tocrawl[MAX_NUMBER_OF_SLAB_CLASSES];
    if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
        return CRAWLER_RUNNING;
    }

    /* FIXME: I added this while debugging. Don't think it's needed? */
    memset(tocrawl, 0, sizeof(uint8_t) * MAX_NUMBER_OF_SLAB_CLASSES);
    if (strcmp(slabs, "all") == 0) {
        for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
            tocrawl[sid] = 1;
        }
    } else {
        for (char *p = strtok_r(slabs, ",", &b);
             p != NULL;
             p = strtok_r(NULL, ",", &b)) {

            if (!safe_strtoul(p, &sid) || sid < POWER_SMALLEST
                    || sid >= MAX_NUMBER_OF_SLAB_CLASSES-1) {
                pthread_mutex_unlock(&lru_crawler_lock);
                return CRAWLER_BADCLASS;
            }
            tocrawl[sid] = 1;
        }
    }

    for (sid = POWER_SMALLEST; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
        if (tocrawl[sid])
            starts += do_lru_crawler_start(sid, settings.lru_crawler_tocrawl);
    }
    if (starts) {
        pthread_cond_signal(&lru_crawler_cond);
        pthread_mutex_unlock(&lru_crawler_lock);
        return CRAWLER_OK;
    } else {
        pthread_mutex_unlock(&lru_crawler_lock);
        return CRAWLER_NOTSTARTED;
    }
}

/* If we hold this lock, crawler can't wake up or move */
void lru_crawler_pause(void) {
    pthread_mutex_lock(&lru_crawler_lock);
}

void lru_crawler_resume(void) {
    pthread_mutex_unlock(&lru_crawler_lock);
}

int init_lru_crawler(void) {
    if (lru_crawler_initialized == 0) {
        memset(&crawlerstats, 0, sizeof(crawlerstats_t) * MAX_NUMBER_OF_SLAB_CLASSES);
        if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) {
            fprintf(stderr, "Can't initialize lru crawler condition\n");
            return -1;
        }
        pthread_mutex_init(&lru_crawler_lock, NULL);
        lru_crawler_initialized = 1;
    }
    return 0;
}
    原文作者:吕宗胜ZJU
    原文地址: https://www.jianshu.com/p/a99ecc052756
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞