Structured Streaming

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine.
Structured Streaming是一个基于sparksql引擎开发的 可伸展和容错的流处理引擎 .

You can express your streaming computation the same way you would express a batch computation on static data.
你可以 使用你在静态数据上批计算一样的方式来 表达你的流计算

The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive.
spark sql引擎会小心地 增量和持续地运行它 并且更新最终结果流数据 由于流数据是不断地到达的.

You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc.
你可以使用df api在各种语言上解释流聚合 事务时间窗口 流与批join等等

The computation is executed on the same optimized Spark SQL engine.
计算执行在同一个sql引擎性能上.

Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs.
最后, 系统保证端到端 只一次 容错 通过checkpointing和Write-Ahead Logs.

In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
简而言之, Structured Streaming提供快速 可伸展 容错 端到端 只一次的流处理 而不用用户知道流的细节.

Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees.
内部地, 默认地, Structured Streaming查询 使用micro-batch处理引擎处理, 把数据流搞一一个个小的批任务 最小100ms

However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees.
然而, 自从spark2.3, 我们介绍一种新的低延迟处理模式叫作Continuous Processing, 可以实现端到端延迟小于1ms 基于至少一次保证.

Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements.
你将能基于你应用需要选择模式, 而不用改动你查询df的操作.

In this guide, we are going to walk you through the programming model and the APIs.
在这指南, 我们将带你浏览程序模式和api
We are going to explain the concepts mostly using the default micro-batch processing model, and then later discuss Continuous Processing model.
我们将大部分解释使用默认微批处理模式的概念, 然后再讨论Continuous Processing model.
First, let’s start with a simple example of a Structured Streaming query – a streaming word count.
首先, 让我们从一个简单例子开始一个Structured Streaming查询

Quick Example
Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket.
Let’s see how you can express this using Structured Streaming. You can see the full code in Scala/Java/Python/R. And if you download Spark, you can directly run the example. In any case, let’s walk through the example step-by-step and understand how it works. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.

This lines DataFrame represents an unbounded table containing the streaming text data.
这个linedf代表了一个无边的表包含了流文本数据.
This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table.
这表包含一个名为value的string, 在流文本数据流中的每一行变成了表中的一列.
Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it.
注意, 这不是当前接收任何数据 由于我们只是设置了转换, 还没有开始.
Next, we have converted the DataFrame to a Dataset of String using .as(Encoders.STRING()), so that we can apply the flatMap operation to split each line into multiple words.
接下来, 我们转换df到一个dataset的string, 使用as命令, 如便我们能够使用flatmap操作去把每行切成多个单词.
The resultant words Dataset contains all the words.
words dataset的结果包含了所有单词.
Finally, we have defined the wordCounts DataFrame by grouping by the unique values in the Dataset and counting them.
最后, 我们定义了wordcountdf通过…..
Note that this is a streaming DataFrame which represents the running word counts of the stream.
注意 这个df流代表了流中单词的记数.
We have now set up the query on the streaming data.
我们现在设置了流数据上的查询.
All that is left is to actually start receiving data and computing the counts.
剩下来就是直接开始接收和计算了.
To do this, we set it up to print the complete set of counts (specified by outputMode(“complete”)) to the console every time they are updated.
要做这个, 我们把它启动打印完成记数集到输出, 每次更新的时候.(通过outputMode(“complete”)来指定)
And then start the streaming computation using start().
然后使用start命令启动计算.

Programming Model
The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended.
Structured Streaming中关键一点是把一个活数据流当成一张不断在增加的表.
This leads to a new stream processing model that is very similar to a batch processing model.
这带来一个新的流处理模式 非常类似一个批处理模式.
You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table.
你将表达你的流计算…
Let’s understand this model in more detail.

Basic Concepts
Consider the input data stream as the “Input Table”. Every data item that is arriving on the stream is like a new row being appended to the Input Table.
考虑输入数据流作为输入表. 每个数据项到达流就像一个新的row被加到输入表中.

The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:
输出被定义成写到外部存储. 输出可以被定义成一种不同的模式:
Complete Mode – The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.
完整模式 – 完全更新结果表会被写到外部存储. 这就由存储连接决定的如何持有写全表.
Append Mode – Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
追加模式 – 当最后触发时 只有新行追加到结果表中 会被写入外部存储中. 这个只适用查询那些在结果表中 不会变化的 现存的行
Update Mode – Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1).
更新模式 – 当最后触发时 只有那些在结果表中更新的行 会被写入外部存储.
Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger.
注意这和完整模式的区别是 这模式只有输出有变化的行 自从最后触发时.
If the query doesn’t contain aggregations, it will be equivalent to Append mode.
如果查询不包括聚合, 它就和追加模式相同.

Note that each mode is applicable on certain types of queries. This is discussed in detail later.
注意 每种模式适用某种查询. 这些细节等会儿会讨论.

To illustrate the use of this model, let’s understand the model in context of the Quick Example above.
为了指示这模式的使用, 让我们通过上面这个快速例子理解这个模式.
The first lines DataFrame is the input table, and the final wordCounts DataFrame is the result table.
df开始的行是输入表, 最终wordcountdf是结果表.
Note that the query on streaming lines DataFrame to generate wordCounts is exactly the same as it would be a static DataFrame.
注意流行df上的查询生成wordcount和它成为一个静态df完全一致.
However, when this query is started, Spark will continuously check for new data from the socket connection.
然而, 当这查询开始, spark不断检查新数据从socket连接里面.
If there is new data, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts, as shown below.
如果那里面有新数据, spark会运行一个增量查询组合之前运行的数和新数据去计算更新的记数, 就像下面.

