Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,19 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateChangefeedPropose(
cdcStream.SetRetentionPeriodSeconds(topic.retention_period().seconds());
}

if (topic.has_partitioning_settings()) {
auto tableDesc = GetTableDescription(ss, dstPath->PathId);
const auto& keyIds = tableDesc.GetKeyColumnIds()[0];
bool isPartitioningAvailable = false;

// ydb.tech/docs/ru/concepts/cdc#topic-partitions
for (const auto& column : tableDesc.GetColumns()) {
if (column.GetId() == keyIds) {
isPartitioningAvailable = column.GetType() == "Uint32" || column.GetType() == "Uint64";
break;
}
}

if (topic.has_partitioning_settings() && isPartitioningAvailable) {
i64 minActivePartitions =
topic.partitioning_settings().min_active_partitions();
if (minActivePartitions < 0) {
Expand Down
28 changes: 28 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,34 @@ void NoMaxPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record)
UNIT_ASSERT(!record.GetPathDescription().GetTable().GetPartitionConfig().GetPartitioningPolicy().HasMaxPartitionsCount());
}

TCheckFunc MinTopicPartitionsCountEqual(ui32 count) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetPartitionStrategy().GetMinPartitionCount(), count);
};
}

void HasMinTopicPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT(record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetPartitionStrategy().HasMinPartitionCount());
}

void NoMinTopicPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT(!record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetPartitionStrategy().HasMinPartitionCount());
}

TCheckFunc MaxTopicPartitionsCountEqual(ui32 count) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetPartitionStrategy().GetMaxPartitionCount(), count);
};
}

void HasMaxTopicPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT(record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetPartitionStrategy().HasMaxPartitionCount());
}

void NoMaxTopicPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT(!record.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetPartitionStrategy().HasMaxPartitionCount());
}

TCheckFunc PartitioningByLoadStatus(bool status) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
UNIT_ASSERT_VALUES_EQUAL(record.GetPathDescription().GetTable().GetPartitionConfig().GetPartitioningPolicy().GetSplitByLoadSettings().GetEnabled(), status);
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ namespace NLs {
TCheckFunc MaxPartitionsCountEqual(ui32 count);
void HasMaxPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record);
void NoMaxPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record);
TCheckFunc MinTopicPartitionsCountEqual(ui32 count);
void HasMinTopicPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record);
void NoMinTopicPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record);
TCheckFunc MaxTopicPartitionsCountEqual(ui32 count);
void HasMaxTopicPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record);
void NoMaxTopicPartitionsCount(const NKikimrScheme::TEvDescribeSchemeResult& record);
TCheckFunc PartitioningByLoadStatus(bool status);
TCheckFunc ColumnFamiliesCount(ui32 size);
TCheckFunc ColumnFamiliesHas(ui32 familyId);
Expand Down
87 changes: 54 additions & 33 deletions ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5238,7 +5238,7 @@ Y_UNIT_TEST_SUITE(TImportTests) {
std::function<void(TTestBasicRuntime&)> Checker;
};

