<?xml version="1.0" encoding="UTF-8" standalone="no"?><rss xmlns:atom="http://www.w3.org/2005/Atom" xmlns:itunes="http://www.itunes.com/dtds/podcast-1.0.dtd" version="2.0">
  <channel>
    <title>codingjunkie-feed</title>
    <description>Personal blog for Bill Bejeck's technical rambling
</description>
    <link>https://codingjunkie.net</link>
    <atom:link href="https://codingjunkie.net/feed.xml" rel="self" type="application/rss+xml"/>
    <author>
      <name/>
      <email/>
      <uri/>
    </author>
    
      <language>en-us</language><itunes:explicit>no</itunes:explicit><itunes:summary>Personal blog for Bill Bejeck's technical rambling </itunes:summary><itunes:subtitle>Personal blog for Bill Bejeck's technical rambling </itunes:subtitle><item>
        <title>Lessons Learned Writing a Book </title>
        <description>&lt;p&gt;Earlier this year, I completed the 2nd edition of &lt;a href="https://www.amazon.com/Kafka-Streams-Action-Second-Bejeck/dp/1617298689/"&gt;Kafka Streams in Action&lt;/a&gt;.  Even though it’s a second edition, there were several things I wanted to change from the first edition, so it ended up being mainly a complete rewrite.  Writing a book is a significant undertaking, and while it’s probably the most challenging task I’ve ever undertaken, it was gratifying.  I don’t regret the amount of time I spent working on it.  I learned several lessons along the way that I’d like to share in the hopes that someone else will benefit from my experience (if anything, writing about it will help solidify the information for myself).   I have several points I’d like to discuss:&lt;/p&gt;

&lt;ul&gt;
  &lt;li&gt;Yes, Virginia, you need a schedule plan&lt;/li&gt;
  &lt;li&gt;Have a good yardstick&lt;/li&gt;
  &lt;li&gt;The magic of 100 words&lt;/li&gt;
  &lt;li&gt;Objects in the mirror are smaller than they appear&lt;/li&gt;
  &lt;li&gt;No matter what, never block&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;With the introduction out of the way, let’s get started.&lt;/p&gt;

&lt;h1 id="yes-virginia-you-must-have-a-plan"&gt;Yes, Virginia, you must have a plan&lt;/h1&gt;

&lt;p&gt;The need for planning was probably my biggest surprise during the process.  While the need for planning might seem obvious to some, to me, it was a revelation.  I held the mistaken belief that I would simply sit down with the intent to write, the inspiration would start flowing, and my fingers would begin to furiously pound on the keyboard.&lt;/p&gt;

&lt;p&gt;Additionally, I was sure that my productivity was linear; the more time I had, the more I’d get written.  I quickly found those beliefs couldn’t be further from the truth.  My actual experience looked like sitting down, distracted, and not knowing what I wanted to say.  I’d get a few words down, and my mind would drift, and then inevitably, I’m surfing on the internet looking at things that couldn’t be further from what I’m writing about (“What happended to the cast of &lt;a href="https://en.wikipedia.org/wiki/Gilligan%27s_Island"&gt;Gilligan’s Island&lt;/a&gt;?”)  It seemed the more time I had, the &lt;em&gt;&lt;strong&gt;less&lt;/strong&gt;&lt;/em&gt; I accomplished.&lt;/p&gt;

&lt;p&gt;This large block of time/low productivity scenario could be attributed to an issue known as Parkinson’s Law:&lt;/p&gt;

&lt;blockquote&gt;
  &lt;p&gt;Work expands so as to fill the time available for its completion
– &lt;cite&gt;C. Northcote Parkinson&lt;/cite&gt;&lt;sup id="fnref:1" role="doc-noteref"&gt;&lt;a href="#fn:1" class="footnote" rel="footnote"&gt;1&lt;/a&gt;&lt;/sup&gt;&lt;/p&gt;

&lt;/blockquote&gt;

&lt;p&gt;In other words, if you allow yourself a large block of time without specific plans or goals, there’s a high chance you’ll use the entire time and achieve very little.&lt;/p&gt;

&lt;p&gt;So what’s the solution to this less than ideal working situation?  Planning and setting a schedule with deadlines.  While many of us &lt;a href="https://theengineeringmanager.substack.com/p/parkinsons-law-its-real-so-use-it"&gt;resist setting deadlines, the bottom line is they work&lt;/a&gt;.  Consider the following illustration&lt;/p&gt;
&lt;figure&gt;
  &lt;img src="../assets/images/no-schedule-vs-schedule.png" style="" /&gt;
  &lt;figcaption&gt;Schedule defined blocks of time for a specific tasks vs. large blocks with no goal&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;The main idea is that instead of a big block of time to get something done, you need to set &lt;em&gt;smaller&lt;/em&gt; amounts of time with a defined outcome.  The size of the work blocks and break time shown here are arbitrary.  Still, I would recommend blocks of &lt;a href="https://www.supernormal.com/blog/how-to-schedule-focus-time#:~:text=How%20long%20should%20focus%20sessions,to%20deliver%20the%20best%20results."&gt;60-90 minutes&lt;/a&gt;.  It’s also essential to allow for some time, in the beginning, to get into the “flow,” it’s nearly impossible to sit down to write and be productive immediately.  A good resource for establishing a work-break process is the &lt;a href="https://www.pomodorotechnique.com/"&gt;Pomodoro® technique&lt;/a&gt;&lt;/p&gt;

&lt;h1 id="have-a-good-yardstick"&gt;Have a good yardstick&lt;/h1&gt;

&lt;p&gt;Along with having a good schedule, I found having concrete markers to measure my progress invaluable.  Being able to gauge your progress goes hand-in-hand with establishing a good schedule.  For me, it was counting the number of words (exclusive of code examples) written.  For &lt;a href="https://www.amazon.com/Kafka-Streams-Action-Second-Bejeck/dp/1617298689/"&gt;Kafka Streams in Action&lt;/a&gt;, the publisher guidelines set each chapter at roughly 30 pages.  At 30 pages, I’d have about 9,000 words.  My goal was to produce a new chapter every 3 weeks.  To achieve that pace, I determined I’d need to write 500 words daily, 6 days a week (one day off is essential).  I’d produce 3,000 words weekly at that pace, resulting in a new chapter every 3 weeks.  While my progress was not always linear, it allowed me to assess my productivity accurately.  More importantly, it would help me adjust my schedule when the inevitable life interruptions occur.  Not able to write for 2 days this week?  It’s not a big deal.  Just up my word count on two other days to 1,000 or 750 words per day for four days, and I’d still be on track.  Of all my heuristics, standardizing how I was making progress was one of the most important.  The ability to “commoditize” your time into blocks allows you to schedule more effectively and fit writing into your schedule vs. having writing take over your schedule.&lt;/p&gt;

&lt;h1 id="the-magic-of-100-words"&gt;The magic of 100 words&lt;/h1&gt;

&lt;p&gt;No matter how motivated you are as a writer or excited about your topic, inevitably, you’ll face times when you flat out don’t feel like writing.  While it’s natural to take a break here and there, if you give in to the emotion of not writing too often, you can quickly fall even further behind on your schedule, which can lead to increased discouragement.  The problem of waiting until you feel like doing something is that feeling may never come.  While willpower can sometimes help with motivation, it’s not a long-term solution.  While I tried a few different approaches, what ended up working for me was to set a very small goal: just write 100 words.  Setting this small goal helped me overcome my writing inertia and have a productive session.  Now, it doesn’t always need to be a word-count goal, but the idea is to set a small, achievable goal before giving in to the “I just don’t feel like writing” vibe.  More often than not, this little trick will help you get going when the motivation is lacking.&lt;/p&gt;

&lt;h1 id="objects-in-the-mirror-are-larger-than-they-appear"&gt;Objects in the mirror are larger than they appear&lt;/h1&gt;

&lt;p&gt;Writing a book is a considerable project.  There’s so much ground to cover, spanning several months and even years.  Taking on something of this size, it’s easy to get overwhelmed by the volume of work that needs to be done.  But thinking of the entire scope of work is counterproductive, and doing so, it’s easy to get into a “deer in the headlights” mode where you’re overwhelmed and not making progress.  This is true of any large project, not only writing a book.  In a bit of a twist on everything we’ve spoken about so far, the key is to break things down into smaller, manageable pieces.  Switching your focus to each minor part makes things look much more manageable.  Then, before you know it, you’ll string together several smaller parts into a larger whole.&lt;/p&gt;

&lt;h1 id="no-matter-what-never-block"&gt;No matter what, never block&lt;/h1&gt;

&lt;figure&gt;
  &lt;img src="../assets/images/dont_block_the_box.png" style="max-width: 35%;" /&gt;
&lt;/figure&gt;

&lt;p&gt;This section could alternatively be called “Always make progress”.  Of all the topics I’ve touched on in this blog post, this one could be the most important.  Over time, you’ll find that even when applying different strategies, there will be periods when it’s tough to get anything done.  Whether it’s work/family obligations or something else, life will get in the way.  When you encounter such a period, it’s imperative for you to continue to make progress.  The significant danger with stopping is that it’s much harder to get going again.&lt;/p&gt;

&lt;p&gt;So when those tough times come, it is far better to write something small daily and continue to make some progress until you can get back to devoting more time to writing.  Other times, you’ll be the obstacle.  Let’s face it: You can apply all the strategies but are just out of steam.  At those moments, it’s important to remember that making progress without writing is still possible.  Maybe you have some illustrations to work on or do some additional research.  Since I was writing a technical book, I would write code for examples or lay out a concept for something I wanted to cover later.  Whatever you do isn’t that important as long as you continue to move forward.&lt;/p&gt;

&lt;h2 id="acknowledgements"&gt;Acknowledgements&lt;/h2&gt;

&lt;p&gt;I would be remiss in my writing if I didn’t acknowledge some sources for this blog post.  First, I must mention my excellent editor &lt;a href="https://www.franceslefkowitz.net/"&gt;Frances Lefkowitz&lt;/a&gt;, who taught me a lot about these writing concepts.  Secondly, the book &lt;a href="https://www.amazon.com/Clockwork-Muse-Practical-Writing-Dissertations/dp/0674135865"&gt;The Clockwork Muse&lt;/a&gt; is an invaluable resource.&lt;/p&gt;

&lt;div class="footnotes" role="doc-endnotes"&gt;
  &lt;ol&gt;
    &lt;li id="fn:1" role="doc-endnote"&gt;
      &lt;p&gt;&lt;a href="https://www.economist.com/news/1955/11/19/parkinsons-law"&gt;Parkinson, Cyril Northcote (19 November 1955). “Parkinson’s Law”. The Economist. London.&lt;/a&gt; &lt;a href="#fnref:1" class="reversefootnote" role="doc-backlink"&gt;&amp;#8617;&lt;/a&gt;&lt;/p&gt;
    &lt;/li&gt;
  &lt;/ol&gt;
&lt;/div&gt;
</description>
        <pubDate>Sun, 27 Oct 2024 22:46:00 +0000</pubDate>
        <link>https://codingjunkie.net//writing-lessons-learned</link>
        <link href="https://codingjunkie.net/writing-lessons-learned"/>
        <guid isPermaLink="true">https://codingjunkie.net/writing-lessons-learned</guid>
      </item>
    
      <item>
        <title>Mastering Stream Processing - Testing Flink SQL windowed applications</title>
        <description>&lt;p&gt;We’ve covered a lot of territory in this blog series about windowing aggregations. Here are the previous posts:&lt;/p&gt;

&lt;ol&gt;
  &lt;li&gt;&lt;a href="https://www.codingjunkie.net/introduction-to-windowing"&gt;Introduction to windowing&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-hopping-tumbling-windows"&gt;Hopping and Tumbling windows&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-sliding-windows"&gt;Sliding windows and OVER aggregation&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-session-cumulating-windows"&gt;Session windows Cumulating windows&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-time-semantics"&gt;Window time semantics&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-viewing-results"&gt;Viewing and analyzing results&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-testing-kafka-streams"&gt;Testing Kafka Streams windowed applications&lt;/a&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;In this final installment, we will cover testing a windowed application. While it may seem obvious, tests are essential to validate your logic. Usually, when testing an application, you’ll assert that N input records result in an expected result. The &lt;a href="https://www.codingjunkie.net/mastering-stream-processing-time-semantics"&gt;time semantics&lt;/a&gt;post emphasized that event timestamps drive the window action. So, it’s not enough to feed the application records and assert results; you must provide timestamps to advance the window correctly. In this post, we will cover how to test Apache Flink® SQL to ensure your streaming windowed applications produce correct results.&lt;/p&gt;

&lt;p&gt;Now, let’s move on to testing Flink SQL windowed queries.&lt;/p&gt;

&lt;h1 id="testing-flink-sql-windowed-aggregations"&gt;Testing Flink SQL windowed aggregations&lt;/h1&gt;

