Skip to content

Commit 4b8dc32

Browse files
committed
Correct heartbeat ignore
1 parent 06d48cb commit 4b8dc32

File tree

8 files changed

+78
-45
lines changed

8 files changed

+78
-45
lines changed

quixstreams/core/stream/functions/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from .apply import *
33
from .base import *
44
from .filter import *
5+
from .heartbeat import *
56
from .transform import *
67
from .types import *
78
from .update import *
8-
from .heartbeat import *

quixstreams/core/stream/functions/apply.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from typing import Any, Literal, Union, overload
22

33
from .base import StreamFunction
4-
from .heartbeat import ignore_heartbeat
4+
from .heartbeat import is_heartbeat_message
55
from .types import (
66
ApplyCallback,
77
ApplyExpandedCallback,
@@ -49,6 +49,10 @@ def wrapper(
4949
timestamp: int,
5050
headers: Any,
5151
) -> None:
52+
# Pass heartbeat messages downstream
53+
if is_heartbeat_message(key, value):
54+
child_executor(value, key, timestamp, headers)
55+
5256
# Execute a function on a single value and wrap results into a list
5357
# to expand them downstream
5458
result = func(value)
@@ -63,11 +67,13 @@ def wrapper(
6367
timestamp: int,
6468
headers: Any,
6569
) -> None:
66-
# Execute a function on a single value and return its result
67-
result = func(value)
68-
child_executor(result, key, timestamp, headers)
70+
# Pass heartbeat messages downstream or execute
71+
# a function on a single value and return its result
72+
if not is_heartbeat_message(key, value):
73+
value = func(value)
74+
child_executor(value, key, timestamp, headers)
6975

70-
return ignore_heartbeat(wrapper)
76+
return wrapper
7177

7278

7379
class ApplyWithMetadataFunction(StreamFunction):
@@ -111,6 +117,10 @@ def wrapper(
111117
timestamp: int,
112118
headers: Any,
113119
):
120+
# Pass heartbeat messages downstream
121+
if is_heartbeat_message(key, value):
122+
child_executor(value, key, timestamp, headers)
123+
114124
# Execute a function on a single value and wrap results into a list
115125
# to expand them downstream
116126
result = func(value, key, timestamp, headers)
@@ -125,8 +135,10 @@ def wrapper(
125135
timestamp: int,
126136
headers: Any,
127137
):
128-
# Execute a function on a single value and return its result
129-
result = func(value, key, timestamp, headers)
130-
child_executor(result, key, timestamp, headers)
138+
# Pass heartbeat messages downstream or execute
139+
# a function on a single value and return its result
140+
if not is_heartbeat_message(key, value):
141+
value = func(value, key, timestamp, headers)
142+
child_executor(value, key, timestamp, headers)
131143

132-
return ignore_heartbeat(wrapper)
144+
return wrapper

quixstreams/core/stream/functions/filter.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import Any
22

3+
from quixstreams.core.stream.functions.heartbeat import is_heartbeat_message
4+
35
from .base import StreamFunction
46
from .types import FilterCallback, FilterWithMetadataCallback, VoidExecutor
57

@@ -30,7 +32,7 @@ def wrapper(
3032
headers: Any,
3133
):
3234
# Filter a single value
33-
if func(value):
35+
if is_heartbeat_message(key, value) or func(value):
3436
child_executor(value, key, timestamp, headers)
3537

3638
return wrapper
@@ -62,7 +64,7 @@ def wrapper(
6264
headers: Any,
6365
):
6466
# Filter a single value
65-
if func(value, key, timestamp, headers):
67+
if is_heartbeat_message(key, value) or func(value, key, timestamp, headers):
6668
child_executor(value, key, timestamp, headers)
6769

6870
return wrapper
Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
1-
from functools import wraps
21
from typing import Any
32

43
from quixstreams.context import message_context
54

65
from .base import StreamFunction
7-
from .types import VoidExecutor
6+
from .types import HeartbeatCallback, VoidExecutor
87

9-
__all__ = ("HeartbeatFunction", "is_heartbeat_message", "ignore_heartbeat")
8+
__all__ = ("HeartbeatFunction", "is_heartbeat_message")
109

1110

1211
class HeartbeatFunction(StreamFunction):
12+
def __init__(self, func: HeartbeatCallback) -> None:
13+
super().__init__(func)
14+
self.func: HeartbeatCallback
15+
1316
def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor:
1417
child_executor = self._resolve_branching(*child_executors)
1518

@@ -21,32 +24,13 @@ def wrapper(
2124
timestamp: int,
2225
headers: Any,
2326
):
24-
if is_heartbeat_message(key, value) and (result := func(timestamp)):
25-
for new_value, new_key, new_timestamp, new_headers in result:
26-
child_executor(new_value, new_key, new_timestamp, new_headers)
27-
27+
if is_heartbeat_message(key, value):
28+
# TODO: Heartbeats may return values (like expired windows)
29+
func(timestamp)
2830
child_executor(value, key, timestamp, headers)
2931

3032
return wrapper
3133

3234

3335
def is_heartbeat_message(key: Any, value: Any) -> bool:
3436
return message_context().heartbeat and key is None and value is None
35-
36-
37-
def ignore_heartbeat(func):
38-
"""
39-
Decorator that wraps a function to return early if the message is a heartbeat.
40-
41-
The decorated function should expect (value, key, timestamp, headers) parameters.
42-
If is_heartbeat_message(key, value) returns True, the function returns early
43-
without executing the wrapped function.
44-
"""
45-
46-
@wraps(func)
47-
def wrapper(value: Any, key: Any, timestamp: int, headers: Any):
48-
if is_heartbeat_message(key, value):
49-
return
50-
return func(value, key, timestamp, headers)
51-
52-
return wrapper

quixstreams/core/stream/functions/transform.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import Any, Literal, Union, cast, overload
22

3+
from quixstreams.core.stream.functions.heartbeat import is_heartbeat_message
4+
35
from .base import StreamFunction
46
from .types import TransformCallback, TransformExpandedCallback, VoidExecutor
57

@@ -53,6 +55,10 @@ def wrapper(
5355
timestamp: int,
5456
headers: Any,
5557
):
58+
# Pass heartbeat messages downstream
59+
if is_heartbeat_message(key, value):
60+
child_executor(value, key, timestamp, headers)
61+
5662
result = expanded_func(value, key, timestamp, headers)
5763
for new_value, new_key, new_timestamp, new_headers in result:
5864
child_executor(new_value, new_key, new_timestamp, new_headers)
@@ -66,10 +72,12 @@ def wrapper(
6672
timestamp: int,
6773
headers: Any,
6874
):
69-
# Execute a function on a single value and return its result
70-
new_value, new_key, new_timestamp, new_headers = func(
71-
value, key, timestamp, headers
72-
)
73-
child_executor(new_value, new_key, new_timestamp, new_headers)
75+
# Pass heartbeat messages downstream or execute
76+
# a function on a single value and return its result
77+
if is_heartbeat_message(key, value):
78+
value, key, timestamp, headers = func(
79+
value, key, timestamp, headers
80+
)
81+
child_executor(value, key, timestamp, headers)
7482

7583
return wrapper

quixstreams/core/stream/functions/types.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,15 @@ def __bool__(self) -> bool: ...
3636
[Any, Any, int, Any], Iterable[Tuple[Any, Any, int, Any]]
3737
]
3838

