简介
工作中我们遇到了把Hive数据同步到Clickhouse的业务需求,一开始我们写Spark任务,用SparkSQL读Hive,再用JDBC写入到Clickhouse。
后来,随着要同步的表越来越多,每次都写Spark任务,成本就显得有些高了。于是,写了一个通用的Spark任务,指定Hive表、字段,指定Clickhouse表、字段,每次指定不同的参数。
再后来,业务越来越复杂,不仅是简单的同步,而是要支持更复杂的SQL,结果进行数据类型转换、值转化等,然后再插入Clickhouse。
这不是ETL要干的事儿吗?!
当然,继续增强之前的Spark,完全可以实现这个功能。但是说到ETL,不是有专业强大的Logstash吗,为什么要重复造轮子?
经过一番调研,还真有人写了Logstash插件,用来导出数据到Clickhouse:logstash-output-clickhouse
输出端搞定了,输入端怎么搞呢?很建达,用JDBC插件就可以了。
输入
input {
jdbc {
jdbc_driver_library => "/data/jars/hive-jdbc-1.1.0-cdh5.7.4.jar,/data/jars/libthrift-0.9.2.jar,/data/jars/httpclient-4.2.5,/data/jars/httpcore-4.2.5.jar,/data/jars/hive-service-1.1.0-cdh5.7.4.jar,/data/jars/hive-common-1.1.0-cdh5.7.4.jar,/data/jars/hive-metastore-1.1.0-cdh5.7.4.jar,/data/jars/hadoop-common-2.6.0-cdh5.7.4.jar,/data/jars/commons-logging-1.2.jar,/data/jars/slf4j-api-1.7.5.jar,/data/jars/slf4j-log4j12-1.7.5.jar,/data/jars/log4j-1.2.17.jar"
jdbc_driver_class => "org.apache.hive.jdbc.HiveDriver"
jdbc_connection_string => "jdbc:hive2://ip:port/db
jdbc_user => ""
jdbc_password => ""
parameters => {"n" => 100}
statement => "select id, reg_time from db.table limit :n"
#schedule => "* * * * *"
}
}
如上,配置jdbc连接信息即可。
需要说明的是,相关的jar包比较多,需要给全了,否则会有各种ClassNotFoundException。完整的jar列表为:
-rw-r--r-- 1 root root 61829 Sep 21 2016 commons-logging-1.2.jar
-rw-r--r-- 1 root root 3438469 Sep 21 2016 hadoop-common-2.6.0-cdh5.7.4.jar
-rw-r--r-- 1 root root 302545 Sep 21 2016 hive-common-1.1.0-cdh5.7.4.jar
-rw-r--r-- 1 root root 96208 Sep 21 2016 hive-jdbc-1.1.0-cdh5.7.4.jar
-rw-r--r-- 1 root root 5498220 Sep 21 2016 hive-metastore-1.1.0-cdh5.7.4.jar
-rw-r--r-- 1 root root 2050189 Sep 21 2016 hive-service-1.1.0-cdh5.7.4.jar
-rw-r--r-- 1 root root 433368 Sep 21 2016 httpclient-4.2.5.jar
-rw-r--r-- 1 root root 227708 Sep 21 2016 httpcore-4.2.5.jar
-rw-r--r-- 1 root root 227712 Sep 21 2016 libthrift-0.9.2.jar
-rw-r--r-- 1 root root 489884 Sep 21 2016 log4j-1.2.17.jar
-rw-r--r-- 1 root root 26084 Sep 21 2016 slf4j-api-1.7.5.jar
-rw-r--r-- 1 root root 8869 Sep 21 2016 slf4j-log4j12-1.7.5.jar
这些jar最好与hive环境版本一致,我们用的是CDH版,所以都是从CDH目录下找到的jar。
输出
Clickhouse插件使用说明参考:https://github.com/mikechris/logstash-output-clickhouse
主要说下安装过程。
说明文档里说的bin/logstash-plugin install logstash-output-clickhouse
方式,没有安装成功,所以只能自己编译安装。
先clone源码,然后进入源码根路径:
git clone https://github.com/mikechris/logstash-output-clickhouse.git
cd logstash-output-clickhouse
编译:
gem build logstash-output-clickhouse.gemspec
此时,若没有安装ruby环境,按照提示安装一下,再编译。
编译成功后,会多出一个文件
-rw-r--r-- 1 root root 8704 Aug 28 11:27 logstash-output-clickhouse-0.1.0.gem
安装:
logstash-plugin install logstash-output-clickhouse-0.1.0.gem
logstash的安装就不多说了,按照logstash官方文档安装就可以了。
此时,如果logstash版本是5.x,可能会遇到一个错误:
Validating ./logstash-output-clickhouse/logstash-output-clickhouse-0.1.0.gem
Installing logstash-output-clickhouse
Plugin version conflict, aborting
ERROR: Installation Aborted, message: Bundler could not find compatible versions for gem "logstash-mixin-http_client":
In snapshot (Gemfile.lock):
logstash-mixin-http_client (= 5.2.0)
In Gemfile:
logstash-mixin-http_client (< 6.0.0, >= 5.0.0) java
logstash-mixin-http_client (< 6.0.0, >= 5.1.0) java
logstash-output-clickhouse (= 0.1.0) java depends on
logstash-mixin-http_client (< 7.0.0, >= 6.0.0) java
Running `bundle update` will rebuild your snapshot from scratch, using only
the gems in your Gemfile, which may resolve the conflict.
按照提示,修改gemfile:
vim logstash-output-clickhouse/logstash-output-clickhouse.gemspec
修改logstash-mixin-http_client的版本:
# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
#s.add_runtime_dependency "logstash-mixin-http_client", ">= 6.0.0", "< 7.0.0"
s.add_runtime_dependency "logstash-mixin-http_client", ">= 5.0.0", "< 6.0.0"
原来是>6且<7,改成>5且<6。
然后,再次编译、安装,就可以了。
按照文档中的使用说明,配置Clickhouse连接信息即可:
output {
#stdout { codec => json }
#stdout { codec => json_lines }
clickhouse {
http_hosts => ["http://127.0.0.1:8123/"]
table => "default.table"
request_tolerance => 1
#flush_size => 1000
#pool_max => 1000
mutations => {
id => id
reg_time => reg_time
}
}
}
数据转化
这部分工作可以放在filter里处理,各种filter插件就不说了,参考logstash官方文档吧。