Note that Structured Streaming does not materialize the entire table.
注意Structured Streaming并不实现全表.
It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data.
它读最近可用数据从流数据源中, 增量地处理它去更新结果, 然后销毁源数据.
It only keeps around the minimal intermediate state data as required to update the result (e.g. intermediate counts in the earlier example).
它只保留最小中间状态数据以用作更新结果(在前面例子里面就是中间count)
This model is significantly different from many other stream processing engines.
这模式和许多别的流处理引擎显著的不同.
Many streaming systems require the user to maintain running aggregations themselves, thus having to reason about fault-tolerance, and data consistency (at-least-once, or at-most-once, or exactly-once).
In this model, Spark is responsible for updating the Result Table when there is new data, thus relieving the users from reasoning about it.
在这模式, spark只在有新数据的时候才负责更新结果表, 因而从原理上缓解了用户.
As an example, let’s see how this model handles event-time based processing and late arriving data.
作为一个例子, 让我们看这模式如何基于事务时间处理和迟到的数据.

Handling Event-time and Late Data(处理事务时间和延迟数据)
Event-time is the time embedded in the data itself.
事务时间是时间内含在数据自身.
For many applications, you may want to operate on this event-time.
对很多应用来说, 你可能想到基于事务时间进行操作.
For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them.
例如, 如果你想得到每分钟iot设备产生事件的数量, 那么你可能想使用数据生成的时间, 而不是spark接收到它们的时间.
This event-time is very naturally expressed in this model – each event from the devices is a row in the table, and event-time is a column value in the row.
这事务时间是非常自然表达在这模式中 – 每个从设备中来的事务都是表中的一行, 事务时间是行中的一列.
This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column – each time window is a group and each row can belong to multiple windows/groups.
这允许基于窗口的聚合(例如每分钟事务的数量)作为一种特殊类型的group和聚合 在事务时间列上 – 每个时间窗口是一个group并且每一行可以属于多个窗口或组.
Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier.
因此, 如此一个基于事务时间窗口的聚合查询能被统一定义在静态数据和流式数据上, 这使得用户用起来更容易.
Furthermore, this model naturally handles data that has arrived later than expected based on its event-time.
此外, 这模式可以很自然地处理比事务时间晚到的数据
Since Spark is updating the Result Table, it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate state data.
由于spark更新结果表, 当有迟到数据时 它就能完全控制云更新旧的聚合, 就和清理旧聚合去限制中间状态数据的尺寸 一样.
Since Spark 2.1, we have support for watermarking which allows the user to specify the threshold of late data, and allows the engine to accordingly clean up old state.
从spark2.1开始, 我们支持水印 允许用户指定迟到数据的阈值, 并且允许引擎相应地清理旧状态.
These are explained later in more detail in the Window Operations section.
在窗口操作章节会解释更多细节.

Fault Tolerance Semantics(容错语义)
Delivering end-to-end exactly-once semantics was one of key goals behind the design of Structured Streaming.
发送端到端只一次语义是一个关键目标 在Structured Streaming设计中.
To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing.
为了实现那个, 我们设计结构流源头, sink和执行引擎会可靠地追踪处理的进程 以便于它可以处理任何各类的错误 通过重启或重处理.
Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position in the stream.
每个流源都假设有offset来追踪在流中读取的位置.
The engine uses checkpointing and write-ahead logs to record the offset range of the data being processed in each trigger.
写卡器使用checkpoint和write-ahead日志去记录数据处理的offset范围 每次触发时.
The streaming sinks are designed to be idempotent for handling reprocessing.
流sink被设计成冥等 为了重处理.
Together, using replayable sources and idempotent sinks, Structured Streaming can ensure end-to-end exactly-once semantics under any failure.
一起, 使用重放源和冥等sink, Structured Streaming可以保证端到端 只一次的语义 在任何错误下.

API using Datasets and DataFrames(api使用ds和df)
Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as well as streaming, unbounded data.
从spark2开始, df和ds可以表达静态 有边界数据 和流式 无边界数据 一样
Similar to static Datasets/DataFrames, you can use the common entry point SparkSession (Scala/Java/Python/R docs) to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets.
类似静态ds/df, 你可以使用普通条目点sparksession去从一个流源头 去创造df/ds, 然后使用和静态df/ds一样的操作在上面.
If you are not familiar with Datasets/DataFrames, you are strongly advised to familiarize yourself with them using the DataFrame/Dataset Programming Guide.
如果你不熟悉ds/df, 你可以去看前面章节.

Creating streaming DataFrames and streaming Datasets
Streaming DataFrames can be created through the DataStreamReader interface (Scala/Java/Python docs) returned by SparkSession.readStream().
df流可以通过DataStreamReader接口通过SparkSession.readStream()来建立.
In R, with the read.stream() method. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.

Input Sources
There are a few built-in sources.

File source – Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, orc, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.

Kafka source – Reads data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher. See the Kafka Integration Guide for more details.

Socket source (for testing) – Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.

Rate source (for testing) – Generates data at the specified number of rows per second, each output row contains a timestamp and value. Where timestamp is a Timestamp type containing the time of message dispatch, and value is of Long type containing the message count, starting from 0 as the first row. This source is intended for testing and benchmarking.

Some sources are not fault-tolerant because they do not guarantee that data can be replayed using checkpointed offsets after a failure. See the earlier section on fault-tolerance semantics. Here are the details of all the sources in Spark.

These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted.
这些例子产生的df流都是没有类型的, 意味着df的schema没有在编译时间检查, 只是在运行时间检查 当查询已经提交了以后.
Some operations like map, flatMap, etc. need the type to be known at compile time.
一些操作像map,flatmap等等需要在编译时间知道类型.
To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame.
为了做那些, 你可以把这些无类型的df流变成有类型的ds流 使用和静态df相同的方法
See the SQL Programming Guide for more details. Additionally, more details on the supported streaming sources are discussed later in the document.
更多细节可以看sql章节. 同时, 更多细节支持流源将在后面讨论.

