Skip to content

Commit d5c70ce

Browse files
authored
fix compaction max inflight (#21234)
1 parent 084dfdd commit d5c70ce

File tree

1 file changed

+13
-4
lines changed

1 file changed

+13
-4
lines changed

ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ namespace NKikimr {
194194
// is this the first key?
195195
bool IsFirstKey = true;
196196

197+
// max inflight request to pdisk
198+
ui32 MaxInFlightWrites;
199+
ui32 MaxInFlightReads;
200+
197201
public:
198202
struct TStatistics {
199203
THullCtxPtr HullCtx;
@@ -296,6 +300,9 @@ namespace NKikimr {
296300
ReadsInFlight = &LevelIndex->HullCompReadsInFlight;
297301
WritesInFlight = &LevelIndex->HullCompWritesInFlight;
298302
}
303+
304+
MaxInFlightWrites = GetMaxInFlightWrites();
305+
MaxInFlightReads = GetMaxInFlightReads();
299306
}
300307

301308
void Prepare(THandoffMapPtr hmp, TGcMapIterator gcmpIt) {
@@ -308,6 +315,8 @@ namespace NKikimr {
308315
// when there is more work to do, return false; MUST NOT return true unless all pending requests are finished
309316
bool MainCycle(TVector<std::unique_ptr<IEventBase>>& msgsForYard) {
310317
for (;;) {
318+
MaxInFlightWrites = GetMaxInFlightWrites();
319+
MaxInFlightReads = GetMaxInFlightReads();
311320
switch (State) {
312321
case EState::Invalid:
313322
Y_ABORT("unexpected state");
@@ -401,7 +410,7 @@ namespace NKikimr {
401410

402411
case EState::FlushingSST:
403412
// do not continue processing if there are too many writes in flight
404-
if (InFlightWrites >= GetMaxInFlightWrites()) {
413+
if (InFlightWrites >= MaxInFlightWrites) {
405414
return false;
406415
}
407416
// try to flush SST
@@ -663,7 +672,7 @@ namespace NKikimr {
663672
bool FlushSST(TVector<std::unique_ptr<IEventBase>>& msgsForYard) {
664673
// try to flush some more data; if the flush fails, it means that we have reached in flight write limit and
665674
// there is nothing to do here now, so we return
666-
const bool flushDone = WriterPtr->FlushNext(FirstLsn, LastLsn, GetMaxInFlightWrites() - InFlightWrites);
675+
const bool flushDone = WriterPtr->FlushNext(FirstLsn, LastLsn, MaxInFlightWrites - InFlightWrites);
667676
ProcessPendingMessages(msgsForYard);
668677
if (!flushDone) {
669678
return false;
@@ -687,7 +696,7 @@ namespace NKikimr {
687696

688697
// send new messages until we reach in flight limit
689698
std::unique_ptr<NPDisk::TEvChunkWrite> msg;
690-
while (InFlightWrites < GetMaxInFlightWrites() && (msg = WriterPtr->GetPendingMessage())) {
699+
while (InFlightWrites < MaxInFlightWrites && (msg = WriterPtr->GetPendingMessage())) {
691700
HullCtx->VCtx->CountCompactionCost(*msg);
692701
Statistics.Update(msg.get());
693702
msgsForYard.push_back(std::move(msg));
@@ -696,7 +705,7 @@ namespace NKikimr {
696705
}
697706

698707
std::unique_ptr<NPDisk::TEvChunkRead> readMsg;
699-
while (InFlightReads < GetMaxInFlightReads() && (readMsg = ReadBatcher.GetPendingMessage(
708+
while (InFlightReads < MaxInFlightReads && (readMsg = ReadBatcher.GetPendingMessage(
700709
PDiskCtx->Dsk->Owner, PDiskCtx->Dsk->OwnerRound, NPriRead::HullComp))) {
701710
HullCtx->VCtx->CountCompactionCost(*readMsg);
702711
Statistics.Update(readMsg.get());

0 commit comments

Comments
 (0)