mmkv框架源码浅析(上)

这部分主要是分析微信开源的mmkv框架,它的介绍在这里github,大概看了下设计原理,比较感觉兴趣的是以多进程的形式操作共享内存(最快的),还有文件锁的改造。之所有分析它,一方面代码量小,可以花两天的样子弄明白,踩坑不知,当然会多思考下为什么这么设计;另一方面,以往自己的相关经验只停留在多线程,虽然用过共享内存,但只是单进程处理为了热更新重启时数据不丢失,并不是多个进程操作一份数据。还有文件锁,之前分析过leveldb部分源码实现时,了解到里面使用文件锁来防止启动多个进程,却没有更深入的使用过文件锁,这里会学到怎么使用和改造它。

这两个是比较感觉兴趣的,当然从这两点,可以延伸到各种已知的同步互斥的特点和效率,以及适用场合,另一方面怎么根据业务对现有的实现代码去改造,这才是重要的。主要会分析这两部分的源码,其他使用方法和一些utils实现如可变整型大小节省内存空间(leveldb中Varint)可以参考。

之前看过一些开源的实现,最大的感觉是分析到自己从不知道的实现设计,比如互斥锁的用在多进程里,一直以为只能用在线程间。使用哪种锁的时候,需要考虑健壮性,性能,是否递归加锁引起的死锁和性能问题,锁的升级降级问题等,还有些属性设置,可能当时的业务仅仅是停留在最基本的使用层面,知道锁的实现原理和区别,但灵活应用到其他方面,也许是更完美的解决方案,但可能想不到这个,这可能是欠缺的经验。

这里会涉及到一些其他的知识点(并不会详细解释基础知识,粗略列举下),如进程间通信,同步互斥,文件结构及文件锁,怎么设计可重入锁(线程级的),死锁的调试及定位等,可能会填上之前说的优化锁的读写性能第七小节(多线程偏底层的性能优化思考)。

以下是大概的小点:
一) 进程线程的同步互斥
二) leveldb中使用mutex和cond条件变量高效实现读写
三) 设计一个可重入锁
四) 进程线程间的通信方式
五) 文件结构和文件锁
六) leveldb中怎么防止同一个进程被启动多次
七) 改造优化文件锁
八) 使用文件锁来调度多进程通过共享内存协作

其中一是原理介绍及使用场合,相关源码可以参考之间写的内容,二三是应用,当然是分析leveldb源码,这里记录下;四是原理介绍,进程间通信用的最多的是tcp或者本机域套接字,共享内存,以及多线程的pipe,那些通过stl加锁和条件变量写的消息队列不算在里面;另外sendfile这个零拷贝可以研究下;五是文件锁介绍,六七是应用;八是mmkv的运用,结合前面介绍的技术。嗯,写完这些,差不多又回顾巩固了下知识面。

当然,底层方面的知识是比较难的,需要掌握的知识较多和深入,且有很多坑要踩。

一) 进程线程的同步互斥

讲真,这些年仅工作方面,线程间用的最多的是mutex和条件变量,spinlock。原子指令和读写锁用过也没几次,项目中dpdk那边有些接口实现用到,其他的零经验;进程间的没有相关经验,文件锁知道些,信号量零使用,包括mutex用于进程间的情况(linux 多线程编程书上第二章节有些内容也可以看看)。

一般使用mutex是通过RAII方式,这样就不会忘记解锁,另外一些对性能要求较高的场景,需要好好选用哪种锁,mutex加锁的速度很快,看过ibm的资料貌似是25ns左右,有竞争存在就不止了。“mutex获取锁分为两阶段,第一阶段在用户态采用spinlock锁总线的方式获取一次锁,如果成功立即返回;否则进入第二阶段,调用系统的futex锁去sleep,当锁可用后被唤醒,继续竞争锁。”另外brpc对相关数据结构做了优化[不同的读之间没有竞争,高度并发;如果没有写,读总是能无竞争地获取和释放thread-local锁,一般小于25ns,对延时基本无影响。如果有写,由于其临界区极小(拿到立刻释放),读在大部分时候仍能快速地获得锁,少数时候释放锁时可能有唤醒写线程的代价。由于写本身就是少数情况,读整体上几乎不会碰到竞争锁。]。

