Memcache-哈希表-源码分析

《Memcache-哈希表-源码分析》 memcached-version-1.4.25

介绍

memcache 的 hashtable 就是一块保存 item* 固定大小的内存区域,也就是一个固定大小的指针数组,在程序启动的时候会初始化这个 hashtable 数组的大小,主要核心涉及到的点就是 hashtable动态扩容、hashtable段锁等,对应的 hashtable 处理的文件为 hash.chash.hassoc.cassoc.h

Memcache 哈希表

《Memcache-哈希表-源码分析》 memcache 哈希表

源码实现

初始化设置 hash 算法,hash_init ()

// 目前只提供2种hash算法 jenkins 和 murmur3
enum hashfunc_type {
    JENKINS_HASH=0, MURMUR3_HASH  
};
//启动memcache的时候调用
int hash_init(enum hashfunc_type type) {
    switch(type) {
        case JENKINS_HASH:
            hash = jenkins_hash; // jenkins_hash 函数指针
            settings.hash_algorithm = "jenkins";
            break;
        case MURMUR3_HASH:
            hash = MurmurHash3_x86_32;  // murmur3_hash 函数指针
            settings.hash_algorithm = "murmur3"; 
            break;
        default:
            return -1;
    }
    return 0;
}

初始化 hashtable 表,assoc_init()

settings.hashpower_init //hashtable的基准值,启动memcache的时候设定

void assoc_init(const int hashtable_init) {
    // 如果存在则赋值 hashpower 没有的话就用默认的 hashpower = 16
    if (hashtable_init) {
        hashpower = hashtable_init;
    }
    // hashsize 根据传入的 hashpower 算出最终的 hashtable 长度
    // hashsize(n) -> #define hashsize(n) ((ub4)1<<(n)) 
    // 实际上就是进行右移运算,右移 hashpower 位 ( 1 << 16 = 65536 )
    // 申请内存 item** primary_hashtable -> hashtable头指针 
    primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
    if (! primary_hashtable) {
        fprintf(stderr, "Failed to init hashtable.\n");
        exit(EXIT_FAILURE);
    }
    STATS_LOCK();
    stats.hash_power_level = hashpower;
    stats.hash_bytes = hashsize(hashpower) * sizeof(void *); // hashtable 占了多少字节内存
    STATS_UNLOCK();
}

hashtable 段锁

在 memcache 初始化线程那部分,还会初始化一些互斥锁, 其中就包括了 hashtable 的段锁,什么是段锁,就是有可能会多个key对应一把锁,而不是每一个key都对应一把锁,因为不同的key可能hash出来的值一样,那么就都会对应这一把锁。

初始化 hashtable 锁

// memcached_thread_init 函数其中一段代码
  
void memcached_thread_init(int nthreads, struct event_base *main_base) {
    int         i;
    int         power;
    
    //.........
    
    //根据线程数设置 hashtable 锁的启动值 power 跟上面的 hashpower 同理
    if (nthreads < 3) {
        power = 10;
    } else if (nthreads < 4) {
        power = 11;
    } else if (nthreads < 5) {
        power = 12;
    } else {
        /* 8192 buckets, and central locks don't scale much past 5 threads */
        power = 13;
    }
    // power 小于 hashpower 因为 hashtable锁的数量 和 hashtable的数量 并不是一一对应的
    // 也就是说并不是 每一个hashtable的key都对应一把锁,memcache为了省内存,采用多个key
    // 对应一把锁,也就是段锁
    if (power >= hashpower) {
        fprintf(stderr, "Hash table power size (%d) cannot be equal to or less than item lock table (%d)\n", hashpower, power);
        fprintf(stderr, "Item lock table grows with `-t N` (worker threadcount)\n");
        fprintf(stderr, "Hash table grows with `-o hashpower=N` \n");
        exit(1);
    }

    //计算hashtable锁的长度, 默认4个线程 power = 12 、 1 << 12 = 4096/锁
    item_lock_count = hashsize(power); 
    item_lock_hashpower = power;
    
    //申请锁
    //之后这个item_locks锁并不会随着hashtable的扩容而扩容
    //也就是说无论之后hashtable变成多大,都会对应这 hashsize(power) -> 4096/锁 
    //这也会导致越来越多的key对应一把锁
    item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
    if (! item_locks) {
        perror("Can't allocate item locks");
        exit(1);
    }
    //循环初始化item_locks锁
    for (i = 0; i < item_lock_count; i++) {
        pthread_mutex_init(&item_locks[i], NULL);
    }
    
    //.........
}

