前言:看到一篇老外写的讨论时序数据模型设计的博文,非常好,就试着翻译一下。Cassandra很适合存储时间序列数据。这里有个前提,数据模型要设计好,设计得好,轻轻松松处理TB级的数据,设计不好你会很痛苦。(原文链接:http://thelastpickle.com/blog/2017/08/02/time-series-data-modeling-massive-scale.html)
理解数据写入量级对集群的影响,是使用Cassandra进行时序数据处理的挑战之一。单一分区的快速写入将产生热点问题,进而影响到集群伸缩性。大分区影响repair、streaming和读性能。从一个大分区读取数据,将极大增加系统负载,并造成GC压力。Cassandra 4.0能改善大分区的性能,但是不能完全解决上述问题。未来很长时间内我们必须考虑大分区性能问题,并早做计划。
这篇博文里,我们将讨论一个称作“桶”的通用的Cassandra数据模型设计技巧。桶是一个很有效的策略,能帮助我们控制分区大小,并将写请求分散到集群各个节点。这里介绍两种桶。这两种桶可以组合使用,以实现更进一步的伸缩性。假设读者已具备分区的基本概念,并了解基本的CQL指令。
先看一个简单的数据模型设计例子:
CREATE TABLE raw_data (
sensor text,
ts timeuuid,
readint int,
primary key(sensor, ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_size': 1,
'compaction_window_unit': 'DAYS'};
这个数据模型只保存简单的传感器数据。实际中我们采集的数据远远比一个整数复杂,但是这里我们只关注键设计。这里使用TWCS作为压紧策略(compaction strategy)。TWCS能帮助我们降低压紧(compact)大分区的负载,使CPU和I/O可控。但是光有TWCS还不够,如果不使用TTL,日积月累,分区会越来越大。大分区在repairing、streaming和读取数据时,会造成系统的极大负载。
为了打破大分区,首先看看第一种桶。我们基于时间片将大分区划分成小分区。理想情况下,分区大小应不要超过100MB。举例来说,如果单个传感器每天的数据在50MB-75MB之间,那么可以按传感器按天进行分区,就是一个传感器一天一个分区。当然只要分区大小不超过100MB,也可以按周、按月、按年进行分区。不管怎么分,为未来业务增长预留点空间。
下面修改数据模型,增加一个字段day,并修改分区键:
CREATE TABLE raw_data_by_day (
sensor text,
day text,
ts timeuuid,
reading int,
primary key((sensor, day), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND COMPACTION = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 1};
下面是插入数据(这里使用now(),也可由应用产生TimeUUID):
INSERT INTO raw_data_by_day (sensor, day, ts, reading)
VALUES ('mysensor', '2017-01-01', now(), 10);
这是限制分区大小的一种方式。如果要获取多天的数据,就要按天发起读请求,每天一个读请求,最后由应用将返回数据合并。这种获取方式的好处是,请求被分散到集群的各个节点,而不是单一的一个节点。我们也可以依赖客户端驱动发起并行请求。Python甚至提供了一个简便的函数帮助实现这种并发请求。下面是Python示例:
from itertools import product
from cassandra.concurrent import execute_concurrent_with_args
days = ["2017-07-01", "2017-07-12", "2017-07-03"] # collecting three days worth of data
session = Cluster(["127.0.0.1"]).connect("blog")
prepared = session.prepare("SELECT day, ts, reading FROM raw_data_by_day WHERE sensor = ? and day = ?")
args = product(["mysensor"], days)
# args: ('test', '2017-07-01'), ('test', '2017-07-12'), ('test', '2017-07-03')
# driver handles concurrency for you
results = execute_concurrent_with_args(session, prepared, args)
# Results:
#[ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36750>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36a90>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36550>)]
这个方案的一个变例,是为每个时间片创建一张表。比如一个月一张表,那么一年就有12张表:
CREATE TABLE raw_data_may_2017 (
sensor text,
ts timeuuid,
reading int,
primary key(sensor, ts)
) WITH COMPACTION = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 1};
这个方案的一个主要优点是便于归档,以及快速删除数据。比如,在每月的开始,我们可以把上个月的数据用parquet格式归档到HDFS或者S3(译者按:S3是AWS上的一种存储服务),这样可以充分利用便宜的存储实现数据分析功能。当Cassandra集群不再需要某些数据时,只要把表删除就完了,非常简单。你也许注意到,这个方案有些额外的运维工作,就是创建和删除表,所以当有归档的需求时,这个方案是合适的。也有其他归档的方法,因此这种形式的桶不是必须的。
上面的方案关注点在于避免一段较长的时间段内分区变得特别大。这个方案适用的业务场景是流量可预测的,并且变化很小。有可能遇到例外的情况,大部分分区比较正常,但是个别分区数据量特别大,以致超过单个节点的存储,或者某个分区流量瞬间变得很大,超出单个节点的处理能力。Twitter是个很好的例子,有些人拥有数千万的粉丝。程序需要对这些人使用不同的代码路径,以实现海量伸缩性。
另一种技巧是在任何时间点使用多分区方式,从而将数据分布到整个集群。好处是,我们可以用单个分区存储小数据,而使用多个分区处理大数据量的情况。
这个方案也有缺点,我们必须做好折衷。缺点是当读取数据时,我们必须使用聚合,这会带来额外的工作。另外让分页也很困难。我们需要实时监控分区的数据量,以便我们为每一块数据选择合适的分区数量。如果我们有太多的桶,结果我们读取一点点数据可能要访问很多分区。太少的桶,会导致大分区,进而造成压紧、repair、stream困难,并影响性能。
在下面的例子里,我们设想一个理想的模型,有人在一个像Twitter一样的社交网络里关注了很多人。大部分账户只用一个分区就能处理所有进来的消息。但是有人关注了数百万的账户。
声明:我并不知道Twitter怎么存储数据。这只是一个简单的例子用来讨论问题。
CREATE TABLE tweet_stream (
account text,
day text,
bucket int,
ts timeuuid,
message text,
primary key((account, day, bucket), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND COMPACTION = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 1};
通过在分区键里增加bucket字段,我们扩展了前面的数据模型。现在一天可以进一步分成多个桶。当获取数据时,我们需要访问这些桶,并把数据聚合一起。我们插入一些数据演示一下:
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 0, now(), 'hi');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 1, now(), 'hi2');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 2, now(), 'hi3');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 3, now(), 'hi4');
如果需要获取最新的消息,我们可以这样做:
from itertools import chain
from cassandra.util import unix_time_from_uuid1
prepared = session.prepare("SELECT ts, message FROM tweet_stream WHERE account = ? and day = ? and bucket = ? LIMIT 10")
# let's get 10 buckets
partitions = range(10)
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
args = product(["jon_haddad"], ["2017-07-01"], partitions)
result = execute_concurrent_with_args(session, prepared, args)
# [ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e6d0>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d710>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d4d0>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d950>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1db10>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dfd0>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dd90>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d290>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e250>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e490>)]
results = [x.result_or_exc for x in result]
# append all the results together
data = chain(*results)
sorted_results = sorted(data, key=lambda x: unix_time_from_uuid1(x.ts), reverse=True)
# newest stuff first
# [Row(ts=UUID('e1c59e60-7406-11e7-9458-897782c5d96c'), message=u'hi4'),
# Row(ts=UUID('dd6ddd00-7406-11e7-9458-897782c5d96c'), message=u'hi3'),
# Row(ts=UUID('d4422560-7406-11e7-9458-897782c5d96c'), message=u'hi2'),
# Row(ts=UUID('d17dae30-7406-11e7-9458-897782c5d96c'), message=u'hi')]
这个例子仅仅使用LIMIT语句限制获取10条消息,懒惰的程序员把这些列表合并在一起,然后排序,然后就大功告成。如果要获取更多的数据,我们需要使用一个K-way合并算法。未来当我们对这个话题进一步阐释的时候,再做讨论。
现在你应该对如何在集群里分发数据和请求有了更好的理解。记住每个问题都是不同的,没有银弹。