还是觉得好好设计算法和数据结构,根据自己的业务特点拆分,框架设计好,如果为了点性能去重新造轮子,一方面可能引入风险和成本,另一方面要验证正确性和稳定性,花更多的时间去调试。

struct mutex 
{   
    /* 1: unlocked, 0: locked, negative: locked, possible waiters */
    atomic_t count;
    spinlock_t wait_lock;
    struct list_head    wait_list;
}

通过文件锁来同步进程在下面小节中介绍,这里重点介绍下mutex用于进程间,截取下phxqueue中的部分代码,这里是rwlock,如下:

140         pthread_rwlockattr_t attr;
141 
142         if (0 != pthread_rwlockattr_init(&attr)) {
144             return comm::RetCode::RET_ERR_SYS;
145         }
146         if (0 != pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) {
148             return comm::RetCode::RET_ERR_SYS;
149         }
150 
151         int shm_id = shm_open(lockpath.c_str(), O_RDWR|O_CREAT, 0666);
152         if (shm_id < 0) {
154             return comm::RetCode::RET_ERR_SYS;
155         }
156         ftruncate(shm_id, sizeof(pthread_rwlock_t));
157 
158         impl_->rwlock = (pthread_rwlock_t *)mmap(NULL, sizeof(pthread_rwlock_t),
159                                                  PROT_READ|PROT_WRITE, MAP_SHARED, shm_id, 0);
160         if (MAP_FAILED == impl_->rwlock) {
162             return comm::RetCode::RET_ERR_SYS;
163         }
164 
165         if (0 != pthread_rwlock_init(impl_->rwlock, &attr)) {
167             return comm::RetCode::RET_ERR_SYS;
168         }

通过设置PTHREAD_PROCESS_SHARED属性,并在共享内存中建立它,就达到了同步的手段,但是这里是有一些问题的,比如说到的robust,“说到进程锁第一个想到的就是 pthread 库的 pthread_mutex,创建于共享内存的 pthread_mutex 是可以用作进程锁的,然而 Android 版的 pthread_mutex 并不保证robust,亦即对 pthread_mutex 加了锁的进程被 kill,系统不会进行清理工作,这个锁会一直存在下去,那么其他等锁的进程就会永远饿死。其他的 IPC 组件,例如信号量、条件变量,也有同样问题。”[这个属性PTHREAD_MUTEX_ERRORCHECK_NP可以检测是否重复加锁]

基于共享内存的进程间的mutex,当持有mutex锁的一方挂了就会造成另外的进程无法获取锁,通过指定属性PTHREAD_MUTEX_ROBUST即可解决这种情况,但需要较高版本的GCC,但对于线程间的这种情况,可以在lock的时候检查下是否为EOWNERDEAD并清除。

通过demo多进程和多线程两种情况,来测试以上说到的问题是否可行。情况一先后启动两个进程,第一个进程负责初始化共享内存的mutex锁,并加锁sleep一段时间,第二个进程尝试加锁等逻辑,包括kill第一个进程,观察不加PTHREAD_MUTEX_ROBUST属性时的效果。简化起见不涉及谁先初始化等情况,这儿的细节可以参考下面的链接,也不贴示例代码,也就几行和上面代码差不多。。情况二是多线程,需要处理EOWNERDEAD错误等。

结论:
对于多进程,在不加PTHREAD_MUTEX_ROBUST时,第一个进程加锁时被kill,第二个进程加锁是死锁,被杀的进程没办法再释放锁。
加PTHREAD_MUTEX_ROBUST时,没问题,这里为了看清测试打印的信息,先trylock捕获错误信息为“lock failed file exists”,之后在用lock阻塞。多线程这个就没有测试。