&lt;p&gt;The Flink SQL client provides an interactive environment for trying different queries but is impossible to use in an automated test. Fortunately, there is the Flink &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/tableapi/#table-api"&gt;Table API&lt;/a&gt; that allows you to execute Flink SQL statements programmatically. So, we’ll use the Table API to create integration tests you can run in JUnit. The first step to running Flink SQL in a test is to create a &lt;code class="language-plaintext highlighter-rouge"&gt;StreamTableEnvironment&lt;/code&gt; that you’ll use to drive the test:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Setting up Flink SQL test with the Table API&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;
    &lt;span class="nc"&gt;StreamTableEnvironment&lt;/span&gt; &lt;span class="n"&gt;streamTableEnv&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="nc"&gt;StreamExecutionEnvironment&lt;/span&gt; &lt;span class="n"&gt;env&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;StreamExecutionEnvironment&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getExecutionEnvironment&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="n"&gt;env&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setParallelism&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;env&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getConfig&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;setRestartStrategy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;RestartStrategies&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;noRestart&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="n"&gt;env&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setStateBackend&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;EmbeddedRocksDBStateBackend&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="n"&gt;streamTableEnv&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;StreamTableEnvironment&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;create&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;env&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;EnvironmentSettings&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;newInstance&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;inStreamingMode&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;Now that you have the &lt;code class="language-plaintext highlighter-rouge"&gt;StreamTableEnvironment&lt;/code&gt;, the following steps are to create a table, populate it with data, execute the SQL statement under test, and assert the results. First, let’s make the table for the test:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Table under test&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;table&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;format&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"CREATE TABLE iot_readings (\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"     device_id STRING,\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"     reading DOUBLE,\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"     reading_time TIMESTAMP(3),\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"     WATERMARK FOR reading_time AS reading_time\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;") WITH (\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    'connector' = 'kafka',\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    'topic' = 'iot_readings',\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    'properties.bootstrap.servers' = 'localhost:%s',\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
    &lt;span class="s"&gt;"    'scan.startup.mode' = 'earliest-offset',\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    'scan.bounded.mode' = 'latest-offset',\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    'key.format' = 'raw',\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    'key.fields' = 'device_id',\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    'value.format' = 'json',\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    'value.fields-include' = 'EXCEPT_KEY'\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;");"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;kafkaPort&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;A placeholder for String.format to set the Apache Kafka® port&lt;/li&gt;
  &lt;li&gt;The variable containing the Kafka port determined by the KafkaContainer&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Here, the &lt;code class="language-plaintext highlighter-rouge"&gt;WITH&lt;/code&gt; clause specifies to use the &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kafka/#apache-kafka-sql-connector"&gt;Flink Kafka connector&lt;/a&gt;. The test will also use &lt;a href="https://java.testcontainers.org/modules/kafka/"&gt;Testcontainers&lt;/a&gt; to provide the running Kafka instance. I won’t go into using Testcontainers in a test, but you can get the details from &lt;a href="https://www.atomicjar.com/2023/06/testing-kafka-applications-with-testcontainers/"&gt;this blog post by Atomic Jar&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Now that you have the table definition along with the kafka-sql-connector configuration, the next step is to execute this statement to create the table:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Running the CREATE TABLE statement&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="n"&gt;streamTableEnv&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;executeSql&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;table&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;await&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;You’ll use the &lt;code class="language-plaintext highlighter-rouge"&gt;await()&lt;/code&gt; method to ensure that the test doesn’t progress until Flink completes creating the table. The next step is to populate the table with data:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Inserting the testing sensor data&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;insertStatement&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"INSERT INTO iot_readings VALUES\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    ('south_sensor', 5.0, TO_TIMESTAMP('2024-04-09 01:01:00')),\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    ('south_sensor', 10.0, TO_TIMESTAMP('2024-04-09 01:03:00')),\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    ('south_sensor', 9.7, TO_TIMESTAMP('2024-04-09 01:04:00')),\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    ('south_sensor', 12.0, TO_TIMESTAMP('2024-04-09 01:06:00')),\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    ('south_sensor', 11.9, TO_TIMESTAMP('2024-04-09 01:07:00')),\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    ('south_sensor', 8.2, TO_TIMESTAMP('2024-04-09 01:09:00'));"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="n"&gt;streamTableEnv&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;executeSql&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;insertStatement&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;await&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;Here, we’re simply using the familiar SQL &lt;code class="language-plaintext highlighter-rouge"&gt;INSERT&lt;/code&gt; statement to get data into the table, and again, we see the &lt;code class="language-plaintext highlighter-rouge"&gt;await&lt;/code&gt; method in use to make sure the test only progresses after the data inserts are finished. With the data inserted into the table, the next step is to execute the windowed query and compare the results against what we expect them to be:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Executing the query and comparing results&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"SELECT device_id,\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    MAX(reading) AS max_reading,\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    window_start,\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"    window_end\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"FROM TABLE(TUMBLE(TABLE iot_readings, DESCRIPTOR(reading_time), INTERVAL '5' MINUTES))\n"&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt;
    &lt;span class="s"&gt;"GROUP BY device_id, window_start, window_end;"&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="nc"&gt;TableResult&lt;/span&gt; &lt;span class="n"&gt;tableResult&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;streamTableEnv&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;executeSql&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;  &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;

    &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Row&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;actualResults&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;rowObjectsFromTableResult&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tableResult&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;

    &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Row&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;expectedRowResults&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;asList&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Row&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofKind&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;           &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
                                                          &lt;span class="nc"&gt;RowKind&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;INSERT&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                                          &lt;span class="s"&gt;"south_sensor"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                                          &lt;span class="mf"&gt;10.0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                                          &lt;span class="n"&gt;parseTS&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"2024-04-09 01:00:00"&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;
                                                          &lt;span class="n"&gt;parseTS&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"2024-04-09 01:05:00"&lt;/span&gt;&lt;span class="o"&gt;)),&lt;/span&gt;
                                                 &lt;span class="nc"&gt;Row&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofKind&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                                                          &lt;span class="nc"&gt;RowKind&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;INSERT&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                                          &lt;span class="s"&gt;"south_sensor"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                                          &lt;span class="mf"&gt;12.0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                                          &lt;span class="n"&gt;parseTS&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"2024-04-09 01:05:00"&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;
                                                          &lt;span class="n"&gt;parseTS&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"2024-04-09 01:10:00"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
                                                       &lt;span class="o"&gt;);&lt;/span&gt;
       &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;expectedRowResults&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;actualResults&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;Executing the query&lt;/li&gt;
  &lt;li&gt;Extracting the results into an ArrayList&lt;/li&gt;
  &lt;li&gt;Creating the expected results&lt;/li&gt;
  &lt;li&gt;Asserting the actual results match the expected ones.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;So, the final step is straightforward. The test executes the query and compares the returned results to what it expects them to be. Notice that since we’ve inserted simple data, it’s trivial to construct the predicted list of results. For completeness, here’s the code for the &lt;code class="language-plaintext highlighter-rouge"&gt;rowObjectsFromTableResult&lt;/code&gt; method:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Extracting the Row objects&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;     &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Row&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;rowObjectsFromTableResult&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TableResult&lt;/span&gt; &lt;span class="n"&gt;tableResult&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;CloseableIterator&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Row&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;closeableIterator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;tableResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
          &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Row&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;rows&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ArrayList&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;
          &lt;span class="n"&gt;closeableIterator&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;forEachRemaining&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;rows:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;add&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
          &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;rows&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
      &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;
&lt;p&gt;The method uses the &lt;code class="language-plaintext highlighter-rouge"&gt;TableResult.collect()&lt;/code&gt; method to gather the query results from the &lt;code class="language-plaintext highlighter-rouge"&gt;ClosableIterator&lt;/code&gt; and put them in a container more suitable for comparison at the end of the test.&lt;/p&gt;

&lt;p&gt;There’s one last point I’d like to make before we move on. The last record of the test data has the time of 01:09:00. With this being the last record, the watermark wouldn’t reach 01:10:00 so how can the second window produce results that satisfy the test? It goes back to the configuration for the Flink SQL Kafka connector, specifically the &lt;code class="language-plaintext highlighter-rouge"&gt;scan.bounded.mode=latest-offset&lt;/code&gt; configuration. The &lt;code class="language-plaintext highlighter-rouge"&gt;scan.bounded.mode&lt;/code&gt; configuration determines when the stream is complete by specifying the latest offsets after consuming from Kafka. In other words, it sets a bound on the stream from Kafka; otherwise, the table is considered unbounded. Using the bounded mode is especially useful in tests, since it flushes out all pending results from any windows, joins, etc that would otherwise hang, waiting for watermarks that aren’t going to come.&lt;/p&gt;

&lt;p&gt;This blog post has been a quick tour of testing Flink SQL windowed queries, but it can serve as the basis for writing future tests against other windowed SQL queries.&lt;/p&gt;

&lt;h1 id="resources"&gt;Resources&lt;/h1&gt;

&lt;ul&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.manning.com/books/kafka-streams-in-action-second-edition"&gt;Kafka Streams in Action 2nd Edition&lt;/a&gt; book&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.confluent.io/product/flink/"&gt;Apache Flink® on Confluent Cloud&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-tvf/#windowing-table-valued-functions-windowing-tvfs"&gt;Flink SQL Windows&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#windowing"&gt;Kafka Streams windowing documentation&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
&lt;/ul&gt;
</description>
        <pubDate>Tue, 07 May 2024 16:46:00 +0000</pubDate>
        <link>https://codingjunkie.net//mastering-stream-processing-testing-flink-sql</link>
        <link href="https://codingjunkie.net/mastering-stream-processing-testing-flink-sql"/>
        <guid isPermaLink="true">https://codingjunkie.net/mastering-stream-processing-testing-flink-sql</guid>
      </item>
    
      <item>
        <title>Mastering Stream Processing - Testing Kafka Streams windowed applications</title>
        <description>&lt;p&gt;In this blog series about windowing aggregations we’ve covered a lot of territory. Here are the previous posts:&lt;/p&gt;

&lt;ol&gt;
  &lt;li&gt;&lt;a href="https://www.codingjunkie.net/introduction-to-windowing"&gt;Introduction to windowing&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-hopping-tumbling-windows"&gt;Hopping and Tumbling windows&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-sliding-windows"&gt;Sliding windows and OVER aggregation&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-session-cumulating-windows"&gt;Session windows Cumulating windows&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-time-semantics"&gt;Window time semantics&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-viewing-results"&gt;Viewing and analyzing results&lt;/a&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;In this final installment, we will cover testing a windowed application. Testing is essential to validate your logic. Typically, when testing an application, you’ll assert that N input records result in an expected result. The &lt;a href="https://www.codingjunkie.net/mastering-stream-processing-time-semantics"&gt;time semantics&lt;/a&gt; post emphasized that event timestamps drive the window action. So, it’s not enough for testing to feed the application records and assert results; you must provide timestamps to advance the window correctly. In the next two posts, we will cover how to test both Kafka Streams and Flink SQL to ensure your streaming windowed applications produce correct results. I originally planned to cover testing on one post, but it grew too long, so I split it in half. This post focuses on testing in Kafka Streams.&lt;/p&gt;

&lt;h1 id="testing-kafka-streams-windowed-aggregations"&gt;Testing Kafka Streams windowed aggregations&lt;/h1&gt;

&lt;p&gt;Kafka Streams provides the &lt;a href="https://kafka.apache.org/37/javadoc/org/apache/kafka/streams/TopologyTestDriver.html"&gt;TopologyTestDriver&lt;/a&gt;(TTD) for &lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/test-streams.html#testing-a-streams-application"&gt;testing a topology&lt;/a&gt; without the need of a live broker. As a result, TTD tests execute very fast and allow you to thoroughly test all topologies, from the simple to the complex. Generally speaking a TTD test will involve using &lt;a href="https://kafka.apache.org/37/javadoc/org/apache/kafka/streams/TestOutputTopic.html"&gt;TestInputTopic&lt;/a&gt; to push some records through your topology and then capture the results with a &lt;a href="https://kafka.apache.org/37/javadoc/org/apache/kafka/streams/TestOutputTopic.html"&gt;TestOutputTopic&lt;/a&gt; and will look something like the following code listing (I’ve left out several details for clarity)&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;A TopologyTestDriver test of a Kafka Streams application&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TopologyTestDriver&lt;/span&gt; &lt;span class="n"&gt;driver&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;TopologyTestDriver&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topology&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

       &lt;span class="nc"&gt;TestInputTopic&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;inputTopic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;driver&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;createInputTopic&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;INPUT_TOPIC&lt;/span&gt;&lt;span class="o"&gt;...);&lt;/span&gt;

       &lt;span class="nc"&gt;TestOutputTopic&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;outputTopic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;driver&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;createOutputTopic&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;OUTPUT_TOPIC&lt;/span&gt;&lt;span class="o"&gt;...);&lt;/span&gt;

       &lt;span class="n"&gt;inputTopic&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;pipeInput&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"foo"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
       &lt;span class="n"&gt;inputTopic&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;pipeInput&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"bar"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

       &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;expectedOutput&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;asList&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"FOO"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"BAR"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
       &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;actualOutput&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;outputTopic&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;readValuesToList&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
       &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;expectedOutput&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;actualOutput&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;Reviewing the code in the above listing makes testing a Kafka Streams application straightforward. One thing that’s not obvious from this example is the use of timestamps. Under the covers, the TTD will create timestamps for all the input records. This approach is acceptable for a topology without windowing, since they don’t require timestamps to calculate results.&lt;/p&gt;

&lt;p&gt;But once you have a topology that requires advancing timestamps, i.e., a windowed aggregation, you’ll want to take another approach and supply custom time values. You’ll want to use just enough values to validate your application for a unit test. The difference in time between records will be so slight that it will not be effective for driving the window behavior. To solve this issue, the &lt;code class="language-plaintext highlighter-rouge"&gt;TestInputTopic&lt;/code&gt; provides &lt;code class="language-plaintext highlighter-rouge"&gt;pipeInput&lt;/code&gt; &lt;a href="https://kafka.apache.org/37/javadoc/org/apache/kafka/streams/TestInputTopic.html#pipeInput(K,V,java.time.Instant)"&gt;method overloads&lt;/a&gt; accepting a &lt;a href="https://kafka.apache.org/37/javadoc/org/apache/kafka/streams/TestInputTopic.html#pipeInput(K,V,java.time.Instant)"&gt;java.time.Instant&lt;/a&gt; allowing you effectively advance a windowed operation with a small number of input records.&lt;/p&gt;

&lt;p&gt;For example, assume you have a one-minute tumbling window aggregation (no grace period) of string keys and double values. Let’s take a look at the test code where you set timestamps to advance the window to contain a small number of expected values:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Providing timestamps to drive a windowed operation&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TopologyTestDriver&lt;/span&gt; &lt;span class="n"&gt;driver&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;TopologyTestDriver&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tumblingWindows&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;topology&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;)))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;TestInputTopic&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Double&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;testInputTopic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;driver&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;createInputTopic&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;inputTopic&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                                                               &lt;span class="n"&gt;stringSerializer&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                                                               &lt;span class="n"&gt;doubleSerializer&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

       &lt;span class="nc"&gt;LocalDate&lt;/span&gt; &lt;span class="n"&gt;localDate&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;LocalDate&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofInstant&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Instant&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;now&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="nc"&gt;ZoneId&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;systemDefault&lt;/span&gt;&lt;span class="o"&gt;());&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
       &lt;span class="nc"&gt;LocalDateTime&lt;/span&gt; &lt;span class="n"&gt;localDateTime&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;LocalDateTime&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;localDate&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getYear&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt;
                                                      &lt;span class="n"&gt;localDate&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMonthValue&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt;
                                                      &lt;span class="n"&gt;localDate&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getDayOfMonth&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="mi"&gt;12&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;18&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

       &lt;span class="nc"&gt;Instant&lt;/span&gt; &lt;span class="n"&gt;instant&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;localDateTime&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toInstant&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ZoneOffset&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;UTC&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 

       &lt;span class="n"&gt;testInputTopic&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;pipeInput&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"deviceOne"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mf"&gt;10.0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;instant&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
       &lt;span class="n"&gt;testInputTopic&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;pipeInput&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"deviceOne"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mf"&gt;35.0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;instant&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;plusSeconds&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; 
       &lt;span class="n"&gt;testInputTopic&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;pipeInput&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"deviceOne"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mf"&gt;45.0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;instant&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;plusSeconds&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;40&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
       &lt;span class="n"&gt;testInputTopic&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;pipeInput&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"deviceOne"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mf"&gt;15.0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;instant&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;plusSeconds&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;70&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;

    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;
&lt;p&gt;Let’s walk through this code:&lt;/p&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;You’re creating an &lt;code class="language-plaintext highlighter-rouge"&gt;Instant&lt;/code&gt; from a &lt;code class="language-plaintext highlighter-rouge"&gt;LocalDateTime&lt;/code&gt; object&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Next, you advance the time 20 and 40 seconds with the second and third record inputs.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;With the fourth record, you advance the time by more than 1 minute, so Kafka Streams will close the first window containing records 1-3 and create a new one containing the fourth record.&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;I want to discuss the block of code setting the &lt;code class="language-plaintext highlighter-rouge"&gt;LocalDateTime&lt;/code&gt; :&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Setting current date time for the test&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="nc"&gt;LocalDate&lt;/span&gt; &lt;span class="n"&gt;localDate&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;LocalDate&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofInstant&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Instant&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;now&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="nc"&gt;ZoneId&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;systemDefault&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="nc"&gt;LocalDateTime&lt;/span&gt; &lt;span class="n"&gt;localDateTime&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;LocalDateTime&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;localDate&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getYear&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt;
                                                   &lt;span class="n"&gt;localDate&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getMonthValue&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt;
                                                   &lt;span class="n"&gt;localDate&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getDayOfMonth&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="mi"&gt;12&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;18&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;
&lt;p&gt;In this case, we’ll have a window starting at 12:00:00 (tumbling windows are aligned to the epoch), but the 2nd and 3rd records advance the timestamp value, not the window start time. Getting the starting time for the initial timestamp is essential because you’ll want to ensure you have enough room for subsequent advances that align with your testing assertions.&lt;/p&gt;

&lt;p&gt;If you have record payloads that contain timestamps, and assuming you’re using a custom &lt;a href="https://kafka.apache.org/37/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html"&gt;TimestampExtractor&lt;/a&gt;, then you’ll follow the same approach placing timestamps on the values you’re piping through the test.&lt;/p&gt;

