Skip to content

PoC: Wall Clock #989

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uuid
import warnings
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Callable, List, Literal, Optional, Protocol, Tuple, Type, Union, cast

Expand All @@ -30,6 +31,8 @@
from .logging import LogLevel, configure_logging
from .models import (
DeserializerType,
MessageContext,
Row,
SerializerType,
TimestampExtractor,
Topic,
Expand Down Expand Up @@ -152,6 +155,7 @@ def __init__(
topic_create_timeout: float = 60,
processing_guarantee: ProcessingGuarantee = "at-least-once",
max_partition_buffer_size: int = 10000,
wall_clock_interval: float = 0.0,
):
"""
:param broker_address: Connection settings for Kafka.
Expand Down Expand Up @@ -220,6 +224,12 @@ def __init__(
It is a soft limit, and the actual number of buffered messages can be up to x2 higher.
Lower value decreases the memory use, but increases the latency.
Default - `10000`.
:param wall_clock_interval: the interval (seconds) at which to invoke
the registered wall clock logic.
The wall clock timing starts counting from application start.
TODO: Save and respect last wall clock timestamp.
If the value is 0, no wall clock logic will be invoked.
Default - `0.0`.

<br><br>***Error Handlers***<br>
To handle errors, `Application` accepts callbacks triggered when
Expand Down Expand Up @@ -371,6 +381,10 @@ def __init__(
recovery_manager=recovery_manager,
)

self._wall_clock_active = wall_clock_interval > 0
self._wall_clock_interval = wall_clock_interval
self._wall_clock_last_sent = datetime.now().timestamp()

self._source_manager = SourceManager()
self._sink_manager = SinkManager()
self._dataframe_registry = DataFrameRegistry()
Expand Down Expand Up @@ -900,6 +914,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
processing_context = self._processing_context
source_manager = self._source_manager
process_message = self._process_message
process_wall_clock = self._process_wall_clock
printer = self._processing_context.printer
run_tracker = self._run_tracker
consumer = self._consumer
Expand All @@ -912,6 +927,9 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
)

dataframes_composed = self._dataframe_registry.compose_all(sink=sink)
wall_clock_executors = self._dataframe_registry.compose_wall_clock()
if not wall_clock_executors:
self._wall_clock_active = False

processing_context.init_checkpoint()
run_tracker.set_as_running()
Expand All @@ -923,6 +941,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
run_tracker.timeout_refresh()
else:
process_message(dataframes_composed)
process_wall_clock(wall_clock_executors)
processing_context.commit_checkpoint()
consumer.resume_backpressured()
source_manager.raise_for_error()
Expand Down Expand Up @@ -1006,6 +1025,71 @@ def _process_message(self, dataframe_composed):
if self._on_message_processed is not None:
self._on_message_processed(topic_name, partition, offset)

def _process_wall_clock(self, wall_clock_executors):
if not self._wall_clock_active:
return

now = datetime.now().timestamp()
if self._wall_clock_last_sent > now - self._wall_clock_interval:
return

value, key, timestamp, headers = None, None, int(now * 1000), {}

# Offsets processed in the current, open checkpoint (in-flight)
tp_offsets = self._processing_context.checkpoint.tp_offsets
assignment = self._consumer.assignment()

for topics, executor in wall_clock_executors.items():
seen_partitions: set[int] = set()
selected_partitions: list[tuple[str, int, int]] = []

for tp in assignment:
if tp.topic in topics and tp.partition not in seen_partitions:
offset = tp_offsets.get((tp.topic, tp.partition))
if offset is None:
# TODO: We can call only once for all required partitions
committed_tp = self._consumer.committed([tp], timeout=30)[0]
if committed_tp.error:
raise RuntimeError(
"Failed to get committed offsets for "
f'"{committed_tp.topic}[{committed_tp.partition}]" '
f"from the broker: {committed_tp.error}"
)
if committed_tp.offset >= 0:
offset = committed_tp.offset - 1

# TODO: Handle the case when the offset is None
# This means that the wall clock is triggered before any messages
if offset is not None:
seen_partitions.add(tp.partition)
selected_partitions.append((tp.topic, tp.partition, offset))

# Execute callback for each selected topic-partition with its offset
for topic, partition, offset in selected_partitions:
row = Row(
value=value,
key=key,
timestamp=timestamp,
context=MessageContext(
topic=topic,
partition=partition,
offset=offset,
size=-1,
),
headers=headers,
)
context = copy_context()
context.run(set_message_context, row.context)
try:
context.run(executor, value, key, timestamp, headers)
except Exception as exc:
to_suppress = self._on_processing_error(exc, row, logger)
if not to_suppress:
raise

# TODO: should we use a "new" now or the one from before the processing?
self._wall_clock_last_sent = now

def _on_assign(self, _, topic_partitions: List[TopicPartition]):
"""
Assign new topic partitions to consumer and state.
Expand Down
12 changes: 11 additions & 1 deletion quixstreams/checkpointing/checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
import time
from abc import abstractmethod
from typing import Dict, Tuple
from types import MappingProxyType
from typing import Dict, Mapping, Tuple

from confluent_kafka import KafkaException, TopicPartition

Expand Down Expand Up @@ -55,6 +56,15 @@ def __init__(
self._commit_every = commit_every
self._total_offsets_processed = 0

@property
def tp_offsets(self) -> Mapping[Tuple[str, int], int]:
"""
Read-only view of processed (but not yet committed) offsets in the current checkpoint.

:return: a read-only mapping {(topic, partition): last_processed_offset}
"""
return MappingProxyType(self._tp_offsets)

def expired(self) -> bool:
"""
Returns `True` if checkpoint deadline has expired OR
Expand Down
14 changes: 12 additions & 2 deletions quixstreams/core/stream/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,21 @@ def add_update(
return self._add(update_func)

@overload
def add_transform(self, func: TransformCallback, *, expand: Literal[False] = False):
def add_transform(
self,
func: TransformCallback,
*,
expand: Literal[False] = False,
):
pass

@overload
def add_transform(self, func: TransformExpandedCallback, *, expand: Literal[True]):
def add_transform(
self,
func: TransformExpandedCallback,
*,
expand: Literal[True],
):
pass

def add_transform(
Expand Down
4 changes: 4 additions & 0 deletions quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,10 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame":
stream=merged_stream, stream_id=merged_stream_id
)

def concat_wall_clock(self, stream: Stream) -> "StreamingDataFrame":
self._registry.register_wall_clock(self, stream)
return self.__dataframe_clone__(stream=self.stream.merge(stream))

def join_asof(
self,
right: "StreamingDataFrame",
Expand Down
23 changes: 23 additions & 0 deletions quixstreams/dataframe/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class DataFrameRegistry:

def __init__(self) -> None:
self._registry: dict[str, Stream] = {}
self._wall_clock_registry: dict[str, tuple[tuple[Topic, ...], Stream]] = {}
self._topics: list[Topic] = []
self._repartition_origins: set[str] = set()
self._topics_to_stream_ids: dict[str, set[str]] = {}
Expand Down Expand Up @@ -69,6 +70,16 @@ def register_root(
self._topics.append(topic)
self._registry[topic.name] = dataframe.stream

def register_wall_clock(
self, dataframe: "StreamingDataFrame", stream: Stream
) -> None:
"""
Register a wall clock stream root for the given dataframe.
Stores the Stream itself to be composed later with an optional sink.
"""
# TODO: What if there are more wall clock streams for the same stream_id?
self._wall_clock_registry[dataframe.stream_id] = (dataframe.topics, stream)

def register_groupby(
self,
source_sdf: "StreamingDataFrame",
Expand Down Expand Up @@ -123,6 +134,18 @@ def compose_all(
executors[topic] = root_executors[root_stream]
return executors

def compose_wall_clock(self) -> dict[tuple[str, ...], VoidExecutor]:
"""
Compose all wall clock Streams and return executors keyed by stream_id.
Returns mapping: {stream_id: (topics, executor)}
"""
executors = {}
for _, (topics, root_stream) in self._wall_clock_registry.items():
root_executors = root_stream.compose()
_topics = tuple({t.name for t in topics})
executors[_topics] = root_executors[root_stream]
return executors

def register_stream_id(self, stream_id: str, topic_names: list[str]):
"""
Register a mapping between the stream_id and topic names.
Expand Down