这里有一篇不错的文章 关于在 Linux 下多个不相干的进程互斥访问同一片共享内存的问题

二) leveldb中使用mutex和cond条件变量高效实现读写

当时看到这段代码有点不明白,后来经过查资料,再回去看代码和调试,发现这儿的设计很巧妙,可以自行分析代码和资料。
部分代码如下:

1203 Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
1204   Writer w(&mutex_);
1205   w.batch = my_batch;
1206   w.sync = options.sync;
1207   w.done = false;
1208 
1209   MutexLock l(&mutex_);
1210   writers_.push_back(&w);
1211   while (!w.done && &w != writers_.front()) {
1212     w.cv.Wait();
1213   }
1214   if (w.done) {
1215     return w.status;
1216   }
1217     //more code...
1231     {
1232       mutex_.Unlock();

这里通过std::mutex和std::condition_variable把要写的线程push到队列中,然后根据两个条件,判断是否挂在条件变量的等待队列中。被唤醒后判断是否已完成,因为这里可能由其他写线程帮忙完成了这个任务,即生产者线程可能充当消费者角色,虽然写这块看似是多线程,实际是串行化了。写流程的剩下的逻辑可以参考源码分析,包括什么时候unlock,什么时候又lock,修改共享资源等。

然后读和写之间的同步,主要看读时涉及到的几个数据结构,哪几个会被写线程操作,用了什么手段来避免竞争以及如何做到高效,以下是读的实现代码:

1117 Status DBImpl::Get(const ReadOptions& options,
1118                    const Slice& key,
1119                    std::string* value) {
1120   Status s;
1121   MutexLock l(&mutex_);
1122   SequenceNumber snapshot;
1123   if (options.snapshot != nullptr) {
1124     snapshot =
1125         static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
1126   } else {
1127     snapshot = versions_->LastSequence();
1128   }
1129 
1130   MemTable* mem = mem_;
1131   MemTable* imm = imm_;
1132   Version* current = versions_->current();
1133   mem->Ref();
1134   if (imm != nullptr) imm->Ref();
1135   current->Ref();
1136 
1137   bool have_stat_update = false;
1138   Version::GetStats stats;
1140   // Unlock while reading from files and memtables
1141   {
1142     mutex_.Unlock();
1143     // First look in the memtable, then in the immutable memtable (if any).
1144     LookupKey lkey(key, snapshot);
1145     if (mem->Get(lkey, value, &s)) {
1146       // Done
1147     } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
1148       // Done
1149     } else {
1150       s = current->Get(options, lkey, value, &stats);
1151       have_stat_update = true;
1152     }
1153     mutex_.Lock();
1154   }
1155 
1156   if (have_stat_update && current->UpdateStats(stats)) {
1157     MaybeScheduleCompaction();
1158   }
1159   mem->Unref();
1160   if (imm != nullptr) imm->Unref();
1161   current->Unref();
1162   return s;
1163 }

除了imm是const类型的,不会被修改等,其他涉及到的mem和sstable文件可能会存在竞争,其中使用了引用计数和原子操作。以当前snapshot快照查找mem。以上分析的比较粗略,只是介绍跟主题相关的一些不错的设计点,具体还是以参考源码为主。(也准备详细分析下leveldb源码)

三) 设计一个可重入锁

可重入锁也叫递归锁,即一个线程获取锁a,然后调用某个函数,又对a加锁,此时不会造成死锁,如果其他线程尝试加锁a则会等待锁可用,其实有些书籍和资料不建议使用递归锁。

一般递归锁有个计数器和线程id,加锁时判断id是否相同,相同直接计数加1,不同则加锁,加锁成功则计数,否则等待。
这里摘取dpdk中的spinlock recursive,部分代码如下:

 55 /**
 56  * The rte_spinlock_t type.
 57  */
 58 typedef struct {
 59     volatile int locked; /**< lock status 0 = unlocked, 1 = locked */
 60 } rte_spinlock_t;

