Skip to content

YQ-4314 supported script executions retries #21592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/control_plane_storage/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ TControlPlaneStorageConfig::TControlPlaneStorageConfig(const NConfig::TControlPl
auto retryPeriod = GetDuration(retryPolicy.GetRetryPeriod(), TDuration::Hours(1));
auto backoffPeriod = GetDuration(retryPolicy.GetBackoffPeriod(), TDuration::Zero());
for (const auto statusCode: mapping.GetStatusCode()) {
RetryPolicies.emplace(statusCode, TRetryPolicyItem(retryCount, retryLimit, retryPeriod, backoffPeriod));
RetryPolicies.emplace(statusCode, NKikimr::NKqp::TRetryPolicyItem(retryCount, retryLimit, retryPeriod, backoffPeriod));
}
}

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/fq/libs/control_plane_storage/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <ydb/core/fq/libs/config/protos/common.pb.h>
#include <ydb/core/fq/libs/config/protos/control_plane_storage.pb.h>
#include <ydb/core/kqp/proxy_service/kqp_script_execution_retries.h>
#include <yql/essentials/providers/common/proto/gateways_config.pb.h>
#include <ydb/public/api/protos/draft/fq.pb.h>

Expand All @@ -26,8 +27,8 @@ struct TControlPlaneStorageConfig {
TSet<FederatedQuery::ConnectionSetting::ConnectionCase> AvailableConnections;
TSet<FederatedQuery::BindingSetting::BindingCase> AvailableBindings;
ui64 GeneratorPathsLimit;
THashMap<ui64, TRetryPolicyItem> RetryPolicies;
TRetryPolicyItem TaskLeaseRetryPolicy;
THashMap<ui64, NKikimr::NKqp::TRetryPolicyItem> RetryPolicies;
NKikimr::NKqp::TRetryPolicyItem TaskLeaseRetryPolicy;
TDuration QuotaTtl;
TDuration MetricsTtl;
TSet<FederatedQuery::ConnectionSetting::ConnectionCase> AvailableStreamingConnections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TInMemoryControlPlaneStorageActor : public NActors::TActor<TInMemoryContro
};