Schema inference and partition of streaming DataFrames/Datasets(Schema推导和df/ds流分片)
By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically.
默认地, 基于文件源的Structured Streaming需要你指定schema, 而不是让spark自动推断.
This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures.
这约束确保一个一致的schema会被用做流查询, 甚至在出错的情况下.
For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference to true.
为了特别的用例, 你可以再打开schema推断通过设置setting spark.sql.streaming.schemaInference为true.

Partition discovery does occur when subdirectories that are named /key=value/ are present and listing will automatically recurse into these directories.
当子目录被命名为/key=value/时 分片发现会发生, 列表会自动地递归这些目录.
If these columns appear in the user-provided schema, they will be filled in by Spark based on the path of the file being read.
如此这些列出现在用户提供的schema, 他们会基于目录中文件读到的 通过spark填入.
The directories that make up the partitioning scheme must be present when the query starts and must remain static.

For example, it is okay to add /data/year=2016/ when /data/year=2015/ was present, but it is invalid to change the partitioning column (i.e. by creating the directory /data/date=2016-04-17/).

Operations on streaming DataFrames/Datasets
You can apply all kinds of operations on streaming DataFrames/Datasets – ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the SQL programming guide for more details. Let’s take a look at a few example operations that you can use.

Basic Operations – Selection, Projection, Aggregation
Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are discussed later in this section.

Window Operations on Event Time
Aggregations over a sliding event-time window are straightforward with Structured Streaming and are very similar to grouped aggregations.
在一个滑动事务时间窗口上的聚合是和Structured Streaming一直向前的, 并且非常类似于group聚合
In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column.
在一个group聚合中, 在用户指定的group列中为每个唯一值维持一份聚合值.
In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into.
在基于窗口聚合的情况下, 聚合值为每个窗口的事务时间一个行.
Let’s understand this with an illustration.
让我们通过一个插图理解这个.

Imagine our quick example is modified and the stream now contains lines along with the time when the line was generated.
想象我们的例子是修改的, 流在行里面包含这行产生的时间.
Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes.
代替运行wordcount, 我们想要计算10分钟窗口内的单词数, 每5分钟更新一次.
That is, word counts in words received between 10 minute windows 12:00 – 12:10, 12:05 – 12:15, 12:10 – 12:20, etc. Note that 12:00 – 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 – 12:10 and 12:05 – 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time).

The result tables would look something like the following.

In this example, we are defining the watermark of the query on the value of the column “timestamp”, and also defining “10 minutes” as the threshold of how late is the data allowed to be.
在这个例子, 我们定义了查询的水印在“timestamp”这个字段上, 也定义了10分钟作为阈值, 这个数据允许迟到多久.
If this query is run in Update output mode (discussed later in Output Modes section), the engine will keep updating counts of a window in the Result Table until the window is older than the watermark, which lags behind the current event time in column “timestamp” by 10 minutes. Here is an illustration.
如果这个查询运行在更新输出模式, 引擎会保持更新商品的数据在结果表中 直到窗口比水印更老, 这里是落后10分钟. 这是示意图.

As shown in the illustration, the maximum event time tracked by the engine is the blue dashed line, and the watermark set as (max event time – ’10 mins’) at the beginning of every trigger is the red line.
就像示意图显示的, 最大事务时间被引擎追踪的是蓝虚线, 水印设置在每个触发开始地方的红线.
For example, when the engine observes the data (12:14, dog), it sets the watermark for the next trigger as 12:04.
例如, 当引擎观察到数据(12:14, dog), 它设置水印为了下一个触发在12:04.
This watermark lets the engine maintain intermediate state for additional 10 minutes to allow late data to be counted.
这水印让引擎保持中间状态额外的10分钟去允许迟到的数据被计数.
For example, the data (12:09, cat) is out of order and late, and it falls in windows 12:00 – 12:10 and 12:05 – 12:15.
例如, 数据(12:09, cat)乱序并晚到了, 那么它在窗口12:00 – 12:10和12:05 – 12:15会丢失.
Since, it is still ahead of the watermark 12:04 in the trigger, the engine still maintains the intermediate counts as state and correctly updates the counts of the related windows.
由于, 它仍然在水印12:04前面 在触发中, 引擎仍然保持中间状态的记数作为状态 并且正确地更新相关窗口的记数.
However, when the watermark is updated to 12:11, the intermediate state for window (12:00 – 12:10) is cleared, and all subsequent data (e.g. (12:04, donkey)) is considered “too late” and therefore ignored.
然而, 当水印更新到12:11, (12:00 – 12:10)的窗口的内部状态被清理, 所有后面到的数据(比如 12:04, donkey)被认为太晚 因而被忽略.
Note that after every trigger, the updated counts (i.e. purple rows) are written to sink as the trigger output, as dictated by the Update mode.
注意 在每次触发后, 更新记数(即 紫色行)被写到sink作为触发器输出, 作为命令 被更新模式.
Some sinks (e.g. files) may not supported fine-grained updates that Update Mode requires.
一些sinks(比如文件)可能不支持 更新模式所需要的 完整获取更新.
To work with them, we have also support Append Mode, where only the final counts are written to sink. This is illustrated below.
为了有这些功能, 我们也支持追加模式, 只在最终记录被写到sink. 下面是示意图.
Note that using withWatermark on a non-streaming Dataset is no-op. As the watermark should not affect any batch query in any way, we will ignore it directly.
注意使用withWatermark 在非流ds是无操作. 因为水印不会对任何批操作有任何影响, 我们直接忽略.