&lt;p&gt;Now, let’s walk through asserting windowed results. The result of the windowed aggregation is an &lt;code class="language-plaintext highlighter-rouge"&gt;IoTAggregation&lt;/code&gt; object that tracks the number of readings taken, the highest value seen, and the sum of readings to calculate an average. We’ll use this information to validate our aggregation code:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Validating the windowed aggregation&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;KeyValue&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Windowed&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;,&lt;/span&gt; &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;results&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;testOutputTopic&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;readKeyValuesToList&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt; &lt;span class="n"&gt;firstWindowAggregation&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;results&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt; &lt;span class="n"&gt;lastWindowAggregation&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;results&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt; &lt;span class="n"&gt;secondWindowAggregation&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;results&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;10.0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;firstWindowAggregation&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;highestSeen&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;firstWindowAggregation&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;numberReadings&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;10.0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;firstWindowAggregation&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;averageReading&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;

    &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;45.0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;lastWindowAggregation&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;highestSeen&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;lastWindowAggregation&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;numberReadings&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;30.0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;lastWindowAggregation&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;averageReading&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;

    &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;15.0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;secondWindowAggregation&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;highestSeen&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;secondWindowAggregation&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;numberReadings&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="n"&gt;assertEquals&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;15.0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;secondWindowAggregation&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;averageReading&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;This testing code asserts that our windowed aggregation is operating correctly. The first window should only have 1 reading, and the average should equal the highest seen value since it’s the first record. It then asserts that the last aggregation of the 1-minute window should contain 3 readings and the correct average. Finally, it asserts that the last record input should be in a new window since its timestamp advanced over 1 minute, so it should have a similar state to the first window in that it contains 1 reading.&lt;/p&gt;

&lt;h1 id="resources"&gt;Resources&lt;/h1&gt;

&lt;ul&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.manning.com/books/kafka-streams-in-action-second-edition"&gt;Kafka Streams in Action 2nd Edition&lt;/a&gt; book.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.confluent.io/product/flink/"&gt;Apache Flink® on Confluent Cloud&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-tvf/#windowing-table-valued-functions-windowing-tvfs"&gt;Flink SQL Windows&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#windowing"&gt;Kafka Streams windowing documentation&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
&lt;/ul&gt;
</description>
        <pubDate>Sat, 16 Mar 2024 20:55:00 +0000</pubDate>
        <link>https://codingjunkie.net//mastering-stream-processing-testing-kafka-streams</link>
        <link href="https://codingjunkie.net/mastering-stream-processing-testing-kafka-streams"/>
        <guid isPermaLink="true">https://codingjunkie.net/mastering-stream-processing-testing-kafka-streams</guid>
      </item>
    
      <item>
        <title>Mastering Stream Processing - Viewing and analyzing results</title>
        <description>&lt;p&gt;This is the sixth blog in a series on windowing in event stream processing. Here’s a list of the previous posts:&lt;/p&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.codingjunkie.net/introduction-to-windowing/"&gt;Introduction to windowing&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-hopping-tumbling-windows/"&gt;Hopping and Tumbling windows&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-sliding-windows/"&gt;Sliding windows and OVER aggregation&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-session-cumulating-windows/"&gt;Session windows Cumulating windows&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-time-semantics"&gt;Window time semantics&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;In this post, we’ll move on from covering the specific window implementations and discuss the viewing and analysis techniques for windowed results.&lt;/p&gt;

&lt;h1 id="flink-sql-results"&gt;Flink SQL results&lt;/h1&gt;

&lt;p&gt;Given that Flink SQL renders results in a table, the ability to view the details of windowed operations is straightforward. Flink SQL creates 3 columns for windowing-TVF queries: &lt;code class="language-plaintext highlighter-rouge"&gt;window_start&lt;/code&gt;, &lt;code class="language-plaintext highlighter-rouge"&gt;window_end&lt;/code&gt;, and &lt;code class="language-plaintext highlighter-rouge"&gt;window_time&lt;/code&gt; (not shown here for brevity). Flink SQL calculates the &lt;code class="language-plaintext highlighter-rouge"&gt;window_time&lt;/code&gt; field by subtracting 1 ms from the &lt;code class="language-plaintext highlighter-rouge"&gt;window_end&lt;/code&gt; value. So, results from a query with the window columns will look similar to the following:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Table results of the query&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-plaintext highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;+------------------+-------+------+------------------+------------------+
|          bidtime | price | item |     window_start |       window_end |
+------------------+-------+------+------------------+------------------+
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 |
+------------------+-------+------+------------------+------------------+
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;So, the results plainly show the time for each event tracked by the window. But directly running a windowing query and displaying the results to the console has one drawback: it needs to be shared, and other persons interested in the results would need to craft and run their queries. While running queries from a console is suitable for prototyping and testing different SQL statements, it doesn’t lend itself to organization-wide distribution of results. For that, a better approach would be to store the results of a windowed query in another table whose schema and existence can be circulated within an organization.&lt;/p&gt;

&lt;p&gt;For example, consider a query that will generate an alert when an average reading exceeds a given threshold. Going back to the blog post on [LINK] OVER aggregations, you first generated a query to perform an aggregation per row:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;SQL OVER aggregation with aggregated results per row&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;
    &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;device_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="k"&gt;AVG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;temp_reading&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;OVER&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
         &lt;span class="k"&gt;PARTITION&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;
         &lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;
          &lt;span class="k"&gt;RANGE&lt;/span&gt; &lt;span class="k"&gt;BETWEEN&lt;/span&gt; &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'1'&lt;/span&gt; &lt;span class="k"&gt;MINUTE&lt;/span&gt; &lt;span class="k"&gt;PRECEDING&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="k"&gt;CURRENT&lt;/span&gt; &lt;span class="k"&gt;ROW&lt;/span&gt;
     &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;one_minute_location_temp_averages&lt;/span&gt;
    &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;readings&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;This query does all the work of generating an average over a sliding range of 1 minute from the readings table. We have some work to do to make it capture the alerting state we’re interested in. At this point, we have two possible approaches: creating a new table with the aggregations or creating a new table with only the alert data.&lt;/p&gt;

&lt;p&gt;Let’s create a table with all the aggregations generated by this query:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Create a Table with all aggregations&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;reading_averages&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;location&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                   &lt;span class="n"&gt;device_id&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                   &lt;span class="n"&gt;report_time&lt;/span&gt; &lt;span class="nb"&gt;TIMESTAMP&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                                   &lt;span class="n"&gt;reading_averages&lt;/span&gt; &lt;span class="nb"&gt;DOUBLE&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;Now you’ll write a persistent query that will continually update this table with the results of our OVER aggregation:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Populating a table with OVER aggregation resulst&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;reading_averages&lt;/span&gt;
      &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;device_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="k"&gt;AVG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;temp_reading&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;OVER&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
         &lt;span class="k"&gt;PARTITION&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;
         &lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;
          &lt;span class="k"&gt;RANGE&lt;/span&gt; &lt;span class="k"&gt;BETWEEN&lt;/span&gt; &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'1'&lt;/span&gt; &lt;span class="k"&gt;MINUTE&lt;/span&gt; &lt;span class="k"&gt;PRECEDING&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="k"&gt;CURRENT&lt;/span&gt; &lt;span class="k"&gt;ROW&lt;/span&gt;
         &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;one_minute_location_temp_averages&lt;/span&gt;
      &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;readings&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;Here, we’ve wrapped our OVER aggregation with an INSERT statement to push the results into our table for later analysis. To take this further, imagine we want a table containing only our alerted state. We can still use our table schema, but make a slight change to the column names:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Create a Table with only alert aggregations&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;reading_alerts&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;location&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                 &lt;span class="n"&gt;device_id&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                                 &lt;span class="n"&gt;report_time&lt;/span&gt; &lt;span class="nb"&gt;TIMESTAMP&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                                 &lt;span class="n"&gt;reading_alerts&lt;/span&gt; &lt;span class="nb"&gt;DOUBLE&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;Now you’ll follow a similar approach but with 2 nested queries. The first to perform the OVER aggregation and the second to pull out the records meeting the altering threshold:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Extracting only the alert results&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;device_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;avg_temps&lt;/span&gt;
     &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;device_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;Avg&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;reading&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                 &lt;span class="n"&gt;OVER&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;
                        &lt;span class="k"&gt;RANGE&lt;/span&gt; &lt;span class="k"&gt;BETWEEN&lt;/span&gt; &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'15'&lt;/span&gt; &lt;span class="k"&gt;MINUTE&lt;/span&gt; &lt;span class="k"&gt;PRECEDING&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="k"&gt;CURRENT&lt;/span&gt; &lt;span class="k"&gt;ROW&lt;/span&gt;
                     &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;avg_temps&lt;/span&gt;
            &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;readings&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;avg_temps&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;SOME_VALUE&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;At this point we’re almost there – we now need to get this data into a table. We’ll take the same approach as before, we’ll wrap this &lt;code class="language-plaintext highlighter-rouge"&gt;SELECT&lt;/code&gt; statement with an &lt;code class="language-plaintext highlighter-rouge"&gt;INSERT&lt;/code&gt; to push the results to a table:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Pushing only alert results to a table&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;reading_alerts&lt;/span&gt;
        &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;device_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;avg_temps&lt;/span&gt;
           &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
                  &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;device_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;Avg&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;reading&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                     &lt;span class="n"&gt;OVER&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;
                           &lt;span class="k"&gt;RANGE&lt;/span&gt; &lt;span class="k"&gt;BETWEEN&lt;/span&gt; &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'15'&lt;/span&gt; &lt;span class="k"&gt;MINUTE&lt;/span&gt; &lt;span class="k"&gt;PRECEDING&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="k"&gt;CURRENT&lt;/span&gt; &lt;span class="k"&gt;ROW&lt;/span&gt;
                      &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;avg_temps&lt;/span&gt;
              &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;readings&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
           &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;avg_temps&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;SOME_VALUE&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;Now we have a table containing only the alert results in a table that anyone can access.&lt;/p&gt;

&lt;h1 id="kafka-streams-results"&gt;Kafka Streams results&lt;/h1&gt;

&lt;p&gt;With Kafka Streams, you have a couple of choices when it comes to evaluating the windowed results. Let’s revisit the sliding window example:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Sliding windows in Kafka Streams&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="nc"&gt;Serde&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Windowed&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;windowedSerde&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
            &lt;span class="nc"&gt;WindowedSerdes&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;timeWindowedSerdeFrom&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                                  &lt;span class="mi"&gt;60_000L&lt;/span&gt;
                                                &lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="nc"&gt;KStream&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;Double&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
      &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"heat-sensor-input"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
        &lt;span class="nc"&gt;Consumed&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;doubleSerde&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

    &lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;groupByKey&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
          &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;windowedBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                      &lt;span class="nc"&gt;SlidingWindows&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofTimeDifferenceWithNoGrace&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMinutes&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; 
                      &lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;aggregate&lt;/span&gt;&lt;span class="o"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tempThreshold&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;
             &lt;span class="n"&gt;aggregator&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
             &lt;span class="nc"&gt;Materialized&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
             &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toStream&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;to&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"sensor-agg-output"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
               &lt;span class="nc"&gt;Produced&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowedSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;As it stands here, this will produce results to Apache Kafka®. To analyze the windowed results, i.e., consume from the output topic, you would need to use the &lt;code class="language-plaintext highlighter-rouge"&gt;Serde&amp;lt;Windowed&amp;lt;String&amp;gt;&amp;gt;&lt;/code&gt; class to get the deserializer for the key, which means you’re leaking specific details of the streaming application. Additionally, I find it challenging to have the window start and end separated in the key vs. the value. Kafka Streams needs to store the window in the key during processing to ensure it’s appropriately handled as time advances. Still, once it emits a windowed aggregation, we don’t need to maintain it in the key.&lt;/p&gt;

&lt;p&gt;Instead, I’d propose mapping a new value that contains the start and end times. Taking this a step further, I suggest adding two long fields to your aggregation so that adding the window times is simple. From there, you’ll update the topology to use a map operation to extract the window information and place it in the aggregation value. But before we do that, let’s create a &lt;a href="https://javadoc.io/doc/org.apache.kafka/kafka-streams/latest/org/apache/kafka/streams/kstream/KeyValueMapper.html"&gt;KeyValueMapper&lt;/a&gt; that will know how to extract the window start and end:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;KeyValueMapper to get window start and end&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;WindowTimeToAggregateMapper&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;KeyValueMapper&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Windowed&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;,&lt;/span&gt;
                                                                      &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                                                      &lt;span class="nc"&gt;KeyValue&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nd"&gt;@Override&lt;/span&gt;
        &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;KeyValue&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Windowed&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;windowed&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                                                            &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt; &lt;span class="n"&gt;iotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;start&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;windowed&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;window&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;start&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 
            &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;end&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;windowed&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;window&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;end&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

            &lt;span class="n"&gt;iotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setWindowStart&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;start&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 
            &lt;span class="n"&gt;iotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setWindowEnd&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;end&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;KeyValue&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;pair&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowed&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;key&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;iotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;Extracting the window start and end times&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Setting the window start and end time on the aggregation object&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Since the &lt;code class="language-plaintext highlighter-rouge"&gt;KeyValueMapper&lt;/code&gt; is a single abstract method (SAM) method we could define it inline as a lambda in the Kafka Streams topology, but it’s useful to create a concrete instance for testing. Now you need to plug this into the Kafka Streams DSL:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Adding the KeyValue mapper into the topology&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="nc"&gt;KStream&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;Double&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
      &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"heat-sensor-input"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
        &lt;span class="nc"&gt;Consumed&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;doubleSerde&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

    &lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;groupByKey&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
          &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;windowedBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                      &lt;span class="nc"&gt;SlidingWindows&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofTimeDifferenceWithNoGrace&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMinutes&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
                      &lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;aggregate&lt;/span&gt;&lt;span class="o"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tempThreshold&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;
             &lt;span class="n"&gt;aggregator&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
             &lt;span class="nc"&gt;Materialized&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
             &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toStream&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
             &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;WindowTimeToAggregateMapper&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; 
             &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;to&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"sensor-agg-output"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
               &lt;span class="nc"&gt;Produced&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;Applying the &lt;code class="language-plaintext highlighter-rouge"&gt;KeyValueMapper&lt;/code&gt; to extract the window starting and closing time.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Now, with the addition of the &lt;code class="language-plaintext highlighter-rouge"&gt;KStream.map&lt;/code&gt; operator with the new &lt;code class="language-plaintext highlighter-rouge"&gt;KeyValueMapper&lt;/code&gt;, you’ve updated your aggregation to include the start and end of the window. Since you’ve also pulled the underlying key out, you’ll switch the Serde for &lt;code class="language-plaintext highlighter-rouge"&gt;Produced&lt;/code&gt; to reflect the change in types. When you analyze the aggregation result, you’ll have direct access to the window starting and ending times.&lt;/p&gt;

&lt;p&gt;Kafka Streams also allows you to directly observe the results of the aggregation from its state store via &lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html#kstreams-interactive-queries-for-cp"&gt;Interactive Queries&lt;/a&gt;. I won’t go into those details here, but you can view a presentation on building an &lt;a href="https://www.confluent.io/events/current-2022/building-an-interactive-query-service-in-kafka-streams/"&gt;Interactive Query service from the 2022 Kafka Summit&lt;/a&gt; and take a look at &lt;a href="https://github.com/bbejeck/KafkaStreamsInteractiveQueries"&gt;the accompanying source code&lt;/a&gt;.&lt;/p&gt;

&lt;h1 id="resouces"&gt;Resouces&lt;/h1&gt;

&lt;ul&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.manning.com/books/kafka-streams-in-action-second-edition"&gt;Kafka Streams in Action 2nd Edition&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.confluent.io/product/flink/"&gt;Apache Flink® on Confluent Cloud&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-tvf/#windowing-table-valued-functions-windowing-tvfs"&gt;Flink SQL Windows&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#windowing"&gt;Kafka Streams windowing documentation&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
&lt;/ul&gt;
</description>
        <pubDate>Fri, 15 Mar 2024 16:40:00 +0000</pubDate>
        <link>https://codingjunkie.net//mastering-stream-processing-viewing-results/</link>
        <link href="https://codingjunkie.net/mastering-stream-processing-viewing-results/"/>
        <guid isPermaLink="true">https://codingjunkie.net/mastering-stream-processing-viewing-results/</guid>
      </item>
    
      <item>
        <title>Mastering Stream Processing - Time semantics</title>
        <description>&lt;p&gt;In the previous blog in this series, we wrapped up coverage of the different windowing types. Here is the list of earlier installments in this series:&lt;/p&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.codingjunkie.net/introduction-to-windowing/"&gt;Introduction to windowing&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-hopping-tumbling-windows/"&gt;Hopping and Tumbling windows&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-sliding-windows/"&gt;Sliding windows and OVER aggregation&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.codingjunkie.net/mastering-stream-processing-session-cumulating-windows/"&gt;Session windows Cumulating windows&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;In this post, we’ll move on from specific code examples and discuss the time semantics of window advancement and the forwarding of results. We’ve now discussed the different window types, how they function, and potential best use cases. But we’ve left some crucial questions unanswered. In this post, we’ll address the following questions:&lt;/p&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;Time semantics and determining what time to use&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;What determines when a window starts and ends?&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Extracting timestamps and how to handle time advancement?&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;How do you handle out-of-order records?&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;h1 id="determining-the-timestamp-to-use"&gt;Determining the timestamp to use&lt;/h1&gt;

