Skip to content

Commit f4251a1

Browse files
committed
Introduce FixedTimeWindow intermediary class
1 parent b1f9f5b commit f4251a1

File tree

5 files changed

+79
-180
lines changed

5 files changed

+79
-180
lines changed

quixstreams/dataframe/windows/definitions.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@
3333
SlidingWindowSingleAggregation,
3434
)
3535
from .time_based import (
36+
FixedTimeWindowMultiAggregation,
37+
FixedTimeWindowSingleAggregation,
3638
TimeWindow,
37-
TimeWindowMultiAggregation,
38-
TimeWindowSingleAggregation,
3939
)
4040

4141
if TYPE_CHECKING:
@@ -310,10 +310,11 @@ def _create_window(
310310
) -> TimeWindow:
311311
if func_name:
312312
window_type: Union[
313-
type[TimeWindowSingleAggregation], type[TimeWindowMultiAggregation]
314-
] = TimeWindowSingleAggregation
313+
type[FixedTimeWindowSingleAggregation],
314+
type[FixedTimeWindowMultiAggregation],
315+
] = FixedTimeWindowSingleAggregation
315316
else:
316-
window_type = TimeWindowMultiAggregation
317+
window_type = FixedTimeWindowMultiAggregation
317318

318319
return window_type(
319320
duration_ms=self._duration_ms,
@@ -359,10 +360,11 @@ def _create_window(
359360
) -> TimeWindow:
360361
if func_name:
361362
window_type: Union[
362-
type[TimeWindowSingleAggregation], type[TimeWindowMultiAggregation]
363-
] = TimeWindowSingleAggregation
363+
type[FixedTimeWindowSingleAggregation],
364+
type[FixedTimeWindowMultiAggregation],
365+
] = FixedTimeWindowSingleAggregation
364366
else:
365-
window_type = TimeWindowMultiAggregation
367+
window_type = FixedTimeWindowMultiAggregation
366368