Similar to the Update Mode earlier, the engine maintains intermediate counts for each window.
类似于前面的更新模式, 引擎维持每个窗口的中间记数.
However, the partial counts are not updated to the Result Table and not written to sink.
然而, 部分记录不更新到结果表, 也不写到sink.
The engine waits for “10 mins” for late date to be counted, then drops intermediate state of a window < watermark, and appends the final counts to the Result Table/sink.
引擎等待10分钟为最后记数时间, 然后把中间状态窗口小于水印的扔了, 把最终记数写到结果表/sink中.
For example, the final counts of window 12:00 – 12:10 is appended to the Result Table only after the watermark is updated to 12:11.
例如, 窗口12:00-12:10的最终记数被追加到结果表 只在水印被更新到12:11以后.

Conditions for watermarking to clean aggregation state
水印清理聚合状态的情况
It is important to note that the following conditions must be satisfied for the watermarking to clean the state in aggregation queries (as of Spark 2.1.1, subject to change in the future).
重要的一点是以下情况必须满足水印去清理状态 在聚合查询(作为spark2.1.1, 在将来变化主题下)

Output mode must be Append or Update. Complete mode requires all aggregate data to be preserved, and hence cannot use watermarking to drop intermediate state.
输出模式必须是追加或更新. 完整模式需要所有聚合数据被保存, 因此不能使用水印去丢弃中间状态.
See the Output Modes section for detailed explanation of the semantics of each output mode.

The aggregation must have either the event-time column, or a window on the event-time column.
聚合必须有事务时间列 或 一个窗口上的事务时间列.

withWatermark must be called on the same column as the timestamp column used in the aggregate.
withWatermark必须是作为聚合时间列的相同列上面.
For example, df.withWatermark(“time”, “1 min”).groupBy(“time2”).count() is invalid in Append output mode, as watermark is defined on a different column from the aggregation column.
例如, df.withWatermark(“time”, “1 min”).groupBy(“time2”).count()在追加模式下是无效的, 由于水印被定义在非聚合列上.
withWatermark must be called before the aggregation for the watermark details to be used.
withWatermark 必须被调用 在聚合水印细节被使用之前.
For example, df.groupBy(“time”).count().withWatermark(“time”, “1 min”) is invalid in Append output mode.
例如, df.groupBy(“time”).count().withWatermark(“time”, “1 min”)在追加模式下无效.

Semantic Guarantees of Aggregation with Watermarking
聚合水印的语义保证
A watermark delay (set with withWatermark) of “2 hours” guarantees that the engine will never drop any data that is less than 2 hours delayed.
一个水印延迟2小时保证 引擎不会扔掉任何晚到2小时的数据.
In other words, any data less than 2 hours behind (in terms of event-time) the latest data processed till then is guaranteed to be aggregated.
换句话说, 任何数据在2小时以内到的最后数据会被处理.

However, the guarantee is strict only in one direction.
然而, 担保只严格在一个方向.
Data delayed by more than 2 hours is not guaranteed to be dropped; it may or may not get aggregated.
数据延迟大于2小时并不保证会被丢弃; 这可能发生也可能不发生.
More delayed is the data, less likely is the engine going to process it.
数据延迟越久, 引擎处理的可能性越小.
——————————————————–join——————————————————–
Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame as well as another streaming Dataset/DataFrame.
Structured Streaming支持 join一个df流到一个静止的df 就和另一个df流一样.
The result of the streaming join is generated incrementally, similar to the results of streaming aggregations in the previous section.
流join的结果是增量生成的, 类似于前一章的聚合结果一样.
In this section we will explore what type of joins (i.e. inner, outer, etc.) are supported in the above cases.
在这章 我们会探索在上面例子里面 什么类型的join(inner, outer等等)被支持.
Note that in all the supported join types, the result of the join with a streaming Dataset/DataFrame will be the exactly the same as if it was with a static Dataset/DataFrame containing the same data in the stream.
注意 在所有join支持的类型里, df流join的结果就和一个包含相同的数据静态df完全一样.

Since the introduction in Spark 2.0, Structured Streaming has supported joins (inner join and some type of outer joins) between a streaming and a static DataFrame/Dataset. Here is a simple example.
从spark2的介绍开始, Structured Streaming支持在一个流和一个静止df之间的join. 这是一个简单的例子.

Stream-stream Joins
In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming Datasets/DataFrames.
在spark2.3, 我们添加了流与流join的支持, 意思是, 你可以把两个df流join起来.
The challenge of generating join results between two data streams is that, at any point of time, the view of the dataset is incomplete for both sides of the join making it much harder to find matches between inputs.
产生两个数据流join的结果的挑战在于, 在任何时间点, 数据集的视图都是不完全的 因为join的两边使它更难在输入中找到匹配.
Any row received from one input stream can match with any future, yet-to-be-received row from the other input stream.
从一个输入流接收到的任何row可以匹配任何 从另一个输入流中 将来的 已经被接到的row.
Hence, for both the input streams, we buffer past input as streaming state, so that we can match every future input with past input and accordingly generate joined results.
因此, 所有的输入流, 我们缓存过去的输入作为流状态, 以便于我们可以匹配每个未来输入 过去的输入 以及相应地产生join结果.
Furthermore, similar to streaming aggregations, we automatically handle late, out-of-order data and can limit the state using watermarks. Let’s discuss the different types of supported stream-stream joins and how to use them.
此外, 类似流聚合, 我们自动地持有过去的, 无序的数据 以及可以使用水印限制状态. 让我们讨论不同类型的流流join的支持以及如何使用他们.

