Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,33 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
cdcStream.SetRetentionPeriodSeconds(topic.retention_period().seconds());
}

auto tableDesc = GetTableDescription(ss, dstPath->PathId);
Y_ABORT_UNLESS(!tableDesc.GetKeyColumnIds().empty());
const auto& keyId = tableDesc.GetKeyColumnIds()[0];
bool isPartitioningAvailable = false;

// Explicit specification of the number of partitions when creating CDC
// is possible only if the first component of the primary key
// of the source table is Uint32 or Uint64
for (const auto& column : tableDesc.GetColumns()) {
if (column.GetId() == keyId) {
isPartitioningAvailable = column.GetType() == "Uint32" || column.GetType() == "Uint64";
break;
}
}

if (topic.has_partitioning_settings()) {
i64 minActivePartitions =
topic.partitioning_settings().min_active_partitions();
if (minActivePartitions < 0) {
error = "minActivePartitions must be >= 0";
return nullptr;
} else if (minActivePartitions == 0) {
minActivePartitions = 1;
if (isPartitioningAvailable) {
i64 minActivePartitions =
topic.partitioning_settings().min_active_partitions();
if (minActivePartitions < 0) {
error = "minActivePartitions must be >= 0";
return nullptr;
} else if (minActivePartitions == 0) {
minActivePartitions = 1;
}
cdcStream.SetTopicPartitions(minActivePartitions);
}
cdcStream.SetTopicPartitions(minActivePartitions);

if (topic.partitioning_settings().has_auto_partitioning_settings()) {
auto& partitioningSettings = topic.partitioning_settings().auto_partitioning_settings();
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,18 @@ void NoMaxPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record)
UNIT_ASSERT(!record.GetPathDescription().GetTable().GetPartitionConfig().GetPartitioningPolicy().HasMaxPartitionsCount());
}

TCheckFunc MinTopicPartitionsCountEqual(ui32 count) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetPartitionStrategy().GetMinPartitionCount(), count);
};
}

TCheckFunc MaxTopicPartitionsCountEqual(ui32 count) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetPartitionStrategy().GetMaxPartitionCount(), count);
};
}

TCheckFunc PartitioningByLoadStatus(bool status) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetTable().GetPartitionConfig().GetPartitioningPolicy().GetSplitByLoadSettings().GetEnabled(), status);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ namespace NLs {
TCheckFunc MaxPartitionsCountEqual(ui32 count);
void HasMaxPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record);
void NoMaxPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record);
TCheckFunc MinTopicPartitionsCountEqual(ui32 count);
TCheckFunc MaxTopicPartitionsCountEqual(ui32 count);
TCheckFunc PartitioningByLoadStatus(bool status);
TCheckFunc ColumnFamiliesCount(ui32 size);
TCheckFunc ColumnFamiliesHas(ui32 familyId);
Expand Down
79 changes: 47 additions & 32 deletions ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5238,7 +5238,7 @@ Y_UNIT_TEST_SUITE(TImportTests) {
std::function<void(TTestBasicRuntime&)> Checker;
};

