介绍
通过过去三年里部署、运维Spark Streaming的积累,我们在Catalyst和DataFrames的基础上重新架构了绰号为“Struct Streaming”的实时计算项目。
语义
我们提出来一个简单的模型“repeated queries(RQ)”。基于该模型用户可以从原静态表以及SQL/DataFrames的一些概念推广至Struct Streaming上。
工作原理:
逻辑上,每一个stream为append-only table(比如DataFrame),该表中记录可以以任意顺序到达
就像传统的SQL/DataFrame一样,用户可以定义queries在整张表上,同时返回一个新的表;当到达processing time时,开始执行该queries;
用户可以设定trigger来确定何时运行查询以及输出,当然,这些都是基于processing time;系统会尽快地满足该条件(as soon as possible);
最终,用户为每一个查询设置output mode,包括如下:
- Delta: 尽管逻辑上每次查询的结果总是一张表,但用户可以获取从上次trigger后的变化delet作为结果输出;
- 这些是物理意义上的deltas,而非逻辑上的;换句话说,哪些行被增减的变化,而非对于某些行的逻辑变化;
- 用户必须指定一个primary key(可以以组合的形式);输出的结果中包含了一个额外的status列,该列指定该记录是add/remove/update;
- Append: Deleta模式的一种特例,不过不包含remove的记录;同时,不需要指定primary key,结果也不包含status列;
- Update(-in-place): 直接更新结果 (比如说update 一个mysql表),类似于Delata,primary key是必须的;
- Complete: 对于每一次查询结果,创建一个完整的snapshot;
总之,在任意时刻每次RQ执行的结果是确定的,并且保存它的完整结果或者距离上次的变更,而每次查询的执行则类似于Spark SQL。
- Delta: 尽管逻辑上每次查询的结果总是一张表,但用户可以获取从上次trigger后的变化delet作为结果输出;
操作
Map-only ETC job
- Query为整个表的map操作;
- Trigger为processing time(比如说1minute一次,或者越快越好);
- Output类型为Append(每次执行都会新写入一个新的HDFS文件);
Infinite agrregation/landmark window
- Query为”select count(*) from visits group by page”;
- Trigger为processing time(比如说1minute一次,或者越快越好);
- Output类型为在mysql表中的实时更新(及时更新变化的记录);
- 针对相同的查询,我们可以查询已经写入HDFS的文件,而不是再次执行然后输出结果;
Sliding window aggregation by Event time
sliding window包含4个参数:
- 时间列;
- window的大小;
- sliding 间隔;
- window起始时间;
Tumbling window是一种特殊的sliding window,它的sliding间隔同window大小一样;针对这种查询,作如下处理:
- Query是一种带有特别(多个)groupby的count操作,将所有的event map到所属的window(类似于spark中的flatMap);
- Trigger为processing time;
- Output模型为及时替换;如果一个已经完成的window收到一个迟到的事件,我们依然可以更新老的记录;
Query on top of window(find most popular window in past hour)
- Query: 对之前的window counted查询添加一个top k操作;
- Trigger为processing time;
- Output模型为HDFS中的一个可更新文件或者可以写入top k的Kafka stream(Compeleted模式)
Session statistics(count number and average length of sessions)
- Query: 对每个记录分配一个session ID(包含session start/end/session grouping key)并做聚合操作,然后可以基于该sessionID做count(*)/max(time)/min(time);
- Trigger为processing time;
- Output模型为HDFS中的一个可更新文件
Output mode
Time和Trigger
过期数据
同其他模型的比较
Repeated Query模型的好处:
- 没有stream的概念–所有的都是table和sql 查询;
- 不同于Google Dataflow, triggers和outputs同查询本身是独立的;Dataflow中window(从sql的角度看就是一个groupby)必须确定一个outputmode 和trigger,而在RQ中,可以使用这些查询,而不一定使用window的概念;
- 同batch processing兼容性很好;
- 许多心仪的features(sessions/feedback loops等)很容易实现;
RQ主要的缺点为查询的渐增是由planner完成的,planner必须支持queries/output mode/triggers的结合,比如说必须支持什么时候可以删除老的数据或者状态,用户对此则没法控制。
同Storm的比较
- Storm暴露lower level api,需要用户自己设计low level的数据流拓扑;
- Storm为单调、不可变(processing)的时间度量,不容易处理event time,同时很难处理有状态的操作及容错等;
同Dstream的比较
- 同Storm类似,它也暴露的为一个单调、不可变的(processing)时间度量,很难支持event time;
- API绑定在micro-batch执行模型中,这导致变化性较差,batch间隔的变化会导致整个window大小的变化;
RQ相较于以上两种的优势在于:
- RQ同时支持processing time和event time;
- RQ API同底层执行模型是解耦的,可以实现不基于microbatch的执行模型;
- RQ 可以提供关系型数据库的查询优化同时能够处理更加复杂的查询语句;
CQL(Streams + Tables)
CQL,Calcite和其他的一些streaming DB也有streams和table的概念, 但这些都是嘉定一个单调、不可变的时间度量,对于这些系统,结果一旦差生就不可再变更。
参考:
- Structured Streaming (aka Streaming DataFrames): https://issues.apache.org/jira/browse/SPARK-8360