Analysis of Event Time of Structured Streaming
Event Time!
There have been unofficial event time support attempts in the Spark Streaming era [1], and in the evolved Structured Streaming, native support for event time has been added.
import spark . implicits . _val words = ... // streaming DataFrame of schema {timestamp: Timestamp, word: String}// Group the data by window and word and compute the count of each group
// Please note: we'll revise this example in <Structured Streaming Watermark Analysis>
val windowedCounts = words.groupBy(
window($ " timestamp " , " 10 minutes " , " 5 minutes " ),
$ " word "
).count()
The execution process here is as follows.
- We have a series of arriving records
- A first column against time
timestamp
to make the length of10m
the sliding of5m
the window () operation - FIG e.g. virtual box on the upper right corner portion, when reaching a recording
12:22|dog
time, will12:22
be grouped in two windows12:15-12:25
,12:20-12:30
so generating two records:12:15-12:25|dog
, ,12:20-12:30|dog
the recording12:24|dog owl
empathy generate two records:12:15-12:25|dog owl
,12:20-12:30|dog owl
- So here the essence of window() operation is explode() , multiple pieces of data can be generated from one piece of data
- Then the window () result of the operation to
window
the column andword
as a key, do groupBy (). Count () operation - The aggregation process of this operation is incremental (with the help of StateStore)
- To give a final
window
,word
,count
state set of three
Processing Late Data
Still use the previous window() + groupBy().count() example, but note that there is a late data 12:06|cat
:
As you can see, the late data here has been correctly updated to where it should be in the State.
OutputModes
Let’s continue to look at the previous window() + groupBy().count() example, now we consider outputting the result, that is, consider OutputModes:
(a) Complete
The output of Complete is exactly the same as State:
(b) Append
The semantics of Append will ensure that once a key is output, the same key will not be output in the future.
Therefore, in view of 12:10
the direct output of this batch 12:00-12:10|cat|1
, 12:05-12:15|cat|1
would be a mistake, because in 12:20
the result for the update 12:00-12:10|cat|2
, but it does not output under Append mode again 12:00-12:10|cat|2
, because the output earlier that the same key 12:00-12:10|cat
results 12:00-12:10|cat|1
.
In order to solve this problem, in Append mode, Structured Streaming needs to know when the result of a certain key will not be updated. When the confirmation result will not be updated again (the next article will specifically explain the confirmation that the result will not be updated depending on the watermark), the result can be output.
As shown above, we determined if 12:30
no further batch of the subsequent 12:00-12:10
update the window, then we can put 12:00-12:10
the results 12:30
of this batch output, and also to ensure that the output will not later batch 12:00-12:10
window of As a result, the semantics of Append mode is maintained.
© Update
Update mode has been officially supported in Spark 2.1.1 and later versions.
As shown in the figure above, in the Update mode, only the updated items in the State of this execution batch will be output:
- In the execution batch at 12:10, all 2 items in State are new (and therefore are updated), so all 2 items are output;
- In the execution batch at 12:20, 2 items in the State were updated and 4 items were newly added (and therefore all were updated), so all 6 items were output;
- In the execution batch at 12:30, 4 items in State were updated, so 4 items were output. These require special note is that, as such Append mode, since this batch execution confirmation (by watermark mechanism)
12:00-12:10
this window will not be updated, and thus it is removed from the State, but has not produced an output.