&lt;p&gt;Kafka Streams and Flink SQL use event timestamps, so they’re based on event time, not system time (although both systems allow configurations to use system time, but I won’t discuss that) . Event timestamps are the time the event occurred. In this blog post, we’ll define event timestamps as the time of the event, and it’s part of the record. System time is the current time of the stream processing engine and provides processing time semantics. We will focus on event-time semantics; later in this blog, we’ll discuss the mechanics of how the stream processing systems extract the event time and some related details. But for now, it’s enough to say the event timestamps drive windowing operations.&lt;/p&gt;

&lt;h1 id="window-start-and-end-time"&gt;Window start and end time&lt;/h1&gt;

&lt;p&gt;Record event timestamps are at the heart of windowing, but their involvement depends on the window type. Hopping, Tumbling, and Cumulative windows are aligned to the epoch, starting on January 1, 1970. What does aligned to the epoch mean exactly? Let’s answer that question with the help of an illustration:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/aligned_to_epoch_definition.png" alt="aligned to epoch definition" /&gt;
&lt;figcaption&gt;Windows aligned to the epoch collect records that fit into the correct slot&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;From this illustration, a five-minute tumbling window starts at 00:00:00 on January 1, 1970. Then, every five minutes, a new window is created (logically) up to the present moment. So when a new record arrives, its event timestamp isn’t the window start time but determines &lt;em&gt;*which*&lt;/em&gt; window it belongs to. Time advances in these windows as the event timestamps of the incoming records increase. The following graphic helps demonstrate this process:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/timestamp_advancement.png" alt="timestamp advancement" /&gt;
&lt;figcaption&gt;Time for a window advances as event timestamps increase&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;So, once a record arrives with a timestamp greater than the current window end, a new window is built, either by an advance time or window size, depending on whether the window is hopping or tumbling. This advancement description does not account for out-of-order records; we’ll get to that later.&lt;/p&gt;

&lt;p&gt;Session and sliding windows (Kafka Streams version of sliding windows) are more behavior-driven and have different semantics. Session windows use the event timestamps to start and close windows. When the first record arrives for a session window, its timestamp becomes the start of the window. Once a record arrives where the timestamp difference exceeds the inactivity gap (accounting for any grace period), a new session starts. The following picture will help in understanding this process:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/session_window_example.png" alt="session window example" /&gt;
&lt;figcaption&gt;Session windows use event timestamps for opening and closing times&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;From this illustration, the session start time is the timestamp of the first record, and the ending time is the timestamp of the last record included in the session. To be more precise, with session windows in Kafka Streams, when a new record arrives, it creates a new window for the record. Kafka Streams then looks to merge that new session window with an existing session. If the new session window’s start timestamp is within the existing session window’s ending timestamp plus the inactivity gap, Kafka Streams will merge the new session into the existing one. This process of merging sessions is how they continue to grow with events inside the inactivity gap. This merging process accounts for out-of-order records that could connect two older sessions into a single larger one.&lt;/p&gt;

&lt;p&gt;Sliding windows in Kafka Streams have a fixed size, specified by the maximum difference between incoming events. But it uses event timestamps like the session window for window start and ending.&lt;/p&gt;

&lt;p&gt;Now that we’ve discussed how the opening and closing of windows operate let’s move on to time advancement.&lt;/p&gt;

&lt;h1 id="time-advancement"&gt;Time Advancement&lt;/h1&gt;

&lt;p&gt;For a window’s time to advance, there needs to be some mechanism to extract an event timestamp from a record and apply it so that time will move forward. Kafka Streams and Flink SQL handle this differently, but the results are the same.&lt;/p&gt;

&lt;h2 id="kafka-streams-time-advancement"&gt;Kafka Streams time advancement&lt;/h2&gt;

&lt;p&gt;Kafka Streams uses a &lt;a href="https://kafka.apache.org/36/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html"&gt;TimestampExtractor&lt;/a&gt; to get the event timestamp. By default, it will extract the timestamp from the consumer, set by the producer. If you prefer to use a timestamp embedded in the record payload, you can write a custom &lt;code class="language-plaintext highlighter-rouge"&gt;TimestampExtractor&lt;/code&gt;, which “knows” which field to grab and use for the timestamp. Kafka Streams keeps track of the highest observed timestamp on a per-partition basis. This current highest timestamp is known as “streamtime” and only moves forward. When an out-of-order record arrives, the streamtime remains unchanged. We’ll discuss out-of-order records later on. Let’s look at an illustration of the concept of streamtime:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/streamtime-in-action.png" alt="streamtime in action" /&gt;
&lt;figcaption&gt;Kafka Streams keeps track of timestamps known as stream time&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;So, as Kafka Streams consumes records, it checks the timestamp of the current record, and if it exceeds the current time, Kafka Streams updates streamtime. Kafka Streams share the timestamp of the current record via a record context that accompanies each record as it flows through the topology. Each window operator keeps track of streamtime itself, and when it advances due to the event timestamps, Kafka Streams will close existing windows where time has advanced beyond its size and or create new windows. Suppose the difference between streamtime and the current timestamp exceeds the inactivity gap for session windows. In that case, Kafka Streams will not merge the new session into the existing one but use it to start a new session.&lt;/p&gt;

&lt;p&gt;For all stateful operations in Kafka Streams, windowed results are buffered and released incrementally, either on commit or when its local cache is full. If you want only to receive a final result, you can set the &lt;a href="https://kafka.apache.org/36/javadoc/org/apache/kafka/streams/kstream/EmitStrategy.html"&gt;EmitStrategy&lt;/a&gt; on the window to &lt;code class="language-plaintext highlighter-rouge"&gt;ON_WINDOW_CLOSE&lt;/code&gt;. Here’s the tumbling window example configured only to emit a final result:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Tumbling window with only final results&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="nc"&gt;KStream&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;Double&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
      &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"heat-sensor-input"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
        &lt;span class="nc"&gt;Consumed&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;doubleSerde&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
    &lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;groupByKey&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
          &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;windowedBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TimeWindows&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofSizeWithNoGrace&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMinutes&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)))&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;emitStrategy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;EmitStrategy&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onWindowClose&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;  &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;aggregate&lt;/span&gt;&lt;span class="o"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tempThreshold&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;
             &lt;span class="n"&gt;aggregator&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
             &lt;span class="nc"&gt;Materialized&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
             &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toStream&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;to&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"sensor-agg-output"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
               &lt;span class="nc"&gt;Produced&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowedSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;Specifying only emit results after the window closes&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Using an &lt;code class="language-plaintext highlighter-rouge"&gt;EmitStrategy.onWindowClose()&lt;/code&gt; is an efficient approach for working with sliding windows, as it will have several updates due to the 1MS window advancement.&lt;/p&gt;

&lt;p&gt;Now, let’s look at time advancement in FLink SQL.&lt;/p&gt;

&lt;h2 id="flink-sql-time-advancement"&gt;Flink SQL time advancement&lt;/h2&gt;

&lt;p&gt;Logically, Flink time advancement works in the same way. It looks to extract the event timestamp from the incoming records. To do this event timestamp extraction, you provide a &lt;code class="language-plaintext highlighter-rouge"&gt;TimestampAssigner&lt;/code&gt;. But instead of the concept of streamtime for event time progression, Flink uses watermarks. A watermark is an assertion that the stream is now complete up through the timestamp the watermark carries. Here’s an illustration showing the watermark process:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/flink_sql_watermarks.png" alt="flink sql watermarks" /&gt;
&lt;figcaption&gt;Flink SQL uses watermarks to indicate to downstream operators what the current event time is&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;So, as you can see in this illustration, Flink operators will use the watermark timestamp to advance windows or close and open new ones. Flink only emits windowed results after a window closes due to a watermark advancing the window beyond its configured size.&lt;/p&gt;

&lt;p&gt;In the &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/"&gt;Flink Data Stream API&lt;/a&gt;, you’ll set the &lt;code class="language-plaintext highlighter-rouge"&gt;TimestampAssigner&lt;/code&gt; and &lt;code class="language-plaintext highlighter-rouge"&gt;WatermarkGenerator&lt;/code&gt; together with a &lt;code class="language-plaintext highlighter-rouge"&gt;WatermarkStrategy&lt;/code&gt;. But we’re focusing on Flink SQL, so you’ll specify a watermark strategy as a statement when you issue a &lt;code class="language-plaintext highlighter-rouge"&gt;CREATE TABLE&lt;/code&gt; statement.&lt;/p&gt;

&lt;p&gt;Consider this table definition tracking movie ratings entered by a user on a review site:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Flink SQL table definition with watermark strategy&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;ratings&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;rating_id&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;title&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;release_year&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;rating&lt;/span&gt; &lt;span class="nb"&gt;DOUBLE&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;rating_time&lt;/span&gt; &lt;span class="nb"&gt;TIMESTAMP&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; 
        &lt;span class="n"&gt;WATERMARK&lt;/span&gt; &lt;span class="k"&gt;FOR&lt;/span&gt; &lt;span class="n"&gt;rating_time&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;rating_time&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;Timestamp of the movie rating event&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Specifing to use the &lt;code class="language-plaintext highlighter-rouge"&gt;rating_time&lt;/code&gt; column as the watermark timestamp&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Flink SQL specifying the watermark strategy takes the form of &lt;code class="language-plaintext highlighter-rouge"&gt;WATERMARK FOR&lt;/code&gt; &amp;lt;column with timestamp&amp;gt; &lt;code class="language-plaintext highlighter-rouge"&gt;AS&lt;/code&gt; &amp;lt;watermark strategy expression&amp;gt;. The watermark strategy depicted here is &lt;code class="language-plaintext highlighter-rouge"&gt;Strictly ascending timestamps&lt;/code&gt; and will forward the maximum observed timestamps. Flink SQL will evaluate the watermark strategy for each incoming record and periodically emit one as defined by the &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#pipeline-auto-watermark-interval"&gt;pipeline.auto-watermark-configuration&lt;/a&gt; which has a default value of 200ms. Flink will only emit a watermark if the current watermark is larger than the previous one. It doesn’t forward one if it is smaller (or null). It’s worth noting the timestamp column needs to be a &lt;code class="language-plaintext highlighter-rouge"&gt;TIMESTAMP(3)&lt;/code&gt; or &lt;code class="language-plaintext highlighter-rouge"&gt;TIMESTAMP_LTZ(3)&lt;/code&gt; type, where the 3 represents the precision of fractional seconds. In this case, it’s millisecond precision.&lt;/p&gt;

&lt;p&gt;Another common element with stream time and watermarks is that advancing time affects all keys in a partition (Kafka Streams) or task slot (Flink SQL). Let’s take a look at an illustration to help understand what this means:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/advancing_time_all_keys.png" alt="advancing time all keys" /&gt;
&lt;figcaption&gt;Advancing time affects all keys per partition or task slot&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;As you can see, we have three keys, A, B, and C, with roughly the same start time for their respective windows. When time advances due to an incoming record, C closes its window and closes the windows for A and B even though they do not have any new records. This action is an example of time advancement influencing all keys per partition/task slot.&lt;/p&gt;

&lt;p&gt;Under ideal circumstances (an evenly distributed key space and a consistent flow of records), this time advancement should be fine because all the keys equally drive time advancement. However, an uneven distribution of keys combined with out-of-order records could cause issues where the window advances and closes, not including records that would otherwise be in the window. Configuring a grace period is one way to ensure that a larger key space doesn’t drive the window closing without skipping valid records because even though they arrive out of order, the grace period ensures their inclusion in the windows. We’ll cover grace periods and out-of-order data in the next section.&lt;/p&gt;

&lt;h1 id="out-of-order-data"&gt;Out of order data&lt;/h1&gt;

&lt;p&gt;So far, with our discussion of windowing, I’ve assumed the happy path of records arriving in order, that is, strictly ascending timestamps. But in practice, a record may arrive out-of-order. Let’s look at an illustration to help explain what an out-of-order record is:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/out-order-record.png" alt="out order record" /&gt;
&lt;figcaption&gt;An out-of-order record would have been included in a window if it arrived in order&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;This illustration shows us that an out-of-order record would have been included in a now-closed window had it arrived in order. Given that we want our windowed results to have as complete a picture as possible, making allowances for out-of-order data makes sense. You should allow a grace period, where a window can include records it would otherwise reject. In Kafka Streams, you can explicitly add a grace period to a window definition:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Adding grace to Kafka Streams window operator&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;
    &lt;span class="nc"&gt;KStream&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;Double&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
      &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"heat-sensor-input"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
        &lt;span class="nc"&gt;Consumed&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;doubleSerde&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
    &lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;groupByKey&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
          &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;windowedBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TimeWindows&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofSizeAndGrace&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMinutes&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofSeconds&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="o"&gt;)))&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;emitStrategy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;EmitStrategy&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onWindowClose&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;aggregate&lt;/span&gt;&lt;span class="o"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tempThreshold&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;
             &lt;span class="n"&gt;aggregator&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
             &lt;span class="nc"&gt;Materialized&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
             &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toStream&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;to&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"sensor-agg-output"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
               &lt;span class="nc"&gt;Produced&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowedSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;Defining a tumbling window of one minute with thirty seconds grace.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The grace period works like this. When a window operator evaluates if it should include the current record, it will subtract the grace period time from the current stream time value. It will be included if the record’s timestamp fits into the grace-adjusted time for the window. Here’s a quick depiction of a grace period in action:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/grace-in-action.png" alt="grace in action" /&gt;
&lt;figcaption&gt;Grace period in action&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;So, by defining a grace period, you can include records that arrive out of order. Any records arriving after the grace period expiration are considered late and are discarded.&lt;/p&gt;

&lt;p&gt;Flink SQL also makes provisions for out-of-order records that operate in a similar manner. You would adjust the watermark strategy expression to allow for out-of-order records:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Flink SQL table definition with watermark strategy expression with a grace period&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;ratings&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;rating_id&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;title&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;release_year&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;rating&lt;/span&gt; &lt;span class="nb"&gt;DOUBLE&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;rating_time&lt;/span&gt; &lt;span class="nb"&gt;TIMESTAMP&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="n"&gt;WATERMARK&lt;/span&gt; &lt;span class="k"&gt;FOR&lt;/span&gt; &lt;span class="n"&gt;rating_time&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;rating_time&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'30'&lt;/span&gt; &lt;span class="k"&gt;SECOND&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;This watermark strategy allows records in the ratings table to be as much as 30 seconds out-of-order.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Updating the watermark strategy this way makes it “Bounded out of orderness timestamps”. So when a record arrives out of order, but its timestamp still fits inside the current watermark, it will be included in the windowed calculation.&lt;/p&gt;

&lt;h1 id="time-and-low-traffic-partitions"&gt;Time and Low traffic partitions&lt;/h1&gt;

&lt;p&gt;Another angle of time semantics relates equally to Kafka Streams (stream time) and Flink SQL (watermarks) that I’d like to cover before wrapping up: behavior with low traffic partitions. I had mentioned before that with both Kafka Streams and Flink SQL, the event time of the incoming records drives the progress of the event stream.&lt;/p&gt;

