Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions ydb/core/blobstorage/ut_blobstorage/bsc_cache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
#include <ydb/core/blobstorage/ut_blobstorage/lib/ut_helpers.h>

Y_UNIT_TEST_SUITE(GroupConfigurationPropagation) {

struct TTestCtx : public TTestCtxBase {
TTestCtx(const TBlobStorageGroupType& erasure)
: TTestCtxBase(TEnvironmentSetup::TSettings{
.NodeCount = erasure.BlobSubgroupSize() * 2,
.Erasure = erasure,
.ControllerNodeId = erasure.BlobSubgroupSize() * 2,
})
{}

static bool BlockBscResponses(ui32, std::unique_ptr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvBlobStorage::TEvControllerNodeServiceSetUpdate::EventType) {
return false;
}

return true;
}

void AllocateEdgeActorOnNodeWOConfig() {
std::set<ui32> nodesWOConfig = GetComplementNodeSet(NodesWithConfig);
UNIT_ASSERT(!NodesWithConfig.empty());
AllocateEdgeActorOnSpecificNode(*nodesWOConfig.begin());
NodeWithProxy = Edge.NodeId();
}

void CheckStatus() {
AllocateEdgeActorOnNodeWOConfig();
auto res = GetGroupStatus(GroupId, WaitTime);
UNIT_ASSERT(res);
UNIT_ASSERT_VALUES_EQUAL_C(res->Get()->Status, NKikimrProto::OK, res->Get()->ErrorReason);
}

void Setup() {
Initialize();
NodesWithConfig = GetNodesWithVDisks();
NodesWithConfig.insert(Env->Settings.ControllerNodeId);
AllocateEdgeActorOnNodeWOConfig();
UNIT_ASSERT(NodeWithProxy != Env->Settings.ControllerNodeId);
Env->Sim(TDuration::Minutes(10));
Env->Runtime->FilterFunction = BlockBscResponses;
}

void StopNode() {
Env->StopNode(NodeWithProxy);
}

void StartNode() {
Env->StartNode(NodeWithProxy);
AllocateEdgeActorOnNodeWOConfig();
}

void RestartNode() {
StopNode();
StartNode();
}

void UpdateGroupConfiguration() {
NKikimrBlobStorage::TConfigRequest request;
auto* reassign = request.AddCommand()->MutableReassignGroupDisk();
for (const auto& slot : BaseConfig.GetVSlot()) {
if (slot.GetGroupId() == GroupId) {
reassign->SetGroupId(slot.GetGroupId());
reassign->SetGroupGeneration(slot.GetGroupGeneration());
reassign->SetFailRealmIdx(slot.GetFailRealmIdx());
reassign->SetFailDomainIdx(slot.GetFailDomainIdx());
reassign->SetVDiskIdx(slot.GetVDiskIdx());
break;
}
}
auto response = Env->Invoke(request);
BaseConfig = Env->FetchBaseConfig();

std::set<ui32> nodesWithVDisks = GetNodesWithVDisks();
NodesWithConfig.insert(nodesWithVDisks.begin(), nodesWithVDisks.end());
}

const TDuration WaitTime = TDuration::Seconds(1);
ui32 NodeWithProxy;
std::set<ui32> NodesWithConfig;
};

Y_UNIT_TEST(Simple) {
TTestCtx ctx(TBlobStorageGroupType::Erasure4Plus2Block);
ctx.Setup();
ctx.CheckStatus();
}

Y_UNIT_TEST(Reassign) {
TTestCtx ctx(TBlobStorageGroupType::Erasure4Plus2Block);
ctx.Setup();
ctx.UpdateGroupConfiguration();
ctx.CheckStatus();
}

Y_UNIT_TEST(NodeRestart) {
TTestCtx ctx(TBlobStorageGroupType::Erasure4Plus2Block);
ctx.Setup();
ctx.RestartNode();
ctx.CheckStatus();
}

Y_UNIT_TEST(NodeRestartAndUpdate) {
TTestCtx ctx(TBlobStorageGroupType::Erasure4Plus2Block);
ctx.Setup();
ctx.StopNode();
ctx.UpdateGroupConfiguration();
ctx.StartNode();
ctx.CheckStatus();
}

Y_UNIT_TEST(BscRestart) {
TTestCtx ctx(TBlobStorageGroupType::Erasure4Plus2Block);
ctx.Setup();
ctx.Env->RestartNode(ctx.Env->Settings.ControllerNodeId);
ctx.CheckStatus();
}

}
46 changes: 30 additions & 16 deletions ydb/core/blobstorage/ut_blobstorage/lib/ut_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,33 +287,29 @@ struct TTestCtxBase {
if (!findNodeWithoutVDisks) {
chosenNodeId = NodeCount;
} else {
std::set<ui32> nodesWithoutVDisks;
for (ui32 nodeId = 1; nodeId <= NodeCount; ++nodeId) {
nodesWithoutVDisks.insert(nodeId);
}

for (const auto& vslot : BaseConfig.GetVSlot()) {
nodesWithoutVDisks.erase(vslot.GetVSlotId().GetNodeId());
}

if (!nodesWithoutVDisks.empty()) {
chosenNodeId = *nodesWithoutVDisks.begin();
}
std::set<ui32> nodesWOVDisks = GetComplementNodeSet(GetNodesWithVDisks());
chosenNodeId = *nodesWOVDisks.begin();
}

Y_VERIFY_S(chosenNodeId != 0, "No available nodes to allocate");
Edge = Env->Runtime->AllocateEdgeActor(NodeCount);
Edge = Env->Runtime->AllocateEdgeActor(chosenNodeId);
}

