Skip to content

Support cancellation and rejection states for building columns with default values #22558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,7 @@ message TIndexBuildControl {
optional string IndexName = 2;
optional uint64 SnapshotTxId = 3;
optional uint64 BuildIndexId = 4;
repeated NKikimrIndexBuilder.TColumnBuildSetting ColumnsToDrop = 5;
}

message TLockConfig {
Expand Down Expand Up @@ -1716,6 +1717,12 @@ message TAlterShards {
}
}

message TDropColumnBuild {
optional NKikimrIndexBuilder.TColumnBuildSettings Settings = 1;
optional uint64 SnapshotTxId = 2;
optional uint64 BuildIndexId = 3;
}

// Request for scheme modification
// Has only one of the operations
message TModifyScheme {
Expand Down Expand Up @@ -1788,6 +1795,7 @@ message TModifyScheme {
optional TDropBlockStoreVolume DropBlockStoreVolume = 60;

optional NKikimrIndexBuilder.TColumnBuildSettings InitiateColumnBuild = 61;
optional TDropColumnBuild DropColumnBuild = 86;

optional bool SuccessOnNotExist = 62;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/schemeshard/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ enum EOperationType {
ESchemeOpAlterExternalDataSource = 91;

ESchemeOpCreateColumnBuild = 92;
ESchemeOpDropColumnBuild = 125;

// View
ESchemeOpCreateView = 93;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__monitoring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,10 @@ struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBas
<< "UnlockTxStatus: " << NKikimrScheme::EStatus_Name(info.UnlockTxStatus) << Endl
<< "UnlockTxDone: " << (info.UnlockTxDone ? "DONE" : "not done") << Endl

<< "DropColumnsTxId: " << info.DropColumnsTxId << Endl
<< "DropColumnsTxStatus: " << NKikimrScheme::EStatus_Name(info.DropColumnsTxStatus) << Endl
<< "DropColumnsTxDone: " << (info.DropColumnsTxDone ? "DONE" : "not done") << Endl

<< "SnapshotStep: " << info.SnapshotStep << Endl
<< "SnapshotTxId: " << info.SnapshotTxId << Endl;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard__op_traits.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ EOperationClass GetOperationClass(NKikimrSchemeOp::EOperationType op) {
case NKikimrSchemeOp::EOperationType::ESchemeOpDropReplicationCascade:
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExtSubDomainCreateHive:
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnBuild:
case NKikimrSchemeOp::EOperationType::ESchemeOpDropColumnBuild:
case NKikimrSchemeOp::EOperationType::ESchemeOpRestoreMultipleIncrementalBackups:
case NKikimrSchemeOp::EOperationType::ESchemeOpRestoreIncrementalBackupAtTable:
case NKikimrSchemeOp::EOperationType::ESchemeOpBackupBackupCollection:
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,8 @@ TVector<ISubOperation::TPtr> TDefaultOperationFactory::MakeOperationParts(
return {CreateUpgradeSubDomainDecision(op.NextPartId(), tx)};
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnBuild:
return CreateBuildColumn(op.NextPartId(), tx, context);
case NKikimrSchemeOp::EOperationType::ESchemeOpDropColumnBuild:
return {DropBuildColumn(op.NextPartId(), tx, context)};
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexBuild:
return CreateBuildIndex(op.NextPartId(), tx, context);
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,11 @@ TVector<ISubOperation::TPtr> CancelBuildIndex(TOperationId nextId, const TTxTran
TPath table = TPath::Resolve(tablePath, context.SS);

TVector<ISubOperation::TPtr> result;

{
auto finalize = TransactionTemplate(table.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpFinalizeBuildIndexMainTable);
*finalize.MutableLockGuard() = tx.GetLockGuard();

auto op = finalize.MutableFinalizeBuildIndexMainTable();
op->SetTableName(table.LeafName());
op->SetSnapshotTxId(config.GetSnapshotTxId());
Expand All @@ -148,10 +150,7 @@ TVector<ISubOperation::TPtr> CancelBuildIndex(TOperationId nextId, const TTxTran
operation->SetName(index.Base()->Name);

result.push_back(CreateDropTableIndex(NextPartId(nextId, result), tableIndexDropping));
}

if (!indexName.empty()) {
TPath index = table.Child(indexName);
Y_ABORT_UNLESS(index.Base()->GetChildren().size() >= 1);
for (auto& indexChildItems : index.Base()->GetChildren()) {
const auto partId = NextPartId(nextId, result);
Expand All @@ -167,5 +166,26 @@ TVector<ISubOperation::TPtr> CancelBuildIndex(TOperationId nextId, const TTxTran
return result;
}

ISubOperation::TPtr DropBuildColumn(TOperationId id, const TTxTransaction& tx, TOperationContext& context) {
Y_ABORT_UNLESS(tx.GetOperationType() == NKikimrSchemeOp::EOperationType::ESchemeOpDropColumnBuild);

auto config = tx.GetDropColumnBuild();
TString tablePath = config.GetSettings().GetTable();

TPath table = TPath::Resolve(tablePath, context.SS);

auto mainTableAlter = TransactionTemplate(table.Parent().PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable);
*mainTableAlter.MutableLockGuard() = tx.GetLockGuard();
auto op = mainTableAlter.MutableAlterTable();
op->SetName(table.LeafName());

for (const auto& col : config.GetSettings().Getcolumn()) {
auto colInfo = op->AddDropColumns();
colInfo->SetName(col.GetColumnName());
}

return CreateAlterTable(id, mainTableAlter);
}

}
}
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation_part.h
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ ISubOperation::TPtr CreateDropTable(TOperationId id, TTxState::ETxState state);
bool CreateDropTable(TOperationId id, const TTxTransaction& tx, TOperationContext& context, TVector<ISubOperation::TPtr>& result);

TVector<ISubOperation::TPtr> CreateBuildColumn(TOperationId id, const TTxTransaction& tx, TOperationContext& context);
ISubOperation::TPtr DropBuildColumn(TOperationId id, const TTxTransaction& tx, TOperationContext& context);

TVector<ISubOperation::TPtr> CreateBuildIndex(TOperationId id, const TTxTransaction& tx, TOperationContext& context);
TVector<ISubOperation::TPtr> ApplyBuildIndex(TOperationId id, const TTxTransaction& tx, TOperationContext& context);
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_audit_log_fragment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ TString DefineUserOperationName(const NKikimrSchemeOp::TModifyScheme& tx) {
return "ALTER EXTERNAL DATA SOURCE";
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnBuild:
return "ALTER TABLE ADD COLUMN DEFAULT";
case NKikimrSchemeOp::EOperationType::ESchemeOpDropColumnBuild:
return "ALTER TABLE ADD COLUMN CANCEL";
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateView:
return "CREATE VIEW";
case NKikimrSchemeOp::EOperationType::ESchemeOpAlterView:
Expand Down Expand Up @@ -590,6 +592,9 @@ TVector<TString> ExtractChangingPaths(const NKikimrSchemeOp::TModifyScheme& tx)
case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnBuild:
result.emplace_back(tx.GetInitiateColumnBuild().GetTable());
break;
case NKikimrSchemeOp::EOperationType::ESchemeOpDropColumnBuild:
result.emplace_back(tx.GetDropColumnBuild().GetSettings().GetTable());
break;

case NKikimrSchemeOp::EOperationType::ESchemeOpCreateView:
result.emplace_back(NKikimr::JoinPath({tx.GetWorkingDir(), tx.GetCreateView().GetName()}));
Expand Down
16 changes: 16 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_build_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,21 @@ void TSchemeShard::PersistBuildIndexUnlockTxId(NIceDb::TNiceDb& db, const TIndex
NIceDb::TUpdate<Schema::IndexBuild::UnlockTxId>(indexInfo.UnlockTxId));
}

void TSchemeShard::PersistBuildIndexDropColumnsTxId(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) {
db.Table<Schema::IndexBuild>().Key(indexInfo.Id).Update(
NIceDb::TUpdate<Schema::IndexBuild::DropColumnsTxId>(indexInfo.DropColumnsTxId));
}

void TSchemeShard::PersistBuildIndexDropColumnsTxStatus(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) {
db.Table<Schema::IndexBuild>().Key(indexInfo.Id).Update(
NIceDb::TUpdate<Schema::IndexBuild::DropColumnsTxStatus>(indexInfo.DropColumnsTxStatus));
}

void TSchemeShard::PersistBuildIndexDropColumnsTxDone(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) {
db.Table<Schema::IndexBuild>().Key(indexInfo.Id).Update(
NIceDb::TUpdate<Schema::IndexBuild::DropColumnsTxDone>(indexInfo.DropColumnsTxDone));
}

void TSchemeShard::PersistBuildIndexProcessed(NIceDb::TNiceDb& db, const TIndexBuildInfo& indexInfo) {
db.Table<Schema::IndexBuild>().Key(indexInfo.Id).Update(
NIceDb::TUpdate<Schema::IndexBuild::UploadRowsProcessed>(indexInfo.Processed.GetUploadRows()),
Expand Down Expand Up @@ -423,6 +438,7 @@ void TSchemeShard::SetupRouting(const TDeque<TIndexBuildId>& indexIds, const TAc
handle(buildInfo.InitiateTxId);
handle(buildInfo.ApplyTxId);
handle(buildInfo.UnlockTxId);
handle(buildInfo.DropColumnsTxId);
}
}

Expand Down
122 changes: 117 additions & 5 deletions ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,33 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CancelPropose(
return propose;
}

THolder<TEvSchemeShard::TEvModifySchemeTransaction> DropColumnsPropose(
TSchemeShard* ss, const TIndexBuildInfo& buildInfo)
{
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(buildInfo.DropColumnsTxId), ss->TabletID());
propose->Record.SetFailOnExist(true);

NKikimrSchemeOp::TModifyScheme& modifyScheme = *propose->Record.AddTransaction();
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpDropColumnBuild);
modifyScheme.SetInternal(true);
modifyScheme.SetWorkingDir(TPath::Init(buildInfo.DomainPathId, ss).PathString());
modifyScheme.MutableLockGuard()->SetOwnerTxId(ui64(buildInfo.LockTxId));

auto* columnBuild = modifyScheme.MutableDropColumnBuild();
columnBuild->SetSnapshotTxId(ui64(buildInfo.InitiateTxId));
columnBuild->SetBuildIndexId(ui64(buildInfo.Id));

auto* settings = columnBuild->MutableSettings();
settings->SetTable(TPath::Init(buildInfo.TablePathId, ss).PathString());

buildInfo.SerializeToProto(ss, settings);

LOG_DEBUG_S((TlsActivationContext->AsActorContext()), NKikimrServices::BUILD_INDEX,
"DropColumnsPropose " << buildInfo.Id << " " << buildInfo.State << " " << propose->Record.ShortDebugString());

return propose;
}

using namespace NTabletFlatExecutor;

struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuilder::TTxBase {
Expand Down Expand Up @@ -1266,9 +1293,13 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
break;
case TIndexBuildInfo::EState::Filling: {
if (buildInfo.IsCancellationRequested() || FillIndex(txc, buildInfo)) {
auto cancelState = buildInfo.IsBuildColumns()
? TIndexBuildInfo::EState::Cancellation_DroppingColumns
: TIndexBuildInfo::EState::Cancellation_Applying;

ClearAfterFill(ctx, buildInfo);
ChangeState(BuildId, buildInfo.IsCancellationRequested()
? TIndexBuildInfo::EState::Cancellation_Applying
? cancelState
: TIndexBuildInfo::EState::Applying);
Progress(BuildId);

Expand Down Expand Up @@ -1377,6 +1408,18 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
SendNotificationsIfFinished(buildInfo);
// stay calm keep status/issues
break;
case TIndexBuildInfo::EState::Cancellation_DroppingColumns:
if (buildInfo.DropColumnsTxId == InvalidTxId) {
AllocateTxId(BuildId);
} else if (buildInfo.DropColumnsTxStatus == NKikimrScheme::StatusSuccess) {
Send(Self->SelfId(), DropColumnsPropose(Self, buildInfo), 0, ui64(BuildId));
} else if (!buildInfo.DropColumnsTxDone) {
Send(Self->SelfId(), MakeHolder<TEvSchemeShard::TEvNotifyTxCompletion>(ui64(buildInfo.DropColumnsTxId)));
} else {
ChangeState(BuildId, TIndexBuildInfo::EState::Cancellation_Applying);
Progress(BuildId);
}
break;
case TIndexBuildInfo::EState::Cancellation_Applying:
if (buildInfo.ApplyTxId == InvalidTxId) {
AllocateTxId(BuildId);
Expand Down Expand Up @@ -1405,6 +1448,18 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
SendNotificationsIfFinished(buildInfo);
// stay calm keep status/issues
break;
case TIndexBuildInfo::EState::Rejection_DroppingColumns:
if (buildInfo.DropColumnsTxId == InvalidTxId) {
AllocateTxId(BuildId);
} else if (buildInfo.DropColumnsTxStatus == NKikimrScheme::StatusSuccess) {
Send(Self->SelfId(), DropColumnsPropose(Self, buildInfo), 0, ui64(BuildId));
} else if (!buildInfo.DropColumnsTxDone) {
Send(Self->SelfId(), MakeHolder<TEvSchemeShard::TEvNotifyTxCompletion>(ui64(buildInfo.DropColumnsTxId)));
} else {
ChangeState(BuildId, TIndexBuildInfo::EState::Rejection_Applying);
Progress(BuildId);
}
break;
case TIndexBuildInfo::EState::Rejection_Applying:
if (buildInfo.ApplyTxId == InvalidTxId) {
AllocateTxId(BuildId);
Expand Down Expand Up @@ -1457,7 +1512,14 @@ struct TSchemeShard::TIndexBuilder::TTxProgress: public TSchemeShard::TIndexBuil
return;
}

buildInfo->State = TIndexBuildInfo::EState::Rejection_Applying;
if (buildInfo->IsBuildIndex()) {
buildInfo->State = TIndexBuildInfo::EState::Rejection_Applying;
} else if (buildInfo->IsBuildColumns()) {
buildInfo->State = TIndexBuildInfo::EState::Rejection_DroppingColumns;
} else {
Y_ENSURE(false, "TTxBuildProgress: OnUnhandledException: unknown build type");
}

Self->PersistBuildIndexState(db, *buildInfo);
Self->Execute(Self->CreateTxProgress(buildInfo->Id), ctx);
}
Expand Down Expand Up @@ -1598,7 +1660,14 @@ struct TSchemeShard::TIndexBuilder::TTxReply: public TSchemeShard::TIndexBuilder
return;
}

buildInfo->State = TIndexBuildInfo::EState::Rejection_Applying;
if (buildInfo->IsBuildIndex()) {
buildInfo->State = TIndexBuildInfo::EState::Rejection_Applying;
} else if (buildInfo->IsBuildColumns()) {
buildInfo->State = TIndexBuildInfo::EState::Rejection_DroppingColumns;
} else {
Y_ENSURE(false, "TTxReply : OnUnhandledException: unknown build type");
}

Self->PersistBuildIndexState(db, *buildInfo);
Self->Execute(Self->CreateTxProgress(buildInfo->Id), ctx);
}
Expand Down Expand Up @@ -1770,7 +1839,15 @@ struct TTxShardReply: public TSchemeShard::TIndexBuilder::TTxReply {
<< ", shardIdx: " << shardIdx);
Self->PersistBuildIndexShardStatus(db, BuildId, shardIdx, shardStatus);
Self->IndexBuildPipes.Close(BuildId, shardId, ctx);
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);

if (buildInfo.IsBuildIndex()) {
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
} else if (buildInfo.IsBuildColumns()) {
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_DroppingColumns);
} else {
Y_ENSURE(false, "TTxShardReply : DoExecute: unknown build type");
}

Progress(BuildId);
return true;
}
Expand Down Expand Up @@ -1944,7 +2021,15 @@ struct TSchemeShard::TIndexBuilder::TTxReplyUploadSample: public TSchemeShard::T
NYql::TIssues issues;
NYql::IssuesFromMessage(record.GetIssues(), issues);
Self->PersistBuildIndexAddIssue(db, buildInfo, issues.ToString());
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);

if (buildInfo.IsBuildIndex()) {
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_Applying);
} else if (buildInfo.IsBuildColumns()) {
ChangeState(buildInfo.Id, TIndexBuildInfo::EState::Rejection_DroppingColumns);
} else {
Y_ENSURE(false, "TTxReplyUploadSample : DoExecute: unknown build type");
}

Progress(BuildId);
}

Expand Down Expand Up @@ -2057,6 +2142,15 @@ struct TSchemeShard::TIndexBuilder::TTxReplyCompleted: public TSchemeShard::TInd
Self->PersistBuildIndexInitiateTxDone(db, buildInfo);
break;
}
case TIndexBuildInfo::EState::Cancellation_DroppingColumns:
case TIndexBuildInfo::EState::Rejection_DroppingColumns:
{
Y_ENSURE(txId == buildInfo.DropColumnsTxId);

buildInfo.DropColumnsTxDone = true;
Self->PersistBuildIndexDropColumnsTxDone(db, buildInfo);
break;
}
case TIndexBuildInfo::EState::DropBuild:
case TIndexBuildInfo::EState::CreateBuild:
case TIndexBuildInfo::EState::LockBuild:
Expand Down Expand Up @@ -2243,6 +2337,17 @@ struct TSchemeShard::TIndexBuilder::TTxReplyModify: public TSchemeShard::TIndexB
ifErrorMoveTo(TIndexBuildInfo::EState::Rejection_Unlocking);
break;
}
case TIndexBuildInfo::EState::Cancellation_DroppingColumns:
case TIndexBuildInfo::EState::Rejection_DroppingColumns:
{
Y_ENSURE(txId == buildInfo.DropColumnsTxId);

buildInfo.DropColumnsTxStatus = record.GetStatus();
Self->PersistBuildIndexDropColumnsTxStatus(db, buildInfo);

ifErrorMoveTo(TIndexBuildInfo::EState::Cancellation_Applying);
break;
}
case TIndexBuildInfo::EState::Cancellation_Applying:
{
Y_ENSURE(txId == buildInfo.ApplyTxId);
Expand Down Expand Up @@ -2349,6 +2454,13 @@ struct TSchemeShard::TIndexBuilder::TTxReplyAllocate: public TSchemeShard::TInde
Self->PersistBuildIndexApplyTxId(db, buildInfo);
}
break;
case TIndexBuildInfo::EState::Cancellation_DroppingColumns:
case TIndexBuildInfo::EState::Rejection_DroppingColumns:
if (!buildInfo.DropColumnsTxId) {
buildInfo.DropColumnsTxId = txId;
Self->PersistBuildIndexDropColumnsTxId(db, buildInfo);
}
break;
case TIndexBuildInfo::EState::Unlocking:
case TIndexBuildInfo::EState::Cancellation_Unlocking:
case TIndexBuildInfo::EState::Rejection_Unlocking:
Expand Down
Loading
Loading