Skip to content

PoC: Basic heartbeat support (rejected, superseded by #989) #956

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import warnings
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Callable, List, Literal, Optional, Protocol, Tuple, Type, Union

Expand All @@ -28,6 +29,8 @@
from .logging import LogLevel, configure_logging
from .models import (
DeserializerType,
MessageContext,
Row,
SerializerType,
TimestampExtractor,
Topic,
Expand Down Expand Up @@ -151,6 +154,7 @@ def __init__(
topic_create_timeout: float = 60,
processing_guarantee: ProcessingGuarantee = "at-least-once",
max_partition_buffer_size: int = 10000,
heartbeat_interval: float = 0.0,
):
"""
:param broker_address: Connection settings for Kafka.
Expand Down Expand Up @@ -220,6 +224,11 @@ def __init__(
It is a soft limit, and the actual number of buffered messages can be up to x2 higher.
Lower value decreases the memory use, but increases the latency.
Default - `10000`.
:param heartbeat_interval: the interval (seconds) at which to send heartbeat messages.
The heartbeat timing starts counting from application start.
The heartbeat is sent for every partition on every topic.
If the value is 0, no heartbeat messages will be sent.
Default - `0.0`.

<br><br>***Error Handlers***<br>
To handle errors, `Application` accepts callbacks triggered when
Expand Down Expand Up @@ -363,6 +372,10 @@ def __init__(
recovery_manager=recovery_manager,
)

self._heartbeat_active = heartbeat_interval > 0
self._heartbeat_interval = heartbeat_interval
self._heartbeat_last_sent = datetime.now().timestamp()

self._source_manager = SourceManager()
self._sink_manager = SinkManager()
self._dataframe_registry = DataFrameRegistry()
Expand Down Expand Up @@ -879,6 +892,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
processing_context = self._processing_context
source_manager = self._source_manager
process_message = self._process_message
process_heartbeat = self._process_heartbeat
printer = self._processing_context.printer
run_tracker = self._run_tracker
consumer = self._consumer
Expand All @@ -902,6 +916,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
run_tracker.timeout_refresh()
else:
process_message(dataframes_composed)
process_heartbeat(dataframes_composed)
processing_context.commit_checkpoint()
consumer.resume_backpressured()
source_manager.raise_for_error()
Expand Down Expand Up @@ -985,6 +1000,49 @@ def _process_message(self, dataframe_composed):
if self._on_message_processed is not None:
self._on_message_processed(topic_name, partition, offset)

def _process_heartbeat(self, dataframe_composed):
if not self._heartbeat_active:
return

now = datetime.now().timestamp()
if self._heartbeat_last_sent > now - self._heartbeat_interval:
return

value, key, timestamp, headers = None, None, int(now * 1000), {}
non_changelog_topics = self._topic_manager.non_changelog_topics

for tp in self._consumer.assignment():
if (topic := tp.topic) in non_changelog_topics:
row = Row(
value=value,
key=key,
timestamp=timestamp,
context=MessageContext(
topic=topic,
partition=tp.partition,
offset=-1,
size=-1,
heartbeat=True,
),
headers=headers,
)
context = copy_context()
context.run(set_message_context, row.context)
try:
context.run(
dataframe_composed[topic],
value,
key,
timestamp,
headers,
)
except Exception as exc:
to_suppress = self._on_processing_error(exc, row, logger)
if not to_suppress:
raise

self._heartbeat_last_sent = now

def _on_assign(self, _, topic_partitions: List[TopicPartition]):
"""
Assign new topic partitions to consumer and state.
Expand Down
1 change: 1 addition & 0 deletions quixstreams/core/stream/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .apply import *
from .base import *
from .filter import *
from .heartbeat import *
from .transform import *
from .types import *
from .update import *
25 changes: 19 additions & 6 deletions quixstreams/core/stream/functions/apply.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, Literal, Union, overload

from .base import StreamFunction
from .heartbeat import is_heartbeat_message
from .types import (
ApplyCallback,
ApplyExpandedCallback,
Expand Down Expand Up @@ -48,6 +49,10 @@ def wrapper(
timestamp: int,
headers: Any,
) -> None:
# Pass heartbeat messages downstream
if is_heartbeat_message(key, value):
child_executor(value, key, timestamp, headers)

# Execute a function on a single value and wrap results into a list
# to expand them downstream
result = func(value)
Expand All @@ -62,9 +67,11 @@ def wrapper(
timestamp: int,
headers: Any,
) -> None:
# Execute a function on a single value and return its result
result = func(value)
child_executor(result, key, timestamp, headers)
# Pass heartbeat messages downstream or execute
# a function on a single value and return its result
if not is_heartbeat_message(key, value):
value = func(value)
child_executor(value, key, timestamp, headers)

return wrapper

Expand Down Expand Up @@ -110,6 +117,10 @@ def wrapper(
timestamp: int,
headers: Any,
):
# Pass heartbeat messages downstream
if is_heartbeat_message(key, value):
child_executor(value, key, timestamp, headers)

# Execute a function on a single value and wrap results into a list
# to expand them downstream
result = func(value, key, timestamp, headers)
Expand All @@ -124,8 +135,10 @@ def wrapper(
timestamp: int,
headers: Any,
):
# Execute a function on a single value and return its result
result = func(value, key, timestamp, headers)
child_executor(result, key, timestamp, headers)
# Pass heartbeat messages downstream or execute
# a function on a single value and return its result
if not is_heartbeat_message(key, value):
value = func(value, key, timestamp, headers)
child_executor(value, key, timestamp, headers)

return wrapper
6 changes: 4 additions & 2 deletions quixstreams/core/stream/functions/filter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Any

from quixstreams.core.stream.functions.heartbeat import is_heartbeat_message

from .base import StreamFunction
from .types import FilterCallback, FilterWithMetadataCallback, VoidExecutor

Expand Down Expand Up @@ -30,7 +32,7 @@ def wrapper(
headers: Any,
):
# Filter a single value
if func(value):
if is_heartbeat_message(key, value) or func(value):
child_executor(value, key, timestamp, headers)

return wrapper
Expand Down Expand Up @@ -62,7 +64,7 @@ def wrapper(
headers: Any,
):
# Filter a single value
if func(value, key, timestamp, headers):
if is_heartbeat_message(key, value) or func(value, key, timestamp, headers):
child_executor(value, key, timestamp, headers)

return wrapper
36 changes: 36 additions & 0 deletions quixstreams/core/stream/functions/heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Any

from quixstreams.context import message_context

from .base import StreamFunction
from .types import HeartbeatCallback, VoidExecutor

__all__ = ("HeartbeatFunction", "is_heartbeat_message")


class HeartbeatFunction(StreamFunction):
def __init__(self, func: HeartbeatCallback) -> None:
super().__init__(func)
self.func: HeartbeatCallback

def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor:
child_executor = self._resolve_branching(*child_executors)

func = self.func

def wrapper(
value: Any,
key: Any,
timestamp: int,
headers: Any,
):
if is_heartbeat_message(key, value) and (result := func(timestamp)):
for new_value, new_key, new_timestamp, new_headers in result:
child_executor(new_value, new_key, new_timestamp, new_headers)
child_executor(value, key, timestamp, headers)

return wrapper


def is_heartbeat_message(key: Any, value: Any) -> bool:
return message_context().heartbeat and key is None and value is None
25 changes: 17 additions & 8 deletions quixstreams/core/stream/functions/transform.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Any, Literal, Union, cast, overload

from quixstreams.core.stream.functions.heartbeat import is_heartbeat_message

from .base import StreamFunction
from .types import TransformCallback, TransformExpandedCallback, VoidExecutor

Expand Down Expand Up @@ -53,9 +55,13 @@ def wrapper(
timestamp: int,
headers: Any,
):
result = expanded_func(value, key, timestamp, headers)
for new_value, new_key, new_timestamp, new_headers in result:
child_executor(new_value, new_key, new_timestamp, new_headers)
# Pass heartbeat messages downstream
if is_heartbeat_message(key, value):
child_executor(value, key, timestamp, headers)
else:
result = expanded_func(value, key, timestamp, headers)
for new_value, new_key, new_timestamp, new_headers in result:
child_executor(new_value, new_key, new_timestamp, new_headers)

else:
func = cast(TransformCallback, self.func)
Expand All @@ -66,10 +72,13 @@ def wrapper(
timestamp: int,
headers: Any,
):
# Execute a function on a single value and return its result
new_value, new_key, new_timestamp, new_headers = func(
value, key, timestamp, headers
)
child_executor(new_value, new_key, new_timestamp, new_headers)
# Pass heartbeat messages downstream or execute
# a function on a single value and return its result
if not is_heartbeat_message(key, value):
value, key, timestamp, headers = func(
value, key, timestamp, headers
)

child_executor(value, key, timestamp, headers)

return wrapper
4 changes: 3 additions & 1 deletion quixstreams/core/stream/functions/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Callable, Iterable, Protocol, Tuple, Union
from typing import Any, Callable, Iterable, Optional, Protocol, Tuple, Union

__all__ = (
"StreamCallback",
Expand Down Expand Up @@ -36,6 +36,8 @@ def __bool__(self) -> bool: ...
[Any, Any, int, Any], Iterable[Tuple[Any, Any, int, Any]]
]

HeartbeatCallback = Callable[[int], Optional[Iterable[Tuple[Any, Any, int, Any]]]]

StreamCallback = Union[
ApplyCallback,
ApplyExpandedCallback,
Expand Down
8 changes: 6 additions & 2 deletions quixstreams/core/stream/functions/update.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Any

from quixstreams.core.stream.functions.heartbeat import is_heartbeat_message

from .base import StreamFunction
from .types import UpdateCallback, UpdateWithMetadataCallback, VoidExecutor

Expand Down Expand Up @@ -28,7 +30,8 @@ def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor:

def wrapper(value: Any, key: Any, timestamp: int, headers: Any):
# Update a single value and forward it
func(value)
if not is_heartbeat_message(key, value):
func(value)
child_executor(value, key, timestamp, headers)

return wrapper
Expand Down Expand Up @@ -56,7 +59,8 @@ def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor:

def wrapper(value: Any, key: Any, timestamp: int, headers: Any):
# Update a single value and forward it
func(value, key, timestamp, headers)
if not is_heartbeat_message(key, value):
func(value, key, timestamp, headers)
child_executor(value, key, timestamp, headers)

return wrapper
5 changes: 5 additions & 0 deletions quixstreams/core/stream/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
FilterFunction,
FilterWithMetadataCallback,
FilterWithMetadataFunction,
HeartbeatFunction,
ReturningExecutor,
StreamFunction,
TransformCallback,
Expand Down Expand Up @@ -280,6 +281,10 @@ def add_transform(
"""
return self._add(TransformFunction(func, expand=expand)) # type: ignore[call-overload]

def add_heartbeat(self, func) -> "Stream":
heartbeat_func = HeartbeatFunction(func)
return self._add(heartbeat_func)

def merge(self, other: "Stream") -> "Stream":
"""
Merge two Streams together and return a new Stream with two parents
Expand Down
4 changes: 4 additions & 0 deletions quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ def stream_id(self) -> str:
def topics(self) -> tuple[Topic, ...]:
return self._topics

def heartbeat(self, func) -> "StreamingDataFrame":
stream = self.stream.add_heartbeat(func)
return self.__dataframe_clone__(stream=stream)

@overload
def apply(
self,
Expand Down
Loading
Loading