Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
0883e88
async ca/task runner: get rid of passing channel/sources/sinks ids in…
yumkam Jul 23, 2025
07926ab
TEvContinueRun: pass inputChannels as r-value ref as before
yumkam Jul 23, 2025
5d972f9
avoid Inputs/ev->InputChannel copy
yumkam Jul 23, 2025
7a20aea
fix validation
yumkam Jul 23, 2025
4452a0d
missing changes
yumkam Jul 30, 2025
6fe8fe0
Merge remote-tracking branch 'origin/main' into refactor-async-ca
yumkam Aug 1, 2025
b244efc
async ca tests: refactor: reduce code duplication
yumkam Jul 10, 2025
533493f
more logs
yumkam Jul 10, 2025
c9edf66
move WEAK_UNIT_ASSERT* definition
yumkam Jul 10, 2025
7939339
validate with varied watermark granularity
yumkam Jul 10, 2025
53466fd
Replace watermarks flow
yumkam Jul 15, 2025
bc6ebf1
stabilize tests
yumkam Jul 28, 2025
6abbfe6
add tests for all stats collect levels
yumkam Jul 28, 2025
e40b21d
dump mon page on timeout
yumkam Jul 29, 2025
11ecd94
defragilize tests (move mon page test before final ack)
yumkam Jul 30, 2025
4dc082a
fuzzy tests
yumkam Jul 30, 2025
3a3499c
dq input: Fix Pending status stuck in Finished state
yumkam Aug 4, 2025
fa7ea81
propagate source/channel watermarks mode into inputTransform
yumkam Aug 4, 2025
a22004a
add multi-channel tests
yumkam Aug 4, 2025
5165661
reduce test repetitions
yumkam Aug 4, 2025
08722f3
move seed initialization to common function
yumkam Aug 4, 2025
370c062
Revert "[YQ-4354] Drop legacy HoppingWindow tests (#19915)"
yumkam Jul 29, 2025
2f27726
avoid copy
yumkam Aug 4, 2025
51e836e
cosmetics
yumkam Aug 4, 2025
56f4f13
fix log message mismatch
yumkam Aug 4, 2025
d14932a
fixup! move seed initialization to common function
yumkam Aug 4, 2025
ca58c86
Merge remote-tracking branch 'fork/refactor-async-ca' into fix-inputt…
yumkam Aug 4, 2025
335ab05
convert basic test to be multi-channel
yumkam Aug 4, 2025
7eab89d
remove unused variables
yumkam Aug 4, 2025
1eaaf85
Fix pausing on input transform attached to input without watermarks/c…
yumkam Aug 4, 2025
b31178a
cosmetics: reindent
yumkam Aug 6, 2025
59cbcd2
fixup! Replace watermarks flow
yumkam Aug 6, 2025
cb8f5f2
test: unweaken assert
yumkam Aug 8, 2025
e5e9900
More comments
yumkam Aug 8, 2025
a90c919
remove excessive and clumsy debug prints from dq_input_impl
yumkam Aug 8, 2025
4952aa0
rename and mark as private SquashWatermarks
yumkam Aug 8, 2025
4217aa8
refactor: move checkpoint/watermarks handling to channels
yumkam Aug 11, 2025
25c2ffe
fix condition on watermark ejection
yumkam Aug 13, 2025
79cfd73
simplify code
yumkam Aug 13, 2025
bc0cb0b
Merge remote-tracking branch 'origin/main' into fix-inputtransform-wi…
yumkam Aug 13, 2025
0123d1b
Revert "Revert "[YQ-4354] Drop legacy HoppingWindow tests (#19915)""
yumkam Aug 13, 2025
d4a5d63
Stop watching for watermarks in Finished channels/sources
yumkam Aug 14, 2025
14ee10a
Wait for Source buffer to be empty before starting checkpoint/watermark
yumkam Aug 14, 2025
1ecaaf7
async ca tests: minor improvements
yumkam Aug 14, 2025
d5a876f
WatermarkTracker: minor optimization
yumkam Aug 14, 2025
ae0b6b8
ca: drop confusing unused code
yumkam Aug 14, 2025
a8ee043
CA: fix thinko in WatermarksTracker interaction
yumkam Aug 15, 2025
cdd61d8
track multiple intermediate watermarks
yumkam Aug 15, 2025
365907b
fix checkpoint interaction
yumkam Aug 15, 2025
cd05867
Merge remote-tracking branch 'origin/main' into fix-inputtransform-wi…
yumkam Aug 15, 2025
948088b
test: remove unused code
yumkam Aug 15, 2025
b48c933
drop unused methods
yumkam Aug 15, 2025
70e2fd7
fix copypaste
yumkam Aug 15, 2025
3066cae
duplicated log
yumkam Aug 15, 2025
07db6bd
revert move pausing to TDqInputChannel
yumkam Aug 19, 2025
1ab1f50
cleanup
yumkam Aug 19, 2025
34305fc
Ensure CA will resume execution after unpausing by watermark
yumkam Aug 19, 2025
620f097
revert unneeded changes
yumkam Aug 26, 2025
2288503
drop unwanted logs
yumkam Aug 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 43 additions & 73 deletions ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
DUMP(ProcessOutputsState, LastPopReturnedNoData);

html << "<h3>Watermarks</h3>";
for (const auto& [time, id]: WatermarkTakeInputChannelDataRequests) {
html << "WatermarkTakeInputChannelDataRequests: " << time.ToString() << " " << id << "<br />";
}
DUMP(WatermarksTracker, GetPendingWatermark, ());

html << "<h3>CPU Quota</h3>";
html << "QuoterServiceActorId: " << QuoterServiceActorId.ToString() << "<br />";
Expand Down Expand Up @@ -250,7 +248,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
DUMP(info, ChannelId);
DUMP(info, SrcStageId);
DUMP(info, HasPeer);
html << "PendingWatermarks: " << !info.PendingWatermarks.empty() << " " << (info.PendingWatermarks.empty() ? TString{} : info.PendingWatermarks.back().ToString()) << "<br />";
html << "WatermarksMode: " << NDqProto::EWatermarksMode_Name(info.WatermarksMode) << "<br />";
html << "PendingCheckpoint: " << info.PendingCheckpoint.has_value() << " " << (info.PendingCheckpoint ? TStringBuilder{} << info.PendingCheckpoint->GetId() << " " << info.PendingCheckpoint->GetGeneration() : TString{}) << "<br />";
html << "CheckpointingMode: " << NDqProto::ECheckpointingMode_Name(info.CheckpointingMode) << "<br />";
Expand Down Expand Up @@ -309,7 +306,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
html << "DqInputBuffer.InputType: " << (buffer->GetInputType() ? buffer->GetInputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";
html << "DqInputBuffer.InputWidth: " << (buffer->GetInputWidth() ? ToString(*buffer->GetInputWidth()) : TString{"unknown"}) << "<br />";
html << "DqInputBuffer.IsFinished: " << buffer->IsFinished() << "<br />";
html << "DqInputBuffer.IsPaused: " << buffer->IsPaused() << "<br />";
html << "DqInputBuffer.IsPending: " << buffer->IsPending() << "<br />";

const auto& popStats = buffer->GetPopStats();
Expand Down Expand Up @@ -524,7 +520,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
}

const bool wasFinished = outputChannel.Finished;
auto channelId = outputChannel.ChannelId;
const auto channelId = outputChannel.ChannelId;

const auto& peerState = Channels->GetOutputChannelInFlightState(channelId);

Expand Down Expand Up @@ -619,18 +615,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
TMaybe<TInstant> watermark;
if (channelData.HasWatermark()) {
watermark = TInstant::MicroSeconds(channelData.GetWatermark().GetTimestampUs());

const bool channelWatermarkChanged = WatermarksTracker.NotifyInChannelWatermarkReceived(
inputChannel->ChannelId,
*watermark
);

if (channelWatermarkChanged) {
CA_LOG_T("Pause input channel " << channelData.GetChannelId() << " bacause of watermark " << *watermark);
inputChannel->Pause(*watermark);
}

WatermarkTakeInputChannelDataRequests[*watermark]++;
}

TDqSerializedBatch batch;
Expand All @@ -646,7 +630,8 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
channelData.GetChannelId(),
batch.RowCount() ? std::optional{std::move(batch)} : std::nullopt,
finished,
channelData.HasCheckpoint()
channelData.HasCheckpoint(),
watermark
);

