From e1cf56e9962be9c4f005c4c1a42160862d2091d6 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Wed, 30 Jul 2025 15:55:49 +0200 Subject: [PATCH 1/5] Heartbeat basics --- quixstreams/app.py | 54 +++++++++++ .../core/stream/functions/transform.py | 89 +++++++++++++++++-- quixstreams/core/stream/functions/types.py | 6 ++ quixstreams/core/stream/stream.py | 50 ++++++++++- quixstreams/dataframe/dataframe.py | 4 + quixstreams/dataframe/registry.py | 29 +++++- 6 files changed, 219 insertions(+), 13 deletions(-) diff --git a/quixstreams/app.py b/quixstreams/app.py index 30d71ab4e..1a17f7176 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -7,6 +7,7 @@ import uuid import warnings from collections import defaultdict +from datetime import datetime from pathlib import Path from typing import Callable, List, Literal, Optional, Protocol, Tuple, Type, Union, cast @@ -30,6 +31,8 @@ from .logging import LogLevel, configure_logging from .models import ( DeserializerType, + MessageContext, + Row, SerializerType, TimestampExtractor, Topic, @@ -152,6 +155,7 @@ def __init__( topic_create_timeout: float = 60, processing_guarantee: ProcessingGuarantee = "at-least-once", max_partition_buffer_size: int = 10000, + heartbeat_interval: float = 0.0, ): """ :param broker_address: Connection settings for Kafka. @@ -220,6 +224,12 @@ def __init__( It is a soft limit, and the actual number of buffered messages can be up to x2 higher. Lower value decreases the memory use, but increases the latency. Default - `10000`. + :param heartbeat_interval: the interval (seconds) at which to send heartbeat messages. + The heartbeat timing starts counting from application start. + TODO: Save and respect last heartbeat timestamp. + The heartbeat is sent for every partition of every topic with registered heartbeat streams. + If the value is 0, no heartbeat messages will be sent. + Default - `0.0`.

***Error Handlers***
To handle errors, `Application` accepts callbacks triggered when @@ -371,6 +381,10 @@ def __init__( recovery_manager=recovery_manager, ) + self._heartbeat_active = heartbeat_interval > 0 + self._heartbeat_interval = heartbeat_interval + self._heartbeat_last_sent = datetime.now().timestamp() + self._source_manager = SourceManager() self._sink_manager = SinkManager() self._dataframe_registry = DataFrameRegistry() @@ -900,6 +914,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None): processing_context = self._processing_context source_manager = self._source_manager process_message = self._process_message + process_heartbeat = self._process_heartbeat printer = self._processing_context.printer run_tracker = self._run_tracker consumer = self._consumer @@ -912,6 +927,9 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None): ) dataframes_composed = self._dataframe_registry.compose_all(sink=sink) + heartbeats_composed = self._dataframe_registry.compose_heartbeats() + if not heartbeats_composed: + self._heartbeat_active = False processing_context.init_checkpoint() run_tracker.set_as_running() @@ -923,6 +941,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None): run_tracker.timeout_refresh() else: process_message(dataframes_composed) + process_heartbeat(heartbeats_composed) processing_context.commit_checkpoint() consumer.resume_backpressured() source_manager.raise_for_error() @@ -1006,6 +1025,41 @@ def _process_message(self, dataframe_composed): if self._on_message_processed is not None: self._on_message_processed(topic_name, partition, offset) + def _process_heartbeat(self, heartbeats_composed): + if not self._heartbeat_active: + return + + now = datetime.now().timestamp() + if self._heartbeat_last_sent > now - self._heartbeat_interval: + return + + value, key, timestamp, headers = None, None, int(now * 1000), {} + + for tp in self._consumer.assignment(): + if executor := heartbeats_composed.get(tp.topic): + row = Row( + value=value, + key=key, + timestamp=timestamp, + context=MessageContext( + topic=tp.topic, + partition=tp.partition, + offset=-1, # TODO: get correct offsets + size=-1, + ), + headers=headers, + ) + context = copy_context() + context.run(set_message_context, row.context) + try: + context.run(executor, value, key, timestamp, headers) + except Exception as exc: + to_suppress = self._on_processing_error(exc, row, logger) + if not to_suppress: + raise + + self._heartbeat_last_sent = now + def _on_assign(self, _, topic_partitions: List[TopicPartition]): """ Assign new topic partitions to consumer and state. diff --git a/quixstreams/core/stream/functions/transform.py b/quixstreams/core/stream/functions/transform.py index 219662b6b..a8744e219 100644 --- a/quixstreams/core/stream/functions/transform.py +++ b/quixstreams/core/stream/functions/transform.py @@ -1,7 +1,13 @@ from typing import Any, Literal, Union, cast, overload from .base import StreamFunction -from .types import TransformCallback, TransformExpandedCallback, VoidExecutor +from .types import ( + TransformCallback, + TransformExpandedCallback, + TransformHeartbeatCallback, + TransformHeartbeatExpandedCallback, + VoidExecutor, +) __all__ = ("TransformFunction",) @@ -23,28 +29,81 @@ class TransformFunction(StreamFunction): @overload def __init__( - self, func: TransformCallback, expand: Literal[False] = False + self, + func: TransformCallback, + expand: Literal[False] = False, + heartbeat: Literal[False] = False, + ) -> None: ... + + @overload + def __init__( + self, + func: TransformExpandedCallback, + expand: Literal[True], + heartbeat: Literal[False] = False, + ) -> None: ... + + @overload + def __init__( + self, + func: TransformHeartbeatCallback, + expand: Literal[False] = False, + heartbeat: Literal[True] = True, ) -> None: ... @overload def __init__( - self, func: TransformExpandedCallback, expand: Literal[True] + self, + func: TransformHeartbeatExpandedCallback, + expand: Literal[True], + heartbeat: Literal[True], ) -> None: ... def __init__( self, - func: Union[TransformCallback, TransformExpandedCallback], + func: Union[ + TransformCallback, + TransformExpandedCallback, + TransformHeartbeatCallback, + TransformHeartbeatExpandedCallback, + ], expand: bool = False, + heartbeat: bool = False, ): super().__init__(func) - self.func: Union[TransformCallback, TransformExpandedCallback] + self.func: Union[ + TransformCallback, + TransformExpandedCallback, + TransformHeartbeatCallback, + TransformHeartbeatExpandedCallback, + ] self.expand = expand + self.heartbeat = heartbeat def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor: child_executor = self._resolve_branching(*child_executors) - if self.expand: + if self.expand and self.heartbeat: + heartbeat_expanded_func = cast( + TransformHeartbeatExpandedCallback, self.func + ) + + def wrapper( + value: Any, + key: Any, + timestamp: int, + headers: Any, + ): + for ( + new_value, + new_key, + new_timestamp, + new_headers, + ) in heartbeat_expanded_func(timestamp): + child_executor(new_value, new_key, new_timestamp, new_headers) + + elif self.expand: expanded_func = cast(TransformExpandedCallback, self.func) def wrapper( @@ -57,8 +116,22 @@ def wrapper( for new_value, new_key, new_timestamp, new_headers in result: child_executor(new_value, new_key, new_timestamp, new_headers) + elif self.heartbeat: + heartbeat_func = cast(TransformHeartbeatCallback, self.func) + + def wrapper( + value: Any, + key: Any, + timestamp: int, + headers: Any, + ): + new_value, new_key, new_timestamp, new_headers = heartbeat_func( + timestamp + ) + child_executor(new_value, new_key, new_timestamp, new_headers) + else: - func = cast(TransformCallback, self.func) + regular_func = cast(TransformCallback, self.func) def wrapper( value: Any, @@ -67,7 +140,7 @@ def wrapper( headers: Any, ): # Execute a function on a single value and return its result - new_value, new_key, new_timestamp, new_headers = func( + new_value, new_key, new_timestamp, new_headers = regular_func( value, key, timestamp, headers ) child_executor(new_value, new_key, new_timestamp, new_headers) diff --git a/quixstreams/core/stream/functions/types.py b/quixstreams/core/stream/functions/types.py index 504299b53..7b7516d61 100644 --- a/quixstreams/core/stream/functions/types.py +++ b/quixstreams/core/stream/functions/types.py @@ -14,6 +14,8 @@ "FilterWithMetadataCallback", "TransformCallback", "TransformExpandedCallback", + "TransformHeartbeatCallback", + "TransformHeartbeatExpandedCallback", ) @@ -35,6 +37,10 @@ def __bool__(self) -> bool: ... TransformExpandedCallback = Callable[ [Any, Any, int, Any], Iterable[Tuple[Any, Any, int, Any]] ] +TransformHeartbeatCallback = Callable[[int], Tuple[Any, Any, int, Any]] +TransformHeartbeatExpandedCallback = Callable[ + [int], Iterable[Tuple[Any, Any, int, Any]] +] StreamCallback = Union[ ApplyCallback, diff --git a/quixstreams/core/stream/stream.py b/quixstreams/core/stream/stream.py index f538f5307..07e291c41 100644 --- a/quixstreams/core/stream/stream.py +++ b/quixstreams/core/stream/stream.py @@ -30,6 +30,8 @@ TransformCallback, TransformExpandedCallback, TransformFunction, + TransformHeartbeatCallback, + TransformHeartbeatExpandedCallback, UpdateCallback, UpdateFunction, UpdateWithMetadataCallback, @@ -249,18 +251,56 @@ def add_update( return self._add(update_func) @overload - def add_transform(self, func: TransformCallback, *, expand: Literal[False] = False): + def add_transform( + self, + func: TransformCallback, + *, + expand: Literal[False] = False, + heartbeat: Literal[False] = False, + ): pass @overload - def add_transform(self, func: TransformExpandedCallback, *, expand: Literal[True]): + def add_transform( + self, + func: TransformExpandedCallback, + *, + expand: Literal[True], + heartbeat: Literal[False] = False, + ): pass + @overload def add_transform( self, - func: Union[TransformCallback, TransformExpandedCallback], + func: TransformHeartbeatCallback, + *, + expand: Literal[False] = False, + heartbeat: Literal[True], + ): + pass + + @overload + def add_transform( + self, + func: TransformHeartbeatExpandedCallback, + *, + expand: Literal[True], + heartbeat: Literal[True], + ): + pass + + def add_transform( + self, + func: Union[ + TransformCallback, + TransformExpandedCallback, + TransformHeartbeatCallback, + TransformHeartbeatExpandedCallback, + ], *, expand: bool = False, + heartbeat: bool = False, ) -> "Stream": """ Add a "transform" function to the Stream, that will mutate the input value. @@ -276,9 +316,11 @@ def add_transform( :param expand: if True, expand the returned iterable into individual items downstream. If returned value is not iterable, `TypeError` will be raised. Default - `False`. + :param heartbeat: if True, the callback is expected to accept timestamp only. + Default - `False`. :return: a new Stream derived from the current one """ - return self._add(TransformFunction(func, expand=expand)) # type: ignore[call-overload] + return self._add(TransformFunction(func, expand=expand, heartbeat=heartbeat)) # type: ignore[call-overload] def merge(self, other: "Stream") -> "Stream": """ diff --git a/quixstreams/dataframe/dataframe.py b/quixstreams/dataframe/dataframe.py index 53e90c767..8c5d46828 100644 --- a/quixstreams/dataframe/dataframe.py +++ b/quixstreams/dataframe/dataframe.py @@ -1696,6 +1696,10 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame": stream=merged_stream, stream_id=merged_stream_id ) + def concat_heartbeat(self, stream: Stream) -> "StreamingDataFrame": + self._registry.register_heartbeat(self, stream) + return self.__dataframe_clone__(stream=self.stream.merge(stream)) + def join_asof( self, right: "StreamingDataFrame", diff --git a/quixstreams/dataframe/registry.py b/quixstreams/dataframe/registry.py index dd7138e0b..eb0286a18 100644 --- a/quixstreams/dataframe/registry.py +++ b/quixstreams/dataframe/registry.py @@ -22,6 +22,7 @@ class DataFrameRegistry: def __init__(self) -> None: self._registry: dict[str, Stream] = {} + self._heartbeat_registry: dict[str, Stream] = {} self._topics: list[Topic] = [] self._repartition_origins: set[str] = set() self._topics_to_stream_ids: dict[str, set[str]] = {} @@ -69,6 +70,20 @@ def register_root( self._topics.append(topic) self._registry[topic.name] = dataframe.stream + def register_heartbeat( + self, dataframe: "StreamingDataFrame", stream: Stream + ) -> None: + """ + Register a heartbeat Stream for the given topic. + """ + topics = dataframe.topics + if len(topics) > 1: + raise ValueError( + f"Expected a StreamingDataFrame with one topic, got {len(topics)}" + ) + topic = topics[0] + self._heartbeat_registry[topic.name] = stream + def register_groupby( self, source_sdf: "StreamingDataFrame", @@ -113,9 +128,21 @@ def compose_all( :param sink: callable to accumulate the results of the execution, optional. :return: a {topic_name: composed} dict, where composed is a callable """ + return self._compose(registry=self._registry, sink=sink) + + def compose_heartbeats(self) -> dict[str, VoidExecutor]: + """ + Composes all the heartbeat Streams and returns a dict of format {: } + :return: a {topic_name: composed} dict, where composed is a callable + """ + return self._compose(registry=self._heartbeat_registry) + + def _compose( + self, registry: dict[str, Stream], sink: Optional[VoidExecutor] = None + ) -> dict[str, VoidExecutor]: executors = {} # Go over the registered topics with root Streams and compose them - for topic, root_stream in self._registry.items(): + for topic, root_stream in registry.items(): # If a root stream is connected to other roots, ".compose()" will # return them all. # Use the registered root Stream to filter them out. From 9be8da8b4b958776fec300c359fe4f8b8873995c Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Thu, 31 Jul 2025 10:25:03 +0200 Subject: [PATCH 2/5] Rename heartbeat to wall clock --- quixstreams/app.py | 38 ++++++++--------- .../core/stream/functions/transform.py | 42 +++++++++---------- quixstreams/core/stream/functions/types.py | 8 ++-- quixstreams/core/stream/stream.py | 26 ++++++------ quixstreams/dataframe/dataframe.py | 4 +- quixstreams/dataframe/registry.py | 14 +++---- 6 files changed, 66 insertions(+), 66 deletions(-) diff --git a/quixstreams/app.py b/quixstreams/app.py index 1a17f7176..8a36cee5d 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -155,7 +155,7 @@ def __init__( topic_create_timeout: float = 60, processing_guarantee: ProcessingGuarantee = "at-least-once", max_partition_buffer_size: int = 10000, - heartbeat_interval: float = 0.0, + wall_clock_interval: float = 0.0, ): """ :param broker_address: Connection settings for Kafka. @@ -224,11 +224,11 @@ def __init__( It is a soft limit, and the actual number of buffered messages can be up to x2 higher. Lower value decreases the memory use, but increases the latency. Default - `10000`. - :param heartbeat_interval: the interval (seconds) at which to send heartbeat messages. - The heartbeat timing starts counting from application start. - TODO: Save and respect last heartbeat timestamp. - The heartbeat is sent for every partition of every topic with registered heartbeat streams. - If the value is 0, no heartbeat messages will be sent. + :param wall_clock_interval: the interval (seconds) at which to invoke + the registered wall clock logic. + The wall clock timing starts counting from application start. + TODO: Save and respect last wall clock timestamp. + If the value is 0, no wall clock logic will be invoked. Default - `0.0`.

***Error Handlers***
@@ -381,9 +381,9 @@ def __init__( recovery_manager=recovery_manager, ) - self._heartbeat_active = heartbeat_interval > 0 - self._heartbeat_interval = heartbeat_interval - self._heartbeat_last_sent = datetime.now().timestamp() + self._wall_clock_active = wall_clock_interval > 0 + self._wall_clock_interval = wall_clock_interval + self._wall_clock_last_sent = datetime.now().timestamp() self._source_manager = SourceManager() self._sink_manager = SinkManager() @@ -914,7 +914,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None): processing_context = self._processing_context source_manager = self._source_manager process_message = self._process_message - process_heartbeat = self._process_heartbeat + process_wall_clock = self._process_wall_clock printer = self._processing_context.printer run_tracker = self._run_tracker consumer = self._consumer @@ -927,9 +927,9 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None): ) dataframes_composed = self._dataframe_registry.compose_all(sink=sink) - heartbeats_composed = self._dataframe_registry.compose_heartbeats() - if not heartbeats_composed: - self._heartbeat_active = False + wall_clock_executors = self._dataframe_registry.compose_wall_clock() + if not wall_clock_executors: + self._wall_clock_active = False processing_context.init_checkpoint() run_tracker.set_as_running() @@ -941,7 +941,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None): run_tracker.timeout_refresh() else: process_message(dataframes_composed) - process_heartbeat(heartbeats_composed) + process_wall_clock(wall_clock_executors) processing_context.commit_checkpoint() consumer.resume_backpressured() source_manager.raise_for_error() @@ -1025,18 +1025,18 @@ def _process_message(self, dataframe_composed): if self._on_message_processed is not None: self._on_message_processed(topic_name, partition, offset) - def _process_heartbeat(self, heartbeats_composed): - if not self._heartbeat_active: + def _process_wall_clock(self, wall_clock_executors): + if not self._wall_clock_active: return now = datetime.now().timestamp() - if self._heartbeat_last_sent > now - self._heartbeat_interval: + if self._wall_clock_last_sent > now - self._wall_clock_interval: return value, key, timestamp, headers = None, None, int(now * 1000), {} for tp in self._consumer.assignment(): - if executor := heartbeats_composed.get(tp.topic): + if executor := wall_clock_executors.get(tp.topic): row = Row( value=value, key=key, @@ -1058,7 +1058,7 @@ def _process_heartbeat(self, heartbeats_composed): if not to_suppress: raise - self._heartbeat_last_sent = now + self._wall_clock_last_sent = now def _on_assign(self, _, topic_partitions: List[TopicPartition]): """ diff --git a/quixstreams/core/stream/functions/transform.py b/quixstreams/core/stream/functions/transform.py index a8744e219..6f5b2f41f 100644 --- a/quixstreams/core/stream/functions/transform.py +++ b/quixstreams/core/stream/functions/transform.py @@ -4,8 +4,8 @@ from .types import ( TransformCallback, TransformExpandedCallback, - TransformHeartbeatCallback, - TransformHeartbeatExpandedCallback, + TransformWallClockCallback, + TransformWallClockExpandedCallback, VoidExecutor, ) @@ -32,7 +32,7 @@ def __init__( self, func: TransformCallback, expand: Literal[False] = False, - heartbeat: Literal[False] = False, + wall_clock: Literal[False] = False, ) -> None: ... @overload @@ -40,23 +40,23 @@ def __init__( self, func: TransformExpandedCallback, expand: Literal[True], - heartbeat: Literal[False] = False, + wall_clock: Literal[False] = False, ) -> None: ... @overload def __init__( self, - func: TransformHeartbeatCallback, + func: TransformWallClockCallback, expand: Literal[False] = False, - heartbeat: Literal[True] = True, + wall_clock: Literal[True] = True, ) -> None: ... @overload def __init__( self, - func: TransformHeartbeatExpandedCallback, + func: TransformWallClockExpandedCallback, expand: Literal[True], - heartbeat: Literal[True], + wall_clock: Literal[True], ) -> None: ... def __init__( @@ -64,29 +64,29 @@ def __init__( func: Union[ TransformCallback, TransformExpandedCallback, - TransformHeartbeatCallback, - TransformHeartbeatExpandedCallback, + TransformWallClockCallback, + TransformWallClockExpandedCallback, ], expand: bool = False, - heartbeat: bool = False, + wall_clock: bool = False, ): super().__init__(func) self.func: Union[ TransformCallback, TransformExpandedCallback, - TransformHeartbeatCallback, - TransformHeartbeatExpandedCallback, + TransformWallClockCallback, + TransformWallClockExpandedCallback, ] self.expand = expand - self.heartbeat = heartbeat + self.wall_clock = wall_clock def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor: child_executor = self._resolve_branching(*child_executors) - if self.expand and self.heartbeat: - heartbeat_expanded_func = cast( - TransformHeartbeatExpandedCallback, self.func + if self.expand and self.wall_clock: + wall_clock_expanded_func = cast( + TransformWallClockExpandedCallback, self.func ) def wrapper( @@ -100,7 +100,7 @@ def wrapper( new_key, new_timestamp, new_headers, - ) in heartbeat_expanded_func(timestamp): + ) in wall_clock_expanded_func(timestamp): child_executor(new_value, new_key, new_timestamp, new_headers) elif self.expand: @@ -116,8 +116,8 @@ def wrapper( for new_value, new_key, new_timestamp, new_headers in result: child_executor(new_value, new_key, new_timestamp, new_headers) - elif self.heartbeat: - heartbeat_func = cast(TransformHeartbeatCallback, self.func) + elif self.wall_clock: + wall_clock_func = cast(TransformWallClockCallback, self.func) def wrapper( value: Any, @@ -125,7 +125,7 @@ def wrapper( timestamp: int, headers: Any, ): - new_value, new_key, new_timestamp, new_headers = heartbeat_func( + new_value, new_key, new_timestamp, new_headers = wall_clock_func( timestamp ) child_executor(new_value, new_key, new_timestamp, new_headers) diff --git a/quixstreams/core/stream/functions/types.py b/quixstreams/core/stream/functions/types.py index 7b7516d61..d8e2c3693 100644 --- a/quixstreams/core/stream/functions/types.py +++ b/quixstreams/core/stream/functions/types.py @@ -14,8 +14,8 @@ "FilterWithMetadataCallback", "TransformCallback", "TransformExpandedCallback", - "TransformHeartbeatCallback", - "TransformHeartbeatExpandedCallback", + "TransformWallClockCallback", + "TransformWallClockExpandedCallback", ) @@ -37,8 +37,8 @@ def __bool__(self) -> bool: ... TransformExpandedCallback = Callable[ [Any, Any, int, Any], Iterable[Tuple[Any, Any, int, Any]] ] -TransformHeartbeatCallback = Callable[[int], Tuple[Any, Any, int, Any]] -TransformHeartbeatExpandedCallback = Callable[ +TransformWallClockCallback = Callable[[int], Tuple[Any, Any, int, Any]] +TransformWallClockExpandedCallback = Callable[ [int], Iterable[Tuple[Any, Any, int, Any]] ] diff --git a/quixstreams/core/stream/stream.py b/quixstreams/core/stream/stream.py index 07e291c41..6d1c99bc0 100644 --- a/quixstreams/core/stream/stream.py +++ b/quixstreams/core/stream/stream.py @@ -30,8 +30,8 @@ TransformCallback, TransformExpandedCallback, TransformFunction, - TransformHeartbeatCallback, - TransformHeartbeatExpandedCallback, + TransformWallClockCallback, + TransformWallClockExpandedCallback, UpdateCallback, UpdateFunction, UpdateWithMetadataCallback, @@ -256,7 +256,7 @@ def add_transform( func: TransformCallback, *, expand: Literal[False] = False, - heartbeat: Literal[False] = False, + wall_clock: Literal[False] = False, ): pass @@ -266,27 +266,27 @@ def add_transform( func: TransformExpandedCallback, *, expand: Literal[True], - heartbeat: Literal[False] = False, + wall_clock: Literal[False] = False, ): pass @overload def add_transform( self, - func: TransformHeartbeatCallback, + func: TransformWallClockCallback, *, expand: Literal[False] = False, - heartbeat: Literal[True], + wall_clock: Literal[True], ): pass @overload def add_transform( self, - func: TransformHeartbeatExpandedCallback, + func: TransformWallClockExpandedCallback, *, expand: Literal[True], - heartbeat: Literal[True], + wall_clock: Literal[True], ): pass @@ -295,12 +295,12 @@ def add_transform( func: Union[ TransformCallback, TransformExpandedCallback, - TransformHeartbeatCallback, - TransformHeartbeatExpandedCallback, + TransformWallClockCallback, + TransformWallClockExpandedCallback, ], *, expand: bool = False, - heartbeat: bool = False, + wall_clock: bool = False, ) -> "Stream": """ Add a "transform" function to the Stream, that will mutate the input value. @@ -316,11 +316,11 @@ def add_transform( :param expand: if True, expand the returned iterable into individual items downstream. If returned value is not iterable, `TypeError` will be raised. Default - `False`. - :param heartbeat: if True, the callback is expected to accept timestamp only. + :param wall_clock: if True, the callback is expected to accept timestamp only. Default - `False`. :return: a new Stream derived from the current one """ - return self._add(TransformFunction(func, expand=expand, heartbeat=heartbeat)) # type: ignore[call-overload] + return self._add(TransformFunction(func, expand=expand, wall_clock=wall_clock)) # type: ignore[call-overload] def merge(self, other: "Stream") -> "Stream": """ diff --git a/quixstreams/dataframe/dataframe.py b/quixstreams/dataframe/dataframe.py index 8c5d46828..946d05d0b 100644 --- a/quixstreams/dataframe/dataframe.py +++ b/quixstreams/dataframe/dataframe.py @@ -1696,8 +1696,8 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame": stream=merged_stream, stream_id=merged_stream_id ) - def concat_heartbeat(self, stream: Stream) -> "StreamingDataFrame": - self._registry.register_heartbeat(self, stream) + def concat_wall_clock(self, stream: Stream) -> "StreamingDataFrame": + self._registry.register_wall_clock(self, stream) return self.__dataframe_clone__(stream=self.stream.merge(stream)) def join_asof( diff --git a/quixstreams/dataframe/registry.py b/quixstreams/dataframe/registry.py index eb0286a18..0221a2222 100644 --- a/quixstreams/dataframe/registry.py +++ b/quixstreams/dataframe/registry.py @@ -22,7 +22,7 @@ class DataFrameRegistry: def __init__(self) -> None: self._registry: dict[str, Stream] = {} - self._heartbeat_registry: dict[str, Stream] = {} + self._wall_clock_registry: dict[str, Stream] = {} self._topics: list[Topic] = [] self._repartition_origins: set[str] = set() self._topics_to_stream_ids: dict[str, set[str]] = {} @@ -70,11 +70,11 @@ def register_root( self._topics.append(topic) self._registry[topic.name] = dataframe.stream - def register_heartbeat( + def register_wall_clock( self, dataframe: "StreamingDataFrame", stream: Stream ) -> None: """ - Register a heartbeat Stream for the given topic. + Register a wall clock stream for the given topic. """ topics = dataframe.topics if len(topics) > 1: @@ -82,7 +82,7 @@ def register_heartbeat( f"Expected a StreamingDataFrame with one topic, got {len(topics)}" ) topic = topics[0] - self._heartbeat_registry[topic.name] = stream + self._wall_clock_registry[topic.name] = stream def register_groupby( self, @@ -130,12 +130,12 @@ def compose_all( """ return self._compose(registry=self._registry, sink=sink) - def compose_heartbeats(self) -> dict[str, VoidExecutor]: + def compose_wall_clock(self) -> dict[str, VoidExecutor]: """ - Composes all the heartbeat Streams and returns a dict of format {: } + Composes all the wall clock streams and returns a dict of format {: } :return: a {topic_name: composed} dict, where composed is a callable """ - return self._compose(registry=self._heartbeat_registry) + return self._compose(registry=self._wall_clock_registry) def _compose( self, registry: dict[str, Stream], sink: Optional[VoidExecutor] = None From 920f3ec52b22128d9a293c97c9787fc317120dd2 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Tue, 12 Aug 2025 09:46:15 +0200 Subject: [PATCH 3/5] Rollback changes to TransformFunction --- .../core/stream/functions/transform.py | 89 ++----------------- quixstreams/core/stream/functions/types.py | 6 -- quixstreams/core/stream/stream.py | 36 +------- 3 files changed, 10 insertions(+), 121 deletions(-) diff --git a/quixstreams/core/stream/functions/transform.py b/quixstreams/core/stream/functions/transform.py index 6f5b2f41f..219662b6b 100644 --- a/quixstreams/core/stream/functions/transform.py +++ b/quixstreams/core/stream/functions/transform.py @@ -1,13 +1,7 @@ from typing import Any, Literal, Union, cast, overload from .base import StreamFunction -from .types import ( - TransformCallback, - TransformExpandedCallback, - TransformWallClockCallback, - TransformWallClockExpandedCallback, - VoidExecutor, -) +from .types import TransformCallback, TransformExpandedCallback, VoidExecutor __all__ = ("TransformFunction",) @@ -29,81 +23,28 @@ class TransformFunction(StreamFunction): @overload def __init__( - self, - func: TransformCallback, - expand: Literal[False] = False, - wall_clock: Literal[False] = False, - ) -> None: ... - - @overload - def __init__( - self, - func: TransformExpandedCallback, - expand: Literal[True], - wall_clock: Literal[False] = False, - ) -> None: ... - - @overload - def __init__( - self, - func: TransformWallClockCallback, - expand: Literal[False] = False, - wall_clock: Literal[True] = True, + self, func: TransformCallback, expand: Literal[False] = False ) -> None: ... @overload def __init__( - self, - func: TransformWallClockExpandedCallback, - expand: Literal[True], - wall_clock: Literal[True], + self, func: TransformExpandedCallback, expand: Literal[True] ) -> None: ... def __init__( self, - func: Union[ - TransformCallback, - TransformExpandedCallback, - TransformWallClockCallback, - TransformWallClockExpandedCallback, - ], + func: Union[TransformCallback, TransformExpandedCallback], expand: bool = False, - wall_clock: bool = False, ): super().__init__(func) - self.func: Union[ - TransformCallback, - TransformExpandedCallback, - TransformWallClockCallback, - TransformWallClockExpandedCallback, - ] + self.func: Union[TransformCallback, TransformExpandedCallback] self.expand = expand - self.wall_clock = wall_clock def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor: child_executor = self._resolve_branching(*child_executors) - if self.expand and self.wall_clock: - wall_clock_expanded_func = cast( - TransformWallClockExpandedCallback, self.func - ) - - def wrapper( - value: Any, - key: Any, - timestamp: int, - headers: Any, - ): - for ( - new_value, - new_key, - new_timestamp, - new_headers, - ) in wall_clock_expanded_func(timestamp): - child_executor(new_value, new_key, new_timestamp, new_headers) - - elif self.expand: + if self.expand: expanded_func = cast(TransformExpandedCallback, self.func) def wrapper( @@ -116,22 +57,8 @@ def wrapper( for new_value, new_key, new_timestamp, new_headers in result: child_executor(new_value, new_key, new_timestamp, new_headers) - elif self.wall_clock: - wall_clock_func = cast(TransformWallClockCallback, self.func) - - def wrapper( - value: Any, - key: Any, - timestamp: int, - headers: Any, - ): - new_value, new_key, new_timestamp, new_headers = wall_clock_func( - timestamp - ) - child_executor(new_value, new_key, new_timestamp, new_headers) - else: - regular_func = cast(TransformCallback, self.func) + func = cast(TransformCallback, self.func) def wrapper( value: Any, @@ -140,7 +67,7 @@ def wrapper( headers: Any, ): # Execute a function on a single value and return its result - new_value, new_key, new_timestamp, new_headers = regular_func( + new_value, new_key, new_timestamp, new_headers = func( value, key, timestamp, headers ) child_executor(new_value, new_key, new_timestamp, new_headers) diff --git a/quixstreams/core/stream/functions/types.py b/quixstreams/core/stream/functions/types.py index d8e2c3693..504299b53 100644 --- a/quixstreams/core/stream/functions/types.py +++ b/quixstreams/core/stream/functions/types.py @@ -14,8 +14,6 @@ "FilterWithMetadataCallback", "TransformCallback", "TransformExpandedCallback", - "TransformWallClockCallback", - "TransformWallClockExpandedCallback", ) @@ -37,10 +35,6 @@ def __bool__(self) -> bool: ... TransformExpandedCallback = Callable[ [Any, Any, int, Any], Iterable[Tuple[Any, Any, int, Any]] ] -TransformWallClockCallback = Callable[[int], Tuple[Any, Any, int, Any]] -TransformWallClockExpandedCallback = Callable[ - [int], Iterable[Tuple[Any, Any, int, Any]] -] StreamCallback = Union[ ApplyCallback, diff --git a/quixstreams/core/stream/stream.py b/quixstreams/core/stream/stream.py index 6d1c99bc0..d472b9379 100644 --- a/quixstreams/core/stream/stream.py +++ b/quixstreams/core/stream/stream.py @@ -30,8 +30,6 @@ TransformCallback, TransformExpandedCallback, TransformFunction, - TransformWallClockCallback, - TransformWallClockExpandedCallback, UpdateCallback, UpdateFunction, UpdateWithMetadataCallback, @@ -256,7 +254,6 @@ def add_transform( func: TransformCallback, *, expand: Literal[False] = False, - wall_clock: Literal[False] = False, ): pass @@ -266,41 +263,14 @@ def add_transform( func: TransformExpandedCallback, *, expand: Literal[True], - wall_clock: Literal[False] = False, ): pass - @overload - def add_transform( - self, - func: TransformWallClockCallback, - *, - expand: Literal[False] = False, - wall_clock: Literal[True], - ): - pass - - @overload def add_transform( self, - func: TransformWallClockExpandedCallback, - *, - expand: Literal[True], - wall_clock: Literal[True], - ): - pass - - def add_transform( - self, - func: Union[ - TransformCallback, - TransformExpandedCallback, - TransformWallClockCallback, - TransformWallClockExpandedCallback, - ], + func: Union[TransformCallback, TransformExpandedCallback], *, expand: bool = False, - wall_clock: bool = False, ) -> "Stream": """ Add a "transform" function to the Stream, that will mutate the input value. @@ -316,11 +286,9 @@ def add_transform( :param expand: if True, expand the returned iterable into individual items downstream. If returned value is not iterable, `TypeError` will be raised. Default - `False`. - :param wall_clock: if True, the callback is expected to accept timestamp only. - Default - `False`. :return: a new Stream derived from the current one """ - return self._add(TransformFunction(func, expand=expand, wall_clock=wall_clock)) # type: ignore[call-overload] + return self._add(TransformFunction(func, expand=expand)) # type: ignore[call-overload] def merge(self, other: "Stream") -> "Stream": """ From 72ab7a647564a27ff56865ad42db25120eb531a6 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Thu, 14 Aug 2025 10:26:33 +0200 Subject: [PATCH 4/5] Refactor wall clock processing --- quixstreams/app.py | 40 +++++++++++++++++++++---- quixstreams/checkpointing/checkpoint.py | 12 +++++++- quixstreams/dataframe/registry.py | 40 +++++++++++-------------- 3 files changed, 64 insertions(+), 28 deletions(-) diff --git a/quixstreams/app.py b/quixstreams/app.py index 8a36cee5d..0ed34f481 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -1035,16 +1035,45 @@ def _process_wall_clock(self, wall_clock_executors): value, key, timestamp, headers = None, None, int(now * 1000), {} - for tp in self._consumer.assignment(): - if executor := wall_clock_executors.get(tp.topic): + # Offsets processed in the current, open checkpoint (in-flight) + tp_offsets = self._processing_context.checkpoint.tp_offsets + assignment = self._consumer.assignment() + + for topics, executor in wall_clock_executors.items(): + seen_partitions: set[int] = set() + selected_partitions: list[tuple[str, int, int]] = [] + + for tp in assignment: + if tp.topic in topics and tp.partition not in seen_partitions: + offset = tp_offsets.get((tp.topic, tp.partition)) + if offset is None: + # TODO: We can call only once for all required partitions + committed_tp = self._consumer.committed([tp], timeout=30)[0] + if committed_tp.error: + raise RuntimeError( + "Failed to get committed offsets for " + f'"{committed_tp.topic}[{committed_tp.partition}]" ' + f"from the broker: {committed_tp.error}" + ) + if committed_tp.offset >= 0: + offset = committed_tp.offset - 1 + + # TODO: Handle the case when the offset is None + # This means that the wall clock is triggered before any messages + if offset is not None: + seen_partitions.add(tp.partition) + selected_partitions.append((tp.topic, tp.partition, offset)) + + # Execute callback for each selected topic-partition with its offset + for topic, partition, offset in selected_partitions: row = Row( value=value, key=key, timestamp=timestamp, context=MessageContext( - topic=tp.topic, - partition=tp.partition, - offset=-1, # TODO: get correct offsets + topic=topic, + partition=partition, + offset=offset, size=-1, ), headers=headers, @@ -1058,6 +1087,7 @@ def _process_wall_clock(self, wall_clock_executors): if not to_suppress: raise + # TODO: should we use a "new" now or the one from before the processing? self._wall_clock_last_sent = now def _on_assign(self, _, topic_partitions: List[TopicPartition]): diff --git a/quixstreams/checkpointing/checkpoint.py b/quixstreams/checkpointing/checkpoint.py index 7bdb09044..0dc0c4bdb 100644 --- a/quixstreams/checkpointing/checkpoint.py +++ b/quixstreams/checkpointing/checkpoint.py @@ -1,7 +1,8 @@ import logging import time from abc import abstractmethod -from typing import Dict, Tuple +from types import MappingProxyType +from typing import Dict, Mapping, Tuple from confluent_kafka import KafkaException, TopicPartition @@ -55,6 +56,15 @@ def __init__( self._commit_every = commit_every self._total_offsets_processed = 0 + @property + def tp_offsets(self) -> Mapping[Tuple[str, int], int]: + """ + Read-only view of processed (but not yet committed) offsets in the current checkpoint. + + :return: a read-only mapping {(topic, partition): last_processed_offset} + """ + return MappingProxyType(self._tp_offsets) + def expired(self) -> bool: """ Returns `True` if checkpoint deadline has expired OR diff --git a/quixstreams/dataframe/registry.py b/quixstreams/dataframe/registry.py index 0221a2222..f2abd178a 100644 --- a/quixstreams/dataframe/registry.py +++ b/quixstreams/dataframe/registry.py @@ -22,7 +22,7 @@ class DataFrameRegistry: def __init__(self) -> None: self._registry: dict[str, Stream] = {} - self._wall_clock_registry: dict[str, Stream] = {} + self._wall_clock_registry: dict[str, tuple[tuple[Topic, ...], Stream]] = {} self._topics: list[Topic] = [] self._repartition_origins: set[str] = set() self._topics_to_stream_ids: dict[str, set[str]] = {} @@ -74,15 +74,11 @@ def register_wall_clock( self, dataframe: "StreamingDataFrame", stream: Stream ) -> None: """ - Register a wall clock stream for the given topic. + Register a wall clock stream root for the given dataframe. + Stores the Stream itself to be composed later with an optional sink. """ - topics = dataframe.topics - if len(topics) > 1: - raise ValueError( - f"Expected a StreamingDataFrame with one topic, got {len(topics)}" - ) - topic = topics[0] - self._wall_clock_registry[topic.name] = stream + # TODO: What if there are more wall clock streams for the same stream_id? + self._wall_clock_registry[dataframe.stream_id] = (dataframe.topics, stream) def register_groupby( self, @@ -128,21 +124,9 @@ def compose_all( :param sink: callable to accumulate the results of the execution, optional. :return: a {topic_name: composed} dict, where composed is a callable """ - return self._compose(registry=self._registry, sink=sink) - - def compose_wall_clock(self) -> dict[str, VoidExecutor]: - """ - Composes all the wall clock streams and returns a dict of format {: } - :return: a {topic_name: composed} dict, where composed is a callable - """ - return self._compose(registry=self._wall_clock_registry) - - def _compose( - self, registry: dict[str, Stream], sink: Optional[VoidExecutor] = None - ) -> dict[str, VoidExecutor]: executors = {} # Go over the registered topics with root Streams and compose them - for topic, root_stream in registry.items(): + for topic, root_stream in self._registry.items(): # If a root stream is connected to other roots, ".compose()" will # return them all. # Use the registered root Stream to filter them out. @@ -150,6 +134,18 @@ def _compose( executors[topic] = root_executors[root_stream] return executors + def compose_wall_clock(self) -> dict[tuple[str, ...], VoidExecutor]: + """ + Compose all wall clock Streams and return executors keyed by stream_id. + Returns mapping: {stream_id: (topics, executor)} + """ + executors = {} + for _, (topics, root_stream) in self._wall_clock_registry.items(): + root_executors = root_stream.compose() + _topics = tuple({t.name for t in topics}) + executors[_topics] = root_executors[root_stream] + return executors + def register_stream_id(self, stream_id: str, topic_names: list[str]): """ Register a mapping between the stream_id and topic names. From da7d1357d65ccd9a7ce049765e91d0d68a0efb6a Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Thu, 14 Aug 2025 16:26:52 +0200 Subject: [PATCH 5/5] Refactor --- quixstreams/app.py | 91 ++++++++++++++++++++----------- quixstreams/dataframe/registry.py | 19 +++---- 2 files changed, 66 insertions(+), 44 deletions(-) diff --git a/quixstreams/app.py b/quixstreams/app.py index 0ed34f481..7a7e3a81c 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -8,6 +8,7 @@ import warnings from collections import defaultdict from datetime import datetime +from itertools import chain from pathlib import Path from typing import Callable, List, Literal, Optional, Protocol, Tuple, Type, Union, cast @@ -1026,46 +1027,72 @@ def _process_message(self, dataframe_composed): self._on_message_processed(topic_name, partition, offset) def _process_wall_clock(self, wall_clock_executors): + # Emit time-based "ticks" when the wall-clock interval elapses. + # For each executor (grouped by topics), select one partition per partition id + # and determine an offset to include in MessageContext. if not self._wall_clock_active: return + # Rate-limit by interval; skip until enough time has elapsed since last send. now = datetime.now().timestamp() if self._wall_clock_last_sent > now - self._wall_clock_interval: return + # Synthetic "tick" payload (no value/key, headers empty, timestamp in ms). value, key, timestamp, headers = None, None, int(now * 1000), {} - # Offsets processed in the current, open checkpoint (in-flight) - tp_offsets = self._processing_context.checkpoint.tp_offsets - assignment = self._consumer.assignment() - - for topics, executor in wall_clock_executors.items(): - seen_partitions: set[int] = set() - selected_partitions: list[tuple[str, int, int]] = [] - - for tp in assignment: - if tp.topic in topics and tp.partition not in seen_partitions: - offset = tp_offsets.get((tp.topic, tp.partition)) - if offset is None: - # TODO: We can call only once for all required partitions - committed_tp = self._consumer.committed([tp], timeout=30)[0] - if committed_tp.error: - raise RuntimeError( - "Failed to get committed offsets for " - f'"{committed_tp.topic}[{committed_tp.partition}]" ' - f"from the broker: {committed_tp.error}" - ) - if committed_tp.offset >= 0: - offset = committed_tp.offset - 1 - - # TODO: Handle the case when the offset is None - # This means that the wall clock is triggered before any messages - if offset is not None: - seen_partitions.add(tp.partition) - selected_partitions.append((tp.topic, tp.partition, offset)) - - # Execute callback for each selected topic-partition with its offset - for topic, partition, offset in selected_partitions: + # In-flight processed offsets within the current (open) checkpoint. + processed_offsets = self._processing_context.checkpoint.tp_offsets + # Only consider currently assigned topic-partitions. + assigned_tps = self._consumer.assignment() + # Cache known offsets to avoid resolving them multiple times for different executors. + # Keyed by (topic, partition) to avoid relying on TopicPartition instance identity. + known_offsets: dict[tuple[str, int], int] = {} + + for topics, executor in wall_clock_executors: + # candidate_partitions: partitions still needing an offset resolved + candidate_partitions: dict[int, set[TopicPartition]] = defaultdict(set) + # selected_partitions: final partition_id -> (topic, offset) + selected_partitions: dict[int, tuple[str, int]] = {} + + for tp in assigned_tps: + known_offset = known_offsets.get((tp.topic, tp.partition)) + if known_offset is not None: + selected_partitions[tp.partition] = (tp.topic, known_offset) + continue + + if tp.topic in topics and tp.partition not in selected_partitions: + # Prefer the most recent known processed offset if available. + if processed_offset := processed_offsets.get( + (tp.topic, tp.partition) + ): + # Use offset from the in-flight checkpoint. + selected_partitions[tp.partition] = (tp.topic, processed_offset) + known_offsets[(tp.topic, tp.partition)] = processed_offset + else: + # Will resolve via committed offsets below. + candidate_partitions[tp.partition].add(tp) + + if candidate_partitions: + # Best-effort: fetch committed offsets in batch for unresolved partitions. + committed_tps = self._consumer.committed( + list(chain(*candidate_partitions.values())), timeout=30 + ) + for tp in committed_tps: + if tp.error: + raise RuntimeError( + f"Failed to get committed offsets for " + f'"{tp.topic}[{tp.partition}]" from the broker: {tp.error}' + ) + if tp.partition not in selected_partitions: + # Committed offset is "next to consume"; last processed is offset - 1. + # The "invalid/unset" broker offset is negative. + offset = tp.offset - 1 if tp.offset >= 0 else tp.offset + selected_partitions[tp.partition] = (tp.topic, offset) + known_offsets[(tp.topic, tp.partition)] = offset + + # Execute callback for each selected topic-partition with its offset. + for partition, (topic, offset) in selected_partitions.items(): row = Row( value=value, key=key, @@ -1087,7 +1114,7 @@ def _process_wall_clock(self, wall_clock_executors): if not to_suppress: raise - # TODO: should we use a "new" now or the one from before the processing? + # Record the emission time for rate-limiting. self._wall_clock_last_sent = now def _on_assign(self, _, topic_partitions: List[TopicPartition]): diff --git a/quixstreams/dataframe/registry.py b/quixstreams/dataframe/registry.py index f2abd178a..0e2a187c9 100644 --- a/quixstreams/dataframe/registry.py +++ b/quixstreams/dataframe/registry.py @@ -22,7 +22,7 @@ class DataFrameRegistry: def __init__(self) -> None: self._registry: dict[str, Stream] = {} - self._wall_clock_registry: dict[str, tuple[tuple[Topic, ...], Stream]] = {} + self._wall_clock_registry: dict[Stream, tuple[str, ...]] = {} self._topics: list[Topic] = [] self._repartition_origins: set[str] = set() self._topics_to_stream_ids: dict[str, set[str]] = {} @@ -73,12 +73,8 @@ def register_root( def register_wall_clock( self, dataframe: "StreamingDataFrame", stream: Stream ) -> None: - """ - Register a wall clock stream root for the given dataframe. - Stores the Stream itself to be composed later with an optional sink. - """ - # TODO: What if there are more wall clock streams for the same stream_id? - self._wall_clock_registry[dataframe.stream_id] = (dataframe.topics, stream) + # Store the topic names as an immutable tuple for stable typing + self._wall_clock_registry[stream] = tuple(t.name for t in dataframe.topics) def register_groupby( self, @@ -134,16 +130,15 @@ def compose_all( executors[topic] = root_executors[root_stream] return executors - def compose_wall_clock(self) -> dict[tuple[str, ...], VoidExecutor]: + def compose_wall_clock(self) -> list[tuple[tuple[str, ...], VoidExecutor]]: """ Compose all wall clock Streams and return executors keyed by stream_id. Returns mapping: {stream_id: (topics, executor)} """ - executors = {} - for _, (topics, root_stream) in self._wall_clock_registry.items(): + executors = [] + for root_stream, topics in self._wall_clock_registry.items(): root_executors = root_stream.compose() - _topics = tuple({t.name for t in topics}) - executors[_topics] = root_executors[root_stream] + executors.append((topics, root_executors[root_stream])) return executors def register_stream_id(self, stream_id: str, topic_names: list[str]):