&lt;p&gt;But what happens when you have low or infrequent traffic? You’ll not observe windowed results regularly without new events to push time advancement. Flink SQL has the concept of “idleness” that allows time to advance when faced with a task slot not receiving regular new events. The &lt;a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/config/#table-exec-source-idle-timeout"&gt;table.exec.source.idle-timeout&lt;/a&gt; or &lt;a href="https://docs.confluent.io/cloud/current/flink/reference/statements/set.html#available-set-options"&gt;sql.tables.scan.idle-timeout&lt;/a&gt; on &lt;a href="https://www.confluent.io/product/flink/"&gt;Confluent Cloud&lt;/a&gt; lets you specify an upper bound on the amount of time to wait for new records before considering a task slot as idle. Setting this configuration (the default is 0, which turns off detecting idleness) allows downstream operators to advance watermarks, providing windowed results without waiting for new records from the event source. You can offer similar functionality in Kafka Streams with a bit of manual work using the &lt;a href="https://javadoc.io/static/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.api.ProcessorSupplier-java.lang.String…​-"&gt;KStream.process&lt;/a&gt; method. This method provides for a mixin of the &lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process"&gt;Processor API&lt;/a&gt; openting the door to schedule a &lt;a href="https://kafka.apache.org/36/javadoc/org/apache/kafka/streams/processor/ProcessorContext.html#schedule(java.time.Duration,org.apache.kafka.streams.processor.PunctuationType,org.apache.kafka.streams.processor.Punctuator)"&gt;punctuation&lt;/a&gt; which would allow you retrieve and forward windowed results with an idle partition.&lt;/p&gt;

&lt;h1 id="resources"&gt;Resources&lt;/h1&gt;

&lt;ul&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.manning.com/books/kafka-streams-in-action-second-edition"&gt;Kafka Streams in Action 2nd Edition&lt;/a&gt; book.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.confluent.io/product/flink/"&gt;Apache Flink® on Confluent Cloud&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-tvf/#windowing-table-valued-functions-windowing-tvfs"&gt;Flink SQL Windows&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#windowing"&gt;Kafka Streams windowing documentation&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
&lt;/ul&gt;
</description>
        <pubDate>Thu, 29 Feb 2024 16:40:00 +0000</pubDate>
        <link>https://codingjunkie.net//mastering-stream-processing-time-semantics/</link>
        <link href="https://codingjunkie.net/mastering-stream-processing-time-semantics/"/>
        <guid isPermaLink="true">https://codingjunkie.net/mastering-stream-processing-time-semantics/</guid>
      </item>
    
      <item>
        <title>Mastering Stream Processing - Session and Cumulating windows</title>
        <description>&lt;p&gt;In the third installment of this windowing blog series, you’ll learn about cumulating and session windows. In previous posts, we’ve covered &lt;a href="https://www.codingjunkie.net/mastering-stream-processing-hopping-tumbling-windows/"&gt;hopping and tumbling windows&lt;/a&gt; and &lt;a href="https://www.codingjunkie.net/mastering-stream-processing-sliding-windows/"&gt;sliding windows and the Flink SQL equivalent - OVER aggregations&lt;/a&gt;. The cumulate window is unique to Flink SQL. The session window has been available in Kafka Streams since version 0.10.2 and is going to be available in the newest version (1.19) of Flink SQL as part of its stable windowing table-valued functions (TVFs).&lt;/p&gt;

&lt;p&gt;Before jumping in, if you ask yourself what is cumulating and how does that relate to Accumulating? The difference between cumulate to accumulate is that the latter is a more intentional gathering while cumulate means to gather together what you already have.&lt;/p&gt;

&lt;p&gt;Now, let’s get into cumulating windows.&lt;/p&gt;

&lt;h2 id="cumulating-windows"&gt;Cumulating windows&lt;/h2&gt;

&lt;p&gt;The cumulate window is also part of Flink SQL’s windowing TVF stable and has a fixed size and steps that advance it. Each advance includes the data from the window start and each previous advance. Once the advances reach the window size, the data resets to only what is available at the beginning of the new window. This concept is probably best understood with an illustration:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/cumulate_windows.png" alt="cumulate windows" /&gt;
&lt;figcaption&gt;Cumulating windows have a fixed size with advances smaller than the length of the window&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;So, from looking at this picture, each advance of the window includes all previous records from the window start. So, each advance accumulates the results up to the window end. Then, the window advances reset from the beginning of the newest window. Another way to think about the cumulate window is a tumbling window where you get updates at regular intervals.&lt;/p&gt;

&lt;p&gt;This explanation could still leave some doubt about what the cumulating window does, so let’s look at one more illustration with values:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/cumulate_with_data.png" alt="cumulate with data" /&gt;
&lt;figcaption&gt;A Cumulate window with a sum function&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;Each event in above illustration represents a purchase transaction, and for simplicity of the example, let’s say each transaction is $5. At the window start, there is a transaction; with each slide, there’s an additional one. Since we have a window size of 1 minute with a 15-second step, our results are w1 = $5, w2 = $10, w3 = $15, w4 = $20. Each step includes the previous events from the overall window start. Once the window reaches its size, the results would reset to the beginning of the next window.&lt;/p&gt;

&lt;p&gt;To use this functionality, specify the window type by using the reserved function name &lt;code class="language-plaintext highlighter-rouge"&gt;CUMULATE&lt;/code&gt; inside the &lt;code class="language-plaintext highlighter-rouge"&gt;TABLE&lt;/code&gt; function:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Specifying the CUMULATE function&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;window_start&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
           &lt;span class="n"&gt;window_end&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
           &lt;span class="n"&gt;user_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
           &lt;span class="k"&gt;sum&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;page_view&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;page_views&lt;/span&gt;
    &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CUMULATE&lt;/span&gt;   &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
                   &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;user_visits&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;   &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
                         &lt;span class="k"&gt;DESCRIPTOR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;visit_time&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;   
                         &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'15'&lt;/span&gt; &lt;span class="n"&gt;SECONDS&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
                         &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'1'&lt;/span&gt; &lt;span class="k"&gt;MINUTE&lt;/span&gt;   &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
                   &lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;window_start&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
             &lt;span class="n"&gt;window_end&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
             &lt;span class="n"&gt;user_id&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;Let’s break this query down:&lt;/p&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;Using the &lt;code class="language-plaintext highlighter-rouge"&gt;CUMULATE&lt;/code&gt; function&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Specifying the table for the function source&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Timestamp column providing time attributes for the windows&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;The step size of each advance&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Maximum size of the window&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;In an earlier post, I discussed how Kafka Stream windows emit updates regularly and Flink SQL windows only emit on closing. The functionality of a cumulating window is logically similar to the Kafka Streams windowing model since it emits updates before the final one. But there’s a difference: &lt;strong&gt;Kafka Streams updates are tied to committing or cache eviction events and are not configurable&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;Now, let’s move on to session windows.&lt;/p&gt;

&lt;h2 id="session-windows"&gt;Session windows&lt;/h2&gt;

&lt;p&gt;Session windows differ significantly from the previous ones we’ve seen so far in that they don’t have a fixed size. Instead, session windows define an &lt;em&gt;inactivity period&lt;/em&gt;, and as long as records arrive within the inactivity period, a session window continues to grow. A new window starts only when a new record arrives and its timestamp is equal to or greater than the inactivity period plus the end timestamp of the current session. Due to the nature of session windows, the record timestamps determine the start and end of the window.&lt;/p&gt;

&lt;p&gt;Let’s review this process in the following depiction of session windows:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/session_window_example.png" alt="session window example" /&gt;
&lt;figcaption&gt;Session windows continue to grow until the gap between the latest and incoming timestamps exceeds the inactivity period&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;So, following along with this illustration, the session window continues to grow until a record arrives with a timestamp 1 minute or more than the ending timestamp of the current session. With the arrival of this timestamp outside the inactivity gap, results in creating a new session.&lt;/p&gt;

&lt;h2 id="kafka-streams-session-window"&gt;Kafka Streams Session Window&lt;/h2&gt;

&lt;p&gt;Here’s how you define a session window in Kafka Streams:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Session windows in Kafka Streams&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;     &lt;span class="nc"&gt;Serde&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Windowed&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;sessionWindowSerde&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
          &lt;span class="nc"&gt;WindowedSerdes&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;sessionWindowedSerdeFrom&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
     &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;inputTopic&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Consumed&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Serdes&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;String&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;clicksSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;groupByKey&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;windowedBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;SessionWindows&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofInactivityGapWithNoGrace&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
                                                      &lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMinutes&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
                                                  &lt;span class="o"&gt;)&lt;/span&gt;
                        &lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;count&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toStream&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;to&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outputTopic&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Produced&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sessionWindowSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Serdes&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;()));&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;Let’s break it down step by step (btw, Serde here refers to Serializer/Deserializer! aha, welcome back to distributed systems world)&lt;/p&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;Creating a &lt;code class="language-plaintext highlighter-rouge"&gt;Serde&lt;/code&gt; for session windows&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Specifying to use session windows for the aggregation. Here, we’re choosing not to use a grace period so that Kafka Streams will drop out-of-order records. In this series, we’ll discuss grace periods in the blog post on time semantics.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;The amount of inactivity, 1 minute, between events before the current session terminates and a new session starts.&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;h2 id="session-windows-in-flink-sql"&gt;Session Windows in Flink SQL&lt;/h2&gt;

&lt;p&gt;To define a &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/window-tvf/#session"&gt;session window in Flink SQL&lt;/a&gt; you’ll use the windowing TVF format assuming the use case of tracking a click stream on a website:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Session Windows in Flink SQL&lt;/strong&gt;&lt;/p&gt;
&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;    &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;window_start&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
           &lt;span class="n"&gt;window_end&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
           &lt;span class="k"&gt;COUNT&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;click&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;total_clicks&lt;/span&gt;
      &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;SESSION&lt;/span&gt;     &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
                   &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;page_views&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
                    &lt;span class="k"&gt;DESCRIPTOR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;click_time&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
                    &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'1'&lt;/span&gt; &lt;span class="n"&gt;MINUTES&lt;/span&gt;    &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
                    &lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;window_start&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;window_end&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;Let’s break it down step by step:&lt;/p&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;Specifying the SESSION windowing TVF function&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Source table for events&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Time attribute column&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;The inactivity gap for defining when to start a new session&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;h2 id="use-cases"&gt;Use cases&lt;/h2&gt;

&lt;h3 id="cumulating-window"&gt;Cumulating Window&lt;/h3&gt;

&lt;p&gt;For the cumulate window, any windowed aggregation where you do a count or sum is the candidate use case.&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/cumulate_use_case.png" alt="cumulate use case" /&gt;
&lt;/figure&gt;

&lt;p&gt;We can generalize the cumulating window use case as &lt;strong&gt;“Give me &amp;lt;aggregate&amp;gt; over the last N period, updated every Y.”&lt;/strong&gt; Instead of waiting for the window to close, you can get updates to help you understand the trends leading up to the final window result.&lt;/p&gt;

&lt;h3 id="session-window"&gt;Session Window&lt;/h3&gt;

&lt;p&gt;For the session window, we could say, &lt;strong&gt;“Show me &amp;lt;aggregate&amp;gt; of events occurring within &amp;lt;inactivity period&amp;gt;.”&lt;/strong&gt; Since the session window starts and ends with event timestamps and continues to grow with incoming records within the inactivity gap, it lends itself well to tracking behavior.&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/session_use_case.png" alt="session use case" /&gt;
&lt;/figure&gt;

&lt;p&gt;Things like tracking a click stream come to mind. The first click event starts the window, and the session continues to grow until it doesn’t receive more events within the inactivity timeout. Then, when more events come in, a new session starts.&lt;/p&gt;

&lt;h1 id="resources"&gt;Resources&lt;/h1&gt;

&lt;ul&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.manning.com/books/kafka-streams-in-action-second-edition"&gt;Kafka Streams in Action 2nd Edition&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.confluent.io/product/flink/"&gt;Apache Flink® on Confluent Cloud&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/window-tvf/#windowing-table-valued-functions-windowing-tvfs"&gt;Flink SQL Windows&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#windowing"&gt;Kafka Streams windowing documentation&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
&lt;/ul&gt;
</description>
        <pubDate>Mon, 26 Feb 2024 21:00:00 +0000</pubDate>
        <link>https://codingjunkie.net//mastering-stream-processing-session-cumulating-windows/</link>
        <link href="https://codingjunkie.net/mastering-stream-processing-session-cumulating-windows/"/>
        <guid isPermaLink="true">https://codingjunkie.net/mastering-stream-processing-session-cumulating-windows/</guid>
      </item>
    
      <item>
        <title>Mastering Stream Processing - Sliding windows and OVER aggregations</title>
        <description>&lt;p&gt;In the third installment of this windowing blog series, you’ll learn about sliding windows and a bit of SQL. In the previous post, we covered hopping and tumbling windows, both of which Kafka Streams and Flink SQL provide. In this installment, we will discuss sliding windows, supported by Kafka Streams and Flink SQL, or the logical equivalent in both. Let’s jump into sliding windows.&lt;/p&gt;

&lt;h1 id="sliding-windows"&gt;Sliding windows&lt;/h1&gt;

&lt;p&gt;&lt;a href="https://kafka.apache.org/36/javadoc/org/apache/kafka/streams/kstream/SlidingWindows.html"&gt;Sliding windows&lt;/a&gt; in Kafka Streams combine attributes of the previous windows we’ve seen in this blog series. Like the hopping or tumbling variants, a sliding window has a fixed size determined by the maximum time difference between records. But record timestamps determine the start and end times of the window, like a session window. Another difference with the sliding window is that both start and end times are inclusive as opposed to only the start time as with the other windows.&lt;/p&gt;

&lt;p&gt;As the window slides over the data, new records come into the front as old records drop off the back. You can think of a sliding window that continually "slides" over an event stream, with new records entering the front and older records falling out the back.&lt;/p&gt;

&lt;p&gt;While you could emulate sliding windows in Kafka Streams by defining a hopping window with a 1MS advance, the sliding window has some distinct advantages. First, the sliding window start and end times are inclusive, unlike the hopping window, where only the start time is inclusive. Second, sliding windows are more efficient as they calculate each &lt;strong&gt;&lt;em&gt;distinct&lt;/em&gt;&lt;/strong&gt; window. A new window is created only when a record enters or drops out of the window. A hopping window with a small advance is less efficient as it will perform its calculation for every window regardless of whether the windows contain different events.&lt;/p&gt;

&lt;p&gt;As each record arrives, Kafka Streams creates a new window, including any previous records that fit within the maximum time difference defined by the window. This "look back" feature is unique to the sliding window behavior. Let’s look at an illustration of sliding windows in action:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/sliding_window_example.png" alt="sliding window example" /&gt;
&lt;figcaption&gt;Sliding windows create a new window for new records, and when records drop out of a window&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;So, from our simple illustration, we can see how incoming records create a new window and include previous records that fit within the (exclusive) time difference.&lt;/p&gt;

&lt;p&gt;Here’s how you define a Kafka Streams sliding window:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Sliding windows in Kafka Streams&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;&lt;span class="nc"&gt;KStream&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;Double&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
  &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"heat-sensor-input"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nc"&gt;Consumed&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;doubleSerde&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

&lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;groupByKey&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;windowedBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                  &lt;span class="nc"&gt;SlidingWindows&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofTimeDifferenceWithNoGrace&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMinutes&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; 
                  &lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;aggregate&lt;/span&gt;&lt;span class="o"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tempThreshold&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;
         &lt;span class="n"&gt;aggregator&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
         &lt;span class="nc"&gt;Materialized&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
         &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toStream&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;to&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"sensor-agg-output"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
           &lt;span class="nc"&gt;Produced&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowedSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;Using a sliding window with a time difference of one minute, when a new record arrives, previous records within the time difference are included in the window.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Now, let’s move on to FLink SQL.&lt;/p&gt;

&lt;h1 id="over-aggregation"&gt;OVER Aggregation&lt;/h1&gt;