TGeneratedChangefeed GenChangefeed(ui64 num = 1) {
TGeneratedChangefeed GenChangefeed(ui64 num = 1, bool isPartitioningAvailable = true) {
const TString changefeedName = TStringBuilder() << "updates_feed" << num;
const auto changefeedPath = TStringBuilder() << "/" << changefeedName;

Expand All @@ -5251,10 +5251,10 @@ Y_UNIT_TEST_SUITE(TImportTests) {

const auto topicDesc = R"(
partitioning_settings {
min_active_partitions: 1
max_active_partitions: 1
min_active_partitions: 2
max_active_partitions: 3
auto_partitioning_settings {
strategy: AUTO_PARTITIONING_STRATEGY_DISABLED
strategy: AUTO_PARTITIONING_STRATEGY_SCALE_UP
partition_write_speed {
stabilization_window {
seconds: 300
Expand Down Expand Up @@ -5303,40 +5303,50 @@ Y_UNIT_TEST_SUITE(TImportTests) {
attr.emplace(NAttr::EKeys::TOPIC_DESCRIPTION, topicDesc);
return {
{changefeedPath, GenerateTestData({EPathTypeCdcStream, changefeedDesc, std::move(attr)})},
[changefeedPath = TString(changefeedPath)](TTestBasicRuntime& runtime) {
[changefeedPath = TString(changefeedPath), isPartitioningAvailable](TTestBasicRuntime& runtime) {
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath, false, false, true), {
NLs::PathExist,
});
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath + "/streamImpl", false, false, true), {
NLs::ConsumerExist("my_consumer")
NLs::ConsumerExist("my_consumer"),
NLs::MinTopicPartitionsCountEqual(isPartitioningAvailable ? 2 : 1),
NLs::MaxTopicPartitionsCountEqual(3),
});
}
};
}

TVector<std::function<void(TTestBasicRuntime&)>> GenChangefeeds(THashMap<TString, TTestDataWithScheme>& bucketContent, ui64 count = 1) {
TVector<std::function<void(TTestBasicRuntime&)>> GenChangefeeds(
THashMap<TString, TTestDataWithScheme>& bucketContent,
ui64 count = 1,
bool isPartitioningAvailable = true)
{
TVector<std::function<void(TTestBasicRuntime&)>> checkers;
checkers.reserve(count);
for (ui64 i = 1; i <= count; ++i) {
auto genChangefeed = GenChangefeed(i);
auto genChangefeed = GenChangefeed(i, isPartitioningAvailable);
bucketContent.emplace(genChangefeed.Changefeed);
checkers.push_back(genChangefeed.Checker);
}
return checkers;
}

std::function<void(TTestBasicRuntime&)> AddedSchemeCommon(THashMap<TString, TTestDataWithScheme>& bucketContent, const TString& permissions) {
const auto data = GenerateTestData(R"(
std::function<void(TTestBasicRuntime&)> AddedSchemeCommon(
THashMap<TString, TTestDataWithScheme>& bucketContent,
const TString& permissions,
const TString& pkType)
{
const auto data = GenerateTestData(Sprintf(R"(
columns {
name: "key"
type { optional_type { item { type_id: UTF8 } } }
type { optional_type { item { type_id: %s } } }
}
columns {
name: "value"
type { optional_type { item { type_id: UTF8 } } }
}
primary_key: "key"
)", {{"a", 1}}, permissions);
)", pkType.c_str()), {{pkType == "UTF8" ? "a" : "", 1}}, permissions);

bucketContent.emplace("", data);
return [](TTestBasicRuntime& runtime) {
Expand All @@ -5346,11 +5356,17 @@ Y_UNIT_TEST_SUITE(TImportTests) {
};
}

std::function<void(TTestBasicRuntime&)> AddedScheme(THashMap<TString, TTestDataWithScheme>& bucketContent) {
return AddedSchemeCommon(bucketContent, "");
std::function<void(TTestBasicRuntime&)> AddedScheme(
THashMap<TString, TTestDataWithScheme>& bucketContent,
const TString& pkType)
{
return AddedSchemeCommon(bucketContent, "", pkType);
}

std::function<void(TTestBasicRuntime&)> AddedSchemeWithPermissions(THashMap<TString, TTestDataWithScheme>& bucketContent) {
std::function<void(TTestBasicRuntime&)> AddedSchemeWithPermissions(
THashMap<TString, TTestDataWithScheme>& bucketContent,
const TString& pkType)
{
const auto permissions = R"(
actions {
change_owner: "eve"
Expand All @@ -5374,34 +5390,22 @@ Y_UNIT_TEST_SUITE(TImportTests) {
}
}
)";
return AddedSchemeCommon(bucketContent, permissions);
return AddedSchemeCommon(bucketContent, permissions, pkType);
}

using SchemeFunction = std::function<std::function<void(TTestBasicRuntime&)>(THashMap<TString, TTestDataWithScheme>&)>;
using SchemeFunction = std::function<std::function<void(TTestBasicRuntime&)>(THashMap<TString, TTestDataWithScheme>&, const TString&)>;

void TestImportChangefeeds(ui64 countChangefeed, SchemeFunction addedScheme) {
void TestImportChangefeeds(ui64 countChangefeed, SchemeFunction addedScheme, const TString& pkType = "UTF8") {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
ui64 txId = 100;
runtime.GetAppData().FeatureFlags.SetEnableChangefeedsImport(true);
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);

const auto data = GenerateTestData(R"(
columns {
name: "key"
type { optional_type { item { type_id: UTF8 } } }
}
columns {
name: "value"
type { optional_type { item { type_id: UTF8 } } }
}
primary_key: "key"
)");

THashMap<TString, TTestDataWithScheme> bucketContent(countChangefeed + 1);

auto checkerTable = addedScheme(bucketContent);
auto checkersChangefeeds = GenChangefeeds(bucketContent, countChangefeed);
auto checkerTable = addedScheme(bucketContent, pkType);
auto checkersChangefeeds = GenChangefeeds(bucketContent, countChangefeed, pkType == "UINT32" || pkType == "UINT64");

TPortManager portManager;
const ui16 port = portManager.GetPort();
Expand Down Expand Up @@ -5431,6 +5435,17 @@ Y_UNIT_TEST_SUITE(TImportTests) {
TestImportChangefeeds(1, AddedScheme);
}

// Explicit specification of the number of partitions when creating CDC
// is possible only if the first component of the primary key
// of the source table is Uint32 or Uint64
Y_UNIT_TEST(ChangefeedWithPartitioning) {
TestImportChangefeeds(1, AddedScheme, "UINT32");
}

Y_UNIT_TEST(ChangefeedsWithPartitioning) {
TestImportChangefeeds(3, AddedScheme, "UINT64");
}

Y_UNIT_TEST(Changefeeds) {
TestImportChangefeeds(3, AddedScheme);
}
Expand Down
Loading