TGeneratedChangefeed GenChangefeed(ui64 num = 1) {
TGeneratedChangefeed GenChangefeed(ui64 num = 1, bool isPartitioningAvailable = true) {
const TString changefeedName = TStringBuilder() << "updates_feed" << num;
const auto changefeedPath = TStringBuilder() << "/" << changefeedName;

Expand All @@ -5251,10 +5251,10 @@ Y_UNIT_TEST_SUITE(TImportTests) {

const auto topicDesc = R"(
partitioning_settings {
min_active_partitions: 1
max_active_partitions: 1
min_active_partitions: 2
max_active_partitions: 3
auto_partitioning_settings {
strategy: AUTO_PARTITIONING_STRATEGY_DISABLED
strategy: AUTO_PARTITIONING_STRATEGY_SCALE_UP
partition_write_speed {
stabilization_window {
seconds: 300
Expand Down Expand Up @@ -5303,40 +5303,58 @@ Y_UNIT_TEST_SUITE(TImportTests) {
attr.emplace(NAttr::EKeys::TOPIC_DESCRIPTION, topicDesc);
return {
{changefeedPath, GenerateTestData({EPathTypeCdcStream, changefeedDesc, std::move(attr)})},
[changefeedPath = TString(changefeedPath)](TTestBasicRuntime& runtime) {
[changefeedPath = TString(changefeedPath), isPartitioningAvailable](TTestBasicRuntime& runtime) {
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath, false, false, true), {
NLs::PathExist,
});
TestDescribeResult(DescribePath(runtime, "/MyRoot/Table" + changefeedPath + "/streamImpl", false, false, true), {
NLs::ConsumerExist("my_consumer")
const auto topicDesc = DescribePath(runtime, "/MyRoot/Table" + changefeedPath + "/streamImpl", false, false, true);
TestDescribeResult(topicDesc, {
NLs::ConsumerExist("my_consumer"),
});
if (isPartitioningAvailable) {
TestDescribeResult(topicDesc, {
NLs::MinTopicPartitionsCountEqual(2),
NLs::MaxTopicPartitionsCountEqual(3),
});
} else {
NLs::NoMinTopicPartitionsCount(topicDesc);
NLs::NoMaxTopicPartitionsCount(topicDesc);
}
}
};
}

TVector<std::function<void(TTestBasicRuntime&)>> GenChangefeeds(THashMap<TString, TTestDataWithScheme>& bucketContent, ui64 count = 1) {
TVector<std::function<void(TTestBasicRuntime&)>> GenChangefeeds(
THashMap<TString, TTestDataWithScheme>& bucketContent,
ui64 count = 1,
bool isPartitioningAvailable = true)
{
TVector<std::function<void(TTestBasicRuntime&)>> checkers;
checkers.reserve(count);
for (ui64 i = 1; i <= count; ++i) {
auto genChangefeed = GenChangefeed(i);
auto genChangefeed = GenChangefeed(i, isPartitioningAvailable);
bucketContent.emplace(genChangefeed.Changefeed);
checkers.push_back(genChangefeed.Checker);
}
return checkers;
}

std::function<void(TTestBasicRuntime&)> AddedSchemeCommon(THashMap<TString, TTestDataWithScheme>& bucketContent, const TString& permissions) {
const auto data = GenerateTestData(R"(
std::function<void(TTestBasicRuntime&)> AddedSchemeCommon(
THashMap<TString, TTestDataWithScheme>& bucketContent,
const TString& permissions,
const TString& pkType)
{
const auto data = GenerateTestData(Sprintf(R"(
columns {
name: "key"
type { optional_type { item { type_id: UTF8 } } }
type { optional_type { item { type_id: %s } } }
}
columns {
name: "value"
type { optional_type { item { type_id: UTF8 } } }
}
primary_key: "key"
)", {{"a", 1}}, permissions);
)", pkType.c_str()), {{pkType == "UTF8" ? "a" : "", 1}}, permissions);

bucketContent.emplace("", data);
return [](TTestBasicRuntime& runtime) {
Expand All @@ -5346,11 +5364,17 @@ Y_UNIT_TEST_SUITE(TImportTests) {
};
}

std::function<void(TTestBasicRuntime&)> AddedScheme(THashMap<TString, TTestDataWithScheme>& bucketContent) {
return AddedSchemeCommon(bucketContent, "");
std::function<void(TTestBasicRuntime&)> AddedScheme(
THashMap<TString, TTestDataWithScheme>& bucketContent,
const TString& pkType)
{
return AddedSchemeCommon(bucketContent, "", pkType);
}

std::function<void(TTestBasicRuntime&)> AddedSchemeWithPermissions(THashMap<TString, TTestDataWithScheme>& bucketContent) {
std::function<void(TTestBasicRuntime&)> AddedSchemeWithPermissions(
THashMap<TString, TTestDataWithScheme>& bucketContent,
const TString& pkType)
{
const auto permissions = R"(
actions {
change_owner: "eve"
Expand All @@ -5374,34 +5398,22 @@ Y_UNIT_TEST_SUITE(TImportTests) {
}
}
)";
return AddedSchemeCommon(bucketContent, permissions);
return AddedSchemeCommon(bucketContent, permissions, pkType);
}

using SchemeFunction = std::function<std::function<void(TTestBasicRuntime&)>(THashMap<TString, TTestDataWithScheme>&)>;
using SchemeFunction = std::function<std::function<void(TTestBasicRuntime&)>(THashMap<TString, TTestDataWithScheme>&, const TString&)>;

void TestImportChangefeeds(ui64 countChangefeed, SchemeFunction addedScheme) {
void TestImportChangefeeds(ui64 countChangefeed, SchemeFunction addedScheme, const TString& pkType = "UTF8") {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
ui64 txId = 100;
runtime.GetAppData().FeatureFlags.SetEnableChangefeedsImport(true);
runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE);

const auto data = GenerateTestData(R"(
columns {
name: "key"
type { optional_type { item { type_id: UTF8 } } }
}
columns {
name: "value"
type { optional_type { item { type_id: UTF8 } } }
}
primary_key: "key"
)");

THashMap<TString, TTestDataWithScheme> bucketContent(countChangefeed + 1);

auto checkerTable = addedScheme(bucketContent);
auto checkersChangefeeds = GenChangefeeds(bucketContent, countChangefeed);
auto checkerTable = addedScheme(bucketContent, pkType);
auto checkersChangefeeds = GenChangefeeds(bucketContent, countChangefeed, pkType == "UINT32" || pkType == "UINT64");

TPortManager portManager;
const ui16 port = portManager.GetPort();
Expand Down Expand Up @@ -5431,6 +5443,15 @@ Y_UNIT_TEST_SUITE(TImportTests) {
TestImportChangefeeds(1, AddedScheme);
}

// ydb.tech/docs/ru/concepts/cdc#topic-partitions
Y_UNIT_TEST(ChangefeedWithPartitioning) {
TestImportChangefeeds(1, AddedScheme, "UINT32");
}

Y_UNIT_TEST(ChangefeedsWithPartitioning) {
TestImportChangefeeds(3, AddedScheme, "UINT64");
}

Y_UNIT_TEST(Changefeeds) {
TestImportChangefeeds(3, AddedScheme);
}
Expand Down
Loading