Send(TaskRunnerActorId, ev.Release(), 0, Cookie);
Expand Down Expand Up @@ -680,22 +665,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
TBase::PassAway();
}

TMaybe<TInstant> GetWatermarkRequest() {
if (!WatermarksTracker.HasPendingWatermark()) {
return Nothing();
}

const auto pendingWatermark = *WatermarksTracker.GetPendingWatermark();
if (WatermarkTakeInputChannelDataRequests.contains(pendingWatermark)) {
// Not all precending to watermark input channels data has been injected
return Nothing();
}

MetricsReporter.ReportInjectedToTaskRunnerWatermark(pendingWatermark);

return pendingWatermark;
}

TMaybe<NDqProto::TCheckpoint> GetCheckpointRequest() {
if (!CheckpointRequestedFromTaskRunner && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved()) {
CheckpointRequestedFromTaskRunner = true;
Expand Down Expand Up @@ -801,9 +770,24 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
}
}

if (ev->Get()->WatermarkInjectedToOutputs && !WatermarksTracker.HasOutputChannels()) {
ResumeInputsByWatermark(*WatermarksTracker.GetPendingWatermark());
WatermarksTracker.PopPendingWatermark();
if (auto watermark = ev->Get()->WatermarkInjectedToOutputs) {
ResumeInputsByWatermark(*watermark);
if (WatermarksTracker.NotifyWatermarkWasSent(*watermark)) {
MetricsReporter.ReportInjectedToOutputsWatermark(*watermark);
WatermarksTracker.PopPendingWatermark();
}
// sources or input channels was unpaused, trigger new poll
ResumeExecution(EResumeSource::CAWatermarkInject);
}

for (auto inputChannelId : ev->Get()->FinishedInputsWithWatermarks) {
CA_LOG_T("Unregister watermarked input channel " << inputChannelId);
WatermarksTracker.UnregisterInputChannel(inputChannelId);
}

for (auto sourceId : ev->Get()->FinishedSourcesWithWatermarks) {
CA_LOG_T("Unregister watermarked async input " << sourceId);
WatermarksTracker.UnregisterAsyncInput(sourceId);
}

ReadyToCheckpointFlag = (bool) ev->Get()->ProgramState;
Expand Down Expand Up @@ -922,18 +906,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
ProcessOutputsState.HasDataToSend |= !outputChannel.Finished;
ProcessOutputsState.LastPopReturnedNoData = asyncData.Data.empty();

if (asyncData.Watermark.Defined()) {
const auto watermark = TInstant::MicroSeconds(asyncData.Watermark->GetTimestampUs());
const bool shouldResumeInputs = WatermarksTracker.NotifyOutputChannelWatermarkSent(
outputChannel.ChannelId,
watermark
);

if (shouldResumeInputs) {
ResumeInputsByWatermark(watermark);
}
}

if (!shouldSkipData) {
if (asyncData.Checkpoint.Defined()) {
ResumeInputsByCheckpoint();
Expand Down Expand Up @@ -980,28 +952,32 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
}

void OnInputChannelDataAck(NTaskRunnerActor::TEvInputChannelDataAck::TPtr& ev) {
auto it = TakeInputChannelDataRequests.find(ev->Cookie);
const auto it = TakeInputChannelDataRequests.find(ev->Cookie);
YQL_ENSURE(it != TakeInputChannelDataRequests.end());

const auto channelId = it->second.ChannelId;
const auto watermark = it->second.Watermark;

CA_LOG_T("Input data push finished. Cookie: " << ev->Cookie
<< " Watermark: " << it->second.Watermark
<< " Watermark: " << watermark
<< " Ack: " << it->second.Ack
<< " TakeInputChannelDataRequests: " << TakeInputChannelDataRequests.size()
<< " WatermarkTakeInputChannelDataRequests: " << WatermarkTakeInputChannelDataRequests.size());

if (it->second.Watermark.Defined()) {
auto& ct = WatermarkTakeInputChannelDataRequests.at(*it->second.Watermark);
if (--ct == 0) {
WatermarkTakeInputChannelDataRequests.erase(*it->second.Watermark);
}
}
);

TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr(it->second.ChannelId);
TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr(channelId);
Y_ABORT_UNLESS(inputChannel);

inputChannel->FreeSpace = ev->Get()->FreeSpace;

if (watermark) {
if (WatermarksTracker.NotifyInChannelWatermarkReceived( channelId, *watermark)) {
CA_LOG_T("Pause input channel " << channelId << " because of watermark");
inputChannel->Pause(*watermark); // XXX does nothing in async CA
}
}

if (it->second.Ack) {
Channels->SendChannelDataAck(it->second.ChannelId, inputChannel->FreeSpace);
Channels->SendChannelDataAck(channelId, inputChannel->FreeSpace);
}

TakeInputChannelDataRequests.erase(it);
Expand Down Expand Up @@ -1141,14 +1117,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
ContinueRunEvent = std::make_unique<NTaskRunnerActor::TEvContinueRun>();
}
ContinueRunEvent->CheckpointOnly = checkpointOnly;
if (TMaybe<TInstant> watermarkRequest = GetWatermarkRequest()) {
if (!ContinueRunEvent->WatermarkRequest) {
ContinueRunEvent->WatermarkRequest.ConstructInPlace();
ContinueRunEvent->WatermarkRequest->Watermark = *watermarkRequest;
} else {
ContinueRunEvent->WatermarkRequest->Watermark = Max(ContinueRunEvent->WatermarkRequest->Watermark, *watermarkRequest);
}
}
if (checkpointRequest) {
if (!ContinueRunEvent->CheckpointRequest) {
ContinueRunEvent->CheckpointRequest.ConstructInPlace(*checkpointRequest);
Expand All @@ -1157,6 +1125,11 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
Y_ABORT_UNLESS(ContinueRunEvent->CheckpointRequest->Checkpoint.GetId() == checkpointRequest->GetId());
}
}
if (auto watermarkRequest = WatermarksTracker.GetPendingWatermark()) {
Y_ENSURE(*watermarkRequest >= ContinueRunEvent->WatermarkRequest);
ContinueRunEvent->WatermarkRequest = *watermarkRequest;
MetricsReporter.ReportInjectedToTaskRunnerWatermark(*watermarkRequest);
}

if (!UseCpuQuota()) {
Send(TaskRunnerActorId, ContinueRunEvent.release());
Expand Down Expand Up @@ -1225,9 +1198,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
TMaybe<TInstant> Watermark;
};
THashMap<ui64, TTakeInputChannelData> TakeInputChannelDataRequests;
// Watermark should be injected to task runner only after all precending data is injected
// This hash map will help to track the right moment
THashMap<TInstant, ui32> WatermarkTakeInputChannelDataRequests;
ui64 Cookie = 0;
NDq::TDqTaskRunnerStatsView TaskRunnerStats;
bool ReadyToCheckpointFlag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ struct TComputeActorAsyncInputHelper {
}

void ResumeByWatermark(TInstant watermark) {
YQL_ENSURE(watermark == PendingWatermark);
PendingWatermark = Nothing();
if (watermark >= PendingWatermark) {
PendingWatermark = Nothing();
}
}

virtual i64 GetFreeSpace() const = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ enum class EResumeSource : ui32 {
CADataSent,
CAPendingOutput,
CATaskRunnerCreated,
CAWatermarkInject,

Last,
};
Expand Down
51 changes: 24 additions & 27 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
sourceInfo.ResumeByWatermark(watermark);
}

