Skip to content

Commit f1c78aa

Browse files
authored
Add shared control KeyValueVolumeControls.ReadRequestsInFlightLimit (#22511)
1 parent 37ad5cb commit f1c78aa

File tree

5 files changed

+75
-19
lines changed

5 files changed

+75
-19
lines changed

ydb/core/control/lib/immediate_control_board_wrapper.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ class TMemorizableControlWrapper {
5656
{
5757
}
5858

59+
void ResetControl(const TControlWrapper &control) {
60+
Control = control;
61+
CurrentValue = control;
62+
}
63+
5964
i64 Update(TInstant now) {
6065
CheckingCounter--;
6166
if (now > CheckingRelevantDeadline || CheckingCounter <= 0) {

ydb/core/keyvalue/keyvalue_state.cpp

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,10 @@ void PrepareCreationUnixTime(const R& request, I& interm)
8181
// Guideline:
8282
// Check SetError calls: there must be no changes made to the DB before SetError call (!)
8383

84-
TKeyValueState::TKeyValueState() {
84+
TKeyValueState::TKeyValueState()
85+
: ReadRequestsInFlightLimit_Base(3, 1, 4096)
86+
, ReadRequestsInFlightLimit(ReadRequestsInFlightLimit_Base)
87+
{
8588
TabletCounters = nullptr;
8689
Clear();
8790
}
@@ -114,7 +117,6 @@ void TKeyValueState::Clear() {
114117

115118
Queue.clear();
116119
IntermediatesInFlight = 0;
117-
IntermediatesInFlightLimit = 3; // FIXME: Change to something like 10
118120
RoInlineIntermediatesInFlight = 0;
119121
DeletesPerRequestLimit = 100'000;
120122

@@ -551,6 +553,14 @@ void TKeyValueState::InitExecute(ui64 tabletId, TActorId keyValueActorId, ui32 e
551553
}
552554
ChannelBalancerActorId = ctx.Register(new TChannelBalancer(maxChannel + 1, KeyValueActorId));
553555

556+
TActorSystem *actorSystem = TActivationContext::ActorSystem();
557+
if (actorSystem && actorSystem->AppData<TAppData>() && actorSystem->AppData<TAppData>()->Icb) {
558+
const TIntrusivePtr<NKikimr::TControlBoard>& icb = actorSystem->AppData<TAppData>()->Icb;
559+
560+
icb->RegisterSharedControl(ReadRequestsInFlightLimit_Base, "KeyValueVolumeControls.ReadRequestsInFlightLimit");
561+
ReadRequestsInFlightLimit.ResetControl(ReadRequestsInFlightLimit_Base);
562+
}
563+
554564
// Issue hard barriers
555565
using TGroupChannel = std::tuple<ui32, ui8>;
556566
{
@@ -1831,7 +1841,8 @@ void TKeyValueState::OnRequestComplete(ui64 requestUid, ui64 generation, ui64 st
18311841
CountRequestComplete(status, stat, ctx);
18321842
ResourceMetrics->TryUpdate(ctx);
18331843

1834-
if (Queue.size() && IntermediatesInFlight < IntermediatesInFlightLimit) {
1844+
ui64 limit = ReadRequestsInFlightLimit.Update(ctx.Now());
1845+
if (Queue.size() && IntermediatesInFlight < limit) {
18351846
TRequestType::EType requestType = Queue.front()->Stat.RequestType;
18361847

18371848
CountLatencyQueue(Queue.front()->Stat);
@@ -2488,14 +2499,14 @@ bool TKeyValueState::PrepareCmdPatch(const TActorContext &ctx, NKikimrClient::TK
24882499
return true;
24892500
}
24902501
}
2491-
interm.Diffs[diffIdx].Offset = diff.GetOffset();
2502+
interm.Diffs[diffIdx].Offset = diff.GetOffset();
24922503
}
24932504

24942505
ui32 storageChannelIdx = BLOB_CHANNEL;
24952506
if (request.HasStorageChannel()) {
24962507
auto storageChannel = request.GetStorageChannel();
24972508
ui32 storageChannelOffset = (ui32)storageChannel;
2498-
2509+
24992510
if (storageChannelOffset == NKikimrClient::TKeyValueRequest::INLINE) {
25002511
TStringStream str;
25012512
str << "KeyValue# " << TabletId;
@@ -3188,14 +3199,15 @@ void TKeyValueState::OnEvReadRequest(TEvKeyValue::TEvRead::TPtr &ev, const TActo
31883199
RegisterReadRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
31893200
++RoInlineIntermediatesInFlight;
31903201
} else {
3191-
if (IntermediatesInFlight < IntermediatesInFlightLimit) {
3202+
ui64 limit = ReadRequestsInFlightLimit.Update(ctx.Now());
3203+
if (IntermediatesInFlight < limit) {
3204+
++IntermediatesInFlight;
31923205
ALOG_DEBUG(NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
3193-
<< " Create storage read request, Marker# KV54");
3206+
<< " Create storage read request, InFlight# " << IntermediatesInFlight << "/" << limit << ", Marker# KV54");
31943207
RegisterReadRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
3195-
++IntermediatesInFlight;
31963208
} else {
31973209
ALOG_DEBUG(NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
3198-
<< " Enqueue storage read request " << IntermediatesInFlight << '/' << IntermediatesInFlightLimit << ", Marker# KV56");
3210+
<< " Enqueue storage read request " << IntermediatesInFlight << '/' << limit << ", Marker# KV56");
31993211
PostponeIntermediate<TEvKeyValue::TEvRead>(std::move(intermediate));
32003212
}
32013213
}
@@ -3224,11 +3236,12 @@ void TKeyValueState::OnEvReadRangeRequest(TEvKeyValue::TEvReadRange::TPtr &ev, c
32243236
RegisterReadRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
32253237
++RoInlineIntermediatesInFlight;
32263238
} else {
3227-
if (IntermediatesInFlight < IntermediatesInFlightLimit) {
3239+
ui64 limit = ReadRequestsInFlightLimit.Update(ctx.Now());
3240+
if (IntermediatesInFlight < limit) {
3241+
++IntermediatesInFlight;
32283242
ALOG_DEBUG(NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
3229-
<< " Create storage read range request, Marker# KV66");
3243+
<< " Create storage read range request, InFlight# " << IntermediatesInFlight << "/" << limit << ", Marker# KV66");
32303244
RegisterReadRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
3231-
++IntermediatesInFlight;
32323245
} else {
32333246
ALOG_DEBUG(NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
32343247
<< " Enqueue storage read range request, Marker# KV59");
@@ -3365,11 +3378,12 @@ void TKeyValueState::OnEvRequest(TEvKeyValue::TEvRequest::TPtr &ev, const TActor
33653378
RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
33663379
++RoInlineIntermediatesInFlight;
33673380
} else {
3368-
if (IntermediatesInFlight < IntermediatesInFlightLimit) {
3381+
ui64 limit = ReadRequestsInFlightLimit.Update(ctx.Now());
3382+
if (IntermediatesInFlight < limit) {
3383+
++IntermediatesInFlight;
33693384
ALOG_DEBUG(NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
3370-
<< " Create storage request for RO/RW, Marker# KV43");
3385+
<< " Create storage request for RO/RW, InFlight# " << IntermediatesInFlight << "/" << limit << ", Marker# KV43");
33713386
RegisterRequestActor(ctx, std::move(intermediate), info, ExecutorGeneration);
3372-
++IntermediatesInFlight;
33733387
} else {
33743388
ALOG_DEBUG(NKikimrServices::KEYVALUE, "KeyValue# " << TabletId
33753389
<< " Enqueue storage request for RO/RW, Marker# KV44");

ydb/core/keyvalue/keyvalue_state.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,6 @@ class TKeyValueState {
275275

276276
TDeque<TAutoPtr<TIntermediate>> Queue;
277277
ui64 IntermediatesInFlight;
278-
ui64 IntermediatesInFlightLimit;
279278
ui64 RoInlineIntermediatesInFlight;
280279
ui64 DeletesPerRequestLimit;
281280

@@ -294,6 +293,9 @@ class TKeyValueState {
294293

295294
ui64 TotalTrashSize = 0;
296295

296+
TControlWrapper ReadRequestsInFlightLimit_Base;
297+
TMemorizableControlWrapper ReadRequestsInFlightLimit;
298+
297299
public:
298300
TKeyValueState();
299301
void Clear();

ydb/core/keyvalue/keyvalue_ut.cpp

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ void CmdPatch(const TString &originalKey, const TString &patchedKey, const TVect
237237
UNIT_ASSERT(result->Record.HasStatus());
238238
UNIT_ASSERT_EQUAL(result->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
239239
UNIT_ASSERT_VALUES_EQUAL(result->Record.PatchResultSize(), 1);
240-
240+
241241
const auto &patchResult = result->Record.GetPatchResult(0);
242242
UNIT_ASSERT(patchResult.HasStatus());
243243
UNIT_ASSERT_EQUAL(patchResult.GetStatus(), NKikimrProto::OK);
@@ -2656,7 +2656,7 @@ Y_UNIT_TEST(TestVacuumOnEmptyTablet) {
26562656
UNIT_ASSERT_EQUAL(response.status(), decltype(response)::STATUS_SUCCESS);
26572657
UNIT_ASSERT_EQUAL(response.generation(), 1);
26582658

2659-
2659+
26602660
NKikimrKeyValue::VacuumResponse responseAlreadyCompleted = SendVacuumRequest(1, tc);
26612661
UNIT_ASSERT_EQUAL(responseAlreadyCompleted.status(), decltype(responseAlreadyCompleted)::STATUS_ALREADY_COMPLETED);
26622662
UNIT_ASSERT_EQUAL(responseAlreadyCompleted.generation(), 1);
@@ -2671,7 +2671,7 @@ Y_UNIT_TEST(TestVacuumOnEmptyTablet) {
26712671

26722672
NKikimrKeyValue::VacuumResponse response3 = SendVacuumRequest(3, tc);
26732673
UNIT_ASSERT_EQUAL(response3.status(), decltype(response3)::STATUS_ALREADY_COMPLETED);
2674-
UNIT_ASSERT_EQUAL(response3.generation(), 3);
2674+
UNIT_ASSERT_EQUAL(response3.generation(), 3);
26752675
});
26762676
}
26772677

@@ -2818,5 +2818,30 @@ Y_UNIT_TEST(TestWriteAndRenameWithCreationUnixTime)
28182818
tc);
28192819
}
28202820

2821+
Y_UNIT_TEST(TestReadRequestInFlightLimit)
2822+
{
2823+
TTestContext tc;
2824+
TFinalizer finalizer(tc);
2825+
bool activeZone = false;
2826+
tc.Prepare(INITIAL_TEST_DISPATCH_NAME, [](TTestActorRuntime &){}, activeZone);
2827+
2828+
auto &icb = tc.Runtime->GetAppData().Icb;
2829+
TControlWrapper readRequestInFlightLimit(5, 1, 4096);
2830+
icb->RegisterSharedControl(readRequestInFlightLimit, "KeyValueVolumeControls.ReadRequestsInFlightLimit");
2831+
readRequestInFlightLimit = 5;
2832+
2833+
ui64 creationUnixTime = (TInstant::Now() - TDuration::Seconds(1000)).Seconds();
2834+
2835+
CmdWrite("key-1", "value",
2836+
NKikimrClient::TKeyValueRequest::MAIN,
2837+
NKikimrClient::TKeyValueRequest::REALTIME,
2838+
creationUnixTime,
2839+
tc);
2840+
CmdRead({"key-1"},
2841+
NKikimrClient::TKeyValueRequest::REALTIME,
2842+
{"value"}, {false}, {creationUnixTime},
2843+
tc);
2844+
}
2845+
28212846
} // TKeyValueTest
28222847
} // NKikimr

ydb/core/protos/config.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1733,6 +1733,15 @@ message TImmediateControlsConfig {
17331733
map<string, TRequestConfig> RequestConfigs = 1;
17341734
}
17351735

1736+
message TKeyValueVolumeControls {
1737+
optional uint64 ReadRequestsInFlightLimit = 1 [(ControlOptions) = {
1738+
Description: "Maximum number of simultaneous read requests",
1739+
MinValue: 1,
1740+
MaxValue: 4096,
1741+
DefaultValue: 3
1742+
}];
1743+
}
1744+
17361745
optional TDataShardControls DataShardControls = 1;
17371746
optional TTxLimitControls TxLimitControls = 2;
17381747
optional TCoordinatorControls CoordinatorControls = 3;
@@ -1747,6 +1756,7 @@ message TImmediateControlsConfig {
17471756
optional TTableServiceControls TableServiceControls = 12;
17481757
optional TTestShardControls TestShardControls = 13;
17491758
optional TGRpcControls GRpcControls = 14;
1759+
optional TKeyValueVolumeControls KeyValueVolumeControls = 15;
17501760
};
17511761

17521762
message TMeteringConfig {

0 commit comments

Comments
 (0)