203 typedef struct {
204     rte_spinlock_t sl; /**< the actual spinlock */
205     volatile int user; /**< core id using lock, -1 for unused */
206     volatile int count; /**< count of time this lock has been called */
207 } rte_spinlock_recursive_t;

220 static inline void rte_spinlock_recursive_init(rte_spinlock_recursive_t *slr)
221 {
222     rte_spinlock_init(&slr->sl);
223     slr->user = -1;
224     slr->count = 0;
225 }

233 static inline void rte_spinlock_recursive_lock(rte_spinlock_recursive_t *slr)
234 {
235     int id = rte_gettid();
236 
237     if (slr->user != id) {
238         rte_spinlock_lock(&slr->sl);
239         slr->user = id; //获取锁成功设置线程id并计数
240     }
241     slr->count++;
242 }

249 static inline void rte_spinlock_recursive_unlock(rte_spinlock_recursive_t *slr)
250 {
251     if (--(slr->count) == 0) {
252         slr->user = -1;
253         rte_spinlock_unlock(&slr->sl);
254     }
255 
256 }

266 static inline int rte_spinlock_recursive_trylock(rte_spinlock_recursive_t *slr)
267 {   
268     int id = rte_gettid();
269     
270     if (slr->user != id) {
271         if (rte_spinlock_trylock(&slr->sl) == 0)
272             return 0;
273         slr->user = id;
274     }
275     slr->count++;
276     return 1;
277 }

以上的代码实现也比较容易,就不分析了。

四) 进程线程间的通信方式

本小节主要粗略介绍几种使用过的通信方式,线程间一般使用pipe来间接的通知另一个线程,一般的都是鉴听某个文件描述符可读,再加上epoll/select网络框架,这样在某个线程要通知另一个线程时,以skynet框架里的实现为例:

 331     int fd[2];
 337     if (pipe(fd)) {
 341     }
 342     if (sp_add(efd, fd[0], NULL)) {   //注册读事件
 349     }
 353     ss->recvctrl_fd = fd[0];
 354     ss->sendctrl_fd = fd[1];
         //发送命令字
1462     ssize_t n = write(ss->sendctrl_fd, &request->header[6], len+2);
         //有事件发生,可读
1008     retval = select(ss->recvctrl_fd+1, &ss->rfds, NULL, NULL, &tv); 

这块还是比较简单,虽然数据要经过内核且拷贝两次,但就一个字节,通常是命令,我在多个开源框架中都看到这种实现,当然还有那种生产者消费者队列,但感觉不是一回事,关注点不一样。线程间通信的其他方式见过的少,也没有相关经验。

进程间的通信方式挺多的,如上面的pipe,系统消息队列等,但用过的就mmap共享内存和套接字,其他的没有相关经验。

跨机器使用tcp,具体使用方式不多介绍。同一台机器上一般是使用共享内存或者不经过协议栈的域套接字。但通过共享内存就需要某种形式的同步,如上文介绍的,而域套接字就不需要,也不需要加报头,校验,没有确认号等操作。

关于pipe,共享内存,tcp等实现原理和注意点,可以自己查一下资料,这块内容比较深入。

以上大部分是文字介绍,实际代码和使用中踩过的坑,还是以工作经历及总结为主,不过多说明。还有一块就是fork和锁的问题,这块内容在网上资料和书上见过,实际中都是独立的进程,也不会在多线程中fork进程做事情,不过多介绍。下篇是mmkv主题。

下周要去深圳出差一个月多,估计没什么时间更新博客,准备把mmkv分析完暂时停一段时间。去那边还是跟这边节奏一样,开发测试协调…

以下是一些参考
MMKV github
MMKV for Android 多进程设计与实现
微信终端跨平台组件 mars 系列(一):高性能日志模块xlog
Mutex 源码
认真分析mmap:是什么 为什么 怎么用
谨防fork与锁之间的深坑

    原文作者:fooboo
    原文地址: https://www.jianshu.com/p/12506cf67e7b
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