流式数据库PipelineDB(集成Kafka)

1. 前言

1.1 PipelineDB 介绍

偶然发现了个流式数据库PipelineDB,它是基于PostgreSQL数据库改造的,允许我们通过sql的方式,对数据流做操作,并把操作结果储存起来。

这年头,真是SQL on everything。

其基本的过程是:

  • 创建PipelineDB Stream。
  • 编写SQL,对Stream做操作。
  • 操作结果被保存到 continuous view,其背后是物理表在支撑。

1.2 安装PipelineDB

我们的安装是在Centos 7上面进行的。 PipelineDB不让用root权限的用户操作,请提前创建用户。

#下载
wget https://s3-us-west-2.amazonaws.com/download.pipelinedb.com/pipelinedb-0.9.6-centos7-x86_64.rpm
# 安装
rmp -ivh ----prefix=/opt/pipelinedb
# 初始化 pipeline-init -D <data directory>
pipeline-init -D /opt/pipelinedb/dbdata
pipelinedb -D /opt/pipelinedb/dbdata
# 激活 continuous query(仅需执行一次,后续重启不用再做)
psql -h localhost -p 5432 -d pipeline -c "ACTIVATE"

2. Quick Start例子

本例是关于 Wikipedia页面访问数据的统计。每一条访问记录,包括以下字段,以英文逗号分割。

hourprojectpage titleview countbytes served

2.1 创建continuous视图

首先,我们创建一个continuous view,使用psql工具。从sql里,我们能够看到统计方法和访问记录的对应关系。

psql -h localhost -p 5432 -d pipeline -c "
CREATE STREAM wiki_stream (hour timestamp, project text, title text, view_count bigint, size bigint);
CREATE CONTINUOUS VIEW wiki_stats AS
SELECT hour, project,
        count(*) AS total_pages,
        sum(view_count) AS total_views,
        min(view_count) AS min_views,
        max(view_count) AS max_views,
        avg(view_count) AS avg_views,
        percentile_cont(0.99) WITHIN GROUP (ORDER BY view_count) AS p99_views,
        sum(size) AS total_bytes_served
FROM wiki_stream
GROUP BY hour, project;"

2.2 创建Stream

我们通过curl工具,获取wiki的数据集,并压缩数据,作为一个Stream写入到stdin。因为数据集比较大,当我们执行了几秒钟之后,可以使用ctrl+c中断curl操作。

curl -sL http://pipelinedb.com/data/wiki-pagecounts | gunzip | \
        psql -h localhost -p 5432 -d pipeline -c "
        COPY wiki_stream (hour, project, title, view_count, size) FROM STDIN"

2.3 查看结果

通过下面的命令,从视图(continuous view)读取streaming的统计计算结果。

psql -h localhost -p 5432 -d pipeline -c "
SELECT * FROM wiki_stats ORDER BY total_views DESC";

3. PipelineDB和kafka的集成

3.1 pipeline_kafka组件安装

PipelineDB默认是没有pipeline_kafka扩展组件的,需要我们自己安装。安装需要git,如果没有git,请使用yum -y install git 安装git。

1.安装librdkafka

pipeline_kafka依赖librdkafka,需要先安装librdkafka。

git clone -b 0.9.1 https://github.com/edenhill/librdkafka.git ~/librdkafka
cd ~/librdkafka
./configure --prefix=/usr
make
sudo make install

2.安装pipeline_kafka

编译安装pipeline_kafk。如果有编译依赖的缺失,请根据缺失补充安装依赖。

./configure
make
make install

配置pipeline_kafka

# 编辑配置文件
vi /opt/pipelinedb/dbdata/pipelinedb.conf
# 在结尾输入以下内容并保存(:wq)
# Add settings for extensions here
shared_preload_libraries = 'pipeline_kafka'

重启数据库,使得扩展组件生效

# pipeline-ctl -D <data directory> start|stop|restart
pipeline-ctl -D /opt/pipelinedb/dbdata restart

3.2 Stream SQL开发过程

# 连接数据库
psql -h localhost -p 5432 -d pipeline
# 创建pipeline_kafka
CREATE EXTENSION pipeline_kafka;
# 配置kafka broker
SELECT pipeline_kafka.add_broker('192.168.1.170:9092');
# 创建Stream,从kafka里接受三个参数
CREATE STREAM msg_stream (sjc varchar, thread_name varchar, msg varchar);
# 创建CONTINUOUS VIEW
CREATE CONTINUOUS VIEW msg_result AS SELECT sjc,thread_name,msg FROM msg_stream;
# 开始消费kafka消息
# topic是my-topic,连接PipelineDB Stream名是msg_stream,消息类型是text,消息以英文逗号分割。
SELECT pipeline_kafka.consume_begin ( 'my-topic', 'msg_stream', format := 'text', 
            delimiter := ',', quote := NULL, escape := NULL, batchsize := 1000,
            maxbytes := 32000000, parallelism := 1, start_offset := NULL );


# 如果要停止Stream,请使用以下命令。
SELECT pipeline_kafka.consume_end('my-topic', 'msg_stream');

3.3 验证

1.向kafka发送消息

登录kafka节点的服务器,进入到kafka home路径,使用以下命令发送消息

# 启动producer
bin/kafka-console-producer.sh --broker-list 192.168.1.90:9092 --topic my-topic
# 输入以下数据
a,b,c

2.在PipelineDB中查询收到的消息

从CONTINUOUS VIEW中 查询数据,可以看到有一条记录,即[a,b,c]。

psql -h localhost -p 5432 -d pipeline -c "
SELECT * FROM msg_result";

ps: 当我们连接到PipelineDB,我们可以使用PostgreSQL的命令,来查看有那些数据库对象生成。例如通过 \d 可以查看到,当我们创建CONTINUOUS VIEW的时候,额外创建了msg_result_mrel、msg_result_seq和msg_result_osrel,实际的数据就存储在msg_result_mrel中。

SchemaNameTypeOwner
publicmsg_resultcontinuous viewpipelinedb
publicmsg_result_mreltablepipelinedb
publicmsg_result_osrelstreampipelinedb
publicmsg_result_seqsequencepipelinedb
publicmsg_streamstreampipelinedb

http://docs.pipelinedb.com/quickstart.html

https://github.com/pipelinedb/pipeline_kafka

(完)

    原文作者:郭寻抚
    原文地址: https://www.jianshu.com/p/29b5c2a642ee
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