1. 前言
1.1 PipelineDB 介绍
偶然发现了个流式数据库PipelineDB,它是基于PostgreSQL数据库改造的,允许我们通过sql的方式,对数据流做操作,并把操作结果储存起来。
这年头,真是SQL on everything。
其基本的过程是:
- 创建PipelineDB Stream。
- 编写SQL,对Stream做操作。
- 操作结果被保存到 continuous view,其背后是物理表在支撑。
1.2 安装PipelineDB
我们的安装是在Centos 7上面进行的。 PipelineDB不让用root权限的用户操作,请提前创建用户。
#下载
wget https://s3-us-west-2.amazonaws.com/download.pipelinedb.com/pipelinedb-0.9.6-centos7-x86_64.rpm
# 安装
rmp -ivh ----prefix=/opt/pipelinedb
# 初始化 pipeline-init -D <data directory>
pipeline-init -D /opt/pipelinedb/dbdata
pipelinedb -D /opt/pipelinedb/dbdata
# 激活 continuous query(仅需执行一次,后续重启不用再做)
psql -h localhost -p 5432 -d pipeline -c "ACTIVATE"
2. Quick Start例子
本例是关于 Wikipedia页面访问数据的统计。每一条访问记录,包括以下字段,以英文逗号分割。
hour | project | page title | view count | bytes served |
---|
2.1 创建continuous视图
首先,我们创建一个continuous view,使用psql工具。从sql里,我们能够看到统计方法和访问记录的对应关系。
psql -h localhost -p 5432 -d pipeline -c "
CREATE STREAM wiki_stream (hour timestamp, project text, title text, view_count bigint, size bigint);
CREATE CONTINUOUS VIEW wiki_stats AS
SELECT hour, project,
count(*) AS total_pages,
sum(view_count) AS total_views,
min(view_count) AS min_views,
max(view_count) AS max_views,
avg(view_count) AS avg_views,
percentile_cont(0.99) WITHIN GROUP (ORDER BY view_count) AS p99_views,
sum(size) AS total_bytes_served
FROM wiki_stream
GROUP BY hour, project;"
2.2 创建Stream
我们通过curl工具,获取wiki的数据集,并压缩数据,作为一个Stream写入到stdin。因为数据集比较大,当我们执行了几秒钟之后,可以使用ctrl+c中断curl操作。
curl -sL http://pipelinedb.com/data/wiki-pagecounts | gunzip | \
psql -h localhost -p 5432 -d pipeline -c "
COPY wiki_stream (hour, project, title, view_count, size) FROM STDIN"
2.3 查看结果
通过下面的命令,从视图(continuous view)读取streaming的统计计算结果。
psql -h localhost -p 5432 -d pipeline -c "
SELECT * FROM wiki_stats ORDER BY total_views DESC";
3. PipelineDB和kafka的集成
3.1 pipeline_kafka组件安装
PipelineDB默认是没有pipeline_kafka扩展组件的,需要我们自己安装。安装需要git,如果没有git,请使用yum -y install git 安装git。
1.安装librdkafka
pipeline_kafka依赖librdkafka,需要先安装librdkafka。
git clone -b 0.9.1 https://github.com/edenhill/librdkafka.git ~/librdkafka
cd ~/librdkafka
./configure --prefix=/usr
make
sudo make install
2.安装pipeline_kafka
编译安装pipeline_kafk。如果有编译依赖的缺失,请根据缺失补充安装依赖。
./configure
make
make install
配置pipeline_kafka
# 编辑配置文件
vi /opt/pipelinedb/dbdata/pipelinedb.conf
# 在结尾输入以下内容并保存(:wq)
# Add settings for extensions here
shared_preload_libraries = 'pipeline_kafka'
重启数据库,使得扩展组件生效
# pipeline-ctl -D <data directory> start|stop|restart
pipeline-ctl -D /opt/pipelinedb/dbdata restart
3.2 Stream SQL开发过程
# 连接数据库
psql -h localhost -p 5432 -d pipeline
# 创建pipeline_kafka
CREATE EXTENSION pipeline_kafka;
# 配置kafka broker
SELECT pipeline_kafka.add_broker('192.168.1.170:9092');
# 创建Stream,从kafka里接受三个参数
CREATE STREAM msg_stream (sjc varchar, thread_name varchar, msg varchar);
# 创建CONTINUOUS VIEW
CREATE CONTINUOUS VIEW msg_result AS SELECT sjc,thread_name,msg FROM msg_stream;
# 开始消费kafka消息
# topic是my-topic,连接PipelineDB Stream名是msg_stream,消息类型是text,消息以英文逗号分割。
SELECT pipeline_kafka.consume_begin ( 'my-topic', 'msg_stream', format := 'text',
delimiter := ',', quote := NULL, escape := NULL, batchsize := 1000,
maxbytes := 32000000, parallelism := 1, start_offset := NULL );
# 如果要停止Stream,请使用以下命令。
SELECT pipeline_kafka.consume_end('my-topic', 'msg_stream');
3.3 验证
1.向kafka发送消息
登录kafka节点的服务器,进入到kafka home路径,使用以下命令发送消息
# 启动producer
bin/kafka-console-producer.sh --broker-list 192.168.1.90:9092 --topic my-topic
# 输入以下数据
a,b,c
2.在PipelineDB中查询收到的消息
从CONTINUOUS VIEW中 查询数据,可以看到有一条记录,即[a,b,c]。
psql -h localhost -p 5432 -d pipeline -c "
SELECT * FROM msg_result";
ps: 当我们连接到PipelineDB,我们可以使用PostgreSQL的命令,来查看有那些数据库对象生成。例如通过 \d 可以查看到,当我们创建CONTINUOUS VIEW的时候,额外创建了msg_result_mrel、msg_result_seq和msg_result_osrel,实际的数据就存储在msg_result_mrel中。
Schema | Name | Type | Owner |
---|---|---|---|
public | msg_result | continuous view | pipelinedb |
public | msg_result_mrel | table | pipelinedb |
public | msg_result_osrel | stream | pipelinedb |
public | msg_result_seq | sequence | pipelinedb |
public | msg_stream | stream | pipelinedb |
(完)