基于mongodb oplog的一点思考

什么是oplog

capped collection: 一个固定大小的集合, 当集合大小达到最大值时, 老的内容会被新的自动覆盖.

oplog正是一种特殊的 capped collection, 用于记录mongodb的全部操作. 对于一个复制集来说, 所有的写操作作用于主节点(primary), 而oplog记录这些操作, secondary member同步读取oplog并应用其上的改变来做到与主节点同步.

关于oplog的更多详细介绍这里并不会多说(因为我也没仔细研究过.~). 可以阅读官方文档以了解更多.

oplog的结构

一条oplog的数据内容如下:

{
    "ts" : Timestamp(1466585597, 1),
    "h" : NumberLong("-9079402863358495470"),
    "v" : 2,
    "op" : "u",
    "ns" : "test.user",
    "o2" : {
        "_id" : ObjectId("576a51c7e8696f07b2b0b272")
    },
    "o" : {
        "_id" : ObjectId("576a51c7e8696f07b2b0b272"),
        "name" : "mikewang"
    }
}

其中:

  • ts: 操作发生的时间
  • h: 记录的唯一ID
  • v: 版本信息
  • op: 写操作的类型
    • n: no-op
    • c: db cmd
    • i: insert
    • u: update
    • d: delete
  • ns: 操作的namespace,即: 数据库.集合
  • o: 操作所对应的文档
  • o2: 更新时所对应的where条件,更新时才有

oplog能干嘛

对于后端开发来讲,如何提高系统的吞吐量,缩短系统的响应时间是进阶的关键(不然你将一直停留在CURD的水平).我们知道在一种常见的解决方案就是缓存策略: 以空间换时间.这是一种很有效的方案,因为硬件成本在不断降低,64G/128G内存的服务器已经不是什么高配了.

但是缓存策略有一个常见的问题:缓存过期.对于静态资源一般很少更新,而且即使更新我们完全可控,只要在更新时去更新缓存中的内容即可.但是对于一些常见的用户资料类的资源,由于更新操作较多,我们需要在每一处涉及更新操作的代码处做响应的缓存过期处理(比较简单的方式就是删除缓存键).这样代码比较混乱,业务逻辑代码与缓存操作混杂,且容易出现过期的缓存内容(哪处的修改未写到缓存中).

因此为了处理这种问题,我们需要在统一的地方,把对DB的操作同步到缓存中,与mongodb复制集同步十分类似,因此可以利用oplog来做缓存同步.

另外我们还可以将DB的操作同步到elasticsearch中,这样我们就可以利用elasticsearch强大的分布式搜索能力,来提高站内搜索能力.

oplog的小实验

这里我会利用nodejsmongo-oplog来读取oplog, 并将内容同步到elasticsearch中.

首先需要有一个复制集架构,这里假设你应该能够配置(不会的可以参考我的文章).
并且需要安装了elasticsearch.

var MongoOplog = require('mongo-oplog');
var oplog = MongoOplog('mongodb://127.0.0.1:30001/local').tail();
var producer = require("./producer.js");

oplog.on('op', function (data) {
  console.log(data);
});

oplog.on('insert', function (doc) {
  console.log(doc.op);
  var data = {}
  ns = doc.ns.split('.');
  data.index = ns[0];
  data.type = ns[1];
  data.body = replaceWithId(doc.o);
  console.log(data);
  producer.post(data);
});

oplog.on('update', function (doc) {
  console.log(doc.op);
});

oplog.on('delete', function (doc) {
  console.log(doc.op._id);
});

oplog.on('error', function (error) {
  console.log(error);
});

oplog.on('end', function () {
  console.log('Stream ended');
});

oplog.stop(function () {
  console.log('server stopped');
});

var replaceWithId = function(json) {
    json.id = json._id
    delete json._id;
    return json;
}

这里有几点需要注意:

  1. mongodb的连接必须指定的是复制集的local database
  2. 读取oplog类型于unix命令 tail -f
  3. mongo-oplog可以过滤oplog的具体操作

集成elasticsearch:

var elasticsearch = require('elasticsearch');

var client = new elasticsearch.Client({
  host: 'localhost:9200',
  log: 'trace'
});


exports.post = function(data) {
    console.log(data);
    client.create(data, function (error, response){
        console.log(response);
    });
};

我们知道mongodb中对于每一条记录都有一个_id的唯一标识, 但是这个字段却与elasticsearch中的_id冲突,因为我们在存储的时候需要把mongodb的_id转化成别的字段.

关于elasticsearch的一些概念我会在接下来的文章中做一些介绍.

在同步的时候考虑到系统的处理能力,我们还需要一个MQ(消息队列),producer用于将oplog的操作写入到MQ, consumer用于将MQ中的内容同步到elasticsearch中.

文中的知识比较浅显,因为很多内容需要很深入的研究才能应用到具体的生产环境中,但至少也是一种可能的解决方案.当然你说你用的是MySQL, 没有oplog,不用担心,binlog一样可以帮助到你.

PS:快忘了上一次是什么时候写了,好惭愧啊, ~

    原文作者:痕无落
    原文地址: https://www.jianshu.com/p/be90c9f05e4d
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