Skip to content

Commit 321dc7b

Browse files
GrigoriyPACopilot
andauthored
YQ-4314 supported script executions retries (#21592)
Co-authored-by: Copilot <[email protected]>
1 parent 1e43a9d commit 321dc7b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+3018
-899
lines changed

ydb/core/fq/libs/control_plane_storage/config.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPl
5454
auto retryPeriod = GetDuration(retryPolicy.GetRetryPeriod(), TDuration::Hours(1));
5555
auto backoffPeriod = GetDuration(retryPolicy.GetBackoffPeriod(), TDuration::Zero());
5656
for (const auto statusCode: mapping.GetStatusCode()) {
57-
RetryPolicies.emplace(statusCode, TRetryPolicyItem(retryCount, retryLimit, retryPeriod, backoffPeriod));
57+
RetryPolicies.emplace(statusCode, NKikimr::NKqp::TRetryPolicyItem(retryCount, retryLimit, retryPeriod, backoffPeriod));
5858
}
5959
}
6060

ydb/core/fq/libs/control_plane_storage/config.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <ydb/core/fq/libs/config/protos/common.pb.h>
66
#include <ydb/core/fq/libs/config/protos/control_plane_storage.pb.h>
7+
#include <ydb/core/kqp/proxy_service/kqp_script_execution_retries.h>
78
#include <yql/essentials/providers/common/proto/gateways_config.pb.h>
89
#include <ydb/public/api/protos/draft/fq.pb.h>
910

@@ -26,8 +27,8 @@ struct TControlPlaneStorageConfig {
2627
TSet<FederatedQuery::ConnectionSetting::ConnectionCase> AvailableConnections;
2728
TSet<FederatedQuery::BindingSetting::BindingCase> AvailableBindings;
2829
ui64 GeneratorPathsLimit;
29-
THashMap<ui64, TRetryPolicyItem> RetryPolicies;
30-
TRetryPolicyItem TaskLeaseRetryPolicy;
30+
THashMap<ui64, NKikimr::NKqp::TRetryPolicyItem> RetryPolicies;
31+
NKikimr::NKqp::TRetryPolicyItem TaskLeaseRetryPolicy;
3132
TDuration QuotaTtl;
3233
TDuration MetricsTtl;
3334
TSet<FederatedQuery::ConnectionSetting::ConnectionCase> AvailableStreamingConnections;

ydb/core/fq/libs/control_plane_storage/in_memory_control_plane_storage.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class TInMemoryControlPlaneStorageActor : public NActors::TActor<TInMemoryContro
4343
};
4444