至此通过上面的代码已经将 memcache hashtable 初始化完毕

插入 、 扩容 hashtable

memcache 的 hashtable 扩容处理的方式是,在程序启动阶段的时候开一个线程处于待命状态,当需要扩容的时候触发该线程,动态的进行扩容处理,而且在扩容操作的时候也不是表锁,而是利用上面说的段锁

扩容流程:

在 hashtable 扩容的时候 memcache 会把当前 primary_hashtable (哈希表)复制一份给 old_hashtable(哈希表),然后对 primary_hashtable 进行扩容 (1 << hashpower + 1),如果当前正处于 hashtable 扩容阶段, 同时有请求要访问 key,则会判断当前的 key – hash 之后的索引位置是否小于当前迁移的索引位置,如果小于则代表已经迁移到新的 primary_hashtable 的索引位置了,如果大于则代表还未迁移到则在新的 primary_hashtable,所以就还在老的 old_hashtable 位置操作,扩容完成之后会释放 old_hashtable,然后就全部都在 primary_hashtable 操作了。

说明:

在迁移 hashtable 的时候,会从小到大一个一个索引位置进行迁移,而这个索引位置就是hash之后的值,所以在迁移每一个索引位置的时候都会对当前的索引位置加锁,这个锁用的就是上面说的段锁,但可能锁的数量有限,会出现很多索引位置共用同一个锁的情况。
例如:
0 & (4096-1) = 0 、4096 & (4096-1) = 0 、8192 & (4096-1) = 0 这几个hash索引位置都会用到 0 这把锁
如果这个时候一个key正好要访问,同时这个key-hash之后的值正好跟我们迁移的这个索引位置对应则会堵塞,因为已经加锁,其他的key如果不命中我们正在迁移的这个位置则正常访问,至于访问 primary_hashtable 还是 old_hashtable 则依据上面(扩容流程)说的条件。

插入 hashtable 函数, assoc_insert()

// hv = hash(key, nkey); 哈希值

int assoc_insert(item *it, const uint32_t hv) {
    unsigned int oldbucket;
    
    //判断是否正在扩容
    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        //插入到hashtable
        it->h_next = old_hashtable[oldbucket];
        old_hashtable[oldbucket] = it;
    } else {
       //插入到hashtable
       it->h_next = primary_hashtable[hv & hashmask(hashpower)];
       primary_hashtable[hv & hashmask(hashpower)] = it;
    }
    
    // 加锁,防止多个线程同时触发扩容操作
    pthread_mutex_lock(&hash_items_counter_lock);
    hash_items++;
    // 判断是否需要扩容
    if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
        //触发扩容操作
        assoc_start_expand();
    }
    pthread_mutex_unlock(&hash_items_counter_lock);

    MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
    return 1;
}

触发扩容操作 , assoc_start_expand()

static void assoc_start_expand(void) {
    if (started_expanding)
        return;

    started_expanding = true;
    // 唤醒 hashtable 扩容线程
    pthread_cond_signal(&maintenance_cond);
}

hashtable 扩容线程函数,assoc_maintenance_thread()

// 默认会堵塞在这个位置 pthread_cond_wait(&maintenance_cond, &maintenance_lock);
// 直到上面 pthread_cond_signal 唤醒操作, 然后往下执行.