39+
HeartbeatCallback = Callable[
40+
[int], # timestamp
41+
Union[
42+
None,
43+
Tuple[Any, Any, int, Any], # single value
44+
Iterable[Tuple[Any, Any, int, Any]], # expanded values
45+
],
46+
]
47+
3948
StreamCallback = Union[
4049
ApplyCallback,
4150
ApplyExpandedCallback,

quixstreams/core/stream/functions/update.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import Any
22

3+
from quixstreams.core.stream.functions.heartbeat import is_heartbeat_message
4+
35
from .base import StreamFunction
46
from .types import UpdateCallback, UpdateWithMetadataCallback, VoidExecutor
57

@@ -28,7 +30,8 @@ def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor:
2830

2931
def wrapper(value: Any, key: Any, timestamp: int, headers: Any):
3032
# Update a single value and forward it
31-
func(value)
33+
if not is_heartbeat_message(key, value):
34+
func(value)
3235
child_executor(value, key, timestamp, headers)
3336

3437
return wrapper
@@ -56,7 +59,8 @@ def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor:
5659

5760
def wrapper(value: Any, key: Any, timestamp: int, headers: Any):
5861
# Update a single value and forward it
59-
func(value, key, timestamp, headers)
62+
if not is_heartbeat_message(key, value):
63+
func(value, key, timestamp, headers)
6064
child_executor(value, key, timestamp, headers)
6165

6266
return wrapper

quixstreams/dataframe/windows/base.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from quixstreams.context import message_context
2020
from quixstreams.core.stream import TransformExpandedCallback
2121
from quixstreams.core.stream.exceptions import InvalidOperation
22+
from quixstreams.core.stream.functions.heartbeat import is_heartbeat_message
2223
from quixstreams.models.topics.manager import TopicManager
2324
from quixstreams.state import WindowedPartitionTransaction
2425

@@ -83,6 +84,7 @@ def _apply_window(
8384
self,
8485
func: TransformRecordCallbackExpandedWindowed,
8586
name: str,
87+
heartbeat_func,
8688
) -> "StreamingDataFrame":
8789
self.register_store()
8890

@@ -91,6 +93,7 @@ def _apply_window(
9193
stream_id=self._dataframe.stream_id,
9294
processing_context=self._dataframe.processing_context,
9395
store_name=name,
96+
heartbeat_func=heartbeat_func,
9497
)
9598
# Manually modify the Stream and clone the source StreamingDataFrame
9699
# to avoid adding "transform" API to it.
@@ -140,9 +143,13 @@ def window_callback(
140143
for key, window in expired_windows:
141144
yield (window, key, window["start"], None)
142145

146+
def heartbeat_callback(timestamp: int) -> Iterable[Message]:
147+
return []
148+
143149
return self._apply_window(
144150
func=window_callback,
145151
name=self._name,
152+
heartbeat_func=heartbeat_callback,
146153
)
147154

148155
def current(self) -> "StreamingDataFrame":
@@ -188,7 +195,14 @@ def window_callback(
188195
for key, window in updated_windows:
189196
yield (window, key, window["start"], None)
190197

191-
return self._apply_window(func=window_callback, name=self._name)
198+
def heartbeat_callback(timestamp: int) -> Iterable[Message]:
199+
return []
200+
201+
return self._apply_window(
202+
func=window_callback,
203+
name=self._name,
204+
heartbeat_func=heartbeat_callback,
205+
)
192206

193207
# Implemented by SingleAggregationWindowMixin and MultiAggregationWindowMixin
194208
# Single aggregation and multi aggregation windows store aggregations and collections

0 commit comments

Comments
 (0)