Cassandra海量时序数据表结构设计

前言:看到一篇老外写的讨论时序数据模型设计的博文,非常好,就试着翻译一下。Cassandra很适合存储时间序列数据。这里有个前提,数据模型要设计好,设计得好,轻轻松松处理TB级的数据,设计不好你会很痛苦。(原文链接:http://thelastpickle.com/blog/2017/08/02/time-series-data-modeling-massive-scale.html

理解数据写入量级对集群的影响,是使用Cassandra进行时序数据处理的挑战之一。单一分区的快速写入将产生热点问题,进而影响到集群伸缩性。大分区影响repair、streaming和读性能。从一个大分区读取数据,将极大增加系统负载,并造成GC压力。Cassandra 4.0能改善大分区的性能,但是不能完全解决上述问题。未来很长时间内我们必须考虑大分区性能问题,并早做计划。

这篇博文里,我们将讨论一个称作“桶”的通用的Cassandra数据模型设计技巧。桶是一个很有效的策略,能帮助我们控制分区大小,并将写请求分散到集群各个节点。这里介绍两种桶。这两种桶可以组合使用,以实现更进一步的伸缩性。假设读者已具备分区的基本概念,并了解基本的CQL指令。

先看一个简单的数据模型设计例子:

CREATE TABLE raw_data (
    sensor text,
    ts timeuuid,
    readint int,
    primary key(sensor, ts)
) WITH CLUSTERING ORDER BY (ts DESC) 
  AND compaction = {'class': 'TimeWindowCompactionStrategy', 
                    'compaction_window_size': 1, 
                    'compaction_window_unit': 'DAYS'};

这个数据模型只保存简单的传感器数据。实际中我们采集的数据远远比一个整数复杂,但是这里我们只关注键设计。这里使用TWCS作为压紧策略(compaction strategy)。TWCS能帮助我们降低压紧(compact)大分区的负载,使CPU和I/O可控。但是光有TWCS还不够,如果不使用TTL,日积月累,分区会越来越大。大分区在repairing、streaming和读取数据时,会造成系统的极大负载。

为了打破大分区,首先看看第一种桶。我们基于时间片将大分区划分成小分区。理想情况下,分区大小应不要超过100MB。举例来说,如果单个传感器每天的数据在50MB-75MB之间,那么可以按传感器按天进行分区,就是一个传感器一天一个分区。当然只要分区大小不超过100MB,也可以按周、按月、按年进行分区。不管怎么分,为未来业务增长预留点空间。

下面修改数据模型,增加一个字段day,并修改分区键:

CREATE TABLE raw_data_by_day (
sensor text,
day text,
ts timeuuid,
reading int,
primary key((sensor, day), ts)
) WITH CLUSTERING ORDER BY (ts DESC) 
       AND COMPACTION = {'class': 'TimeWindowCompactionStrategy', 
                     'compaction_window_unit': 'DAYS', 
                     'compaction_window_size': 1};

下面是插入数据(这里使用now(),也可由应用产生TimeUUID):

INSERT INTO raw_data_by_day (sensor, day, ts, reading) 
VALUES ('mysensor', '2017-01-01', now(), 10);

这是限制分区大小的一种方式。如果要获取多天的数据,就要按天发起读请求,每天一个读请求,最后由应用将返回数据合并。这种获取方式的好处是,请求被分散到集群的各个节点,而不是单一的一个节点。我们也可以依赖客户端驱动发起并行请求。Python甚至提供了一个简便的函数帮助实现这种并发请求。下面是Python示例:

from itertools import product
from cassandra.concurrent import execute_concurrent_with_args

days = ["2017-07-01", "2017-07-12", "2017-07-03"]  # collecting three days worth of data
session  = Cluster(["127.0.0.1"]).connect("blog")
prepared = session.prepare("SELECT day, ts, reading FROM raw_data_by_day WHERE sensor = ? and day = ?")

args = product(["mysensor"], days) 
# args: ('test', '2017-07-01'), ('test', '2017-07-12'), ('test', '2017-07-03')

# driver handles concurrency for you
results = execute_concurrent_with_args(session, prepared, args)

# Results:
#[ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36750>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36a90>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36550>)]

这个方案的一个变例,是为每个时间片创建一张表。比如一个月一张表,那么一年就有12张表:

CREATE TABLE raw_data_may_2017 (
    sensor text,
    ts timeuuid,
    reading int,
    primary key(sensor, ts)
) WITH COMPACTION = {'class': 'TimeWindowCompactionStrategy', 
                     'compaction_window_unit': 'DAYS', 
                     'compaction_window_size': 1};

这个方案的一个主要优点是便于归档,以及快速删除数据。比如,在每月的开始,我们可以把上个月的数据用parquet格式归档到HDFS或者S3(译者按:S3是AWS上的一种存储服务),这样可以充分利用便宜的存储实现数据分析功能。当Cassandra集群不再需要某些数据时,只要把表删除就完了,非常简单。你也许注意到,这个方案有些额外的运维工作,就是创建和删除表,所以当有归档的需求时,这个方案是合适的。也有其他归档的方法,因此这种形式的桶不是必须的。

上面的方案关注点在于避免一段较长的时间段内分区变得特别大。这个方案适用的业务场景是流量可预测的,并且变化很小。有可能遇到例外的情况,大部分分区比较正常,但是个别分区数据量特别大,以致超过单个节点的存储,或者某个分区流量瞬间变得很大,超出单个节点的处理能力。Twitter是个很好的例子,有些人拥有数千万的粉丝。程序需要对这些人使用不同的代码路径,以实现海量伸缩性。

另一种技巧是在任何时间点使用多分区方式,从而将数据分布到整个集群。好处是,我们可以用单个分区存储小数据,而使用多个分区处理大数据量的情况。

这个方案也有缺点,我们必须做好折衷。缺点是当读取数据时,我们必须使用聚合,这会带来额外的工作。另外让分页也很困难。我们需要实时监控分区的数据量,以便我们为每一块数据选择合适的分区数量。如果我们有太多的桶,结果我们读取一点点数据可能要访问很多分区。太少的桶,会导致大分区,进而造成压紧、repair、stream困难,并影响性能。

在下面的例子里,我们设想一个理想的模型,有人在一个像Twitter一样的社交网络里关注了很多人。大部分账户只用一个分区就能处理所有进来的消息。但是有人关注了数百万的账户。
声明:我并不知道Twitter怎么存储数据。这只是一个简单的例子用来讨论问题。

CREATE TABLE tweet_stream (
    account text,
    day text,
    bucket int,
    ts timeuuid,
    message text,
    primary key((account, day, bucket), ts)
) WITH CLUSTERING ORDER BY (ts DESC) 
         AND COMPACTION = {'class': 'TimeWindowCompactionStrategy', 
                       'compaction_window_unit': 'DAYS', 
                       'compaction_window_size': 1};

通过在分区键里增加bucket字段,我们扩展了前面的数据模型。现在一天可以进一步分成多个桶。当获取数据时,我们需要访问这些桶,并把数据聚合一起。我们插入一些数据演示一下:

cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 0, now(), 'hi');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 1, now(), 'hi2');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 2, now(), 'hi3');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 3, now(), 'hi4');