Inner Joins with optional Watermarking(内部join 和 可选的水印)
Inner joins on any kind of columns along with any kind of join conditions are supported.
内部join在任何类型的列 与 任何类型的join条件都是支持的.
However, as the stream runs, the size of streaming state will keep growing indefinitely as all past input must be saved as any new input can match with any input from the past.
然而, 作为流运行, 流状态的大小会保证不确定的增长 由于所有过去的输入必须保存 以便于新的输入可以匹配任何过去的输入.
To avoid unbounded state, you have to define additional join conditions such that indefinitely old inputs cannot match with future inputs and therefore can be cleared from the state.
为了避免无边的状态, 你必须定义另外的join条件 像是不确定旧的输入不能匹配将来的输入 并因此能够从状态中被清理.
In other words, you will have to do the following additional steps in the join.
换句话说, 你将不得不做下面的步骤在join中.

Define watermark delays on both inputs such that the engine knows how delayed the input can be (similar to streaming aggregations)
定义水印延时在所有输入 致使引擎知道输入可能延时多久(类似流聚合)

Define a constraint on event-time across the two inputs such that the engine can figure out when old rows of one input is not going to be required (i.e. will not satisfy the time constraint) for matches with the other input. This constraint can be defined in one of the two ways.
定义一个约束在事务时间 交叉两个输入 致使引擎可以算出 当一个输入的旧行将不再被需要(即将不满足时间约束) 用做另一个输入的匹配. 此约束可能被定义为两种方法中的一种.

Time range join conditions (e.g. …JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),
时间范围join条件(例如 …JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR)

Join on event-time windows (e.g. …JOIN ON leftTimeWindow = rightTimeWindow).
事务时间窗口的join(例如 …JOIN ON leftTimeWindow = rightTimeWindow).

Let’s understand this with an example.

Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with another stream of user clicks on advertisements to correlate when impressions led to monetizable clicks.
让我们说我们想要join一个广告牌的流(当一个广告被展示) 到另一个用户点击广告的流 去关联 当印象变成有价值的点击时.

To allow the state cleanup in this stream-stream join, you will have to specify the watermarking delays and the time constraints as follows.
在这个流流join中允许状态清理, 你将不得不指定延迟水印和时间约束 就像下面这样.

Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order in event-time by at most 2 and 3 hours, respectively.
延迟水印: 说, 展示和相关点击可能各自在事务时间 迟到大概2到3小时.

Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour after the corresponding impression.
事务时间范围条件: 说, 一个点击可能出现在时间范围0到1小时 在相关展示以后.

The code would look like this.
代码看来这样.

// Apply watermarks on event-time columns
Dataset<Row> impressionsWithWatermark = impressions.withWatermark(“impressionTime”, “2 hours”);
Dataset<Row> clicksWithWatermark = clicks.withWatermark(“clickTime”, “3 hours”);

// Join with event-time constraints
impressionsWithWatermark.join(
clicksWithWatermark,
expr(
“clickAdId = impressionAdId AND ” +
“clickTime >= impressionTime AND ” +
“clickTime <= impressionTime + interval 1 hour “)
);

Semantic Guarantees of Stream-stream Inner Joins with Watermarking
流流内join的水印的语义保证
This is similar to the guarantees provided by watermarking on aggregations.
这类似聚合水印提供的保证
A watermark delay of “2 hours” guarantees that the engine will never drop any data that is less than 2 hours delayed.
一个延迟两小时的水印保证 引擎不再丢弃任何延迟少于2小时的数据.
But data delayed by more than 2 hours may or may not get processed.
但数据延迟大于2小时的可以处理也可以不处理.

Outer Joins with Watermarking(外join的水印)
While the watermark + event-time constraints is optional for inner joins, for left and right outer joins they must be specified.
当水印+事务时间约束在内部join是可选, 左和右外join则是必须指定的.
This is because for generating the NULL results in outer join, the engine must know when an input row is not going to match with anything in future.
这是因为在外部join产生null结果, 引擎必须知道当一个输入行在将来不再匹配任何东西时.
Hence, the watermark + event-time constraints must be specified for generating correct results.
因此, 水印+事务时间约束必须为了产生正确结果而被指定.
Therefore, a query with outer-join will look quite like the ad-monetization example earlier, except that there will be an additional parameter specifying it to be an outer-join.
因此, 一个外join查询会看起来完全像是广告转换例子更早的, 除了有一个附加的参数指定它成一个外连接.

impressionsWithWatermark.join(
clicksWithWatermark,
expr(
“clickAdId = impressionAdId AND ” +
“clickTime >= impressionTime AND ” +
“clickTime <= impressionTime + interval 1 hour “),
“leftOuter” // can be “inner”, “leftOuter”, “rightOuter”
);

Semantic Guarantees of Stream-stream Outer Joins with Watermarking
流流外连接的水印的语义保证
Outer joins have the same guarantees as inner joins regarding watermark delays and whether data will be dropped or not.
外连接和内连接关于水印延迟 在数据是否会被丢弃的问题上 有同样的保证

Caveats 预告
There are a few important characteristics to note regarding how the outer results are generated.
关于外部结果如何产生的 有一些重要特性需要注意.

The outer NULL results will be generated with a delay that depends on the specified watermark delay and the time range condition.
outer的null结果会被产生一个延迟 依赖被指定的水印延迟和时间范围条件.
This is because the engine has to wait for that long to ensure there were no matches and there will be no more matches in future.
这是因为引擎不得不等待那么长去确保 现在没有匹配以及将来也不会有更多匹配.