static void *assoc_maintenance_thread(void *arg) {

    mutex_lock(&maintenance_lock);
    
    // 死循环
    while (do_run_maintenance_thread) {
        int ii = 0;

        /* 循环迁移 old_hashtable ->  primary_hashtable */
        for (ii = 0; ii < hash_bulk_move && expanding; ++ii) {
            item *it, *next;
            int bucket;
            void *item_lock = NULL;
            
            // 把当前正在迁移的索引位置加锁
            if ((item_lock = item_trylock(expand_bucket))) {
                    // 循环处理当前索引位置的所有item
                    // 因为在当前索引位置可能会存在多个item
                    // 就是hash冲突链式解决
                    for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
                        // 获取下一个item
                        next = it->h_next;
                        // 按照新的hashtable长度重新定位一个索引位置
                        bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
                        // 赋值保存
                        it->h_next = primary_hashtable[bucket];
                        primary_hashtable[bucket] = it;
                    }
                    // 当前的索引位置item都迁移完毕之后,将之前的hashtable索引位置至空
                    old_hashtable[expand_bucket] = NULL;
                    // 更新当前索引位置
                    expand_bucket++;
                    // 判断是否全部迁移完毕
                    if (expand_bucket == hashsize(hashpower - 1)) {
                        expanding = false; // 关闭正在扩容hashtable状态
                        free(old_hashtable); // 释放 old_hashtable
                        STATS_LOCK();
                        stats.hash_bytes -= hashsize(hashpower - 1) * sizeof(void *);
                        stats.hash_is_expanding = 0;
                        STATS_UNLOCK();
                        if (settings.verbose > 1)
                            fprintf(stderr, "Hash table expansion done\n");
                    }

            } else {
                // 如果加锁失败,可能存在别的线程正在操作hashtable当前索引位置的item
                // 所以延时等待,直到抢到锁为止
                usleep(10*1000);
            }
            // 释放锁
            if (item_lock) {
                item_trylock_unlock(item_lock);
                item_lock = NULL;
            }
        }

        if (!expanding) {
            /* We are done expanding.. just wait for next invocation */
            started_expanding = false;
            // 等待唤醒
            pthread_cond_wait(&maintenance_cond, &maintenance_lock);
            pause_threads(PAUSE_ALL_THREADS);
            // 扩容 hashtable 操作
            assoc_expand();
            //往下执行while循环
            pause_threads(RESUME_ALL_THREADS);
        }
    }
    return NULL;
}

扩容 hashtable 函数,assoc_expand()

static void assoc_expand(void) {
    // 保存一份当前的 hashtable
    old_hashtable = primary_hashtable;
    
    // 扩容,每次扩容 hashpower + 1 
    // (1 << 16) = 65536、(1 << 17) = 131072
    primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
    if (primary_hashtable) {
        if (settings.verbose > 1)
            fprintf(stderr, "Hash table expansion starting\n");
        hashpower++; // hashpower++
        expanding = true; // 表示当前正在扩容
        expand_bucket = 0; // 当前迁移到primary_hashtable索引位置
        STATS_LOCK();
        stats.hash_power_level = hashpower;
        stats.hash_bytes += hashsize(hashpower) * sizeof(void *);
        stats.hash_is_expanding = 1;
        STATS_UNLOCK();
    } else {
        primary_hashtable = old_hashtable;
        /* Bad news, but we can keep running. */
    }
}

查找 hashtable 函数,assoc_find()

item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
    item *it;
    unsigned int oldbucket;
    
    // 是否正处于扩容操作
    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        // 获取item
        it = old_hashtable[oldbucket];
    } else {
        // 获取item
        it = primary_hashtable[hv & hashmask(hashpower)];
    }

    item *ret = NULL;
    int depth = 0;
    //循环判断,因为hash冲突采用链式解决法,所以需要遍历这个链,找到key相等的
    while (it) {
        if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
            ret = it;
            break;
        }
        it = it->h_next;
        ++depth;
    }
    MEMCACHED_ASSOC_FIND(key, nkey, depth);
    return ret;
}

结束

以上就是 Memcache 的哈希表以及对应的操作,由于是多线程模型,所以无论是 (插入 、查找、 扩容) 都会对当前操作的key对应的索引位置加锁 item_locks,所以随着 hashtable 不断的扩大,锁的争抢也会越来越大,性能可能也会存在一些影响.

点赞