367369
return window_type(
368370
duration_ms=self._duration_ms,

quixstreams/dataframe/windows/session.py

Lines changed: 9 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,23 @@
22
import time
33
from typing import TYPE_CHECKING, Any, Iterable, Optional
44

5-
from quixstreams.context import message_context
65
from quixstreams.state import WindowedPartitionTransaction, WindowedState
76

87
from .base import (
98
MultiAggregationWindowMixin,
109
SingleAggregationWindowMixin,
11-
Window,
1210
WindowKeyResult,
1311
WindowOnLateCallback,
1412
)
15-
from .time_based import ClosingStrategy, ClosingStrategyValues
13+
from .time_based import ClosingStrategy, TimeWindow
1614

1715
if TYPE_CHECKING:
1816
from quixstreams.dataframe.dataframe import StreamingDataFrame
1917

2018
logger = logging.getLogger(__name__)
2119

2220

23-
class SessionWindow(Window):
21+
class SessionWindow(TimeWindow):
2422
"""
2523
Session window groups events that occur within a specified timeout period.
2624
@@ -40,77 +38,10 @@ def __init__(
4038
dataframe: "StreamingDataFrame",
4139
on_late: Optional[WindowOnLateCallback] = None,
4240
):
43-
super().__init__(
44-
name=name,
45-
dataframe=dataframe,
46-
)
41+
super().__init__(name=name, dataframe=dataframe, on_late=on_late)
4742

4843
self._timeout_ms = timeout_ms
4944
self._grace_ms = grace_ms
50-
self._on_late = on_late
51-
self._closing_strategy = ClosingStrategy.KEY
52-
53-
def final(
54-
self, closing_strategy: ClosingStrategyValues = "key"
55-
) -> "StreamingDataFrame":
56-
"""
57-
Apply the session window aggregation and return results only when the sessions
58-
are closed.
59-
60-
The format of returned sessions:
61-
```python
62-
{
63-
"start": <session start time in milliseconds>,
64-
"end": <session end time in milliseconds>,
65-
"value: <aggregated session value>,
66-
}
67-
```
68-
69-
The individual session is closed when the event time
70-
(the maximum observed timestamp across the partition) passes
71-
the last event timestamp + timeout + grace period.
72-
The closed sessions cannot receive updates anymore and are considered final.
73-
74-
:param closing_strategy: the strategy to use when closing sessions.
75-
Possible values:
76-
- `"key"` - messages advance time and close sessions with the same key.
77-
If some message keys appear irregularly in the stream, the latest sessions can remain unprocessed until a message with the same key is received.
78-
- `"partition"` - messages advance time and close sessions for the whole partition to which this message key belongs.
79-
If timestamps between keys are not ordered, it may increase the number of discarded late messages.
80-
Default - `"key"`.
81-
"""
82-
self._closing_strategy = ClosingStrategy.new(closing_strategy)
83-
return super().final()
84-
85-
def current(
86-
self, closing_strategy: ClosingStrategyValues = "key"
87-
) -> "StreamingDataFrame":
88-
"""
89-
Apply the session window transformation to the StreamingDataFrame to return results
90-
for each updated session.
91-
92-
The format of returned sessions:
93-
```python
94-
{
95-
"start": <session start time in milliseconds>,
96-
"end": <session end time in milliseconds>,
97-
"value: <aggregated session value>,
98-
}
99-
```
100-
101-
This method processes streaming data and returns results as they come,
102-
regardless of whether the session is closed or not.
103-
104-
:param closing_strategy: the strategy to use when closing sessions.
105-
Possible values:
106-
- `"key"` - messages advance time and close sessions with the same key.
107-
If some message keys appear irregularly in the stream, the latest sessions can remain unprocessed until a message with the same key is received.
108-
- `"partition"` - messages advance time and close sessions for the whole partition to which this message key belongs.
109-
If timestamps between keys are not ordered, it may increase the number of discarded late messages.
110-
Default - `"key"`.
111-
"""
112-
self._closing_strategy = ClosingStrategy.new(closing_strategy)
113-
return super().current()
11445

11546
def process_window(
11647
self,
@@ -140,7 +71,7 @@ def process_window(
14071
# Check if the event is too late
14172
if timestamp_ms < session_expiry_threshold:
14273
late_by_ms = session_expiry_threshold - timestamp_ms
143-
self._on_expired_session(
74+
self._on_expired_window(
14475
value=value,
14576
key=key,
14677
start=timestamp_ms,
@@ -216,17 +147,17 @@ def process_window(
216147

217148
# Expire old sessions
218149
if self._closing_strategy == ClosingStrategy.PARTITION:
219-
expired_windows = self.expire_sessions_by_partition(
150+
expired_windows = self.expire_by_partition(
220151
transaction, session_expiry_threshold, collect
221152
)
222153
else:
223-
expired_windows = self.expire_sessions_by_key(
154+
expired_windows = self.expire_by_key(
224155
key, state, session_expiry_threshold, collect
225156
)
226157

227158
return updated_windows, expired_windows
228159

229-
def expire_sessions_by_partition(
160+
def expire_by_partition(
230161
self,
231162
transaction: WindowedPartitionTransaction,
232163
expiry_threshold: int,
@@ -257,7 +188,7 @@ def expire_sessions_by_partition(
257188
for prefix in seen_prefixes:
258189
state = transaction.as_state(prefix=prefix)
259190
prefix_expired = list(
260-
self.expire_sessions_by_key(prefix, state, expiry_threshold, collect)
191+
self.expire_by_key(prefix, state, expiry_threshold, collect)
261192
)
262193
expired_results.extend(prefix_expired)
263194
count += len(prefix_expired)
@@ -271,7 +202,7 @@ def expire_sessions_by_partition(
271202

272203
return expired_results
273204

274-
def expire_sessions_by_key(
205+
def expire_by_key(
275206
self,
276207
key: Any,
277208
state: WindowedState,
@@ -318,43 +249,6 @@ def expire_sessions_by_key(
318249
round(time.monotonic() - start, 2),
319250
)
320251

321-
def _on_expired_session(
322-
self,
323-
value: Any,
324-
key: Any,
325-
start: int,
326-
end: int,
327-
timestamp_ms: int,
328-
late_by_ms: int,
329-
) -> None:
330-
ctx = message_context()
331-
to_log = True
332-
333-
# Trigger the "on_late" callback if provided
334-
if self._on_late:
335-
to_log = self._on_late(
336-
value,
337-
key,
338-
timestamp_ms,
339-
late_by_ms,
340-
start,
341-
end,
342-
self._name,
343-
ctx.topic,
344-
ctx.partition,
345-
ctx.offset,
346-
)
347-
if to_log:
348-
logger.warning(
349-
"Skipping session processing for the closed session "
350-
f"timestamp_ms={timestamp_ms} "
351-
f"session={(start, end)} "
352-
f"late_by_ms={late_by_ms} "
353-
f"store_name={self._name} "
354-
f"partition={ctx.topic}[{ctx.partition}] "
355-
f"offset={ctx.offset}"
356-
)
357-
358252

359253
class SessionWindowSingleAggregation(SingleAggregationWindowMixin, SessionWindow):
360254
pass

quixstreams/dataframe/windows/sliding.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
SingleAggregationWindowMixin,
88
WindowKeyResult,
99
)
10-
from .time_based import ClosingStrategyValues, TimeWindow
10+
from .time_based import ClosingStrategyValues, FixedTimeWindow
1111

1212
if TYPE_CHECKING:
1313
from quixstreams.dataframe.dataframe import StreamingDataFrame
1414

1515

16-
class SlidingWindow(TimeWindow):
16+
class SlidingWindow(FixedTimeWindow):
1717
def final(
1818
self, closing_strategy: ClosingStrategyValues = "key"
1919
) -> "StreamingDataFrame":

quixstreams/dataframe/windows/time_based.py

Lines changed: 57 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,12 @@ def new(cls, value: str) -> "ClosingStrategy":
4141
class TimeWindow(Window):
4242
def __init__(
4343
self,
44-
duration_ms: int,
45-
grace_ms: int,
4644
name: str,
4745
dataframe: "StreamingDataFrame",
48-
step_ms: Optional[int] = None,
4946
on_late: Optional[WindowOnLateCallback] = None,
5047
):
51-
super().__init__(
52-
name=name,
53-
dataframe=dataframe,
54-
)
55-
56-
self._duration_ms = duration_ms
57-
self._grace_ms = grace_ms
58-
self._step_ms = step_ms
48+
super().__init__(name=name, dataframe=dataframe)
5949
self._on_late = on_late
60-
6150
self._closing_strategy = ClosingStrategy.KEY
6251

6352
def final(
@@ -123,6 +112,60 @@ def current(
123112
self._closing_strategy = ClosingStrategy.new(closing_strategy)
124113
return super().current()
125114

115+
def _on_expired_window(
116+
self,
117+
value: Any,
118+
key: Any,
119+
start: int,
120+
end: int,
121+
timestamp_ms: int,
122+
late_by_ms: int,
123+
) -> None:
124+
ctx = message_context()
125+
to_log = True
126+
# Trigger the "on_late" callback if provided.
127+
# Log the lateness warning if the callback returns True
128+
if self._on_late:
129+
to_log = self._on_late(
130+
value,
131+
key,
132+
timestamp_ms,
133+
late_by_ms,
134+
start,
135+
end,
136+
self._name,
137+
ctx.topic,
138+
ctx.partition,
139+
ctx.offset,
140+
)
141+
if to_log:
142+
logger.warning(
143+
"Skipping window processing for the closed window "
144+
f"timestamp_ms={timestamp_ms} "
145+
f"window={(start, end)} "
146+
f"late_by_ms={late_by_ms} "
147+
f"store_name={self._name} "
148+
f"partition={ctx.topic}[{ctx.partition}] "
149+
f"offset={ctx.offset}"
150+
)
151+
152+
153+
class FixedTimeWindow(TimeWindow):
154+
def __init__(
155+
self,
156+
duration_ms: int,
157+
grace_ms: int,
158+
name: str,
159+
dataframe: "StreamingDataFrame",
160+
step_ms: Optional[int] = None,
161+
on_late: Optional[WindowOnLateCallback] = None,
162+
):
163+
super().__init__(name=name, dataframe=dataframe, on_late=on_late)
164+
165+
self._duration_ms = duration_ms
166+
self._grace_ms = grace_ms
167+
self._step_ms = step_ms
168+
126169
def process_window(
127170
self,
128171
value: Any,
@@ -252,47 +295,10 @@ def expire_by_key(
252295
"Expired %s windows in %ss", count, round(time.monotonic() - start, 2)
253296
)
254297

255-
def _on_expired_window(
256-
self,
257-
value: Any,
258-
key: Any,
259-
start: int,
260-
end: int,
261-
timestamp_ms: int,
262-
late_by_ms: int,
263-
) -> None:
264-
ctx = message_context()
265-
to_log = True
266-
# Trigger the "on_late" callback if provided.
267-
# Log the lateness warning if the callback returns True
268-
if self._on_late:
269-
to_log = self._on_late(
270-
value,
271-
key,
272-
timestamp_ms,
273-
late_by_ms,
274-
start,
275-
end,
276-
self._name,
277-
ctx.topic,
278-
ctx.partition,
279-
ctx.offset,
280-
)
281-
if to_log:
282-
logger.warning(
283-
"Skipping window processing for the closed window "
284-
f"timestamp_ms={timestamp_ms} "
285-
f"window={(start, end)} "
286-
f"late_by_ms={late_by_ms} "
287-
f"store_name={self._name} "
288-
f"partition={ctx.topic}[{ctx.partition}] "
289-
f"offset={ctx.offset}"
290-
)
291-
292298

293-
class TimeWindowSingleAggregation(SingleAggregationWindowMixin, TimeWindow):
299+
class FixedTimeWindowSingleAggregation(SingleAggregationWindowMixin, FixedTimeWindow):
294300
pass
295301

296302

297-
class TimeWindowMultiAggregation(MultiAggregationWindowMixin, TimeWindow):
303+
class FixedTimeWindowMultiAggregation(MultiAggregationWindowMixin, FixedTimeWindow):
298304
pass

0 commit comments

Comments
 (0)