// XXX Does nothing in async CA, not used (yet) in sync CA
for (auto& [id, channelInfo] : InputChannelsMap) {
if (channelInfo.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) {
continue;
Expand Down Expand Up @@ -839,7 +840,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
ui32 SrcStageId;
IDqInputChannel::TPtr Channel;
bool HasPeer = false;
std::queue<TInstant> PendingWatermarks;
const NDqProto::EWatermarksMode WatermarksMode;
std::optional<NDqProto::TCheckpoint> PendingCheckpoint;
const NDqProto::ECheckpointingMode CheckpointingMode;
Expand All @@ -860,38 +860,36 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
}

bool IsPaused() const {
return !PendingWatermarks.empty() || PendingCheckpoint.has_value();
return PendingCheckpoint.has_value();
}

void Pause(TInstant watermark) {
YQL_ENSURE(WatermarksMode != NDqProto::WATERMARKS_MODE_DISABLED);

PendingWatermarks.emplace(watermark);
if (Channel) {
Channel->AddWatermark(watermark);
}
}

void Pause(const NDqProto::TCheckpoint& checkpoint) {
YQL_ENSURE(!PendingCheckpoint);
YQL_ENSURE(CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED);
PendingCheckpoint = checkpoint;
if (Channel) { // async actor doesn't hold channels, so channel is paused in task runner actor
Channel->Pause();
Channel->PauseByCheckpoint();
}
}

void ResumeByWatermark(TInstant watermark) {
while (!PendingWatermarks.empty() && PendingWatermarks.front() <= watermark) {
if (PendingWatermarks.front() != watermark) {
CA_LOG_W("Input channel " << ChannelId <<
" watermarks were collapsed. See YQ-1441. Dropped watermark: " << PendingWatermarks.front());
}
PendingWatermarks.pop();
if (Channel) {
Channel->ResumeByWatermark(watermark);
}
}

void ResumeByCheckpoint() {
PendingCheckpoint.reset();
if (Channel) { // async actor doesn't hold channels, so channel is resumed in task runner actor
Channel->Resume();
Channel->ResumeByCheckpoint();
}
}

Expand Down Expand Up @@ -1609,37 +1607,42 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
void InitializeTask() {
for (ui32 i = 0; i < Task.InputsSize(); ++i) {
const auto& inputDesc = Task.GetInputs(i);
auto watermarksMode = NDqProto::WATERMARKS_MODE_DISABLED;
Y_ABORT_UNLESS(!inputDesc.HasSource() || inputDesc.ChannelsSize() == 0); // HasSource => no channels

if (inputDesc.HasTransform()) {
auto result = InputTransformsMap.emplace(
i,
TAsyncInputTransformHelper(LogPrefix, i, NDqProto::WATERMARKS_MODE_DISABLED)
);
YQL_ENSURE(result.second);
}

if (inputDesc.HasSource()) {
const auto watermarksMode = inputDesc.GetSource().GetWatermarksMode();
watermarksMode = inputDesc.GetSource().GetWatermarksMode();
auto result = SourcesMap.emplace(
i,
static_cast<TDerived*>(this)->CreateInputHelper(LogPrefix, i, watermarksMode)
);
YQL_ENSURE(result.second);
} else {
for (auto& channel : inputDesc.GetChannels()) {
auto channelWatermarksMode = channel.GetWatermarksMode();
if (channelWatermarksMode != NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) {
watermarksMode = channelWatermarksMode;
}
auto result = InputChannelsMap.emplace(
channel.GetId(),
TInputChannelInfo(
LogPrefix,
channel.GetId(),
channel.GetSrcStageId(),
channel.GetWatermarksMode(),
channelWatermarksMode,
channel.GetCheckpointingMode())
);
YQL_ENSURE(result.second);
}
}

if (inputDesc.HasTransform()) {
auto result = InputTransformsMap.emplace(
i,
TAsyncInputTransformHelper(LogPrefix, i, watermarksMode)
);
YQL_ENSURE(result.second);
}
}

for (ui32 i = 0; i < Task.OutputsSize(); ++i) {
Expand Down Expand Up @@ -1694,12 +1697,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
WatermarksTracker.RegisterInputChannel(id);
}
}

for (const auto& [id, channel] : OutputChannelsMap) {
if (channel.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DEFAULT) {
WatermarksTracker.RegisterOutputChannel(id);
}
}
}

virtual const NYql::NDq::TDqTaskRunnerStats* GetTaskRunnerStats() = 0;
Expand Down
16 changes: 10 additions & 6 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ void TDqComputeActorMetrics::ReportInjectedToOutputsWatermark(TInstant watermark
}

InjectedToOutputsWatermark->Set(watermark.MilliSeconds());
auto iter = WatermarkStartedAt.find(watermark);
if (iter != WatermarkStartedAt.end()) {
WatermarkCollectLatency->Collect((TInstant::Now() - iter->second).MilliSeconds());
WatermarkStartedAt.erase(iter);
auto iter = WatermarkStartedAt.upper_bound(watermark);
if (iter != WatermarkStartedAt.begin()) {
WatermarkCollectLatency->Collect((TInstant::Now() - WatermarkStartedAt.begin()->second).MilliSeconds());
WatermarkStartedAt.erase(WatermarkStartedAt.begin(), iter);
}
}

Expand All @@ -162,9 +162,13 @@ NMonitoring::TDynamicCounterPtr TDqComputeActorMetrics::GetInputChannelCounters(
}

void TDqComputeActorMetrics::ReportInputWatermarkMetrics(NMonitoring::TDynamicCounterPtr& counters, TInstant watermark) {
if (!Enable) {
return;
}
counters->GetCounter("watermark_ms")->Set(watermark.MilliSeconds());
if (!WatermarkStartedAt.contains(watermark)) {
WatermarkStartedAt[watermark] = TInstant::Now();
auto [it, inserted] = WatermarkStartedAt.emplace(watermark, TInstant::Zero());
if (inserted) {
it->second = TInstant::Now();
}
}

Expand Down
Loading
Loading