&lt;p&gt;While FLink SQL doesn’t have an exact one-to-one match with the Kafka Streams sliding window, it does provide essentially the same functionality with &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/over-agg/#over-aggregation"&gt;OVER aggregations&lt;/a&gt;. Using the &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; clause in Flink SQL allows you to perform an aggregation over a range of rows, but what makes it unique is that, unlike a &lt;code class="language-plaintext highlighter-rouge"&gt;GROUP BY&lt;/code&gt; aggregation, the &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; aggregation does not reduce the results; it includes all the rows in the aggregation range. Note that you could do something similar in Kafka Streams using the &lt;a href="https://kafka.apache.org/36/documentation/streams/developer-guide/processor-api.html"&gt;Processor API&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;There’s a subtle difference in the results of &lt;code class="language-plaintext highlighter-rouge"&gt;GROUP BY&lt;/code&gt; and an &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; aggregation with a &lt;code class="language-plaintext highlighter-rouge"&gt;PARTITION BY&lt;/code&gt;. The easiest way to show the differences between the two will be with illustrations. Consider the following table of data as the basis for our comparison:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/temp_readings_table.png" alt="temp readings table" /&gt;
&lt;figcaption&gt;Table of temperature readings&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;Now lets at the results of a &lt;code class="language-plaintext highlighter-rouge"&gt;GROUP BY&lt;/code&gt; aggregation first:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/group_by_results.png" alt="group by results" /&gt;
&lt;figcaption&gt;GROUP BY Aggregates collapse the details into singular results&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;The results here are what we’ve all come to expect: the original rows are reduced into a single row per location with the average reading. Now contrast that with the &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; approach:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/over_aggregate_results.png" alt="over aggregate results" /&gt;
&lt;figcaption&gt;OVER Aggregates return all rows in the range&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;The results of an &lt;code class="language-plaintext highlighter-rouge"&gt;OVER (PARTITION BY…)&lt;/code&gt; aggregation contain all the rows of the range. Each row contains the same value for the average by location, but you have all the other information available to view. This demonstrates the differences between &lt;code class="language-plaintext highlighter-rouge"&gt;GROUP BY&lt;/code&gt; and &lt;code class="language-plaintext highlighter-rouge"&gt;OVER (PARTITION BY..)&lt;/code&gt; aggregations. Both clauses group things together, but a &lt;code class="language-plaintext highlighter-rouge"&gt;Partition By&lt;/code&gt; does not combine the rows in the results; each row remains distinct. It’s important to note here that altough results are shown here for each row in the table, it’s only for demonstration purposes. An &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; aggregation only returns results for rows that fall into the specifed range.&lt;/p&gt;

&lt;p&gt;So, in what may be an oversimplification, an &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; aggregation allows you to perform aggregates and group the results but still view the individual rows. While a &lt;code class="language-plaintext highlighter-rouge"&gt;GROUP BY&lt;/code&gt; will collapse the rows and provide a single-row result per grouping.&lt;/p&gt;

&lt;p&gt;Let’s jump into an example query now. Let’s say you have a fleet of IoT sensors deployed in different parts of a manufacturing process, and monitoring the temperature is essential to spot problems and keep the process running smoothly. So you’ll want a query that will give you the average temps per location over the last minute:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;OVER Aggregation in Flink SQL&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;device_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
   &lt;span class="k"&gt;AVG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;temp_reading&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;OVER&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
     &lt;span class="k"&gt;PARTITION&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;  &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
     &lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;   &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
      &lt;span class="k"&gt;RANGE&lt;/span&gt; &lt;span class="k"&gt;BETWEEN&lt;/span&gt; &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'1'&lt;/span&gt; &lt;span class="k"&gt;MINUTE&lt;/span&gt; &lt;span class="k"&gt;PRECEDING&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="k"&gt;CURRENT&lt;/span&gt; &lt;span class="k"&gt;ROW&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
 &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;one_minute_location_temp_averages&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;readings&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;The OVER clause&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Partitioning by the location&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Ordering results by the &lt;code class="language-plaintext highlighter-rouge"&gt;report_time&lt;/code&gt; column&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;A range definition specifying the range to go back 1 minute in results&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;The name of the average calculation column&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;So, this query will give us a running average of temperatures grouped by region but all rows. You can also specify the range as a count of rows from the current row. In Flink SQL, the &lt;code class="language-plaintext highlighter-rouge"&gt;ORDER BY&lt;/code&gt; is required and only works with ascending time attributes. The &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/over-agg/#range-definitions"&gt;range defintions&lt;/a&gt; come in two forms:&lt;/p&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;A &lt;code class="language-plaintext highlighter-rouge"&gt;RANGE&lt;/code&gt; interval dependant on the time attribute defined by the &lt;code class="language-plaintext highlighter-rouge"&gt;ORDER BY&lt;/code&gt; column&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;A &lt;code class="language-plaintext highlighter-rouge"&gt;ROW&lt;/code&gt; interval, which is count-based and specifies how many rows the result will contain. A &lt;code class="language-plaintext highlighter-rouge"&gt;ROW&lt;/code&gt; interval looks like this: &lt;code class="language-plaintext highlighter-rouge"&gt;ROWS BETWEEN N and CURRENT ROW&lt;/code&gt;, including N+1 result rows (the N preceding rows plus the current row). The &lt;code class="language-plaintext highlighter-rouge"&gt;CURRENT ROW&lt;/code&gt; is the starting point for a specified range determined by the &lt;code class="language-plaintext highlighter-rouge"&gt;PARTITION BY&lt;/code&gt; clause.&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The choice of which range definition to apply depends on your use case. The &lt;code class="language-plaintext highlighter-rouge"&gt;RANGE&lt;/code&gt; interval will drop older records as new records advance the window, but you’ll always know the records in the aggregation are within a given time. The &lt;code class="language-plaintext highlighter-rouge"&gt;ROW&lt;/code&gt; ensures that you’ll always have N number of records making up your computation.&lt;/p&gt;

&lt;p&gt;Another point of consideration is that the &lt;code class="language-plaintext highlighter-rouge"&gt;PARTITION BY&lt;/code&gt; clause is optional. By leaving it off, you’ll get an overall aggregation of records in the range vs. aggregations segmented by the partition column.&lt;/p&gt;

&lt;p&gt;There’s another way to express an &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; aggregation in Flink SQL using the &lt;code class="language-plaintext highlighter-rouge"&gt;WINDOW&lt;/code&gt; clause. Let’s rework our &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; aggregation example to use this format.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;OVER aggregation using the WINDOW clause&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;reading&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="k"&gt;Avg&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;reading&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;OVER&lt;/span&gt; &lt;span class="n"&gt;win&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;avg_temps&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
 &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;readings&lt;/span&gt;
 &lt;span class="k"&gt;WINDOW&lt;/span&gt; &lt;span class="n"&gt;win&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;   
    &lt;span class="k"&gt;PARTITION&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;
    &lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;
    &lt;span class="k"&gt;RANGE&lt;/span&gt; &lt;span class="k"&gt;BETWEEN&lt;/span&gt; &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'15'&lt;/span&gt; &lt;span class="k"&gt;MINUTE&lt;/span&gt; &lt;span class="k"&gt;PRECEDING&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="k"&gt;CURRENT&lt;/span&gt; &lt;span class="k"&gt;ROW&lt;/span&gt;
 &lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;The Avg aggregation function over the readings&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Using the &lt;code class="language-plaintext highlighter-rouge"&gt;WINDOW&lt;/code&gt; clause to specify the window over a range of data&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;This query is the functional equivalent of the previous &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; aggregation example. So the question of "which one" naturally comes to mind, to which there are a couple of answers. First, the &lt;code class="language-plaintext highlighter-rouge"&gt;WINDOW&lt;/code&gt; form has a more explicit window definition, making it easier to understand. Second, defining the &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; aggregation this way opens the door to reusing the window definition for multiple aggregates. For example, consider you want to keep track of the maximum temperature and the average. You could do so with this query:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;OVER aggregation with a Window clause and multiple aggregations&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;reading&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="k"&gt;Avg&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;reading&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;OVER&lt;/span&gt; &lt;span class="n"&gt;win&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;avg_temps&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
 &lt;span class="k"&gt;MAX&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;reading&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;OVER&lt;/span&gt; &lt;span class="n"&gt;win&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;max_temp&lt;/span&gt;
 &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;readings&lt;/span&gt;
 &lt;span class="k"&gt;WINDOW&lt;/span&gt; &lt;span class="n"&gt;win&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="k"&gt;PARTITION&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;
    &lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;
    &lt;span class="k"&gt;RANGE&lt;/span&gt; &lt;span class="k"&gt;BETWEEN&lt;/span&gt; &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'15'&lt;/span&gt; &lt;span class="k"&gt;MINUTE&lt;/span&gt; &lt;span class="k"&gt;PRECEDING&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="k"&gt;CURRENT&lt;/span&gt; &lt;span class="k"&gt;ROW&lt;/span&gt;
 &lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;So by explicitly using the &lt;code class="language-plaintext highlighter-rouge"&gt;WINDOW&lt;/code&gt; form, you can easily add more aggregations, but keep in mind this increases the state for Flink SQL to keep.&lt;/p&gt;

&lt;p&gt;Finally, the &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; aggregation query is the basis for other analytical queries like the &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/topn/#top-n"&gt;Top-N&lt;/a&gt; query. I won’t go into more detail about the &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; aggregation type of query now, but I’ll have a post that goes deeper into it and other analytical queries soon.&lt;/p&gt;

&lt;h2 id="comparing-sliding-windows-to-over-aggregations"&gt;Comparing Sliding windows to OVER aggregations&lt;/h2&gt;
&lt;p&gt;At the blog’s beginning, I mentioned that the Kafka Streams and Flink SQL &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; aggregation were logically similar. With the sliding window, when a new record arrives, Kafka Streams creates a new window for it, and there’s a "look back" to see what records have timestamps within the max difference. As records continue to arrive and the windows advance, new records come into the front, and older records drop out the back. Much the same can be said of the &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; aggregation; a new record results in a new row, and the &lt;code class="language-plaintext highlighter-rouge"&gt;RANGE&lt;/code&gt; includes records within the time range. Over time, new records are at the top, and older records drop off the back of the range.&lt;/p&gt;

&lt;h1 id="use-cases"&gt;Use cases&lt;/h1&gt;

&lt;h2 id="sliding-windows-1"&gt;Sliding Windows&lt;/h2&gt;

&lt;p&gt;Logically, a sliding window flows continually over an event stream, which makes it an excellent fit for a running average.&lt;/p&gt;

&lt;p&gt;&lt;img src="../assets/images/sliding_use_case.png" alt="sliding use case" /&gt;&lt;/p&gt;

&lt;p&gt;Also, a sliding window could be used for alerting when a given event occurs N times within the timeframe of one window.&lt;/p&gt;

&lt;h2 id="over-aggregations"&gt;OVER Aggregations&lt;/h2&gt;

&lt;p&gt;Similarly, an &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; aggregation can provide the same type of functionality, a running average or count, watching for a value to exceed a given threshold.&lt;/p&gt;

&lt;p&gt;&lt;img src="../assets/images/over_aggregate_use_case.png" alt="over aggregate use case" /&gt;&lt;/p&gt;

&lt;p&gt;You can also wrap your &lt;code class="language-plaintext highlighter-rouge"&gt;OVER&lt;/code&gt; query with an outer one to only select values that meet your alerting criteria:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Selecting only values that reach or exceed the max average&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;reading&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;avg_temps&lt;/span&gt;
 &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;location&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;reading&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;Avg&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;reading&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
             &lt;span class="n"&gt;OVER&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;report_time&lt;/span&gt;
                    &lt;span class="k"&gt;RANGE&lt;/span&gt; &lt;span class="k"&gt;BETWEEN&lt;/span&gt; &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'15'&lt;/span&gt; &lt;span class="k"&gt;MINUTE&lt;/span&gt; &lt;span class="k"&gt;PRECEDING&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="k"&gt;CURRENT&lt;/span&gt; &lt;span class="k"&gt;ROW&lt;/span&gt;
                 &lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;avg_temps&lt;/span&gt;
        &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;readings&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;avg_temps&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;N&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;h1 id="resouces"&gt;Resouces&lt;/h1&gt;

&lt;ul&gt;
  &lt;li&gt;&lt;a href="https://www.manning.com/books/kafka-streams-in-action-second-edition"&gt;Kafka Streams in Action 2nd Edition&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://www.confluent.io/product/flink/"&gt;Apache Flink®  on Confluent Cloud&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-tvf/#windowing-table-valued-functions-windowing-tvfs"&gt;Flink SQL Windows&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#windowing"&gt;Kafka Streams windowing documentation&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
        <pubDate>Wed, 14 Feb 2024 13:23:23 +0000</pubDate>
        <link>https://codingjunkie.net//mastering-stream-processing-sliding-windows/</link>
        <link href="https://codingjunkie.net/mastering-stream-processing-sliding-windows/"/>
        <guid isPermaLink="true">https://codingjunkie.net/mastering-stream-processing-sliding-windows/</guid>
      </item>
    
      <item>
        <title>Mastering Stream Processing - Hoppping and Tumbling windows</title>
        <description>&lt;p&gt;In the first post of this series, we discussed what event streaming windowing is, and we examined in detail the structure of a windowed aggregate in Kafka Streams and Flink SQL.&lt;/p&gt;

&lt;p&gt;In this post, we’ll dive into two specific windowing implementations: hopping and tumbling windows.&lt;/p&gt;

&lt;h2 id="hopping-windows"&gt;Hopping windows&lt;/h2&gt;

&lt;p&gt;A hopping window has a fixed time length, and it moves forward or "hops" at a time interval smaller than the window’s length. For example, a hopping window can be one minute long and advance every ten seconds. The following illustration demonstrates the concept of a hopping window:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/hopping_windows.png" alt="hopping windows" /&gt;
&lt;figcaption&gt;Hopping windows have a fixed size with advances smaller than the length of the window&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;So, from the illustration above, hopping windows can produce overlapping results. A hop forward can include results contained in the previous window. Let’s look at another illustration demonstrating this concept:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/hopping_window_overlap.png" alt="hopping window overlap" /&gt;
&lt;figcaption&gt;Hopping windows of one minute with a 30-second advance will share 30 seconds of data with the following window&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;Walking through the picture:&lt;/p&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;Window one starts at 12:00:00 PM and will collect data until 12:01:00 PM (end time is exclusive).&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;At 12:00:30 PM, due to the thirty-second advance, window two starts gathering data.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Window one and window two will share data for thirty seconds from the start of window two until the end of window one. The process continues with each window advance.&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Let’s show how you would implement a hopping window in Kafka Streams and Flink SQL.&lt;/p&gt;

&lt;h2 id="kafka-streams-hopping-window"&gt;Kafka Streams hopping window&lt;/h2&gt;

&lt;p&gt;For a hopping windowed aggregation in Kafka Streams, you’ll use one of the factory methods in the &lt;a href="https://www.javadoc.io/static/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/TimeWindows.html"&gt;TimeWindows&lt;/a&gt; class:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;A Kafks Streams hopping window example&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;&lt;span class="nc"&gt;KStream&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;Double&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
  &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"heat-sensor-input"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nc"&gt;Consumed&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;doubleSerde&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;


&lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;groupByKey&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;windowedBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                  &lt;span class="nc"&gt;TimeWindows&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofSizeWithNoGrace&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMinutes&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; 
                             &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;advanceBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofSeconds&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; 
                  &lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;aggregate&lt;/span&gt;&lt;span class="o"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tempThreshold&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;
         &lt;span class="n"&gt;aggregator&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
         &lt;span class="nc"&gt;Materialized&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
         &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toStream&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;to&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"sensor-agg-output"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
           &lt;span class="nc"&gt;Produced&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowedSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;By using &lt;code class="language-plaintext highlighter-rouge"&gt;TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))&lt;/code&gt; sets the window size at one minute, the &lt;code class="language-plaintext highlighter-rouge"&gt;withNoGrace&lt;/code&gt; means Kafka Streams will drop any out-of-order records that would have been included in the window had they arrived in order. We’ll get into grace periods more in the blog post on windowing time semantics.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;The &lt;code class="language-plaintext highlighter-rouge"&gt;.advanceBy(Duration.ofSeconds(30)&lt;/code&gt; call makes this a hopping window. It creates a window that is one minute in size and advances every ten seconds.&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Next, let’s move on to hopping windows with Flink SQL.&lt;/p&gt;

&lt;h1 id="flink-sql-hopping-window"&gt;Flink SQL hopping window&lt;/h1&gt;

&lt;p&gt;Note that Flink hopping windows can also be referred to as sliding windows. Kafka Stream offers a sliding window variant that behaves differently from its hopping window offering. So, for clarity, we’ll only refer to Flink windows with an advance smaller than the window size as hopping windows.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Hopping window average with Flink SQL&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;window_start&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;window_end&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;device_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="k"&gt;AVG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;reading&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;avg_reading&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;HOP&lt;/span&gt; 
               &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;device_readings&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;   
                     &lt;span class="k"&gt;DESCRIPTOR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;    
                     &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'30'&lt;/span&gt; &lt;span class="n"&gt;SECONDS&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  
                     &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'1'&lt;/span&gt; &lt;span class="n"&gt;MINUTES&lt;/span&gt;  
               &lt;span class="p"&gt;))&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;window_start&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
         &lt;span class="n"&gt;window_end&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
         &lt;span class="n"&gt;device_id&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;Specifying hopping windows by passing the &lt;code class="language-plaintext highlighter-rouge"&gt;HOP&lt;/code&gt; function to the &lt;code class="language-plaintext highlighter-rouge"&gt;TABLE&lt;/code&gt; function.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;The table you’ll use as the source for the hopping window aggregation.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;The &lt;code class="language-plaintext highlighter-rouge"&gt;DESCRIPTOR&lt;/code&gt; is the column with the time attribute used for the window.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;This first &lt;code class="language-plaintext highlighter-rouge"&gt;INTERVAL&lt;/code&gt; is the amount of "hop" or advance of the window.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;The second &lt;code class="language-plaintext highlighter-rouge"&gt;INTERVAL&lt;/code&gt; is the size of the window.&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Now, let’s move on to tumbling windows.&lt;/p&gt;

&lt;h1 id="tumbling-windows"&gt;Tumbling windows&lt;/h1&gt;

&lt;p&gt;A tumbling window has a fixed length in size and has an advance that is the same amount of time as the size. Tumbling windows are considered a specialized case of hopping windows due to the advance equalling the window size.&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/tumbling_windows.png" alt="tumbling windows" /&gt;
&lt;figcaption&gt;A tumbling window collects data for the window size, then "tumbles over" to start a new window.&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;Since a tumbling window starts a new one when the previous one ends, they don’t share any data. You won’t find records from one window in another one; the following illustration helps clarify this process:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/tumbling_window_no_overlap.png" alt="tumbling window no overlap" /&gt;
&lt;figcaption&gt;Tumbling windows have an advance equal to the size of the window and tumble to start a new one with no overlap in data&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;Stepping through this illustration&lt;/p&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;Window one starts at 12:00:00 PM and will collect data until it ends. The endtime of the window is exclusive&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;At 12:01:00 PM window two starts collecting data&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Since each window starts collecting data after the previous window ended, there are no shared results.&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;h1 id="kafka-streams-tumbling-window"&gt;Kafka Streams tumbling window&lt;/h1&gt;

&lt;p&gt;For tumbling windows in Kafka Streams you’ll use &lt;a href="https://www.javadoc.io/static/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/TimeWindows.html"&gt;TimeWindows&lt;/a&gt; class:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;A Kafks Streams tumbling window example&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;&lt;span class="nc"&gt;KStream&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;Double&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
  &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"heat-sensor-input"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nc"&gt;Consumed&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;doubleSerde&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
  
&lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;groupByKey&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;windowedBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                  &lt;span class="nc"&gt;TimeWindows&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofSizeWithNoGrace&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Duration&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofMinutes&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; 
                  &lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;aggregate&lt;/span&gt;&lt;span class="o"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tempThreshold&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;
         &lt;span class="n"&gt;aggregator&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
         &lt;span class="nc"&gt;Materialized&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
         &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toStream&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;to&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"sensor-agg-output"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
           &lt;span class="nc"&gt;Produced&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowedSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;Using &lt;code class="language-plaintext highlighter-rouge"&gt;TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))&lt;/code&gt; without the &lt;code class="language-plaintext highlighter-rouge"&gt;.advanceBy&lt;/code&gt; clause automatically makes this a tumbling window. Since you didn’t specify an advance, Kafka Streams will add one equal to the size. You could add the &lt;code class="language-plaintext highlighter-rouge"&gt;advanceBy&lt;/code&gt; clause with the same amount of time if you choose to skip the shortened version.&lt;/li&gt;
&lt;/ol&gt;

&lt;h1 id="flink-sql-tumbling-window"&gt;Flink SQL tumbling window&lt;/h1&gt;

&lt;p&gt;Tumbling windows in Flink SQL are defined similarly to the hopping variety, with a couple of differences&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Tumbling window average with Flink SQL&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;window_start&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;window_end&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;device_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="k"&gt;AVG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;reading&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;avg_reading&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;TUMBLE&lt;/span&gt; 
               &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;device_readings&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;   
                     &lt;span class="k"&gt;DESCRIPTOR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;timestamp&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;    
                     &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'1'&lt;/span&gt; &lt;span class="n"&gt;MINUTES&lt;/span&gt;  
               &lt;span class="p"&gt;))&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;window_start&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
         &lt;span class="n"&gt;window_end&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
         &lt;span class="n"&gt;device_id&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;Specifying tumbling windows by passing the &lt;code class="language-plaintext highlighter-rouge"&gt;TUMBLE&lt;/code&gt; function to the &lt;code class="language-plaintext highlighter-rouge"&gt;TABLE&lt;/code&gt; function.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;The table you’ll use as the source for the tumbling window aggregation.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;The &lt;code class="language-plaintext highlighter-rouge"&gt;DESCRIPTOR&lt;/code&gt; is the column with the time attribute used for the window.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;By passing a single &lt;code class="language-plaintext highlighter-rouge"&gt;INTERVAL&lt;/code&gt; parameter, Flink SQL will utilize this as the advance and the size.&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;You define tumbling windows in Flink SQL similarly to Kafka Streams in that you only provide one time paramater.&lt;/p&gt;

&lt;h1 id="use-cases"&gt;Use cases&lt;/h1&gt;

&lt;h2 id="hopping-windows-1"&gt;Hopping Windows&lt;/h2&gt;

&lt;p&gt;Let’s look at an illustration for the use case of hopping windows:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/hopping_window_use_case.png" alt="hopping window use case" /&gt;
&lt;figcaption&gt;Hopping window use case of collecting data for an aggregation for some time reported at intervals less than window&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;So, from looking at this image, we could generalize hopping windows as "every &amp;lt;time-period of window advance&amp;gt; give me &amp;lt;aggregate&amp;gt; over the last &amp;lt;window size&amp;gt; period". From our examples here, it would be "every 30 seconds, give me the average temp reading over the last minute". So, any problem domain where you want to closely monitor changes over time or compare changes relative to the previous reading could be a good fit for the hopping window.&lt;/p&gt;

&lt;p&gt;It’s worth noting that Kafka Streams will emit periodic results before a window closes, while Flink SQL will only emit results when it’s closed. I’ll go into more details about this in the fourth installment of this blog series.&lt;/p&gt;

&lt;h2 id="tumbling-windows-1"&gt;Tumbling Windows&lt;/h2&gt;

&lt;p&gt;For the tumbling window, we have another illustration:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/tumbling_window_use_case.png" alt="tumbling window use case" /&gt;
&lt;figcaption&gt;Tumbling window use case collecting data for aggregation and reporting it at regular intervals&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;This illustration for tumbling windows can be summarized as "give me &amp;lt;aggregate&amp;gt; every &amp;lt;window size period&amp;gt;" or restated to fit the examples in this post: "give me the average temp reading over the last minute". Since tumbling windows don’t share any records, any situation requiring a unique count of events per window period would be a reason to use a tumbling window.&lt;/p&gt;

&lt;h1 id="resouces"&gt;Resouces&lt;/h1&gt;

&lt;ul&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.manning.com/books/kafka-streams-in-action-second-edition"&gt;Kafka Streams in Action 2nd Edition&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.confluent.io/product/flink/"&gt;Apache Flink®  on Confluent Cloud&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-tvf/#windowing-table-valued-functions-windowing-tvfs"&gt;Flink SQL Windows&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#windowing"&gt;Kafka Streams windowing documentation&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
&lt;/ul&gt;

</description>
        <pubDate>Thu, 08 Feb 2024 00:23:23 +0000</pubDate>
        <link>https://codingjunkie.net//mastering-stream-processing-hopping-tumbling-windows/</link>
        <link href="https://codingjunkie.net/mastering-stream-processing-hopping-tumbling-windows/"/>
        <guid isPermaLink="true">https://codingjunkie.net/mastering-stream-processing-hopping-tumbling-windows/</guid>
      </item>
    
      <item>
        <title>Mastering Stream Processing - Introduction to windowing.</title>
        <description>&lt;p&gt;Stream processing is the best way to work with event data. While batch
processing still has its use cases, and probably always will, only
stream processing offers the ability to respond in real-time to events.&lt;/p&gt;

&lt;p&gt;But if we zoom in, what does it look like to respond to events? By now,
I’m sure you’re familiar with the oft-quoted fraud scenario - a person
with nefarious intent gets a hold of an unaware consumer’s credit card
number. Still, due to the bank’s responsiveness processing system, the
fraudulent charge gets declined.&lt;/p&gt;

&lt;p&gt;Other uses of stream processing require an immediate response but are
not tied to one single event. Consider monitoring the heat of a
manufacturing process; if the average temperature reaches a certain
threshold in a given period, then the monitoring process should generate
an alert. But this isn’t about one temperature spike. It’s about a
consistent upward trend. In other words, what are the temperature
readings doing during a fixed period?&lt;/p&gt;

&lt;p&gt;I’m talking about windowing in event streams, if you have not guessed by
now. While aggregations (an aggregation is a grouping of events by a
common attribute) are a vital tool to leverage an event stream, an
aggregation over all time doesn’t shed any light on specific periods of
activity. Consider the following illustration:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/ave_temp_over_time.png" alt="ave temp over time" /&gt;
&lt;figcaption&gt;Course grained average temperature readings&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;Over time the average temperature reading has increased some over time,
but it doesn’t tell the whole story. Now let’s take a look at capturing
the average temp readings over specific intervals:&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/ave_temp_windowed.png" alt="ave temp windowed" /&gt;
&lt;figcaption&gt;Windowed average temp readings&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;Now by getting readings at specific intervals (windows) you can spot the
issue with a large jump in the average value.&lt;/p&gt;

&lt;p&gt;This is not to say that an aggregation over all time isn’t helpful, but
that, in many cases, you’ll want to aggregate over specific intervals.
In other cases, you’ll want an aggregation not defined by fixed time
boundaries but by behavior, e.g., session windows whose boundaries are
based on periods of &lt;em&gt;inactivity.&lt;/em&gt; We’ll get into session windows in a
post later in the blog series.&lt;/p&gt;

&lt;p&gt;This blog post marks the first in a series about windowing in the two
dominant stream processing technologies today: &lt;a href="https://kafka.apache.org/36/documentation/streams/developer-guide/"&gt;Kafka
Streams&lt;/a&gt;
and &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/overview/"&gt;Flink, specifically Flink
SQL&lt;/a&gt;).
It’s important to note that the point of this blog series is not a
direct comparison between the two APIs. Instead, it is a resource for
windowed operations in Kafka Streams and Flink SQL. While comparing the
two in a competitive analysis is natural, it’s not the main focus here.&lt;/p&gt;

&lt;p&gt;The blog series will discuss:&lt;/p&gt;

&lt;ul&gt;
  &lt;li&gt;
    &lt;p&gt;The different types of windowing, semantics, and potential use
cases.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Time semantics&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Interpretation of the results&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Testing windowed applications&lt;/p&gt;
  &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;I will assume basic familiarity with Kafka Streams and Flink SQL, so the
examples will start by covering windowing.&lt;/p&gt;

&lt;p&gt;But before we get into windowing, let’s discuss how Kafka Streams and
Flink SQL structure windowing applications. We’ll only cover this level
of detail in this initial post, and subsequent ones will assume
knowledge of how to assemble the program and focus on the windowing
aspect.&lt;/p&gt;

&lt;h1 id="kafka-streams-windowing"&gt;Kafka Streams windowing&lt;/h1&gt;

&lt;p&gt;You’ll need to specify an
&lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#aggregating"&gt;aggregation&lt;/a&gt;
to do any windowing in Kafka Streams. Aggregations are a function that
combines smaller components into a large composition, clustered around
some attribute, which in Kafka Streams will be the key in the key-value
pairs. You can also perform a reduce, a specialized form of aggregation,
since a reduce operation will return the same type as its input
components. Generally, an aggregation can return a completely different
value from the inputs. But since windowing operates the same for either
a reduce or aggregation will use an aggregation for our examples
throughout the blog series.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;A Kafka Streams windowed aggregation&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;&lt;span class="nc"&gt;KStream&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;Double&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
  &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"heat-sensor-input"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nc"&gt;Consumed&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;doubleSerde&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
&lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;groupByKey&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; 
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;windowedBy&lt;/span&gt;&lt;span class="o"&gt;(&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;window&lt;/span&gt; &lt;span class="n"&gt;specificatation&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;)&lt;/span&gt; 
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;aggregate&lt;/span&gt;&lt;span class="o"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tempThreshold&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt; 
         &lt;span class="n"&gt;aggregator&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
         &lt;span class="nc"&gt;Materialized&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
         &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toStream&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;to&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"sensor-agg-output"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
           &lt;span class="nc"&gt;Produced&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowedSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sensorAggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;Let’s walk through the essential points of setting up the Kafka Streams
window aggregation:&lt;/p&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;The first step is to group all records by key; this is required
before performing any aggregation. Here you’re using
&lt;a href="https://javadoc.io/static/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/KStream.html#groupByKey"&gt;KStream.groupByKey&lt;/a&gt;
which assumes the underlying key-value pairs have the correct keys
needed for clustering together. If not, you could use the
&lt;a href="https://javadoc.io/static/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/KStream.html#groupBy-org.apache.kafka.streams.kstream.KeyValueMapper-"&gt;KStream.groupBy&lt;/a&gt;
function where you pass a
&lt;a href="https://javadoc.io/static/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/KeyValueMapper.html"&gt;KeyValueMapper&lt;/a&gt;
instance that maps the current key-value pair into a new one which
allows you to create a new key suitable for the aggregation
grouping. Note that changing the key for a group-by will lead to a
re-partitioning of the records.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;You are specifying the windowing - we’ll cover the specific types in
later posts.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Point three is where you’re specifying how to aggregate records. The
first parameter is an Initializer represented as a lambda function,
which provides the initial value. The second parameter is the
Aggregator instance, which performs the aggregation action you
specify. Here, it’s a simple average and tracking the highest and
lowest values seen. The third parameter is a Materialized instance
specifying how to store the aggregation. Since the value type
differs from the incoming value, you must provide the appropriate
Serde instance for Kafka Streams to use when (de)serializing
records.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;The final point is where you provide the Serde instances for
producing the results back to Kafka. The key Serde is a different
type as Kafka Streams wraps the incoming record key in a Windowed
instance.&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;What’s not apparent from this aggregation example is where the
timestamps for the window are. But there’s a big hint in the explanation
of the aggregation example. At point four of the aggregation
description, Kafka Streams wraps the original key in a
&lt;a href="https://www.javadoc.io/doc/org.apache.kafka/kafka-streams/latest/org/apache/kafka/streams/kstream/Windowed.html"&gt;Windowed&lt;/a&gt;
object.&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/windowed_example.png" alt="windowed example" /&gt;
&lt;figcaption&gt;Windowed object&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;As shown in this illustration, the Windowed object contains the original
key and the
&lt;a href="https://www.javadoc.io/static/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/Window.html"&gt;Window&lt;/a&gt;
instance for the aggregation values. The Window object has the start and
end time for the aggregation window. It doesn’t contain the window size,
but you can easily calculate the size by subtracting the start time from
the end. We’ll cover reporting and analyzing the aggregation window
times in a follow-on blog post.&lt;/p&gt;

