可定制的 elasticsearch 数据导入东西:mysql_2_elasticsearch

近来为了es导库的题目,费了一些周折。于是乎做了一个小东西(用过npm的一些jdbc的导库东西,觉得还不够好用),这里举一反三,自荐一下下,迎接同志兄弟吐槽和介入,我会不定时的更新这个扩大。

A customizable importer from mysql to elasticsearch.

可定制的 elasticsearch 数据导入东西 ——基于 elasticsearch 的 JS API

主要功能

  1. 完全运用 JS 完成数据从 MySQL 到 elasticsearch 的迁徙;

  2. 可批量导入多张 MySQL 表;

  3. 可自定义的数据迁徙的划定规矩(数据表/字段关联、字段过滤、运用正则举行非常处置惩罚);

  4. 可自定义的异步分片导入体式格局,数据导入效力更高。

一键装置

npm install mysql_2_elasticsearch

疾速最先(简单用例)

var esMysqlRiver = require('mysql_2_elasticsearch');

var river_config = {
  mysql: {
    host: '127.0.0.1',
    user: 'root',
    password: 'root',
    database: 'users',
    port: 3306
  },
  elasticsearch: {
    host_config: {               // es客户端的设置参数
      host: 'localhost:9200',
      // log: 'trace'
    },
    index: 'myIndex'
  },
  riverMap: {
    'users => users': {}         // 将数据表 users 导入到 es 范例: /myIndex/users
  }
};


/*
** 以下代码内容:
** 经由过程 esMysqlRiver 要领举行数据传输,要领的回调参数(一个JSON对象) obj 包括此次数据传输的效果
** 个中:
** 1. obj.total    => 须要传输的数据表数目
** 2. obj.success  => 传输胜利的数据表数目
** 3. obj.failed   => 传输失利的数据表数目
** 4. obj.result   => 本次数据传输的结论
*/

esMysqlRiver(river_config, function(obj) {
  /* 将传输效果打印到终端 */
  console.log('\n---------------------------------');
  console.log('总传送:' + obj.total + '项');
  console.log('胜利:' + obj.success + '项');
  console.log('失利:' + obj.failed + '项');
  if (obj.result == 'success') {
    console.log('\n结论:悉数数据传送完成!');
  } else {
    console.log('\n结论:传送未胜利...');
  }
  console.log('---------------------------------');
  /* 将传输效果打印到终端 */
});

最好实践(完全用例)

var esMysqlRiver = require('mysql_2_elasticsearch');

/*
** mysql_2_elasticsearch 的相干参数设置(概况见解释)
*/

var river_config = {

  /* [必须] MySQL数据库的相干参数(依据实际状况举行修正) */
  mysql: {
    host: '127.0.0.1',
    user: 'root',
    password: 'root',
    database: 'users',
    port: 3306
  },

  /* [必须] es 相干参数(依据实际状况举行修正) */
  elasticsearch: {
    host_config: {               // [必须] host_config 即 es客户端的设置参数,细致设置参考 es官方文档
      host: 'localhost:9200',
      log: 'trace',
      // Other options...
    },
    index: 'myIndex',            // [必须] es 索引名
    chunkSize: 8000,             // [非必须] 单分片最大数据量,默以为 5000 (条数据)
    timeout: '2m'                // [非必须] 单次分片要求的超时时候,默以为 1m
    //(注重:此 timeout 并不是es客户端要求的timeout,后者请在 host_config 中设置)
  },

  /* [必须] 数据传送的划定规矩 */
  riverMap: {
    'users => users': {            // [必须] 'a => b' 示意将 mysql数据库中名为 'a' 的 table 的一切数据 输送到 es中名为 'b' 的 type 中去
      filter_out: [                // [非必须] 须要过滤的字段名,即 filter_out 中的设置的一切字段将不会被导入 elasticsearch 的数据中
        'password',
        'age'
      ],
      exception_handler: {           // [非必须] 非常处置惩罚器,运用JS正则表达式处置惩罚非常数据,防止 es 入库时因为范例不合法形成数据缺失
        'birthday': [                // [示例] 对 users 表的 birthday 字段的非常数据举行处置惩罚
          {
            match: /NaN/gi,          // [示例] 正则前提(此例婚配字段值为 "NaN" 的状况)
            writeAs: null            // [示例] 将 "NaN" 重写为 null
          },
          {
            match: /(\d{4})年/gi,    // [示例] 正则表达式(此例婚配字段值为形如 "2016年" 的状况)
            writeAs: '$1.1'          // [示例] 将 "2015年" 款式的数据重写为 "2016.1" 款式的数据
          }
        ]
      }
    },
    // Other fields' options...
  }

};


/*
** 将传输效果打印到终端
*/

esMysqlRiver(river_config, function(obj) {
  console.log('\n---------------------------------');
  console.log('总传送:' + obj.total + '项');
  console.log('胜利:' + obj.success + '项');
  console.log('失利:' + obj.failed + '项');
  if (obj.result == 'success') {
    console.log('\n结论:悉数数据传送完成!');
  } else {
    console.log('\n结论:传送未胜利...');
  }
  console.log('---------------------------------');
});

注重事项及参考

  1. elasticsearch数据导入前请先设置好数据的 mapping;

  2. host_config 更多参数设置详见 es官方API文档

  3. mysql 表的自增 id 自动替换为 表名+_id 的花样,如:users_id

  4. 如涌现数据缺失状况,请注重检察 elasticsearch 终端历程或日记,找出未胜利导入的数据,经由过程设置 exception_handler 参数处置惩罚它。

github 项目地点

https://github.com/parksben/m…

    原文作者:parksben
    原文地址: https://segmentfault.com/a/1190000007792941
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