-
Notifications
You must be signed in to change notification settings - Fork 716
[WIP] async ca: rework watermark handling and fix inputtransform and watermark interaction #22263
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[WIP] async ca: rework watermark handling and fix inputtransform and watermark interaction #22263
Conversation
Sinks and Sources are definitely unique, sort Inputs/Outputs before check
This reverts commit 7dabb05.
⚪ |
⚪ |
🟢 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR reworks watermark handling and fixes the interaction between input transforms and watermarks in the DQ (Data Query) system. The primary change is moving watermark and checkpoint handling logic from the base TDqInputImpl
class to the specific TDqInputChannelImpl
class to better manage barriers (watermarks/checkpoints) and their interaction with data processing.
Key changes:
- Refactored watermark and checkpoint handling to use a barrier-based approach in input channels
- Updated method names to distinguish between checkpoint-based and watermark-based pausing/resuming
- Simplified the base input interface by removing pause/resume methods
Reviewed Changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 5 comments.
Show a summary per file
File | Description |
---|---|
ydb/library/yql/dq/runtime/dq_input_impl.h | Removes checkpoint/watermark handling from base class, simplifies Empty() logic |
ydb/library/yql/dq/runtime/dq_input_channel.cpp | Implements new barrier-based watermark/checkpoint handling system |
ydb/library/yql/dq/runtime/dq_input_channel.h | Updates interface to separate checkpoint and watermark pause/resume operations |
ydb/library/yql/dq/runtime/dq_input.h | Removes pause/resume methods from base interface |
ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp | Updates to use new watermark handling and track finished inputs/sources |
ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp | Modifies watermark processing flow to work with new barrier system |
ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp | Simplifies watermark tracking by removing output channel tracking |
Comments suppressed due to low confidence (1)
ydb/library/yql/dq/actors/compute/ut/dq_async_compute_actor_ut.cpp:142
- Magic number 9999 is used without explanation. Consider using a named constant to make the error simulation condition more explicit.
TActorId EdgeActor;
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
You can also share your feedback on Copilot code review for a chance to win a $100 gift card. Take the survey.
@@ -228,7 +228,7 @@ class TDqOutputChannel : public IDqOutputChannel { | |||
} | |||
|
|||
void Push(NDqProto::TWatermark&& watermark) override { | |||
YQL_ENSURE(!Watermark); | |||
YQL_ENSURE(!Watermark || Watermark->GetTimestampUs() <= watermark.GetTimestampUs()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The watermark ordering check allows equal timestamps, but the original code only allowed pushing when no watermark existed. This change may allow duplicate watermarks with the same timestamp, which could be incorrect behavior.
YQL_ENSURE(!Watermark || Watermark->GetTimestampUs() <= watermark.GetTimestampUs()); | |
YQL_ENSURE(!Watermark || Watermark->GetTimestampUs() < watermark.GetTimestampUs()); |
Copilot uses AI. Check for mistakes.
bool IsPaused() const override { | ||
return BatchesBeforePause; | ||
} | ||
TString LogPrefix() const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The LogPrefix() method is declared but not implemented in the base class. This will cause linker errors when called from derived classes that don't override it.
TString LogPrefix() const; | |
TString LogPrefix() const { | |
return TString("[TDqInputImpl]"); | |
} |
Copilot uses AI. Check for mistakes.
private: | ||
void Push(TDqSerializedBatch&&) override { | ||
Y_ABORT("Not implemented"); | ||
} | ||
|
||
struct TBarrier { | ||
static constexpr TInstant NoBarrier = TInstant::Zero(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Using TInstant::Zero() as a sentinel value for 'no barrier' could be confusing since zero is a valid timestamp. Consider using TInstant::Max() or a separate boolean flag to indicate absence of a barrier.
static constexpr TInstant NoBarrier = TInstant::Zero(); | |
// Use TInstant::Max() as a sentinel for both "no barrier" and "checkpoint barrier" | |
static constexpr TInstant NoBarrier = TInstant::Max(); |
Copilot uses AI. Check for mistakes.
|
||
auto sourcesMap = Sources; | ||
|
||
Invoker->Invoke([selfId, cookie, actorSystem, replyTo, taskRunner=TaskRunner, inputMap, sourcesMap, memLimit=ev->Get()->MemLimit, settings=Settings, stageId=StageId, runtimeData=RuntimeData]() mutable { | ||
Invoker->Invoke([selfId, cookie, actorSystem, replyTo, taskRunner=TaskRunner, inputMap=std::move(inputMap), sourcesMap=std::move(sourcesMap), memLimit=ev->Get()->MemLimit, settings=Settings, stageId=StageId, runtimeData=RuntimeData]() mutable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving ev->Get()->InputChannels directly into inputMap and then moving inputMap again in the lambda capture creates an unnecessary move operation. Consider moving directly in the lambda capture.
Copilot uses AI. Check for mistakes.
}; | ||
std::deque<TBarrier> PendingBarriers; // barrier and counts after barrier | ||
TBarrier BeforeBarrier; // counts before barrier | ||
TInstant PauseBarrier; // |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment for PauseBarrier is incomplete and doesn't explain what this member variable represents or how it's used.
TInstant PauseBarrier; // | |
// The barrier (watermark or checkpoint) up to which watermarks are paused or skipped. | |
// Used in SkipWatermarksBeforeBarrier() to determine which barriers to process or ignore. | |
TInstant PauseBarrier; |
Copilot uses AI. Check for mistakes.
…th-watermarks-5 Fixed Conflicts: ydb/library/yql/dq/runtime/dq_output_channel.cpp
⚪ Test history | Ya make output | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
⚪ Test history | Ya make output | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
1aff691
to
b48c933
Compare
⚪ Test history | Ya make output | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
⚪ Test history | Ya make output | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
should be reached anyway (sending watermark via output channel will eventually result in new resume event)
⚪ Test history | Ya make output | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
⚪ Test history | Ya make output | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR does not match RFC https://nda.ya.ru/t/I8zckxs17HpogX
⚪ Test history | Ya make output | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat
⚪ Test history | Ya make output | Test bloat | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
⚪ Test history | Ya make output | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
Changelog entry
...
Changelog category
Description for reviewers
to be rebased on top of
#21546
(changes before 6fe8fe0 should be reviewed there)