Analysis of Event Time of Structured Streaming

surendranadh kurra
4 min readFeb 22, 2021

--

Event Time!

There have been unofficial event time support attempts in the Spark Streaming era [1], and in the evolved Structured Streaming, native support for event time has been added.

import  spark . implicits . _val  words  = ... // streaming DataFrame of schema {timestamp: Timestamp, word: String}// Group the data by window and word and compute the count of each group 
// Please note: we'll revise this example in <Structured Streaming Watermark Analysis>
val windowedCounts = words.groupBy(
window($ " timestamp " , " 10 minutes " , " 5 minutes " ),
$ " word "
).count()

The execution process here is as follows.

  • We have a series of arriving records
  • A first column against time timestampto make the length of 10mthe sliding of 5mthe window () operation
  • FIG e.g. virtual box on the upper right corner portion, when reaching a recording 12:22|dogtime, will 12:22be grouped in two windows 12:15-12:25, 12:20-12:30so generating two records: 12:15-12:25|dog, , 12:20-12:30|dogthe recording 12:24|dog owlempathy generate two records: 12:15-12:25|dog owl,12:20-12:30|dog owl
  • So here the essence of window() operation is explode() , multiple pieces of data can be generated from one piece of data
  • Then the window () result of the operation to windowthe column and wordas a key, do groupBy (). Count () operation
  • The aggregation process of this operation is incremental (with the help of StateStore)
  • To give a final window, word, countstate set of three

Processing Late Data

Still use the previous window() + groupBy().count() example, but note that there is a late data 12:06|cat:

As you can see, the late data here has been correctly updated to where it should be in the State.

OutputModes

Let’s continue to look at the previous window() + groupBy().count() example, now we consider outputting the result, that is, consider OutputModes:

(a) Complete

The output of Complete is exactly the same as State:

(b) Append

The semantics of Append will ensure that once a key is output, the same key will not be output in the future.

Therefore, in view of 12:10the direct output of this batch 12:00-12:10|cat|1, 12:05-12:15|cat|1would be a mistake, because in 12:20the result for the update 12:00-12:10|cat|2, but it does not output under Append mode again 12:00-12:10|cat|2, because the output earlier that the same key 12:00-12:10|catresults 12:00-12:10|cat|1.

In order to solve this problem, in Append mode, Structured Streaming needs to know when the result of a certain key will not be updated. When the confirmation result will not be updated again (the next article will specifically explain the confirmation that the result will not be updated depending on the watermark), the result can be output.

As shown above, we determined if 12:30no further batch of the subsequent 12:00-12:10update the window, then we can put 12:00-12:10the results 12:30of this batch output, and also to ensure that the output will not later batch 12:00-12:10window of As a result, the semantics of Append mode is maintained.

© Update

Update mode has been officially supported in Spark 2.1.1 and later versions.

As shown in the figure above, in the Update mode, only the updated items in the State of this execution batch will be output:

  • In the execution batch at 12:10, all 2 items in State are new (and therefore are updated), so all 2 items are output;
  • In the execution batch at 12:20, 2 items in the State were updated and 4 items were newly added (and therefore all were updated), so all 6 items were output;
  • In the execution batch at 12:30, 4 items in State were updated, so 4 items were output. These require special note is that, as such Append mode, since this batch execution confirmation (by watermark mechanism) 12:00-12:10this window will not be updated, and thus it is removed from the State, but has not produced an output.

--

--

surendranadh kurra
surendranadh kurra

Written by surendranadh kurra

0 Followers

Hi, I am Surendranadh. Bigdata consultant and trainer. Work on Hadoop, Spark and it's ecosystem projects.

No responses yet