MySQL数据库数据导入ES中

需要安装       

       mysql数据导入ES中,用到的有logstash,ES,mysql的连接包。链接:https://pan.baidu.com/s/1xopMMUtPir12zrQfYcwBZg 
提取码:me8g。这里的logstash和ES我用的版本是6.4.3的,还有mysql连接包,ik分词器6.4.3版本,数据库数据。

一. 安装ES

1.把elasticsearch-6.4.3.tar.gz安装包解压到自己熟悉的路径。

  tar -zxvf elasticsearch-6.4.3.tar.gz -C /usr/local/

《MySQL数据库数据导入ES中》

2.修改 elasticsearch-6.4.3/config文件夹下的elasticsearch.yml文件

cluster.name: an-es  # 集群名字,在ES集群中用的到
 node.name: es-node1 # ES的节点名称,ES集群中,每个几点名称不一样
 path.data: /usr/local/elasticsearch-6.4.3/data # data文件夹自行创建,在elasticsearch-7.4.2文件夹下
 path.logs: /usr/local/elasticsearch-6.4.3/logs # 日志文件存储位置
 network.host: 0.0.0.0 # 测试时可供提供所有访问
 http.port: 9200 # 端口号
 cluster.initial_master_nodes: ["es-node1"] # ES集群情况下节点名称

3.修改elasticsearch-6.4.3/config文件夹下jvm.xml配置文件(根据自己服务器情况配置,测试环境小的就行)

 -Xms128m
 -Xmx128m

4.添加用户

由于ES启动不能用root用户启动,所以要创建一个用户

 adduser esuser

  同时为elasticsearch-6.4.3文件夹下的所有文件改成esuser权限

chown -R esuser:esuser elasticsearch-6.4.3/

5.修改limits.conf 和sysctl.conf配置文件

root用户的权限修改limits.conf

 vim /etc/security/limits.conf
# 文本末尾添加这样的配置
* hard nofile 65536
* soft nofile 131072
* hard nproc 4096
* soft nproc 2048 

修改limits.conf

vim /etc/sysctl.conf
#添加配置
vm.max_map_count=655360

最后刷新应用配置

sysctl -p

6.启动ES

以非root用户后台启动

/usr/local/elasticsearch-6.4.3/bin/elasticsearch -d

7.安装ES head

我直接是是在google浏览器安装的,这样的安装方式比较方便。

1.打开扩展程序

《MySQL数据库数据导入ES中》

2.找到应用商店搜索

《MySQL数据库数据导入ES中》

直接安装就可以了

最后连接到自己的ES就可以了

《MySQL数据库数据导入ES中》

 

8.ik分词器安装

   解压ik分词器

unzip /opt/software/elasticsearch-analysis-ik-7.4.2.zip -d /usr/local/elasticsearch-7.4.2/plugins/ik

重启ES,完成设置

二.logstash安装

1.把logstash解压到熟悉的路径

tar -zxvf logstash-6.4.3.tar.gz -C /usr/local/

2.在logstash-6.4.3文件下创建sync文件夹

mkdir sync

3.数据库数据准备

 在文章开头,云盘里的foodie-shop-dev.sql文件,导入到自己的数据库即可。

4.完成logstash和数据库连接的配置文件logstash-db-sync.conf

在sync创建文件logstash-db-sync.conf,内容如下

input{
stdin{}
jdbc{
#数据库连接地址
jdbc_connection_string=>"jdbc:mysql://192.168.1.101:3306/foodie-shop-dev?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true"
##数据库连接账号密码;
jdbc_user=>"root"
jdbc_password=>"Aa_123456"
##MySQL依赖包路径;
jdbc_driver_library=>"/opt/software/mysql-connector-java-5.1.27-bin.jar"
##thenameofthedriverclassformysql
jdbc_driver_class=>"com.mysql.jdbc.Driver"
##数据库重连尝试次数
connection_retry_attempts=>"3"
##判断数据库连接是否可用,默认false不开启
jdbc_validate_connection=>"true"
##数据库连接可用校验超时时间,默认3600S
jdbc_validation_timeout=>"3600"
##开启分页查询(默认false不开启);
jdbc_paging_enabled=>"true"
##单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值);
jdbc_page_size=>"500"
##statement为查询数据sql,如果sql较复杂,建议配通过statement_filepath配置sql文件的存放路径;
##sql_last_value为内置的变量,存放上次查询结果中最后一条数据tracking_column的值,此处即为ModifyTime;
##执行SQL文件的路径
statement_filepath=>"/usr/local/logstash-6.4.3/sync/foodie-items.sql"
##statement=>"SELECTKeyId,TradeTime,OrderUserName,ModifyTimeFROM`DetailTab`WHEREModifyTime>=:sql_last_valueorderbyModifyTimeasc"
##是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);
lowercase_column_names=>false
##Valuecanbeanyof:fatal,error,warn,info,debug,默认info;
sql_log_level=>warn
##
##是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
record_last_run=>true
##需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值;
use_column_value=>true
##需要记录的字段,用于增量同步,需是数据库字段
tracking_column=>"updated_time"
##Valuecanbeanyof:numeric,timestamp,Defaultvalueis"numeric"
tracking_column_type=>timestamp
##record_last_run上次数据存放位置;
last_run_metadata_path=>"/usr/local/logstash-6.4.3/sync/track_time"
##是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;
clean_run=>false
##
##同步频率(分时天月年),默认每分钟同步一次;
schedule=>"* * * * *"
##索引类型
type=>"_doc"
}
}
output{
elasticsearch{
##host=>"192.168.1.1"
##port=>"9200"
##配置ES地址
hosts=>["192.168.1.101:9200"]
##索引名字,必须小写
index=>"foodie-items-ik"
##数据唯一索引(建议使用数据库KeyID)
document_id=>"%{itemId}"
# 定义模板名称
template_name=>"myik"
# 模板所在位置
template=>"/usr/local/logstash-6.4.3/sync/logstash-ik.json"
# 重写模板
template_overwrite=>true
# 默认为true,false关闭ogstash自动管理模板功能,如果自定义模板,则设置为false
manage_template=>false
}
stdout{
codec=>json_lines
}
}