4545
struct TValue {
46-
TRetryLimiter RetryLimiter;
46+
NKikimr::NKqp::TRetryLimiter RetryLimiter;
4747
TString Owner;
4848
TInstant AssignedUntil;
4949
TInstant LastSeenAt;
@@ -439,7 +439,7 @@ class TInMemoryControlPlaneStorageActor : public NActors::TActor<TInMemoryContro
439439
if (ctx.Request.execute_mode() != FederatedQuery::SAVE) {
440440
AddEntity(Jobs, {ctx.Scope, queryId, jobId}, {job});
441441

442-
TRetryLimiter retryLimiter;
442+
NKikimr::NKqp::TRetryLimiter retryLimiter;
443443
retryLimiter.Assign(0, ctx.StartTime, 0.0);
444444

445445
AddEntity(PendingQueries, {
@@ -1081,7 +1081,7 @@ class TInMemoryControlPlaneStorageActor : public NActors::TActor<TInMemoryContro
10811081
struct TTaskInternal {
10821082
TTask Task;
10831083
TString Owner;
1084-
TRetryLimiter RetryLimiter;
1084+
NKikimr::NKqp::TRetryLimiter RetryLimiter;
10851085
TString TenantName;
10861086
bool ShouldAbortTask;
10871087
};

ydb/core/fq/libs/control_plane_storage/internal/task_get.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ namespace {
1818

1919
struct TTaskInternal {
2020
TTask Task;
21-
TRetryLimiter RetryLimiter;
21+
NKikimr::NKqp::TRetryLimiter RetryLimiter;
2222
bool ShouldAbortTask = false; // force ABORTED_BY_SYSTEM
2323
bool ShouldSkipTask = false; // tenant fetch denied or tenant must be changed
2424
TString TablePathPrefix;

ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ TYdbControlPlaneStorageActor::TPingTaskParams TYdbControlPlaneStorageActor::Cons
109109
jobId = *parser.ColumnParser(JOB_ID_COLUMN_NAME).GetOptionalString();
110110
}
111111

112-
TRetryLimiter retryLimiter;
112+
NKikimr::NKqp::TRetryLimiter retryLimiter;
113113
{
114114
TResultSetParser parser(resultSets[2]);
115115
if (!parser.TryNextRow()) {
@@ -330,7 +330,7 @@ NYql::TIssues TControlPlaneStorageBase::ValidateRequest(TEvControlPlaneStorage::
330330
void TControlPlaneStorageBase::UpdateTaskInfo(
331331
NActors::TActorSystem* actorSystem, Fq::Private::PingTaskRequest& request, const std::shared_ptr<TFinalStatus>& finalStatus, FederatedQuery::Query& query,
332332
FederatedQuery::Internal::QueryInternal& internal, FederatedQuery::Job& job, TString& owner,
333-
TRetryLimiter& retryLimiter, TDuration& backoff, TInstant& expireAt) const
333+
NKikimr::NKqp::TRetryLimiter& retryLimiter, TDuration& backoff, TInstant& expireAt) const
334334
{
335335
TMaybe<FederatedQuery::QueryMeta::ComputeStatus> queryStatus;
336336
if (request.status() != FederatedQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED) {
@@ -357,7 +357,7 @@ void TControlPlaneStorageBase::UpdateTaskInfo(
357357
internal.clear_operation_id();
358358
}
359359

360-
TRetryPolicyItem policy(0, 0, TDuration::Seconds(1), TDuration::Zero());
360+
NKikimr::NKqp::TRetryPolicyItem policy(0, 0, TDuration::Seconds(1), TDuration::Zero());
361361
auto it = Config->RetryPolicies.find(request.status_code());
362362
auto policyFound = it != Config->RetryPolicies.end();
363363
if (policyFound) {
@@ -376,7 +376,7 @@ void TControlPlaneStorageBase::UpdateTaskInfo(
376376
if (retryLimiter.UpdateOnRetry(now, policy) && now < executionDeadline) {
377377
queryStatus.Clear();
378378
// failing query is throttled for backoff period
379-
backoff = policy.BackoffPeriod * (retryLimiter.RetryRate + 1);
379+
backoff = retryLimiter.Backoff;
380380
owner = "";
381381
if (!transientIssues) {
382382
transientIssues.ConstructInPlace();
@@ -622,7 +622,7 @@ void TControlPlaneStorageBase::UpdateTaskInfo(
622622

623623
void TControlPlaneStorageBase::FillQueryStatistics(
624624
const std::shared_ptr<TFinalStatus>& finalStatus, const FederatedQuery::Query& query,
625-
const FederatedQuery::Internal::QueryInternal& internal, const TRetryLimiter& retryLimiter) const
625+
const FederatedQuery::Internal::QueryInternal& internal, const NKikimr::NKqp::TRetryLimiter& retryLimiter) const
626626
{
627627
finalStatus->FinalStatistics = ExtractStatisticsFromProtobuf(internal.statistics());
628628
finalStatus->FinalStatistics.push_back(std::make_pair("IsAutomatic", query.content().automatic()));

ydb/core/fq/libs/control_plane_storage/internal/ut/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ PEERDIR(
88
library/cpp/testing/unittest
99
library/cpp/json/yson
1010
yql/essentials/public/udf/service/stub
11+
yql/essentials/sql/pg_dummy
1112
)
1213

1314
YQL_LAST_ABI_VERSION()

ydb/core/fq/libs/control_plane_storage/internal/ya.make

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,9 @@ SRCS(
1212

1313
PEERDIR(
1414
contrib/libs/fmt
15-
ydb/library/actors/core
1615
library/cpp/lwtrace/mon
1716
library/cpp/monlib/service/pages
1817
ydb/core/base
19-
ydb/core/metering
2018
ydb/core/fq/libs/common
2119
ydb/core/fq/libs/compute/common
2220
ydb/core/fq/libs/config
@@ -26,14 +24,18 @@ PEERDIR(
2624
ydb/core/fq/libs/quota_manager/events
2725
ydb/core/fq/libs/rate_limiter/events
2826
ydb/core/fq/libs/ydb
27+
ydb/core/kqp/opt
28+
ydb/core/kqp/proxy_service
29+
ydb/core/metering
2930
ydb/core/mon
31+
ydb/library/actors/core
3032
ydb/library/protobuf_printer
3133
ydb/library/security
32-
yql/essentials/public/issue
33-
yql/essentials/utils
3434
ydb/public/lib/fq
3535
ydb/public/sdk/cpp/src/client/scheme
3636
ydb/public/sdk/cpp/src/client/value
37+
yql/essentials/public/issue
38+
yql/essentials/utils
3739
)
3840

3941
YQL_LAST_ABI_VERSION()

ydb/core/fq/libs/control_plane_storage/util.cpp

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,46 +5,6 @@
55

66
namespace NFq {
77

8-
TRetryLimiter::TRetryLimiter(ui64 retryCount, const TInstant& retryCounterUpdatedAt, double retryRate)
9-
: RetryCount(retryCount), RetryCounterUpdatedAt(retryCounterUpdatedAt), RetryRate(retryRate) {
10-
}
11-
12-
void TRetryLimiter::Assign(ui64 retryCount, const TInstant& retryCounterUpdatedAt, double retryRate) {
13-
RetryCount = retryCount;
14-
RetryCounterUpdatedAt = retryCounterUpdatedAt;
15-
RetryRate = retryRate;
16-
}
17-
18-
bool TRetryLimiter::UpdateOnRetry(const TInstant& lastSeenAt, const TRetryPolicyItem& policy, const TInstant now) {
19-
auto lastPeriod = lastSeenAt - RetryCounterUpdatedAt;
20-
if (lastPeriod >= policy.RetryPeriod) {
21-
RetryRate = 0.0;
22-
} else {
23-
RetryRate += 1.0;
24-
auto rate = lastPeriod / policy.RetryPeriod * policy.RetryCount;
25-
if (RetryRate > rate) {
26-
RetryRate -= rate;
27-
} else {
28-
RetryRate = 0.0;
29-
}
30-
}
31-
32-
bool shouldRetry = true;
33-
if (RetryRate >= policy.RetryCount) {
34-
shouldRetry = false;
35-
LastError = TStringBuilder() << "failure rate " << RetryRate << " exceeds limit of " << policy.RetryCount;
36-
} else if (policy.RetryLimit && RetryCount >= policy.RetryLimit) {
37-
shouldRetry = false;
38-
LastError = TStringBuilder() << "retry count reached limit of " << policy.RetryLimit;
39-
}
40-
41-
if (shouldRetry) {
42-
RetryCount++;
43-
RetryCounterUpdatedAt = now;
44-
}
45-
return shouldRetry;
46-
}
47-
488
bool IsTerminalStatus(FederatedQuery::QueryMeta::ComputeStatus status)
499
{
5010
return IsIn({ FederatedQuery::QueryMeta::ABORTED_BY_USER, FederatedQuery::QueryMeta::ABORTED_BY_SYSTEM,

ydb/core/fq/libs/control_plane_storage/util.h

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,6 @@
1212

1313
namespace NFq {
1414

15-
class TRetryPolicyItem {
16-
public:
17-
TRetryPolicyItem() = default;
18-
TRetryPolicyItem(ui64 retryCount, ui64 retryLimit, const TDuration& retryPeriod, const TDuration& backoffPeriod)
19-
: RetryCount(retryCount), RetryLimit(retryLimit), RetryPeriod(retryPeriod), BackoffPeriod(backoffPeriod)
20-
{ }
21-
ui64 RetryCount = 0;
22-
ui64 RetryLimit = 0;
23-
TDuration RetryPeriod = TDuration::Zero();
24-
TDuration BackoffPeriod = TDuration::Zero();
25-
};
26-
27-
class TRetryLimiter {
28-
public:
29-
TRetryLimiter() = default;
30-
TRetryLimiter(ui64 retryCount, const TInstant& retryCounterUpdatedAt, double retryRate);
31-
void Assign(ui64 retryCount, const TInstant& retryCounterUpdatedAt, double retryRate);
32-
bool UpdateOnRetry(const TInstant& lastSeenAt, const TRetryPolicyItem& policy, const TInstant now = Now());
33-
ui64 RetryCount = 0;
34-
TInstant RetryCounterUpdatedAt = TInstant::Zero();
35-
double RetryRate = 0.0;
36-
TString LastError;
37-
};
38-
3915
bool IsTerminalStatus(FederatedQuery::QueryMeta::ComputeStatus status);
4016

4117
bool IsAbortedStatus(FederatedQuery::QueryMeta::ComputeStatus status);

ydb/core/fq/libs/control_plane_storage/ya.make

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,17 @@ PEERDIR(
3434
ydb/core/fq/libs/quota_manager/events
3535
ydb/core/fq/libs/shared_resources
3636
ydb/core/fq/libs/ydb
37+
ydb/core/kqp/opt
38+
ydb/core/kqp/proxy_service
3739
ydb/core/mon
3840
ydb/library/db_pool
3941
ydb/library/security
4042
ydb/library/yql/providers/s3/path_generator
41-
yql/essentials/public/issue
4243
ydb/public/api/protos
4344
ydb/public/sdk/cpp/adapters/issue
4445
ydb/public/sdk/cpp/src/client/scheme
4546
ydb/public/sdk/cpp/src/client/table
46-
ydb/library/db_pool
47-
ydb/library/yql/providers/s3/path_generator
47+
yql/essentials/public/issue
4848
)
4949

5050
YQL_LAST_ABI_VERSION()

0 commit comments

Comments
 (0)