In the current implementation in the micro-batch engine, watermarks are advanced at the end of a micro-batch, and the next micro-batch uses the updated watermark to clean up state and output outer results.
在当前实现的微批处理引擎, 水印在一个微批的结尾才被增长, 然后下一个微批使用更新的水印去清理状态和输出外连接的结果.
Since we trigger a micro-batch only when there is new data to be processed, the generation of the outer result may get delayed if there no new data being received in the stream.
由于我们触发微批只是当有新数据被处理, 外连接结果的产生可能会迟延 如果在流中没有新数据被接收到的话.
In short, if any of the two input streams being joined does not receive data for a while, the outer (both cases, left or right) output may get delayed.
简单地讲, 如果被连接的两个输入流中的任何一个都有一段时间没有收到数据, 外连接(左连,右连和全外)可能会延迟.

流流的连接不支持全外连, 对内连和左连右连来说, 必须要有水印和时间约束才支持

Additional details on supported joins:
支持连接的额外细节:

Joins can be cascaded, that is, you can do df1.join(df2, …).join(df3, …).join(df4, ….).
join操作可以是一串, 你可以df1.join(df2, …).join(df3, …).join(df4, ….).

As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.
spark2.3, 你可以使用join只在查询在追加模式下. 别的输出模式还不被支持.

As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used.
spark2.3, 在join之前 你不能使用别的非map系的操作. 这是一些不能使用的例子

Cannot use streaming aggregations before joins.
在join之前不能使用流聚合.

Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.
在join之前不能使用mapGroupsWithState和flatMapGroupsWithState 在更新模式下.

Streaming Deduplication(流重复)
You can deduplicate records in data streams using a unique identifier in the events.
你可以在事务中使用一个唯一的id来把数据流中的记录消重.
This is exactly same as deduplication on static using a unique identifier column.
这和静态数据使用一个唯一id列来消重是完全一样的.
The query will store the necessary amount of data from previous records such that it can filter duplicate records.
查询会保存必要的数据量 从之前记录 以致 它可以过滤重复的记录.
Similar to aggregations, you can use deduplication with or without watermarking.
类似聚合, 消重时 你可以使用水印也可以不使用水印.

With watermark – If there is a upper bound on how late a duplicate record may arrive, then you can define a watermark on a event time column and deduplicate using both the guid and the event time columns.
使用水印 – 如果关于重复记录可能到达有多晚的一个上限, 你可以定义一个水印在事务时间上 使用guid和事务时间列进行消重.
The query will use the watermark to remove old state data from past records that are not expected to get any duplicates any more.
查询将使用水印去删除旧状态数据从过去的不再会收到任何重复数据的记录.
This bounds the amount of the state the query has to maintain.
这限制了查询不得不保持的状态的数量.

Without watermark – Since there are no bounds on when a duplicate record may arrive, the query stores the data from all the past records as state.
不使用水印 – 由于没有限制重复的记录可能到达的时间, 查询保存了所有过去的记录作为状态.

Dataset<Row> streamingDf = spark.readStream(). …; // columns: guid, eventTime, …

// Without watermark using guid column
streamingDf.dropDuplicates(“guid”);

// With watermark using guid and eventTime columns
streamingDf
.withWatermark(“eventTime”, “10 seconds”)
.dropDuplicates(“guid”, “eventTime”);

Policy for handling multiple watermarks(处理多个水印的机制)
A streaming query can have multiple input streams that are unioned or joined together.
一个流查询可以有多个输入流union或一起join.
Each of the input streams can have a different threshold of late data that needs to be tolerated for stateful operations.
每个输入流可能有一个不同的迟到数据阈值 需要忍受状态操作.
You specify these thresholds using withWatermarks(“eventTime”, delay) on each of the input streams.
你指定这些阈值使用withWatermarks在每个输入流上.
For example, consider a query with stream-stream joins between inputStream1 and inputStream2.
例如, 考虑一个在inputStream1和inputStream2上的流流join查询

inputStream1.withWatermark(“eventTime1”, “1 hour”) .join( inputStream2.withWatermark(“eventTime2”, “2 hours”), joinCondition)

While executing the query, Structured Streaming individually tracks the maximum event time seen in each input stream, calculates watermarks based on the corresponding delay, and chooses a single global watermark with them to be used for stateful operations.
当执行查询, Structured Streaming分别在每个输入流中追踪可见的最大事务时间, 基于一致的延迟计算水印, 并在状态操作下 给他们选择一个独立的全局水印.
By default, the minimum is chosen as the global watermark because it ensures that no data is accidentally dropped as too late if one of the streams falls behind the others (for example, one of the streams stop receiving data due to upstream failures).
默认地, 选择最小的作为全局水印 因为它确保没有数据因为太晚而意外地丢失 如果一个流在另一个之后失败了(例如, 流中的一个停止接收数据 因为上游失败了).
In other words, the global watermark will safely move at the pace of the slowest stream and the query output will be delayed accordingly.
换句话说, 全局水印将安全地移动到最慢的流上 并且查询输出会因此被延迟.

However, in some cases, you may want to get faster results even if it means dropping data from the slowest stream.
然而, 在一些情况下, 你可能想要得到更快的结果 甚至如果它意味着从最慢的流里面丢失数据.
Since Spark 2.4, you can set the multiple watermark policy to choose the maximum value as the global watermark by setting the SQL configuration spark.sql.streaming.multipleWatermarkPolicy to max (default is min).
从spark2.4开始, 你可以设置多个水印策略去选择最大值 由于全局水印通过设置sql配置spark.sql.streaming.multipleWatermarkPolicy为max(默认是min)
This lets the global watermark move at the pace of the fastest stream.
这使得全局水印移到最快的流上了.
However, as a side effect, data from the slower streams will be aggressively dropped. Hence, use this configuration judiciously.
然而, 作为一个片面的结果, 从最慢流中的数据会被侵略性地丢失. 因此, 谨慎地使用这配置.

