Skip to content

Commit 910acf1

Browse files
Fix bug when importing a table with a non Uint primary key and a changefeed (#25528)
1 parent 7a8af56 commit 910acf1

File tree

4 files changed

+86
-40
lines changed

4 files changed

+86
-40
lines changed

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -288,16 +288,33 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
288288
cdcStream.SetRetentionPeriodSeconds(topic.retention_period().seconds());
289289
}
290290

291+
auto tableDesc = GetTableDescription(ss, dstPath->PathId);
292+
Y_ABORT_UNLESS(!tableDesc.GetKeyColumnIds().empty());
293+
const auto& keyId = tableDesc.GetKeyColumnIds()[0];
294+
bool isPartitioningAvailable = false;
295+
296+
// Explicit specification of the number of partitions when creating CDC
297+
// is possible only if the first component of the primary key
298+
// of the source table is Uint32 or Uint64
299+
for (const auto& column : tableDesc.GetColumns()) {
300+
if (column.GetId() == keyId) {
301+
isPartitioningAvailable = column.GetType() == "Uint32" || column.GetType() == "Uint64";
302+
break;
303+
}
304+
}
305+
291306
if (topic.has_partitioning_settings()) {
292-
i64 minActivePartitions =
293-
topic.partitioning_settings().min_active_partitions();
294-
if (minActivePartitions < 0) {
295-
error = "minActivePartitions must be >= 0";
296-
return nullptr;
297-
} else if (minActivePartitions == 0) {
298-
minActivePartitions = 1;
307+
if (isPartitioningAvailable) {
308+
i64 minActivePartitions =
309+
topic.partitioning_settings().min_active_partitions();
310+
if (minActivePartitions < 0) {
311+
error = "minActivePartitions must be >= 0";
312+
return nullptr;
313+
} else if (minActivePartitions == 0) {
314+
minActivePartitions = 1;
315+
}
316+
cdcStream.SetTopicPartitions(minActivePartitions);
299317
}
300-
cdcStream.SetTopicPartitions(minActivePartitions);
301318

302319
if (topic.partitioning_settings().has_auto_partitioning_settings()) {
303320
auto& partitioningSettings = topic.partitioning_settings().auto_partitioning_settings();

ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,6 +1181,18 @@ void NoMaxPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record)
11811181
UNIT_ASSERT(!record.GetPathDescription().GetTable().GetPartitionConfig().GetPartitioningPolicy().HasMaxPartitionsCount());
11821182
}
11831183

1184+
TCheckFunc MinTopicPartitionsCountEqual(ui32 count) {
1185+
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
1186+
UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetPartitionStrategy().GetMinPartitionCount(), count);
1187+
};
1188+
}
1189+
1190+
TCheckFunc MaxTopicPartitionsCountEqual(ui32 count) {
1191+
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
1192+
UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetPartitionStrategy().GetMaxPartitionCount(), count);
1193+
};
1194+
}
1195+
11841196
TCheckFunc PartitioningByLoadStatus(bool status) {
11851197
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
11861198
UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetTable().GetPartitionConfig().GetPartitioningPolicy().GetSplitByLoadSettings().GetEnabled(), status);

ydb/core/tx/schemeshard/ut_helpers/ls_checks.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ namespace NLs {
138138
TCheckFunc MaxPartitionsCountEqual(ui32 count);
139139
void HasMaxPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record);
140140
void NoMaxPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record);
141+
TCheckFunc MinTopicPartitionsCountEqual(ui32 count);
142+
TCheckFunc MaxTopicPartitionsCountEqual(ui32 count);
141143
TCheckFunc PartitioningByLoadStatus(bool status);
142144
TCheckFunc ColumnFamiliesCount(ui32 size);
143145
TCheckFunc ColumnFamiliesHas(ui32 familyId);

ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5238,7 +5238,7 @@ Y_UNIT_TEST_SUITE(TImportTests) {
52385238
std::function<void(TTestBasicRuntime&)> Checker;
52395239
};
52405240