&lt;p&gt;Wrapping the original key in a Windowed object changes the type, meaning
you’ll have to update Kafka Streams on serializing the results.
Fortunately, Kafka Streams provides the
&lt;a href="https://www.javadoc.io/static/org.apache.kafka/kafka-streams/3.6.1/org/apache/kafka/streams/kstream/WindowedSerdes.html"&gt;WindowedSerdes&lt;/a&gt;
utility class making it easy to get the correct Serde for producing
results back to Kafka:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Using the WindowedSerdes class to get a Serde for Windowed keys&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-java highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;&lt;span class="nc"&gt;Serde&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Windowed&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;windowedSerde&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
        &lt;span class="nc"&gt;WindowedSerdes&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;timeWindowedSerdeFrom&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; 
                                              &lt;span class="mi"&gt;60_000L&lt;/span&gt; 
                                            &lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="nc"&gt;KStream&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="nc"&gt;Double&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
  &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"heat-sensor-input"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
    &lt;span class="nc"&gt;Consumed&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;doubleSerde&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
  &lt;span class="n"&gt;iotHeatSensorStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;groupByKey&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; 
         &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;windowedBy&lt;/span&gt;&lt;span class="o"&gt;(&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;window&lt;/span&gt; &lt;span class="n"&gt;specificatation&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;)&lt;/span&gt;
         &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;aggregate&lt;/span&gt;&lt;span class="o"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;IotSensorAggregation&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tempThreshold&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;
              &lt;span class="n"&gt;aggregator&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
               &lt;span class="nc"&gt;Materialized&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stringSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;aggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
         &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toStream&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;to&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"sensor-agg-output"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
           &lt;span class="nc"&gt;Produced&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;with&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;windowedSerde&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;sensorAggregationSerde&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;The class type for the original key&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;The size of the window in milliseconds&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Providing the Serde for the Windowed key&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;So, by using the WindowedSerdes class, you provide the proper
deserialization strategy for Kafka Streams to produce windowed results
back to Kafka. Producing windowed results to a topic implies downstream
consumers will know how to handle the windowed results as well. We’ll
cover that situation in a later blog on reporting in a subsequent post
in this series.&lt;/p&gt;

&lt;p&gt;Now, let’s move on to Flink SQL aggregation windows.&lt;/p&gt;

&lt;h1 id="flink-sql-windowing"&gt;Flink SQL windowing&lt;/h1&gt;

&lt;p&gt;Flink offers windowing for event stream data as windowing table-valued
functions (TVF). The Flink TVFs implement the &lt;a href="https://sigmodrecord.org/publications/sigmodRecord/1806/pdfs/08_Industry_Michels.pdf"&gt;&lt;em&gt;SQL 2016 standard
Polymorphic Table
Functions&lt;/em&gt;&lt;/a&gt;
(PTF). In a nutshell, PTFs allow for user-defined functions on a table
that returns a table.&lt;/p&gt;

&lt;figure&gt;
&lt;img src="../assets/images/ptf_in_action.png" alt="ptf in action" /&gt;
&lt;figcaption&gt;PTF table function returning a table&lt;/figcaption&gt;
&lt;/figure&gt;

&lt;p&gt;The exciting thing about PTF is that the schema of the table returned by
the function is dynamic; it’s determined at runtime by the function
output. So, the PTFs enable windowing and aggregation functions on
existing tables, precisely what we get with the Flink SQL windowing. The
windowing TVFs in Flink replace the now deprecated &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-agg/#group-window-aggregation"&gt;Group Window
Functions&lt;/a&gt;.
Window TVFs provide more powerful window-based calculations like &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-topn/"&gt;Window
TopN&lt;/a&gt;
and &lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-deduplication/"&gt;Window
Deduplication&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Now, let’s move on to how you execute a windowed aggregation in Flink
SQL. As with the Kafka Streams example, we’ll review the structure of a
windowed aggregation, with specific window implementations covered in
later posts.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Structure of Flink SQL windowed aggregation&lt;/strong&gt;&lt;/p&gt;

&lt;div class="language-sql highlighter-rouge"&gt;&lt;div class="highlight"&gt;&lt;pre class="highlight"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;window_start&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;window_end&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;device_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="k"&gt;AVG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;reading&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;avg_reading&lt;/span&gt;   

&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
           &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="k"&gt;Window&lt;/span&gt; &lt;span class="k"&gt;Function&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt; 
                              &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;device_readings&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;   
                              &lt;span class="k"&gt;DESCRIPTOR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ts&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;    
                              &lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'5'&lt;/span&gt; &lt;span class="n"&gt;MINUTES&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  
                              &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;INTERVAL&lt;/span&gt; &lt;span class="s1"&gt;'10'&lt;/span&gt; &lt;span class="n"&gt;MINUTES&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
                            &lt;span class="p"&gt;)&lt;/span&gt;
           &lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;window_start&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
         &lt;span class="n"&gt;window_end&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
         &lt;span class="n"&gt;device_id&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;&lt;/div&gt;

&lt;p&gt;Here’s the breakdown of the query:&lt;/p&gt;

&lt;ol&gt;
  &lt;li&gt;
    &lt;p&gt;Selecting the columns and the aggregation using the Flink SQL AVG
function and providing a descriptive name; these columns form the
schema of the returned table.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;The TABLE function&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Here, you give a specific window function, either HOP, TUMBLING, or
CUMULATE. Support for a SESSION type is coming soon. We’ll cover the
specific types in later posts.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Next are the parameters for the window function, starting with the
table to use for the input&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;The DESCRIPTOR is the time attribute column the function uses for
the window.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;Depending on the window function, the following 1 or 2 parameters
determine the window advance and size or just the size.&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;As with standard SQL aggregate functions, we need the same columns
in the GROUP BY clause in the SELECT clause.&lt;/p&gt;
  &lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Flink SQL inserts three additional columns into windowed operations,
window_start, window_end, and window_time. Flink SQL determines
window_time by subtracting 1ms from the window_end value.&lt;/p&gt;

&lt;p&gt;This concludes our introduction to the structure of windowing
applications in Kafka Streams and Flink SQL. In the next edition, we’ll
cover hopping and tumbling windows.&lt;/p&gt;

&lt;h1 id="resources"&gt;Resources&lt;/h1&gt;

&lt;ul&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.confluent.io/product/flink/"&gt;Apache Flink®  on Confluent
Cloud&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-tvf/#windowing-table-valued-functions-windowing-tvfs"&gt;Flink SQL
Windows&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#windowing"&gt;Kafka Streams windowing
documentation&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
  &lt;li&gt;
    &lt;p&gt;&lt;a href="https://www.manning.com/books/kafka-streams-in-action-second-edition"&gt;Kafka Streams in Action 2nd
Edition&lt;/a&gt;&lt;/p&gt;
  &lt;/li&gt;
&lt;/ul&gt;
</description>
        <pubDate>Fri, 02 Feb 2024 00:23:23 +0000</pubDate>
        <link>https://codingjunkie.net//introduction-to-windowing/</link>
        <link href="https://codingjunkie.net/introduction-to-windowing/"/>
        <guid isPermaLink="true">https://codingjunkie.net/introduction-to-windowing/</guid>
      </item>
    
      <item>
        <title>Completable Futures - Error Handling.</title>
        <description>&lt;p&gt;Some time ago, over 2 years, I started a 3 part series on the &lt;code class="language-plaintext highlighter-rouge"&gt;CompletableFuture&lt;/code&gt;.  I’m just now getting around to doing part two now.  My long time delay in completing this series was due to working in my book &lt;a href="URL"&gt;Kafka Streams in Action&lt;/a&gt;.  But now that’s done I can get back to doing some blogging again.&lt;/p&gt;

&lt;p&gt;Earlier this year I started a series on a new class introduced in Java 8, the &lt;a href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html"&gt;CompletableFuture&lt;/a&gt; class.  Since the &lt;code class="language-plaintext highlighter-rouge"&gt;CompletableFuture&lt;/code&gt; is such a feature rich class, I decided to break the coverage up into three stages.  The &lt;a href="http://codingjunkie.net/completable-futures-part1/"&gt;first post&lt;/a&gt; covered the creation of &lt;code class="language-plaintext highlighter-rouge"&gt;CompletableFuture&lt;/code&gt; tasks and how to specify followup tasks to execute when the original one completes.  The examples in the first post only dealt with the happy path scenarios, however.  Today we are going to go over dealing with failures and errors including specifying actions to take when an error is encountered.&lt;br /&gt;
&lt;!-- more --&gt;&lt;/p&gt;

&lt;h3 id="why-having-separate-error-handling-methods"&gt;Why Having Separate Error Handling Methods&lt;/h3&gt;
&lt;p&gt;CompletableFutures give you the ability to define functionality that can be executed then you can come back to it later and magically extract the result of your asynchronous task. But there is one drawback, what to do when errors occur? You can use try-catch blocks, but you lose the conciseness of lambda syntax.  Plus you lose flexibility as you have to handle errors the same way, you can’t use a CompleteableFuture with error handling strategy A then five lines later use a different error handling strategy (assuming you are passing the same lambda with different parameters).  What we need is a pluggable solution where different functions can be specified at any point to handle errors that best fit the specific situation.&lt;/p&gt;

&lt;h3 id="error-handling-strategies"&gt;Error Handling Strategies&lt;/h3&gt;
&lt;p&gt;When it comes to handling errors with &lt;code class="language-plaintext highlighter-rouge"&gt;Completeable&lt;/code&gt; futures, there are two approaches.  The first is to run a given function when an &lt;code class="language-plaintext highlighter-rouge"&gt;Exception&lt;/code&gt; occurs.  The second is a &lt;a href="URL"&gt;BiFunction&lt;/a&gt; with the expected result type and a [Throwable] as parameters.  If an error occurred, the &lt;code class="language-plaintext highlighter-rouge"&gt;Throwable&lt;/code&gt; instance is not null and you can take the action at that point.  While not error handling strategies, two methods will force an exception to be thrown when any attempt to call the &lt;code class="language-plaintext highlighter-rouge"&gt;CompletableFuture.get&lt;/code&gt; method is called. The two approaches differ in that the &lt;code class="language-plaintext highlighter-rouge"&gt;CompleteableFuture.excepionally&lt;/code&gt; is used when you don’t want to take any further action with the result, if the future completes normally, then the returned result is good enough.  The only way the provided function executes is in the event of an error. But the case of &lt;code class="language-plaintext highlighter-rouge"&gt;CompletableFuture.handle&lt;/code&gt; method is different.  If the future completes without error, the result is available for extra processing.  Otherwise the &lt;code class="language-plaintext highlighter-rouge"&gt;Throwable&lt;/code&gt; parameter is not null and you can react at that point.  The key point here is &lt;code class="language-plaintext highlighter-rouge"&gt;CompleteableFuture.handle&lt;/code&gt; method is &lt;em&gt;always&lt;/em&gt; executed.&lt;/p&gt;

&lt;h4 id="functions-that-run-on-error"&gt;Functions That Run On Error&lt;/h4&gt;
&lt;p&gt;The first strategy for handling errors is the &lt;code class="language-plaintext highlighter-rouge"&gt;CompletableFuture.exceptionally&lt;/code&gt; method.  The &lt;code class="language-plaintext highlighter-rouge"&gt;exceptionally&lt;/code&gt; method takes a &lt;a href="URL"&gt;Function&lt;/a&gt; that expects to receive an instance of &lt;a href="URL"&gt;Throwable&lt;/a&gt; and returns the same type of the original &lt;code class="language-plaintext highlighter-rouge"&gt;CompletebleFuture&lt;/code&gt;.  If the case of normal completion, the result of the &lt;code class="language-plaintext highlighter-rouge"&gt;CompletableFuture&lt;/code&gt; is returned to the caller.  But if there are any errors then the supplied function is executed, and that result is returned instead.
//Code Here
In other words &lt;code class="language-plaintext highlighter-rouge"&gt;CompleteableFuture.exceptionally&lt;/code&gt; only runs when there is an exception.&lt;/p&gt;

&lt;h4 id="handling-success-or-failure"&gt;Handling Success or Failure&lt;/h4&gt;

&lt;p&gt;The second method we have for error handling is &lt;code class="language-plaintext highlighter-rouge"&gt;CompleteableFuture.handle&lt;/code&gt;.  In contrast to &lt;code class="language-plaintext highlighter-rouge"&gt;exceptionally&lt;/code&gt; the &lt;code class="language-plaintext highlighter-rouge"&gt;handle&lt;/code&gt; method &lt;em&gt;always&lt;/em&gt; executes the function parameter.  We can see the difference in execution by the types the two methods accept.  The &lt;code class="language-plaintext highlighter-rouge"&gt;exceptionally&lt;/code&gt; method requires a &lt;code class="language-plaintext highlighter-rouge"&gt;Function&lt;/code&gt; returning the same type as the &lt;code class="language-plaintext highlighter-rouge"&gt;CompletebleFuture&lt;/code&gt;.  On the other hand, the &lt;code class="language-plaintext highlighter-rouge"&gt;handle&lt;/code&gt; method takes a &lt;code class="language-plaintext highlighter-rouge"&gt;BiFunction&lt;/code&gt; where the first parameter &lt;em&gt;is&lt;/em&gt;
the result of the &lt;code class="language-plaintext highlighter-rouge"&gt;CompleteableFuture&lt;/code&gt; computation and the second parameter is a 
//Code Here
&lt;code class="language-plaintext highlighter-rouge"&gt;Throwable&lt;/code&gt;.  So in our function, if the &lt;code class="language-plaintext highlighter-rouge"&gt;Throwable&lt;/code&gt; is not null we know an error occurred and took the appropriate action.  Otherwise, we return the result of the &lt;code class="language-plaintext highlighter-rouge"&gt;CompleteableFuture&lt;/code&gt;.  Or course we can perform additional operations on a successful result as long as we return the same type.&lt;/p&gt;

&lt;h3 id="choosing-a-strategy"&gt;Choosing a Strategy&lt;/h3&gt;

&lt;p&gt;We have two strategies for asynchronous error handling, so the question is which type to use? While there are no hard rules here’s some quick advice:&lt;/p&gt;

&lt;ul&gt;
  &lt;li&gt;When the result stands alone use &lt;code class="language-plaintext highlighter-rouge"&gt;CompleteableFuture.exceptional&lt;/code&gt;.&lt;/li&gt;
  &lt;li&gt;If the result requires more processing,  use &lt;code class="language-plaintext highlighter-rouge"&gt;CompleteableFuture.handle&lt;/code&gt; instead.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3 id="conclusion"&gt;Conclusion&lt;/h3&gt;

&lt;p&gt;We have reached the end of our coverage on &lt;code class="language-plaintext highlighter-rouge"&gt;CompletableFuture&lt;/code&gt; error handling.  The takeaway(s) here are don’t add error handling inside your &lt;code class="language-plaintext highlighter-rouge"&gt;CompletableFuture&lt;/code&gt;.  Instead, rely on the error handling process provided by the class.  In next post on the &lt;code class="language-plaintext highlighter-rouge"&gt;CompleteableFuture,&lt;/code&gt; we’ll cover canceling and forcing completion.&lt;/p&gt;

&lt;h3 id="resources"&gt;Resources&lt;/h3&gt;
&lt;ul&gt;
  &lt;li&gt;&lt;a href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html"&gt;CompletableFuture&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html"&gt;CompletionStage&lt;/a&gt;&lt;/li&gt;
  &lt;li&gt;&lt;a href="https://github.com/bbejeck/Java-8/blob/master/src/test/java/bbejeck/concurrent/CompletableFutureTest.java"&gt;Source Code&lt;/a&gt; for this post.&lt;/li&gt;
&lt;/ul&gt;
</description>
        <pubDate>Wed, 31 Oct 2018 00:23:23 +0000</pubDate>
        <link>https://codingjunkie.net//completable-futures-part2/</link>
        <link href="https://codingjunkie.net/completable-futures-part2/"/>
        <guid isPermaLink="true">https://codingjunkie.net/completable-futures-part2/</guid>
      </item>
    
  </channel>
</rss>