如果需要获取最新的消息,我们可以这样做:

from itertools import chain
from cassandra.util import unix_time_from_uuid1

prepared = session.prepare("SELECT ts, message FROM tweet_stream WHERE account = ? and day = ? and bucket = ? LIMIT 10")
# let's get 10 buckets 
partitions = range(10)
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

args = product(["jon_haddad"], ["2017-07-01"], partitions)

result = execute_concurrent_with_args(session, prepared, args)

# [ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e6d0>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d710>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d4d0>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d950>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1db10>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dfd0>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dd90>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d290>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e250>),
#  ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e490>)]

results = [x.result_or_exc for x in result]

# append all the results together
data = chain(*results)
            
sorted_results = sorted(data, key=lambda x: unix_time_from_uuid1(x.ts), reverse=True)            

# newest stuff first
# [Row(ts=UUID('e1c59e60-7406-11e7-9458-897782c5d96c'), message=u'hi4'),
#  Row(ts=UUID('dd6ddd00-7406-11e7-9458-897782c5d96c'), message=u'hi3'),
#  Row(ts=UUID('d4422560-7406-11e7-9458-897782c5d96c'), message=u'hi2'),
#  Row(ts=UUID('d17dae30-7406-11e7-9458-897782c5d96c'), message=u'hi')]

这个例子仅仅使用LIMIT语句限制获取10条消息,懒惰的程序员把这些列表合并在一起,然后排序,然后就大功告成。如果要获取更多的数据,我们需要使用一个K-way合并算法。未来当我们对这个话题进一步阐释的时候,再做讨论。

现在你应该对如何在集群里分发数据和请求有了更好的理解。记住每个问题都是不同的,没有银弹。

    原文作者:九仞
    原文地址: https://www.jianshu.com/p/b054ca9dad83
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