5241-
TGeneratedChangefeed GenChangefeed(ui64 num = 1) {
5241+
TGeneratedChangefeed GenChangefeed(ui64 num = 1, bool isPartitioningAvailable = true) {
52425242
const TString changefeedName = TStringBuilder() << "updates_feed" << num;
52435243
const auto changefeedPath = TStringBuilder() << "/" << changefeedName;
52445244

@@ -5251,10 +5251,10 @@ Y_UNIT_TEST_SUITE(TImportTests) {
52515251

52525252
const auto topicDesc = R"(
52535253
partitioning_settings {
5254-
min_active_partitions: 1
5255-
max_active_partitions: 1
5254+
min_active_partitions: 2
5255+
max_active_partitions: 3
52565256
auto_partitioning_settings {
5257-
strategy: AUTO_PARTITIONING_STRATEGY_DISABLED
5257+
strategy: AUTO_PARTITIONING_STRATEGY_SCALE_UP
52585258
partition_write_speed {
52595259
stabilization_window {
52605260
seconds: 300
@@ -5303,40 +5303,50 @@ Y_UNIT_TEST_SUITE(TImportTests) {
53035303
attr.emplace(NAttr::EKeys::TOPIC_DESCRIPTION, topicDesc);
53045304
return {
53055305
{changefeedPath, GenerateTestData({EPathTypeCdcStream, changefeedDesc, std::move(attr)})},
5306-
[changefeedPath = TString(changefeedPath)](TTestBasicRuntime& runtime) {
5306+
[changefeedPath = TString(changefeedPath), isPartitioningAvailable](TTestBasicRuntime& runtime) {
53075307
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath, false, false, true), {
53085308
NLs::PathExist,
53095309
});
53105310
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath + "/streamImpl", false, false, true), {
5311-
NLs::ConsumerExist("my_consumer")
5311+
NLs::ConsumerExist("my_consumer"),
5312+
NLs::MinTopicPartitionsCountEqual(isPartitioningAvailable ? 2 : 1),
5313+
NLs::MaxTopicPartitionsCountEqual(3),
53125314
});
53135315
}
53145316
};
53155317
}
53165318

5317-
TVector<std::function<void(TTestBasicRuntime&)>> GenChangefeeds(THashMap<TString, TTestDataWithScheme>& bucketContent, ui64 count = 1) {
5319+
TVector<std::function<void(TTestBasicRuntime&)>> GenChangefeeds(
5320+
THashMap<TString, TTestDataWithScheme>& bucketContent,
5321+
ui64 count = 1,
5322+
bool isPartitioningAvailable = true)
5323+
{
53185324
TVector<std::function<void(TTestBasicRuntime&)>> checkers;
53195325
checkers.reserve(count);
53205326
for (ui64 i = 1; i <= count; ++i) {
5321-
auto genChangefeed = GenChangefeed(i);
5327+
auto genChangefeed = GenChangefeed(i, isPartitioningAvailable);
53225328
bucketContent.emplace(genChangefeed.Changefeed);
53235329
checkers.push_back(genChangefeed.Checker);
53245330
}
53255331
return checkers;
53265332
}
53275333

5328-
std::function<void(TTestBasicRuntime&)> AddedSchemeCommon(THashMap<TString, TTestDataWithScheme>& bucketContent, const TString& permissions) {
5329-
const auto data = GenerateTestData(R"(
5334+
std::function<void(TTestBasicRuntime&)> AddedSchemeCommon(
5335+
THashMap<TString, TTestDataWithScheme>& bucketContent,
5336+
const TString& permissions,
5337+
const TString& pkType)
5338+
{
5339+
const auto data = GenerateTestData(Sprintf(R"(
53305340
columns {
53315341
name: "key"
5332-
type { optional_type { item { type_id: UTF8 } } }
5342+
type { optional_type { item { type_id: %s } } }
53335343
}
53345344
columns {
53355345
name: "value"
53365346
type { optional_type { item { type_id: UTF8 } } }
53375347
}
53385348
primary_key: "key"
5339-
)", {{"a", 1}}, permissions);
5349+
)", pkType.c_str()), {{pkType == "UTF8" ? "a" : "", 1}}, permissions);
53405350

53415351
bucketContent.emplace("", data);
53425352
return [](TTestBasicRuntime& runtime) {
@@ -5346,11 +5356,17 @@ Y_UNIT_TEST_SUITE(TImportTests) {
53465356
};
53475357
}
53485358

5349-
std::function<void(TTestBasicRuntime&)> AddedScheme(THashMap<TString, TTestDataWithScheme>& bucketContent) {
5350-
return AddedSchemeCommon(bucketContent, "");
5359+
std::function<void(TTestBasicRuntime&)> AddedScheme(
5360+
THashMap<TString, TTestDataWithScheme>& bucketContent,
5361+
const TString& pkType)
5362+
{
5363+
return AddedSchemeCommon(bucketContent, "", pkType);
53515364
}
53525365

5353-
std::function<void(TTestBasicRuntime&)> AddedSchemeWithPermissions(THashMap<TString, TTestDataWithScheme>& bucketContent) {
5366+
std::function<void(TTestBasicRuntime&)> AddedSchemeWithPermissions(
5367+
THashMap<TString, TTestDataWithScheme>& bucketContent,
5368+
const TString& pkType)
5369+
{
53545370
const auto permissions = R"(
53555371
actions {
53565372
change_owner: "eve"
@@ -5374,34 +5390,22 @@ Y_UNIT_TEST_SUITE(TImportTests) {
53745390
}
53755391
}
53765392
)";
5377-
return AddedSchemeCommon(bucketContent, permissions);
5393+
return AddedSchemeCommon(bucketContent, permissions, pkType);
53785394
}
53795395

5380-
using SchemeFunction = std::function<std::function<void(TTestBasicRuntime&)>(THashMap<TString, TTestDataWithScheme>&)>;
5396+
using SchemeFunction = std::function<std::function<void(TTestBasicRuntime&)>(THashMap<TString, TTestDataWithScheme>&, const TString&)>;
53815397

5382-
void TestImportChangefeeds(ui64 countChangefeed, SchemeFunction addedScheme) {
5398+
void TestImportChangefeeds(ui64 countChangefeed, SchemeFunction addedScheme, const TString& pkType = "UTF8") {
53835399
TTestBasicRuntime runtime;
53845400
TTestEnv env(runtime);
53855401
ui64 txId = 100;
53865402
runtime.GetAppData().FeatureFlags.SetEnableChangefeedsImport(true);
53875403
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);
53885404

5389-
const auto data = GenerateTestData(R"(
5390-
columns {
5391-
name: "key"
5392-
type { optional_type { item { type_id: UTF8 } } }
5393-
}
5394-
columns {
5395-
name: "value"
5396-
type { optional_type { item { type_id: UTF8 } } }
5397-
}
5398-
primary_key: "key"
5399-
)");
5400-
54015405
THashMap<TString, TTestDataWithScheme> bucketContent(countChangefeed + 1);
54025406

5403-
auto checkerTable = addedScheme(bucketContent);
5404-
auto checkersChangefeeds = GenChangefeeds(bucketContent, countChangefeed);
5407+
auto checkerTable = addedScheme(bucketContent, pkType);
5408+
auto checkersChangefeeds = GenChangefeeds(bucketContent, countChangefeed, pkType == "UINT32" || pkType == "UINT64");
54055409

54065410
TPortManager portManager;
54075411
const ui16 port = portManager.GetPort();
@@ -5431,6 +5435,17 @@ Y_UNIT_TEST_SUITE(TImportTests) {
54315435
TestImportChangefeeds(1, AddedScheme);
54325436
}
54335437

5438+
// Explicit specification of the number of partitions when creating CDC
5439+
// is possible only if the first component of the primary key
5440+
// of the source table is Uint32 or Uint64
5441+
Y_UNIT_TEST(ChangefeedWithPartitioning) {
5442+
TestImportChangefeeds(1, AddedScheme, "UINT32");
5443+
}
5444+
5445+
Y_UNIT_TEST(ChangefeedsWithPartitioning) {
5446+
TestImportChangefeeds(3, AddedScheme, "UINT64");
5447+
}
5448+
54345449
Y_UNIT_TEST(Changefeeds) {
54355450
TestImportChangefeeds(3, AddedScheme);
54365451
}

0 commit comments

Comments
 (0)