Skip to content

Commit f2c19c8

Browse files
authored
Add UT for group cache, improve SentToWarden for local messages (#22541)
1 parent 604a3ec commit f2c19c8

File tree

6 files changed

+177
-21
lines changed

6 files changed

+177
-21
lines changed
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
2+
#include <ydb/core/blobstorage/ut_blobstorage/lib/ut_helpers.h>
3+
4+
Y_UNIT_TEST_SUITE(GroupConfigurationPropagation) {
5+
6+
struct TTestCtx : public TTestCtxBase {
7+
TTestCtx(const TBlobStorageGroupType& erasure)
8+
: TTestCtxBase(TEnvironmentSetup::TSettings{
9+
.NodeCount = erasure.BlobSubgroupSize() * 2,
10+
.Erasure = erasure,
11+
.ControllerNodeId = erasure.BlobSubgroupSize() * 2,
12+
})
13+
{}
14+
15+
static bool BlockBscResponses(ui32, std::unique_ptr<IEventHandle>& ev) {
16+
if (ev->GetTypeRewrite() == TEvBlobStorage::TEvControllerNodeServiceSetUpdate::EventType) {
17+
return false;
18+
}
19+
20+
return true;
21+
}
22+
23+
void AllocateEdgeActorOnNodeWOConfig() {
24+
std::set<ui32> nodesWOConfig = GetComplementNodeSet(NodesWithConfig);
25+
UNIT_ASSERT(!NodesWithConfig.empty());
26+
AllocateEdgeActorOnSpecificNode(*nodesWOConfig.begin());
27+
NodeWithProxy = Edge.NodeId();
28+
}
29+
30+
void CheckStatus() {
31+
AllocateEdgeActorOnNodeWOConfig();
32+
auto res = GetGroupStatus(GroupId, WaitTime);
33+
UNIT_ASSERT(res);
34+
UNIT_ASSERT_VALUES_EQUAL_C(res->Get()->Status, NKikimrProto::OK, res->Get()->ErrorReason);
35+
}
36+
37+
void Setup() {
38+
Initialize();
39+
NodesWithConfig = GetNodesWithVDisks();
40+
NodesWithConfig.insert(Env->Settings.ControllerNodeId);
41+
AllocateEdgeActorOnNodeWOConfig();
42+
UNIT_ASSERT(NodeWithProxy != Env->Settings.ControllerNodeId);
43+
Env->Sim(TDuration::Minutes(10));
44+
Env->Runtime->FilterFunction = BlockBscResponses;
45+
}
46+
47+
void StopNode() {
48+
Env->StopNode(NodeWithProxy);
49+
}
50+
51+
void StartNode() {
52+
Env->StartNode(NodeWithProxy);
53+
AllocateEdgeActorOnNodeWOConfig();
54+
}
55+
56+
void RestartNode() {
57+
StopNode();
58+
StartNode();
59+
}
60+
61+
void UpdateGroupConfiguration() {
62+
NKikimrBlobStorage::TConfigRequest request;
63+
auto* reassign = request.AddCommand()->MutableReassignGroupDisk();
64+
for (const auto& slot : BaseConfig.GetVSlot()) {
65+
if (slot.GetGroupId() == GroupId) {
66+
reassign->SetGroupId(slot.GetGroupId());
67+
reassign->SetGroupGeneration(slot.GetGroupGeneration());
68+
reassign->SetFailRealmIdx(slot.GetFailRealmIdx());
69+
reassign->SetFailDomainIdx(slot.GetFailDomainIdx());
70+
reassign->SetVDiskIdx(slot.GetVDiskIdx());
71+
break;
72+
}
73+
}
74+
auto response = Env->Invoke(request);
75+
BaseConfig = Env->FetchBaseConfig();
76+
77+
std::set<ui32> nodesWithVDisks = GetNodesWithVDisks();
78+
NodesWithConfig.insert(nodesWithVDisks.begin(), nodesWithVDisks.end());
79+
}
80+
81+
const TDuration WaitTime = TDuration::Seconds(1);
82+
ui32 NodeWithProxy;
83+
std::set<ui32> NodesWithConfig;
84+
};
85+
86+
Y_UNIT_TEST(Simple) {
87+
TTestCtx ctx(TBlobStorageGroupType::Erasure4Plus2Block);
88+
ctx.Setup();
89+
ctx.CheckStatus();
90+
}
91+
92+
Y_UNIT_TEST(Reassign) {
93+
TTestCtx ctx(TBlobStorageGroupType::Erasure4Plus2Block);
94+
ctx.Setup();
95+
ctx.UpdateGroupConfiguration();
96+
ctx.CheckStatus();
97+
}
98+
99+
Y_UNIT_TEST(NodeRestart) {
100+
TTestCtx ctx(TBlobStorageGroupType::Erasure4Plus2Block);
101+
ctx.Setup();
102+
ctx.RestartNode();
103+
ctx.CheckStatus();
104+
}
105+
106+
Y_UNIT_TEST(NodeRestartAndUpdate) {
107+
TTestCtx ctx(TBlobStorageGroupType::Erasure4Plus2Block);
108+
ctx.Setup();
109+
ctx.StopNode();
110+
ctx.UpdateGroupConfiguration();
111+
ctx.StartNode();
112+
ctx.CheckStatus();
113+
}
114+
115+
Y_UNIT_TEST(BscRestart) {
116+
TTestCtx ctx(TBlobStorageGroupType::Erasure4Plus2Block);
117+
ctx.Setup();
118+
ctx.Env->RestartNode(ctx.Env->Settings.ControllerNodeId);
119+
ctx.CheckStatus();
120+
}
121+
122+
}

ydb/core/blobstorage/ut_blobstorage/lib/ut_helpers.h

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -287,33 +287,29 @@ struct TTestCtxBase {
287287
if (!findNodeWithoutVDisks) {
288288
chosenNodeId = NodeCount;
289289
} else {
290-
std::set<ui32> nodesWithoutVDisks;
291-
for (ui32 nodeId = 1; nodeId <= NodeCount; ++nodeId) {
292-
nodesWithoutVDisks.insert(nodeId);
293-
}
294-
295-
for (const auto& vslot : BaseConfig.GetVSlot()) {
296-
nodesWithoutVDisks.erase(vslot.GetVSlotId().GetNodeId());
297-
}
298-
299-
if (!nodesWithoutVDisks.empty()) {
300-
chosenNodeId = *nodesWithoutVDisks.begin();
301-
}
290+
std::set<ui32> nodesWOVDisks = GetComplementNodeSet(GetNodesWithVDisks());
291+
chosenNodeId = *nodesWOVDisks.begin();
302292
}
303293

304294
Y_VERIFY_S(chosenNodeId != 0, "No available nodes to allocate");
305-
Edge = Env->Runtime->AllocateEdgeActor(NodeCount);
295+
Edge = Env->Runtime->AllocateEdgeActor(chosenNodeId);
296+
}
297+
298+
void AllocateEdgeActorOnSpecificNode(ui32 nodeId) {
299+
Edge = Env->Runtime->AllocateEdgeActor(nodeId);
306300
}
307301

308302
void FetchBaseConfig() {
309303
BaseConfig = Env->FetchBaseConfig();
310304
}
311305

312-
TAutoPtr<TEventHandle<TEvBlobStorage::TEvStatusResult>> GetGroupStatus(ui32 groupId) {
306+
TAutoPtr<TEventHandle<TEvBlobStorage::TEvStatusResult>> GetGroupStatus(ui32 groupId,
307+
TDuration waitTime = TDuration::Max()) {
308+
TInstant ts = (waitTime == TDuration::Max()) ? TInstant::Max() : Env->Now() + waitTime;
313309
Env->Runtime->WrapInActorContext(Edge, [&] {
314-
SendToBSProxy(Edge, groupId, new TEvBlobStorage::TEvStatus(TInstant::Max()));
310+
SendToBSProxy(Edge, groupId, new TEvBlobStorage::TEvStatus(ts));
315311
});
316-
return Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvStatusResult>(Edge, false, TInstant::Max());
312+
return Env->WaitForEdgeActorEvent<TEvBlobStorage::TEvStatusResult>(Edge, false, ts);
317313
}
318314

319315
virtual void Initialize() {
@@ -431,6 +427,24 @@ struct TTestCtxBase {
431427
return blobs;
432428
}
433429

430+
std::set<ui32> GetNodesWithVDisks() {
431+
std::set<ui32> res;
432+
for (const auto& vslot : BaseConfig.GetVSlot()) {
433+
res.insert(vslot.GetVSlotId().GetNodeId());
434+
}
435+
return res;
436+
}
437+
438+
std::set<ui32> GetComplementNodeSet(std::set<ui32> nodes) {
439+
std::set<ui32> res;
440+
for (ui32 nodeId = 1; nodeId <= NodeCount; ++nodeId) {
441+
if (!nodes.contains(nodeId)) {
442+
res.insert(nodeId);
443+
}
444+
}
445+
return res;
446+
}
447+
434448
public:
435449
ui32 NodeCount;
436450
TBlobStorageGroupType Erasure;

ydb/core/blobstorage/ut_blobstorage/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ SRCS(
1818
assimilation.cpp
1919
backpressure.cpp
2020
block_race.cpp
21+
bsc_cache.cpp
2122
counting_events.cpp
2223
deadlines.cpp
2324
decommit_3dc.cpp

ydb/core/mind/bscontroller/config.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ namespace NKikimr::NBsController {
4040

4141
if (CacheUpdate.KeyValuePairsSize()) {
4242
State.Outbox.emplace_back(Self->SelfId().NodeId(), std::make_unique<NStorage::TEvNodeWardenUpdateCache>(
43-
std::move(CacheUpdate)), 0);
43+
std::move(CacheUpdate)), 0, true);
4444
}
4545
}
4646

@@ -722,8 +722,12 @@ namespace NKikimr::NBsController {
722722
}
723723

724724
ui64 TBlobStorageController::TConfigState::ApplyConfigUpdates() {
725-
for (auto& [nodeId, ev, cookie] : Outbox) {
726-
Self.SendToWarden(nodeId, std::move(ev), cookie);
725+
for (TOutgoingMessage& msg : Outbox) {
726+
if (msg.ToLocalWarden) {
727+
Self.Send(MakeBlobStorageNodeWardenID(Self.SelfId().NodeId()), std::move(msg.Event), 0, msg.Cookie);
728+
} else {
729+
Self.SendToWarden(msg.NodeId, std::move(msg.Event), msg.Cookie);
730+
}
727731
}
728732
for (auto& ev : StatProcessorOutbox) {
729733
Self.SelfId().Send(Self.StatProcessorActorId, ev.release());

ydb/core/mind/bscontroller/config.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,22 @@ namespace NKikimr {
9292
THashSet<TPDiskId> PDisksToRemove;
9393

9494
// outgoing messages
95-
std::deque<std::tuple<TNodeId, std::unique_ptr<IEventBase>, ui64>> Outbox;
95+
struct TOutgoingMessage {
96+
TNodeId NodeId;
97+
std::unique_ptr<IEventBase> Event;
98+
ui64 Cookie;
99+
bool ToLocalWarden;
100+
101+
TOutgoingMessage(TNodeId nodeId, std::unique_ptr<IEventBase>&& event,
102+
ui64 cookie, bool toLocalWarden = false)
103+
: NodeId(nodeId)
104+
, Event(std::forward<std::unique_ptr<IEventBase>>(event))
105+
, Cookie(cookie)
106+
, ToLocalWarden(toLocalWarden)
107+
{}
108+
};
109+
110+
std::deque<TOutgoingMessage> Outbox;
96111
std::deque<std::unique_ptr<IEventBase>> StatProcessorOutbox;
97112
std::deque<std::unique_ptr<IEventBase>> NodeWhiteboardOutbox;
98113
THolder<TEvControllerUpdateSelfHealInfo> UpdateSelfHealInfoMsg;

ydb/core/mind/bscontroller/register_node.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ void TBlobStorageController::EraseKnownDrivesOnDisconnected(TNodeInfo *nodeInfo)
667667

668668
void TBlobStorageController::SendToWarden(TNodeId nodeId, std::unique_ptr<IEventBase> ev, ui64 cookie) {
669669
Y_ABORT_UNLESS(nodeId);
670-
if (auto *node = FindNode(nodeId); node && node->ConnectedServerId) {
670+
if (TNodeInfo* node = FindNode(nodeId); node && node->ConnectedServerId) {
671671
auto h = std::make_unique<IEventHandle>(MakeBlobStorageNodeWardenID(nodeId), SelfId(), ev.release(), 0, cookie);
672672
if (node->InterconnectSessionId) {
673673
h->Rewrite(TEvInterconnect::EvForward, node->InterconnectSessionId);

0 commit comments

Comments
 (0)