struct TValue {
TRetryLimiter RetryLimiter;
NKikimr::NKqp::TRetryLimiter RetryLimiter;
TString Owner;
TInstant AssignedUntil;
TInstant LastSeenAt;
Expand Down Expand Up @@ -439,7 +439,7 @@ class TInMemoryControlPlaneStorageActor : public NActors::TActor<TInMemoryContro
if (ctx.Request.execute_mode() != FederatedQuery::SAVE) {
AddEntity(Jobs, {ctx.Scope, queryId, jobId}, {job});

TRetryLimiter retryLimiter;
NKikimr::NKqp::TRetryLimiter retryLimiter;
retryLimiter.Assign(0, ctx.StartTime, 0.0);

AddEntity(PendingQueries, {
Expand Down Expand Up @@ -1081,7 +1081,7 @@ class TInMemoryControlPlaneStorageActor : public NActors::TActor<TInMemoryContro
struct TTaskInternal {
TTask Task;
TString Owner;
TRetryLimiter RetryLimiter;
NKikimr::NKqp::TRetryLimiter RetryLimiter;
TString TenantName;
bool ShouldAbortTask;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace {

struct TTaskInternal {
TTask Task;
TRetryLimiter RetryLimiter;
NKikimr::NKqp::TRetryLimiter RetryLimiter;
bool ShouldAbortTask = false; // force ABORTED_BY_SYSTEM
bool ShouldSkipTask = false; // tenant fetch denied or tenant must be changed
TString TablePathPrefix;
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ TYdbControlPlaneStorageActor::TPingTaskParams TYdbControlPlaneStorageActor::Cons
jobId = *parser.ColumnParser(JOB_ID_COLUMN_NAME).GetOptionalString();
}

TRetryLimiter retryLimiter;
NKikimr::NKqp::TRetryLimiter retryLimiter;
{
TResultSetParser parser(resultSets[2]);
if (!parser.TryNextRow()) {
Expand Down Expand Up @@ -330,7 +330,7 @@ NYql::TIssues TControlPlaneStorageBase::ValidateRequest(TEvControlPlaneStorage::
void TControlPlaneStorageBase::UpdateTaskInfo(
NActors::TActorSystem* actorSystem, Fq::Private::PingTaskRequest& request, const std::shared_ptr<TFinalStatus>& finalStatus, FederatedQuery::Query& query,
FederatedQuery::Internal::QueryInternal& internal, FederatedQuery::Job& job, TString& owner,
TRetryLimiter& retryLimiter, TDuration& backoff, TInstant& expireAt) const
NKikimr::NKqp::TRetryLimiter& retryLimiter, TDuration& backoff, TInstant& expireAt) const
{
TMaybe<FederatedQuery::QueryMeta::ComputeStatus> queryStatus;
if (request.status() != FederatedQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED) {
Expand All @@ -357,7 +357,7 @@ void TControlPlaneStorageBase::UpdateTaskInfo(
internal.clear_operation_id();
}

TRetryPolicyItem policy(0, 0, TDuration::Seconds(1), TDuration::Zero());
NKikimr::NKqp::TRetryPolicyItem policy(0, 0, TDuration::Seconds(1), TDuration::Zero());
auto it = Config->RetryPolicies.find(request.status_code());
auto policyFound = it != Config->RetryPolicies.end();
if (policyFound) {
Expand All @@ -376,7 +376,7 @@ void TControlPlaneStorageBase::UpdateTaskInfo(
if (retryLimiter.UpdateOnRetry(now, policy) && now < executionDeadline) {
queryStatus.Clear();
// failing query is throttled for backoff period
backoff = policy.BackoffPeriod * (retryLimiter.RetryRate + 1);
backoff = retryLimiter.Backoff;
owner = "";
if (!transientIssues) {
transientIssues.ConstructInPlace();
Expand Down Expand Up @@ -622,7 +622,7 @@ void TControlPlaneStorageBase::UpdateTaskInfo(

void TControlPlaneStorageBase::FillQueryStatistics(
const std::shared_ptr<TFinalStatus>& finalStatus, const FederatedQuery::Query& query,
const FederatedQuery::Internal::QueryInternal& internal, const TRetryLimiter& retryLimiter) const
const FederatedQuery::Internal::QueryInternal& internal, const NKikimr::NKqp::TRetryLimiter& retryLimiter) const
{
finalStatus->FinalStatistics = ExtractStatisticsFromProtobuf(internal.statistics());
finalStatus->FinalStatistics.push_back(std::make_pair("IsAutomatic", query.content().automatic()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ PEERDIR(
library/cpp/testing/unittest
library/cpp/json/yson
yql/essentials/public/udf/service/stub
yql/essentials/sql/pg_dummy
)

YQL_LAST_ABI_VERSION()
Expand Down
10 changes: 6 additions & 4 deletions ydb/core/fq/libs/control_plane_storage/internal/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ SRCS(

PEERDIR(
contrib/libs/fmt
ydb/library/actors/core
library/cpp/lwtrace/mon
library/cpp/monlib/service/pages
ydb/core/base
ydb/core/metering
ydb/core/fq/libs/common
ydb/core/fq/libs/compute/common
ydb/core/fq/libs/config
Expand All @@ -26,14 +24,18 @@ PEERDIR(
ydb/core/fq/libs/quota_manager/events
ydb/core/fq/libs/rate_limiter/events
ydb/core/fq/libs/ydb
ydb/core/kqp/opt
ydb/core/kqp/proxy_service
ydb/core/metering
ydb/core/mon
ydb/library/actors/core
ydb/library/protobuf_printer
ydb/library/security
yql/essentials/public/issue
yql/essentials/utils
ydb/public/lib/fq
ydb/public/sdk/cpp/src/client/scheme
ydb/public/sdk/cpp/src/client/value
yql/essentials/public/issue
yql/essentials/utils
)

YQL_LAST_ABI_VERSION()
Expand Down
40 changes: 0 additions & 40 deletions ydb/core/fq/libs/control_plane_storage/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,6 @@

namespace NFq {

TRetryLimiter::TRetryLimiter(ui64 retryCount, const TInstant& retryCounterUpdatedAt, double retryRate)
: RetryCount(retryCount), RetryCounterUpdatedAt(retryCounterUpdatedAt), RetryRate(retryRate) {
}

void TRetryLimiter::Assign(ui64 retryCount, const TInstant& retryCounterUpdatedAt, double retryRate) {
RetryCount = retryCount;
RetryCounterUpdatedAt = retryCounterUpdatedAt;
RetryRate = retryRate;
}

bool TRetryLimiter::UpdateOnRetry(const TInstant& lastSeenAt, const TRetryPolicyItem& policy, const TInstant now) {
auto lastPeriod = lastSeenAt - RetryCounterUpdatedAt;
if (lastPeriod >= policy.RetryPeriod) {
RetryRate = 0.0;
} else {
RetryRate += 1.0;
auto rate = lastPeriod / policy.RetryPeriod * policy.RetryCount;
if (RetryRate > rate) {
RetryRate -= rate;
} else {
RetryRate = 0.0;
}
}

bool shouldRetry = true;
if (RetryRate >= policy.RetryCount) {
shouldRetry = false;
LastError = TStringBuilder() << "failure rate " << RetryRate << " exceeds limit of " << policy.RetryCount;
} else if (policy.RetryLimit && RetryCount >= policy.RetryLimit) {
shouldRetry = false;
LastError = TStringBuilder() << "retry count reached limit of " << policy.RetryLimit;
}

if (shouldRetry) {
RetryCount++;
RetryCounterUpdatedAt = now;
}
return shouldRetry;
}

bool IsTerminalStatus(FederatedQuery::QueryMeta::ComputeStatus status)
{
return IsIn({ FederatedQuery::QueryMeta::ABORTED_BY_USER, FederatedQuery::QueryMeta::ABORTED_BY_SYSTEM,
Expand Down
24 changes: 0 additions & 24 deletions ydb/core/fq/libs/control_plane_storage/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,6 @@

namespace NFq {

class TRetryPolicyItem {
public:
TRetryPolicyItem() = default;
TRetryPolicyItem(ui64 retryCount, ui64 retryLimit, const TDuration& retryPeriod, const TDuration& backoffPeriod)
: RetryCount(retryCount), RetryLimit(retryLimit), RetryPeriod(retryPeriod), BackoffPeriod(backoffPeriod)
{ }
ui64 RetryCount = 0;
ui64 RetryLimit = 0;
TDuration RetryPeriod = TDuration::Zero();
TDuration BackoffPeriod = TDuration::Zero();
};

class TRetryLimiter {
public:
TRetryLimiter() = default;
TRetryLimiter(ui64 retryCount, const TInstant& retryCounterUpdatedAt, double retryRate);
void Assign(ui64 retryCount, const TInstant& retryCounterUpdatedAt, double retryRate);
bool UpdateOnRetry(const TInstant& lastSeenAt, const TRetryPolicyItem& policy, const TInstant now = Now());
ui64 RetryCount = 0;
TInstant RetryCounterUpdatedAt = TInstant::Zero();
double RetryRate = 0.0;
TString LastError;
};

bool IsTerminalStatus(FederatedQuery::QueryMeta::ComputeStatus status);

bool IsAbortedStatus(FederatedQuery::QueryMeta::ComputeStatus status);
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/fq/libs/control_plane_storage/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ PEERDIR(
ydb/core/fq/libs/quota_manager/events
ydb/core/fq/libs/shared_resources
ydb/core/fq/libs/ydb
ydb/core/kqp/opt
ydb/core/kqp/proxy_service
ydb/core/mon
ydb/library/db_pool
ydb/library/security
ydb/library/yql/providers/s3/path_generator
yql/essentials/public/issue
ydb/public/api/protos
ydb/public/sdk/cpp/adapters/issue
ydb/public/sdk/cpp/src/client/scheme
ydb/public/sdk/cpp/src/client/table
ydb/library/db_pool
ydb/library/yql/providers/s3/path_generator
yql/essentials/public/issue
)

YQL_LAST_ABI_VERSION()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <ydb/core/fq/libs/quota_manager/events/events.h>
#include <ydb/core/fq/libs/ydb/util.h>
#include <ydb/core/fq/libs/ydb/ydb.h>
#include <ydb/core/kqp/proxy_service/kqp_script_execution_retries.h>

namespace NFq {

Expand Down Expand Up @@ -690,11 +691,11 @@ class TControlPlaneStorageBase : public TControlPlaneStorageUtils {
void UpdateTaskInfo(
NActors::TActorSystem* actorSystem, Fq::Private::PingTaskRequest& request, const std::shared_ptr<TFinalStatus>& finalStatus, FederatedQuery::Query& query,
FederatedQuery::Internal::QueryInternal& internal, FederatedQuery::Job& job, TString& owner,
TRetryLimiter& retryLimiter, TDuration& backoff, TInstant& expireAt) const;
NKikimr::NKqp::TRetryLimiter& retryLimiter, TDuration& backoff, TInstant& expireAt) const;

void FillQueryStatistics(
const std::shared_ptr<TFinalStatus>& finalStatus, const FederatedQuery::Query& query,
const FederatedQuery::Internal::QueryInternal& internal, const TRetryLimiter& retryLimiter) const;
const FederatedQuery::Internal::QueryInternal& internal, const NKikimr::NKqp::TRetryLimiter& retryLimiter) const;

void Handle(TEvControlPlaneStorage::TEvFinalStatusReport::TPtr& ev);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/local_grpc/local_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class TContextBase : public NYdbGrpc::IRequestContextBase {
}

void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details = "") override {
NYql::TIssue issue{TStringBuilder() << "grpc code: " << code << ", msg: " << msg << " (" << details << ")"};
NYql::TIssue issue{TStringBuilder() << "grpc code: " << static_cast<i32>(code) << ", msg: " << msg << " (" << details << ")"};
issue.SetCode(code, NYql::ESeverity::TSeverityIds_ESeverityId_S_ERROR);
RaiseIssue(issue);
ReplyWithYdbStatus(Ydb::StatusIds::GENERIC_ERROR);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/query/rpc_execute_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class TExecuteScriptRPC : public TRpcRequestActor<TExecuteScriptRPC, TEvExecuteS
const auto& request = GetProtoRequest();

if (request->operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) {
issues.AddIssue("ExecuteScript must be asyncronous operation");
issues.AddIssue("ExecuteScript must be asynchronous operation");
return Reply(Ydb::StatusIds::BAD_REQUEST, issues);
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ struct TEvKqp {
TDuration ForgetAfter;
TDuration ResultsTtl;
TDuration ProgressStatsPeriod;
std::vector<NKikimrKqp::TScriptExecutionRetryState::TMapping> RetryMapping;
};

struct TEvScriptResponse : public TEventLocal<TEvScriptResponse, TKqpEvents::EvScriptResponse> {
Expand Down
Loading
Loading