Arbitrary Stateful Operations(专制的状态操作)
Many usecases require more advanced stateful operations than aggregations.
许多用例需要比聚合更多高级状态操作.
For example, in many usecases, you have to track sessions from data streams of events.
例如, 在许多用例, 你不得不从数据流的事务中追踪会话.
For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger.
为了做如此会话化的操作, 你将不得不保存专制类型的数据作为状态, 并在每次触发使用数据流事务 在状态上执行专制操作.
Since Spark 2.2, this can be done using the operation mapGroupsWithState and the more powerful operation flatMapGroupsWithState.
从spark2.2起, 这可能通过使用操作mapGroupsWithState来处理 并且更多强力的操作flatMapGroupsWithState
Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state.
两个操作都允许你使用用户定义的代码在group的ds上 去更新用户定义的状态.
For more concrete details, take a look at the API documentation (Scala/Java) and the examples (Scala/Java).

Unsupported Operations
There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows.

Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.
多个流聚合(例如 在一个df流上的一串聚合)在ds流上还不支持.

Limit and take first N rows are not supported on streaming Datasets.

Distinct operations on streaming Datasets are not supported.

Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.

Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.

In addition, there are some Dataset methods that will not work on streaming Datasets.
另外, 一些ds方法不会在ds流上工作.
They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset.
它们是那些快速运行查询并返回结果的操作, 对ds流没有意义.
Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).
倒不如, 那些机能能被明确地使用开始一个流的查询(看下一章的讨论)

count() – Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.
从一个ds流中不能返回一个单独地数量. 代替的是, 使用ds.groupBy().count()返回一个包含运行数量的ds流.

foreach() – Instead use ds.writeStream.foreach(…) (see next section).
代替使用ds.writeStream.foreach(…)

show() – Instead use the console sink (see next section).
代替使用sink控制台

If you try any of these operations, you will see an AnalysisException like “operation XYZ is not supported with streaming DataFrames/Datasets”. While some of them may be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting on the input stream is not supported, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently.

Starting Streaming Queries(开始流查询)
Once you have defined the final result DataFrame/Dataset, all that is left is for you to start the streaming computation.
一旦你定义最终df/ds结果, 所以留下来给你的就是启动流计算.
To do that, you have to use the DataStreamWriter (Scala/Java/Python docs) returned through Dataset.writeStream(). You will have to specify one or more of the following in this interface.

Details of the output sink: Data format, location, etc.
输出sink的详细:数据格式等等

Output mode: Specify what gets written to the output sink.
输出模式:定义什么写到输出的sink

Query name: Optionally, specify a unique name of the query for identification.
查询名:可选, 指定查询一个唯一的名字用来定位.

Trigger interval: Optionally, specify the trigger interval. If it is not specified, the system will check for availability of new data as soon as the previous processing has completed. If a trigger time is missed because the previous processing has not completed, then the system will trigger processing immediately.
触发间隔:可选, 指定触发的间隔. 如果没指定, 前面的处理一完成 系统就会检查新数据的有效性 . 如果因为前一个处理没有完成 造成一个触发时间丢失了 , 那么系统会立刻触发处理.

Checkpoint location: For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. The semantics of checkpointing is discussed in more detail in the next section.
checkpoint位置:一些输出sink 是端到端错误处理可以被保证, 定义 系统会写所有的checkpoint信息的位置.
这应该是一个hdfs协调的容错的文件系统的一个目录. checkpoint语义在下一章里有更多细节被讨论.

Output Modes
There are a few types of output modes.

Append mode (default) – This is the default mode, where only the new rows added to the Result Table since the last trigger will be outputted to the sink. This is supported for only those queries where rows added to the Result Table is never going to change. Hence, this mode guarantees that each row will be output only once (assuming fault-tolerant sink). For example, queries with only select, where, map, flatMap, filter, join, etc. will support Append mode.
追加模式(默认) – 这是默认的模式, 从最后触发 只有新行加到结果表 会被输出到sink. 这只支持那些 行被加到结果表以后不再改变的 查询. 因此, 这模式保证每行都将被输出一次(假定容错sink). 例如, 查询只有select, where, map, flatMap, filter, join,等会支持追加模式.

Complete mode – The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.
完全模式 – 每次触发 整个结果表会输出到sink. 这支持聚合查询.

Update mode – (Available since Spark 2.1.1) Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. More information to be added in future releases.
更新模式 – (从spark2.1.1可用) 从最后触发 只有行在结果表被更新时才被输出到sink. 更多信息在将来版本中添加

Different types of streaming queries support different output modes. Here is the compatibility matrix.
不同类型的流查询支持不同类型的输出模式. 这是适应对照表.

Using Foreach and ForeachBatch
The foreach and foreachBatch operations allow you to apply arbitrary operations and writing logic on the output of a streaming query.
foreach和foreachBatch操作允许你在一个流查询上使用任意的操作和写逻辑.
They have slightly different use cases – while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch. Let’s understand their usages in more detail.
他们在使用上有些许不同 – foreach允许在每列上定制写逻辑, 而foreachBatch允许在每个微批的输出上定义任意地操作和写逻辑. 让我们理解它们在更多细节.

ForeachBatch
foreachBatch(…) allows you to specify a function that is executed on the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  // Transform and write batchDF 
}.start()

With foreachBatch, you can do the following.
对于每个批次, 你可以做下面的.

1, Reuse existing batch data sources – For many storage systems, there may not be a streaming sink available yet, but there may already exist a data writer for batch queries.
重复使用存在的批数据源 – 对很多存储系统来说, 那甚至可以不是一个流sink, 但那已经存在一个数据写入为批查询.
Using foreachBatch, you can use the batch data writers on the output of each micro-batch.
使用foreachBatch, 你可以使用批数据写入在每个微批的输出上.

