@@ -730,73 +730,6 @@ Also, specifying a grace period using `grace_ms` will increase the latency, beca
730730
731731You can use ` final() ` mode when some latency is allowed, but the emitted results must be complete and unique.
732732
733- ## Closing strategies
734-
735- By default, windows use the ** key** closing strategy.
736- In this strategy, messages advance time and close only windows with the ** same** message key.
737-
738- If some message keys appear irregularly in the stream, the latest windows can remain unprocessed until the message with the same key is received.
739-
740- ``` python
741- from datetime import timedelta
742- from quixstreams import Application
743- from quixstreams.dataframe.windows import Sum
744-
745- app = Application(... )
746- sdf = app.dataframe(... )
747-
748- # Calculate a sum of values over a window of 10 seconds
749- # and use .final() to emit results only when the window is complete
750- sdf = sdf.tumbling_window(timedelta(seconds = 10 )).agg(value = Sum()).final(closing_strategy = " key" )
751-
752- # Details:
753- # -> Timestamp=100, Key="A", value=1 -> emit nothing (the window is not closed yet)
754- # -> Timestamp=101, Key="B", value=2 -> emit nothing (the window is not closed yet)
755- # -> Timestamp=105, Key="C", value=3 -> emit nothing (the window is not closed yet)
756- # -> Timestamp=10100, Key="B", value=2 -> emit one message with key "B" and value {"start": 0, "end": 10000, "value": 2}, the time has progressed beyond the window end for the "B" key only.
757- # -> Timestamp=8000, Key="A", value=1 -> emit nothing (the window is not closed yet)
758- # -> Timestamp=10001, Key="A", value=1 -> emit one message with key "A" and value {"start": 0, "end": 10000, "value": 2}, the time has progressed beyond the window end for the "A" key.
759-
760- # Results:
761- # (key="B", value={"start": 0, "end": 10000, "value": 2})
762- # (key="A", value={"start": 0, "end": 10000, "value": 2})
763- # No message for key "C" as the window is never closed since no messages with key "C" and a timestamp later than 10000 was received
764- ```
765-
766- An alternative is to use the ** partition** closing strategy.
767- In this strategy, messages advance time and close windows for the whole partition to which this key belongs.
768-
769- If messages aren't ordered accross keys some message can be skipped if the windows are already closed.
770-
771- ``` python
772- from datetime import timedelta
773- from quixstreams import Application
774- from quixstreams.dataframe.windows import Sum
775-
776- app = Application(... )
777- sdf = app.dataframe(... )
778-
779- # Calculate a sum of values over a window of 10 seconds
780- # and use .final() to emit results only when the window is complete
781- sdf = sdf.tumbling_window(timedelta(seconds = 10 )).agg(value = Sum()).final(closing_strategy = " partition" )
782-
783- # Details:
784- # -> Timestamp=100, Key="A", value=1 -> emit nothing (the window is not closed yet)
785- # -> Timestamp=101, Key="B", value=2 -> emit nothing (the window is not closed yet)
786- # -> Timestamp=105, Key="C", value=3 -> emit nothing (the window is not closed yet)
787- # -> Timestamp=10100, Key="B", value=1 -> emit three messages, the time has progressed beyond the window end for all the keys in the partition
788- # 1. first one with key "A" and value {"start": 0, "end": 10000, "value": 1}
789- # 2. second one with key "B" and value {"start": 0, "end": 10000, "value": 2}
790- # 3. third one with key "C" and value {"start": 0, "end": 10000, "value": 3}
791- # -> Timestamp=8000, Key="A", value=1 -> emit nothing and value isn't part of the sum (the window is already closed)
792- # -> Timestamp=10001, Key="A", value=1 -> emit nothing (the window is not closed yet)
793-
794- # Results:
795- # (key="A", value={"start": 0, "end": 10000, "value": 1})
796- # (key="B", value={"start": 0, "end": 10000, "value": 2})
797- # (key="C", value={"start": 0, "end": 10000, "value": 3})
798- ```
799-
800733## Transforming the result of a windowed aggregation
801734Windowed aggregations return aggregated results in the following format/schema:
802735
0 commit comments