Logstash同步Hive和Clickhouse

简介

工作中我们遇到了把Hive数据同步到Clickhouse的业务需求,一开始我们写Spark任务,用SparkSQL读Hive,再用JDBC写入到Clickhouse。

后来,随着要同步的表越来越多,每次都写Spark任务,成本就显得有些高了。于是,写了一个通用的Spark任务,指定Hive表、字段,指定Clickhouse表、字段,每次指定不同的参数。

再后来,业务越来越复杂,不仅是简单的同步,而是要支持更复杂的SQL,结果进行数据类型转换、值转化等,然后再插入Clickhouse。

这不是ETL要干的事儿吗?!

当然,继续增强之前的Spark,完全可以实现这个功能。但是说到ETL,不是有专业强大的Logstash吗,为什么要重复造轮子?

经过一番调研,还真有人写了Logstash插件,用来导出数据到Clickhouse:logstash-output-clickhouse

输出端搞定了,输入端怎么搞呢?很建达,用JDBC插件就可以了。

输入

input {
    jdbc {
        jdbc_driver_library => "/data/jars/hive-jdbc-1.1.0-cdh5.7.4.jar,/data/jars/libthrift-0.9.2.jar,/data/jars/httpclient-4.2.5,/data/jars/httpcore-4.2.5.jar,/data/jars/hive-service-1.1.0-cdh5.7.4.jar,/data/jars/hive-common-1.1.0-cdh5.7.4.jar,/data/jars/hive-metastore-1.1.0-cdh5.7.4.jar,/data/jars/hadoop-common-2.6.0-cdh5.7.4.jar,/data/jars/commons-logging-1.2.jar,/data/jars/slf4j-api-1.7.5.jar,/data/jars/slf4j-log4j12-1.7.5.jar,/data/jars/log4j-1.2.17.jar"
        jdbc_driver_class => "org.apache.hive.jdbc.HiveDriver"
        jdbc_connection_string => "jdbc:hive2://ip:port/db
        jdbc_user => ""
        jdbc_password => ""
        parameters => {"n" => 100}
        statement => "select id, reg_time from db.table limit :n"
        #schedule => "* * * * *"
    }
}

如上,配置jdbc连接信息即可。

需要说明的是,相关的jar包比较多,需要给全了,否则会有各种ClassNotFoundException。完整的jar列表为:

-rw-r--r--  1 root root   61829 Sep 21  2016 commons-logging-1.2.jar
-rw-r--r--  1 root root 3438469 Sep 21  2016 hadoop-common-2.6.0-cdh5.7.4.jar
-rw-r--r--  1 root root  302545 Sep 21  2016 hive-common-1.1.0-cdh5.7.4.jar
-rw-r--r--  1 root root   96208 Sep 21  2016 hive-jdbc-1.1.0-cdh5.7.4.jar
-rw-r--r--  1 root root 5498220 Sep 21  2016 hive-metastore-1.1.0-cdh5.7.4.jar
-rw-r--r--  1 root root 2050189 Sep 21  2016 hive-service-1.1.0-cdh5.7.4.jar
-rw-r--r--  1 root root  433368 Sep 21  2016 httpclient-4.2.5.jar
-rw-r--r--  1 root root  227708 Sep 21  2016 httpcore-4.2.5.jar
-rw-r--r--  1 root root  227712 Sep 21  2016 libthrift-0.9.2.jar
-rw-r--r--  1 root root  489884 Sep 21  2016 log4j-1.2.17.jar
-rw-r--r--  1 root root   26084 Sep 21  2016 slf4j-api-1.7.5.jar
-rw-r--r--  1 root root    8869 Sep 21  2016 slf4j-log4j12-1.7.5.jar

这些jar最好与hive环境版本一致,我们用的是CDH版,所以都是从CDH目录下找到的jar。

输出

Clickhouse插件使用说明参考:https://github.com/mikechris/logstash-output-clickhouse

主要说下安装过程。

说明文档里说的bin/logstash-plugin install logstash-output-clickhouse方式,没有安装成功,所以只能自己编译安装。

先clone源码,然后进入源码根路径:

git clone https://github.com/mikechris/logstash-output-clickhouse.git
cd logstash-output-clickhouse

编译:

gem build logstash-output-clickhouse.gemspec

此时,若没有安装ruby环境,按照提示安装一下,再编译。

编译成功后,会多出一个文件

-rw-r--r--  1 root root 8704 Aug 28 11:27 logstash-output-clickhouse-0.1.0.gem

安装:

logstash-plugin install logstash-output-clickhouse-0.1.0.gem

logstash的安装就不多说了,按照logstash官方文档安装就可以了。

此时,如果logstash版本是5.x,可能会遇到一个错误:

Validating ./logstash-output-clickhouse/logstash-output-clickhouse-0.1.0.gem
Installing logstash-output-clickhouse
Plugin version conflict, aborting
ERROR: Installation Aborted, message: Bundler could not find compatible versions for gem "logstash-mixin-http_client":
  In snapshot (Gemfile.lock):
    logstash-mixin-http_client (= 5.2.0)

  In Gemfile:
    logstash-mixin-http_client (< 6.0.0, >= 5.0.0) java

    logstash-mixin-http_client (< 6.0.0, >= 5.1.0) java

    logstash-output-clickhouse (= 0.1.0) java depends on
      logstash-mixin-http_client (< 7.0.0, >= 6.0.0) java

Running `bundle update` will rebuild your snapshot from scratch, using only
the gems in your Gemfile, which may resolve the conflict.

按照提示,修改gemfile:

vim logstash-output-clickhouse/logstash-output-clickhouse.gemspec

修改logstash-mixin-http_client的版本:

# Gem dependencies
  s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
  #s.add_runtime_dependency "logstash-mixin-http_client", ">= 6.0.0", "< 7.0.0"
  s.add_runtime_dependency "logstash-mixin-http_client", ">= 5.0.0", "< 6.0.0"

原来是>6且<7,改成>5且<6。

然后,再次编译、安装,就可以了。

按照文档中的使用说明,配置Clickhouse连接信息即可:

output {
    #stdout { codec => json }
    #stdout { codec => json_lines }
    clickhouse {
        http_hosts => ["http://127.0.0.1:8123/"]
        table => "default.table"
        request_tolerance => 1
        #flush_size => 1000
        #pool_max => 1000
        mutations => {
            id => id
            reg_time => reg_time
        }
    }
}

数据转化

这部分工作可以放在filter里处理,各种filter插件就不说了,参考logstash官方文档吧。

    原文作者:小琪的大爷
    原文地址: https://www.jianshu.com/p/fb9e41026025
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