2, Write to multiple locations – If you want to write the output of a streaming query to multiple locations, then you can simply write the output DataFrame/Dataset multiple times.
写到多个地方 – 如果你想到把一个流查询写到多个地方, 那么你可以简单的把df或ds输出写多次.
However, each attempt to write can cause the output data to be recomputed (including possible re-reading of the input data).
然而, 每次写的尝试可能造成输出数据被重复计算(包括输入数据的重读)
To avoid recomputations, you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.
为了避免重计算, 你应该缓存输出的df/ds, 把它写入多个地方, 然后释放. 这是一个示意

  streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.persist()
    batchDF.write.format(…).save(…)
    // location 1 
    batchDF.write.format(…).save(…)
    // location 2
    batchDF.unpersist()
  }

3, Apply additional DataFrame operations – Many DataFrame and Dataset operations are not supported in streaming DataFrames because Spark does not support generating incremental plans in those cases.
应用附加df操作 – 许多df和ds操作在流里面不支持, 因为在那些情况下 spark不支持产生增量地计划.
Using foreachBatch, you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
使用foreachBatch, 你可以在每个微批输出上应用这些操作. 然而, 你将不得不在处理那些操作的时候自己推论端到端的语义.

Note:
1, By default, foreachBatch provides only at-least-once write guarantees. However, you can use the batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.
默认foreachBatch提供至少一次写保证. 然而, 你可以使用方法提供的batchid作为输出消重的方式从而得到正好一次的保证.
2, foreachBatch does not work with the continuous processing mode as it fundamentally relies on the micro-batch execution of a streaming query. If you write data in the continuous mode, use foreach instead.
foreachBatch不能在continuous处理模式下工作, 因为它本质上依赖于微批处理. 如果你在continuous模式下写数据, 使用foreach代替.

Foreach
If foreachBatch is not an option (for example, corresponding batch data writer does not exist, or continuous processing mode), then you can express you custom writer logic using foreach.
如果foreachBatch不是一个可选项(例如, 对应的批数据写并不存在, 或者continuous处理模式), 那么你可能使用foreach表达定制你的写入.
Specifically, you can express the data writing logic by dividing it into three methods: open, process, and close. Since Spark 2.4, foreach is available in Scala, Java and Python.
特别地, 你可能通过把写入逻辑分成三个方法:打开,处理和关闭 来表达数据写逻辑.

streamingDatasetOfString.writeStream().foreach(
  new ForeachWriter[String] {

    @Override public boolean open(long partitionId, long version) {
      // Open connection
    }

    @Override public void process(String record) {
      // Write string to connection
    }

    @Override public void close(Throwable errorOrNull) {
      // Close the connection
    }
  }
).start();

Execution semantics When the streaming query is started, Spark calls the function or the object’s methods in the following way:

A single copy of this object is responsible for all the data generated by a single task in a query. In other words, one instance is responsible for processing one partition of the data generated in a distributed manner.

This object must be serializable, because each task will get a fresh serialized-deserialized copy of the provided object. Hence, it is strongly recommended that any initialization for writing data (for example. opening a connection or starting a transaction) is done after the open() method has been called, which signifies that the task is ready to generate data.

The lifecycle of the methods are as follows:

For each partition with partition_id:

For each batch/epoch of streaming data with epoch_id:

Method open(partitionId, epochId) is called.

If open(…) returns true, for each row in the partition and batch/epoch, method process(row) is called.

Method close(error) is called with error (if any) seen while processing rows.

The close() method (if it exists) is called if an open() method exists and returns successfully (irrespective of the return value), except if the JVM or Python process crashes in the middle.

Note: The partitionId and epochId in the open() method can be used to deduplicate generated data when failures cause reprocessing of some input data. This depends on the execution mode of the query. If the streaming query is being executed in the micro-batch mode, then every partition represented by a unique tuple (partition_id, epoch_id) is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used to deduplicate and/or transactionally commit data and achieve exactly-once guarantees. However, if the streaming query is being executed in the continuous mode, then this guarantee does not hold and therefore should not be used for deduplication.

Triggers
The trigger settings of a streaming query defines the timing of streaming data processing, whether the query is going to executed as micro-batch query with a fixed batch interval or as a continuous processing query. Here are the different kinds of triggers that are supported.
一个流查询的触发设置定义了流数据处理的时间线, 是否查询将作为微批查询被处理 一个修正的批间隔 或作为一个连续查询处理. 支持的触发器有这几种.

Trigger Type Description

1, unspecified (default) If no trigger setting is explicitly specified, then by default, the query will be executed in micro-batch mode, where micro-batches will be generated as soon as the previous micro-batch has completed processing.
默认 如果没有触发设置被明确指定, 那么默认地, 查询会被在微批模式下执行, 微批将在前一个微批 一完成处理后就生成出来.

2, Fixed interval micro-batches The query will be executed with micro-batches mode, where micro-batches will be kicked off at the user-specified intervals.
If the previous micro-batch completes within the interval, then the engine will wait until the interval is over before kicking off the next micro-batch.
If the previous micro-batch takes longer than the interval to complete (i.e. if an interval boundary is missed), then the next micro-batch will start as soon as the previous one completes (i.e., it will not wait for the next interval boundary).
If no new data is available, then no micro-batch will be kicked off.

3, One-time micro-batch The query will execute only one micro-batch to process all the available data and then stop on its own. This is useful in scenarios you want to periodically spin up a cluster, process everything that is available since the last period, and then shutdown the cluster. In some case, this may lead to significant cost savings.

4, Continuous with fixed checkpoint interval
(experimental) The query will be executed in the new low-latency, continuous processing mode. Read more about this in the Continuous Processing section below.

    原文作者:黄药师ii
    原文地址: https://www.jianshu.com/p/be64e344474f
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