在这里需要数据库连接包,准备好放入到sync文件夹下。这里的主要对数据库中items表的数据进行导入,所以需要一个sql语句,在sync文件夹下创建foodie-items.sql。这里根据自己的需要写SQL

5.创建外部文件SQL

foodie-items.sql

SELECT
        i.id AS itemId,
        i.item_name AS itemName,
        i.sell_counts AS sellCounts,
        ii.url AS imgUrl,
        tempSpec.priceDiscount price,
        i.updated_time as updated_time
FROM
        items i
LEFT JOIN items_img ii ON i.id = ii.item_id
LEFT JOIN (
        SELECT
                item_id,
                MIN(price_discount) priceDiscount
        FROM
                items_spec
        GROUP BY
                item_id
) tempSpec ON i.id = tempSpec.item_id
WHERE
        ii.is_main = 1
 and i.updated_time >= :sql_last_value

这里的主键是itemId,要对应到输出的ES,document_id=>”%{itemId}”,是es的id。i.updated_time >= :sql_last_value这个是在更新数据库的时候同时ES中的数据也更新。ES的更新是根据items表中的updated_time更新而更新的。只有当updated_time大于sync文件夹下的track_time时,才进行ES中的数据更新。track_time文件内如如下

--- 2020-04-29 14:45:39.000000000 +08:00

所以,只有数据库表中的updated_time大于track_time里的时间,才进行 更新。

6.加入ik分词器

在logstash-db-sync.conf文件中,需要一个模板,在sync下创建

logstash-ik.json

{
        "order": 0,
        "version": 1,
        "index_patterns": ["*"],
        "settings": {
            "index": {
                "refresh_interval": "5s"
            }
        },
        "mappings": {
            "_default_": {
                "dynamic_templates": [
                    {
                        "message_field": {
                            "path_match": "message",
                            "match_mapping_type": "string",
                            "mapping": {
                                "type": "text",
                                "norms": false
                            }
                        }
                    },
                    {
                        "string_fields": {
                            "match": "*",
                            "match_mapping_type": "string",
                            "mapping": {
                                "type": "text",
                                "norms": false,
                                "analyzer": "ik_max_word",
                                "fields": {
                                    "keyword": {
                                        "type": "keyword",
                                        "ignore_above": 256
                                    }
                                }
                            }
                        }
                    }
                ],
                "properties": {
                    "@timestamp": {
                        "type": "date"
                    },
                    "@version": {
                        "type": "keyword"
                    },
                    "geoip": {
                        "dynamic": true,
                        "properties": {
                            "ip": {
                                "type": "ip"
                            },
                            "location": {
                                "type": "geo_point"
                            },
                            "latitude": {
                                "type": "half_float"
                            },
                            "longitude": {
                                "type": "half_float"
                            }
                        }
                    }
                }
            }
        },
        "aliases": {}
    }

这个文件是从ES中取来的,然后做一下修改

《MySQL数据库数据导入ES中》

 

修改

1.”index-patterns”:[“*”],对所有的索引进行配置;

2.”version”:1,版本从1开始

3.string_fields下的mapping中加入”analyzer”: “ik_max_word”,配置分词器

4.去掉最外层的{“logstash”:   }。

看一下sync下的都有什么文件

《MySQL数据库数据导入ES中》

7.启动logstash

进入目录

《MySQL数据库数据导入ES中》

./logstash -f /usr/local/logstash-6.4.3/sync/logstash-db-sync.conf

出现这样的情况,数据已经同步

《MySQL数据库数据导入ES中》

8.遇到的问题

再做ik分词器模板时,以上面的方式,一直ik分词器不能生效。我把ik分词器的文件放到postman里,手动的生成了一下。生成一个myik的模板,在制定ik分词器的时候,名字叫template_name: myik。有点头疼

 

《MySQL数据库数据导入ES中》

    原文作者:New灬soul
    原文地址: https://blog.csdn.net/qq_29963323/article/details/106573303
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