void AllocateEdgeActorOnSpecificNode(ui32 nodeId) {
Edge = Env->Runtime->AllocateEdgeActor(nodeId);
}

void FetchBaseConfig() {
BaseConfig = Env->FetchBaseConfig();
}

TAutoPtr<TEventHandle<TEvBlobStorage::TEvStatusResult>> GetGroupStatus(ui32 groupId) {
TAutoPtr<TEventHandle<TEvBlobStorage::TEvStatusResult>> GetGroupStatus(ui32 groupId,
TDuration waitTime = TDuration::Max()) {
TInstant ts = (waitTime == TDuration::Max()) ? TInstant::Max() : Env->Now() + waitTime;
Env->Runtime->WrapInActorContext(Edge, [&] {
SendToBSProxy(Edge, groupId, new TEvBlobStorage::TEvStatus(TInstant::Max()));
SendToBSProxy(Edge, groupId, new TEvBlobStorage::TEvStatus(ts));
});
return Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvStatusResult>(Edge, false, TInstant::Max());
return Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvStatusResult>(Edge, false, ts);
}

virtual void Initialize() {
Expand Down Expand Up @@ -431,6 +427,24 @@ struct TTestCtxBase {
return blobs;
}

std::set<ui32> GetNodesWithVDisks() {
std::set<ui32> res;
for (const auto& vslot : BaseConfig.GetVSlot()) {
res.insert(vslot.GetVSlotId().GetNodeId());
}
return res;
}

std::set<ui32> GetComplementNodeSet(std::set<ui32> nodes) {
std::set<ui32> res;
for (ui32 nodeId = 1; nodeId <= NodeCount; ++nodeId) {
if (!nodes.contains(nodeId)) {
res.insert(nodeId);
}
}
return res;
}

public:
ui32 NodeCount;
TBlobStorageGroupType Erasure;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_blobstorage/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ SRCS(
assimilation.cpp
backpressure.cpp
block_race.cpp
bsc_cache.cpp
counting_events.cpp
deadlines.cpp
decommit_3dc.cpp
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/mind/bscontroller/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace NKikimr::NBsController {

if (CacheUpdate.KeyValuePairsSize()) {
State.Outbox.emplace_back(Self->SelfId().NodeId(), std::make_unique<NStorage::TEvNodeWardenUpdateCache>(
std::move(CacheUpdate)), 0);
std::move(CacheUpdate)), 0, true);
}
}

Expand Down Expand Up @@ -722,8 +722,12 @@ namespace NKikimr::NBsController {
}

ui64 TBlobStorageController::TConfigState::ApplyConfigUpdates() {
for (auto& [nodeId, ev, cookie] : Outbox) {
Self.SendToWarden(nodeId, std::move(ev), cookie);
for (TOutgoingMessage& msg : Outbox) {
if (msg.ToLocalWarden) {
Self.Send(MakeBlobStorageNodeWardenID(Self.SelfId().NodeId()), std::move(msg.Event), 0, msg.Cookie);
} else {
Self.SendToWarden(msg.NodeId, std::move(msg.Event), msg.Cookie);
}
}
for (auto& ev : StatProcessorOutbox) {
Self.SelfId().Send(Self.StatProcessorActorId, ev.release());
Expand Down
17 changes: 16 additions & 1 deletion ydb/core/mind/bscontroller/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,22 @@ namespace NKikimr {
THashSet<TPDiskId> PDisksToRemove;

// outgoing messages
std::deque<std::tuple<TNodeId, std::unique_ptr<IEventBase>, ui64>> Outbox;
struct TOutgoingMessage {
TNodeId NodeId;
std::unique_ptr<IEventBase> Event;
ui64 Cookie;
bool ToLocalWarden;

TOutgoingMessage(TNodeId nodeId, std::unique_ptr<IEventBase>&& event,
ui64 cookie, bool toLocalWarden = false)
: NodeId(nodeId)
, Event(std::forward<std::unique_ptr<IEventBase>>(event))
, Cookie(cookie)
, ToLocalWarden(toLocalWarden)
{}
};

std::deque<TOutgoingMessage> Outbox;
std::deque<std::unique_ptr<IEventBase>> StatProcessorOutbox;
std::deque<std::unique_ptr<IEventBase>> NodeWhiteboardOutbox;
THolder<TEvControllerUpdateSelfHealInfo> UpdateSelfHealInfoMsg;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/mind/bscontroller/register_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ void TBlobStorageController::EraseKnownDrivesOnDisconnected(TNodeInfo *nodeInfo)

void TBlobStorageController::SendToWarden(TNodeId nodeId, std::unique_ptr<IEventBase> ev, ui64 cookie) {
Y_ABORT_UNLESS(nodeId);
if (auto *node = FindNode(nodeId); node && node->ConnectedServerId) {
if (TNodeInfo* node = FindNode(nodeId); node && node->ConnectedServerId) {
auto h = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), SelfId(), ev.release(), 0, cookie);
if (node->InterconnectSessionId) {
h->Rewrite(TEvInterconnect::EvForward, node->InterconnectSessionId);
Expand Down
Loading