Window版本ElasticSearch同步数据库数据

我使用的ElasticSearch是2.3.3版本,同步数据库使用插件是elasticsearch-jdbc-2.3.3.0,这里请注意,针对ElasticSearch版本需要使用对应的插件.下面是插件下载地址.
https://github.com/jprante/elasticsearch-jdbc

环境变量配置

安装好ElasticSearch后,将下载的插件放到你喜欢的任意盘中(不需要安装).
然后配置环境变量,在系统变量中新增一个变量,名字自定义(路径指向你的插件地址)
《Window版本ElasticSearch同步数据库数据》
这个插件有点坑的地方就是你同步数据库的时候需要使用jdk1.8,然后同步完后可以将环境变量改成你原来的jdk版本。但是不能卸载,不然同步就会停止。

脚本文件编写

安装完JDK1.8后在任意盘创建一个文件夹,创建一个.sh后缀文件,同步执行的就是这个shell脚本文件。内容如下(贴上的代码显示不正常,就使用图片了):
《Window版本ElasticSearch同步数据库数据》
图片橘黄色部分是你新增环境变量名字。最后一行的statefile.json文件新增在这个执行文件同一路径下。代码内容如下:
{
“type” : “jdbc”,
“jdbc”: {
“schedule”: “0/20 0-59 0-23 ? * *”,
“elasticsearch.autodiscover”:true,
“elasticsearch.cluster”:”elasticsearch”,
“driver”:”com.mysql.jdbc.Driver”,
“url”:”jdbc:mysql:localhost:3306/topic_test2”,
“user”:”“,
“password”:”“,
“sql”:”select v.id as _id,v.id as id,v.name as name,unix_timestamp(v.createTime) as createTime from videoInfo v”,
“elasticsearch” : {
“host” : “10.0.1.2”,
“port” : 9300
},
“index” : “original”,
“type” : “original”
}
}
其中schedule指的是同步刷新的时间间隔,可以指定多长时间同步一次。有关schedule的参数配置可以点击下面路径查看:
http://www.cnblogs.com/lihaiming93/p/6619124.html
elasticsearch.cluster 这个会自动查找你的elasticsearch是否有集群,我做测试是没有搭集群,可以在你安装的elasticsearch2.3.3/config/elasticsearch.yml文件配置,修改这个文件,找到cluster.name,去掉前面的#号,将值改为elasticsearch。
《Window版本ElasticSearch同步数据库数据》

这里建议在写sql时把ES上的 _id与数据库id对应。这样方便搜索对应的数据。

然后填写你的数据库基本信息,我是用的mysql这里写sql语句的时间类型是个坑,最好可以转成时间戳,这样方便搜索是对时间排序。当时我需要同步的表设计时时间类型定为了varchar,同步到ES中就认定成了字符串(这里可以测试一下如果数据库是正常时间类型,同步过来是否可以正常排序)。

下面有关ES的配置就可以按照你自己的需求来填写。index和type填写后会自动为你创建,不需要你主动创建。到这里基本上已经成功配置完了。

脚本文件运行

在window下运行shell文件需要下载Git Bash。下载完成后运行.sh文件。运行后会在.sh文件路径下产生一个日志文件,如果失败可以在日志中查看问题。
到这里基本已经完成了对数据库的同步,这里要记住,运行.sh文件的窗口不能关闭,关闭就停止同步了(关闭后重新执行这个文件就可以恢复同步)。
然后这个插件坑点还有不能同步删除数据库信息(数据库删除后,ES上不能删除),能同步增改。

删除ES上冗余数据(java代码实现)

由于ES不能同步删除这个坑,我自己写了一个工具方法()用来删除ES上存在但数据库中却删除了的冗余数据(可以自己弄个定时器什么的每天删除)。
首先如果你的同步的数据库数据量比较大的话,在java中,如果使用set.size()方法的话,每次最多能搜索出1万条数据,这样对较大数据量时就显得不够用。所以在这里使用的是滚动搜索。代码如下:
这个方法是设置每次滚动搜索的数量,然后需要对id进行排序好与数据库中数据做判断。
public Map<String,Object> searchByScroll() {
Map<String,Object> map = new HashMap<String,Object>();
List<Integer> list = new ArrayList<Integer>();
String index = "original";
String type = "original";
// 搜索条件
SearchRequestBuilder searchRequestBuilder = client.prepareSearch();
searchRequestBuilder.setIndices(index);
searchRequestBuilder.setTypes(type);
searchRequestBuilder.addSort("id", SortOrder.ASC);
searchRequestBuilder.setSize(1000);//设置每次滚动搜索数量
searchRequestBuilder.setScroll(new TimeValue(30000));//设置滚动搜索有效时长
// 执行
SearchResponse searchResponse = searchRequestBuilder.get();
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();
for (SearchHit searchHit : searchHits) {
list.add(Integer.valueOf(searchHit.getSource().get("id") + ""));
}
map.put("list", list);
map.put("scrollId", scrollId);
return map;
}

根据上面方法传过来的scrollId来获取所有的数据

public List<Integer> searchByScrollId(Map<String,Object> map) {
        List<Integer> list = (List<Integer>) map.get("list");
        String scrollId = (String) map.get("scrollId");
        TimeValue timeValue = new TimeValue(10000);
        SearchScrollRequestBuilder searchScrollRequestBuilder;
        SearchResponse response;
        // 结果
        while (true) {
            searchScrollRequestBuilder = client.prepareSearchScroll(scrollId);
            // 重新设定滚动时间
            searchScrollRequestBuilder.setScroll(timeValue);
            // 请求
            response = searchScrollRequestBuilder.get();
            // 每次返回下一个批次结果 直到没有结果返回时停止 即hits数组空时
            if (response.getHits().getHits().length == 0) {
                break;
            } // if
                // 这一批次结果
            SearchHit[] searchHits = response.getHits().getHits();
            System.out.println(searchHits.length);
            for (SearchHit searchHit : searchHits) {
                list.add(Integer.valueOf(searchHit.getSource().get("id") + ""));
            } // for
                // 只有最近的滚动ID才能被使用
            scrollId = response.getScrollId();
        } // while
        clearScroll(scrollId);
        return list;
    }

这个方法就是同步删除的工具方法,需要ES和数据库数据id排序一致,原理是id循环比较,id相同就继续,不同就跳过。

public boolean dropCommon() throws UnknownHostException{
        ESUtil es = new ESUtil();
        List<String> original = new Infomation().originalInfo();//获取数据库所有数据id,需要根据id排序
        List<Integer> error = new ArrayList<Integer>();
        int size = 0;
        while(size==0){
            int index = 0;
            int original_index = 0;
            List<Integer> all = es.searchByScrollId(es.searchByScroll());//获取ES所有数据id,需要根据id排序
            for(int i=0;i<all.size();i++){
                index++;
                if(all.get(i)==Integer.parseInt(original.get(original_index))){
                    original_index++;
                }else{
                    error.add(all.get(i));//这里就是冗余数据id
                }
            }
            System.out.println(error.size());
            break;
        }
        return false;
    }

文章就到这里了,希望能对想要了解这方面的有帮助。
参考文献http://blog.csdn.net/laoyang360/article/details/51694519

    原文作者:LGHS-向阳者
    原文地址: https://blog.csdn.net/qq_28779087/article/details/72782248
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