Skip to content

Commit 37fce7c

Browse files
Gagan6164tandonks
authored andcommitted
Refactor WarmDiskThresholdDecider and DiskThresholdMonitor to use warm disk usage (opensearch-project#18808)
Signed-off-by: Gagan Singh Saini <[email protected]>
1 parent 1109770 commit 37fce7c

File tree

5 files changed

+111
-203
lines changed

5 files changed

+111
-203
lines changed

server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -690,28 +690,18 @@ private List<String> startTestNodes(int nodeCount, Settings additionalSettings)
690690
* Helper method to simulate disk pressure for both hot and warm indices
691691
*/
692692
private void simulateDiskPressure(MockInternalClusterInfoService clusterInfoService) {
693-
boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings);
694-
if (isWarmIndex) {
695-
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> TOTAL_SPACE_BYTES - WATERMARK_BYTES + 10);
696-
} else {
697-
clusterInfoService.setDiskUsageFunctionAndRefresh(
698-
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1)
699-
);
700-
}
693+
clusterInfoService.setDiskUsageFunctionAndRefresh(
694+
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1)
695+
);
701696
}
702697

703698
/**
704699
* Helper method to release disk pressure for both hot and warm indices
705700
*/
706701
private void releaseDiskPressure(MockInternalClusterInfoService clusterInfoService) {
707-
boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings);
708-
if (isWarmIndex) {
709-
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 100L);
710-
} else {
711-
clusterInfoService.setDiskUsageFunctionAndRefresh(
712-
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES)
713-
);
714-
}
702+
clusterInfoService.setDiskUsageFunctionAndRefresh(
703+
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES)
704+
);
715705
}
716706

717707
/**

server/src/main/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitor.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import org.opensearch.common.util.set.Sets;
5454
import org.opensearch.core.action.ActionListener;
5555
import org.opensearch.core.common.Strings;
56-
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
5756
import org.opensearch.transport.client.Client;
5857

5958
import java.util.ArrayList;
@@ -70,7 +69,6 @@
7069
import java.util.stream.StreamSupport;
7170

7271
import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE;
73-
import static org.opensearch.cluster.routing.RoutingPool.getIndexPool;
7472
import static org.opensearch.cluster.routing.RoutingPool.getNodePool;
7573

7674
/**
@@ -86,7 +84,6 @@ public class DiskThresholdMonitor {
8684
private final DiskThresholdSettings diskThresholdSettings;
8785
private final Client client;
8886
private final Supplier<ClusterState> clusterStateSupplier;
89-
private final Supplier<Double> dataToFileCacheSizeRatioSupplier;
9087
private final LongSupplier currentTimeMillisSupplier;
9188
private final RerouteService rerouteService;
9289
private final NodeDiskEvaluator nodeDiskEvaluator;
@@ -126,7 +123,6 @@ public DiskThresholdMonitor(
126123
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
127124
this.client = client;
128125
this.nodeDiskEvaluator = new NodeDiskEvaluator(diskThresholdSettings, dataToFileCacheSizeRatioSupplier);
129-
this.dataToFileCacheSizeRatioSupplier = dataToFileCacheSizeRatioSupplier;
130126
}
131127

132128
private void checkFinished() {
@@ -181,10 +177,6 @@ public void onNewInfo(ClusterInfo info) {
181177
// Only for Dedicated Warm Nodes
182178
final boolean isWarmNode = REMOTE_CAPABLE.equals(getNodePool(routingNode));
183179
nodeDiskEvaluator.setNodeType(isWarmNode);
184-
if (isWarmNode) {
185-
// Create DiskUsage for Warm Nodes based on total Addressable Space
186-
usage = getWarmDiskUsage(usage, info, routingNode, state);
187-
}
188180

189181
if (nodeDiskEvaluator.isNodeExceedingFloodStageWatermark(usage)) {
190182

@@ -431,29 +423,6 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste
431423
);
432424
}
433425

434-
private DiskUsage getWarmDiskUsage(DiskUsage diskUsage, ClusterInfo info, RoutingNode node, ClusterState state) {
435-
double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get();
436-
AggregateFileCacheStats fileCacheStats = info.getNodeFileCacheStats().getOrDefault(diskUsage.getNodeId(), null);
437-
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
438-
long totalAddressableSpace = (long) dataToFileCacheSizeRatio * nodeCacheSize;
439-
final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
440-
.filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getIndexPool(state.metadata().getIndexSafe(shard.index()))))
441-
.collect(Collectors.toList());
442-
443-
long remoteShardSize = 0L;
444-
for (ShardRouting shard : remoteShardsOnNode) {
445-
remoteShardSize += DiskThresholdDecider.getExpectedShardSize(shard, 0L, info, null, state.metadata(), state.getRoutingTable());
446-
}
447-
final DiskUsage warmDiskUsage = new DiskUsage(
448-
diskUsage.getNodeId(),
449-
diskUsage.getNodeName(),
450-
diskUsage.getPath(),
451-
totalAddressableSpace,
452-
Math.max(0, totalAddressableSpace - remoteShardSize)
453-
);
454-
return warmDiskUsage;
455-
}
456-
457426
private void markNodesMissingUsageIneligibleForRelease(
458427
RoutingNodes routingNodes,
459428
Map<String, DiskUsage> usages,

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/WarmDiskThresholdDecider.java

Lines changed: 46 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.logging.log4j.Logger;
3737
import org.opensearch.Version;
3838
import org.opensearch.cluster.ClusterInfo;
39+
import org.opensearch.cluster.DiskUsage;
3940
import org.opensearch.cluster.routing.RoutingNode;
4041
import org.opensearch.cluster.routing.ShardRouting;
4142
import org.opensearch.cluster.routing.allocation.DiskThresholdEvaluator;
@@ -49,6 +50,7 @@
4950
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
5051

5152
import java.util.List;
53+
import java.util.Map;
5254
import java.util.stream.Collectors;
5355
import java.util.stream.StreamSupport;
5456

@@ -111,11 +113,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
111113
return Decision.ALWAYS;
112114
}
113115

114-
final Decision decision = earlyTerminate(node, allocation);
116+
ClusterInfo clusterInfo = allocation.clusterInfo();
117+
Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
118+
final Decision decision = earlyTerminate(node, allocation, usages);
115119
if (decision != null) {
116120
return decision;
117121
}
118122

123+
DiskUsage usage = usages.get(node.nodeId());
119124
final long shardSize = DiskThresholdDecider.getExpectedShardSize(
120125
shardRouting,
121126
0L,
@@ -125,18 +130,21 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
125130
allocation.routingTable()
126131
);
127132

128-
final long totalAddressableSpace = calculateTotalAddressableSpace(node, allocation);
129-
final long currentNodeRemoteShardSize = calculateCurrentNodeRemoteShardSize(node, allocation, false);
130-
final long freeSpace = Math.max(totalAddressableSpace - currentNodeRemoteShardSize, 0);
131-
final long freeSpaceAfterAllocation = Math.max(freeSpace - shardSize, 0);
132-
final long freeSpaceLowThreshold = diskThresholdEvaluator.getFreeSpaceLowThreshold(totalAddressableSpace);
133+
final DiskUsage usageAfterShardAssigned = new DiskUsage(
134+
usage.getNodeId(),
135+
usage.getNodeName(),
136+
usage.getPath(),
137+
usage.getTotalBytes(),
138+
Math.max(0, usage.getFreeBytes() - shardSize)
139+
);
140+
final long freeSpaceLowThreshold = diskThresholdEvaluator.getFreeSpaceLowThreshold(usage.getTotalBytes());
133141

134142
final ByteSizeValue freeSpaceLowThresholdInByteSize = new ByteSizeValue(freeSpaceLowThreshold);
135-
final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(freeSpace);
136-
final ByteSizeValue freeSpaceAfterAllocationInByteSize = new ByteSizeValue(freeSpaceAfterAllocation);
143+
final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(usage.getFreeBytes());
144+
final ByteSizeValue freeSpaceAfterAllocationInByteSize = new ByteSizeValue(usageAfterShardAssigned.getFreeBytes());
137145
final ByteSizeValue shardSizeInByteSize = new ByteSizeValue(shardSize);
138146

139-
if (freeSpaceAfterAllocation < freeSpaceLowThreshold) {
147+
if (diskThresholdEvaluator.isNodeExceedingLowWatermark(usageAfterShardAssigned)) {
140148
logger.warn(
141149
"after allocating [{}] node [{}] would have less than the required threshold of "
142150
+ "{} free (currently {} free, estimated shard size is {}), preventing allocation",
@@ -180,21 +188,29 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
180188
return Decision.ALWAYS;
181189
}
182190

183-
final Decision decision = earlyTerminate(node, allocation);
191+
ClusterInfo clusterInfo = allocation.clusterInfo();
192+
Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
193+
final Decision decision = earlyTerminate(node, allocation, usages);
184194
if (decision != null) {
185195
return decision;
186196
}
187197

188-
final long totalAddressableSpace = calculateTotalAddressableSpace(node, allocation);
189-
final long currentNodeRemoteShardSize = calculateCurrentNodeRemoteShardSize(node, allocation, true);
190-
final long freeSpace = Math.max(totalAddressableSpace - currentNodeRemoteShardSize, 0);
198+
final long leavingRemoteShardSize = calculateCurrentNodeLeavingRemoteShardSize(node, allocation);
199+
final DiskUsage usage = usages.get(node.nodeId());
200+
final DiskUsage usageAfterSubtractingLeavingShard = new DiskUsage(
201+
usage.getNodeId(),
202+
usage.getNodeName(),
203+
usage.getPath(),
204+
usage.getTotalBytes(),
205+
Math.min(usage.getFreeBytes() + leavingRemoteShardSize, usage.getTotalBytes())
206+
);
191207

192-
final long freeSpaceHighThreshold = diskThresholdEvaluator.getFreeSpaceHighThreshold(totalAddressableSpace);
208+
final long freeSpaceHighThreshold = diskThresholdEvaluator.getFreeSpaceHighThreshold(usage.getTotalBytes());
193209

194210
final ByteSizeValue freeSpaceHighThresholdInByteSize = new ByteSizeValue(freeSpaceHighThreshold);
195-
final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(freeSpace);
211+
final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(usageAfterSubtractingLeavingShard.getFreeBytes());
196212

197-
if (freeSpace < freeSpaceHighThreshold) {
213+
if (diskThresholdEvaluator.isNodeExceedingHighWatermark(usageAfterSubtractingLeavingShard)) {
198214
logger.warn(
199215
"less than the required {} of free remote addressable space threshold left ({} free) on node [{}], shard cannot remain",
200216
freeSpaceHighThresholdInByteSize,
@@ -220,18 +236,14 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
220236
);
221237
}
222238

223-
private long calculateCurrentNodeRemoteShardSize(RoutingNode node, RoutingAllocation allocation, boolean subtractLeavingShards) {
224-
final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
225-
.filter(
226-
shard -> shard.primary()
227-
&& REMOTE_CAPABLE.equals(getShardPool(shard, allocation))
228-
&& (subtractLeavingShards == false || shard.relocating() == false)
229-
)
239+
private long calculateCurrentNodeLeavingRemoteShardSize(RoutingNode node, RoutingAllocation allocation) {
240+
final List<ShardRouting> leavingRemoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
241+
.filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)) && (shard.relocating() == true))
230242
.collect(Collectors.toList());
231243

232-
var remoteShardSize = 0L;
233-
for (ShardRouting shard : remoteShardsOnNode) {
234-
remoteShardSize += DiskThresholdDecider.getExpectedShardSize(
244+
var leavingRemoteShardSize = 0L;
245+
for (ShardRouting shard : leavingRemoteShardsOnNode) {
246+
leavingRemoteShardSize += DiskThresholdDecider.getExpectedShardSize(
235247
shard,
236248
0L,
237249
allocation.clusterInfo(),
@@ -241,19 +253,10 @@ private long calculateCurrentNodeRemoteShardSize(RoutingNode node, RoutingAlloca
241253
);
242254
}
243255

244-
return remoteShardSize;
245-
}
246-
247-
private long calculateTotalAddressableSpace(RoutingNode node, RoutingAllocation allocation) {
248-
ClusterInfo clusterInfo = allocation.clusterInfo();
249-
// TODO: Change the default value to 5 instead of 0
250-
final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio();
251-
final AggregateFileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
252-
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
253-
return (long) dataToFileCacheSizeRatio * nodeCacheSize;
256+
return leavingRemoteShardSize;
254257
}
255258

256-
private Decision earlyTerminate(RoutingNode node, RoutingAllocation allocation) {
259+
private Decision earlyTerminate(RoutingNode node, RoutingAllocation allocation, final Map<String, DiskUsage> usages) {
257260
// Always allow allocation if the decider is disabled
258261
if (diskThresholdSettings.isWarmThresholdEnabled() == false) {
259262
return allocation.decision(Decision.YES, NAME, "the warm disk threshold decider is disabled");
@@ -285,9 +288,12 @@ private Decision earlyTerminate(RoutingNode node, RoutingAllocation allocation)
285288
return allocation.decision(Decision.YES, NAME, "File Cache Stat is unavailable");
286289
}
287290

288-
double remoteDataRatio = fileCacheSettings.getRemoteDataRatio();
289-
if (remoteDataRatio == 0) {
290-
return allocation.decision(Decision.YES, NAME, "Remote data ratio is set to 0, no limit on allocation");
291+
// Fail open if there are no disk usages available
292+
if (usages.isEmpty() || usages.containsKey(node.nodeId()) == false) {
293+
if (logger.isTraceEnabled()) {
294+
logger.trace("unable to determine disk usages for disk-aware allocation, allowing allocation");
295+
}
296+
return allocation.decision(Decision.YES, NAME, "disk usages are unavailable");
291297
}
292298

293299
return null;

server/src/test/java/org/opensearch/cluster/routing/allocation/DiskThresholdMonitorTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -901,7 +901,7 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
901901
// High stage threshold (50%) = 200 * 0.15 = 30
902902
// Free space = 28 < 30, so should exceed low stage
903903
Map<String, DiskUsage> diskUsages = new HashMap<>();
904-
diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 8));
904+
diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 28));
905905

906906
Map<String, AggregateFileCacheStats> fileCacheStats = new HashMap<>();
907907
fileCacheStats.put("warm_node", createAggregateFileCacheStats(100));
@@ -982,7 +982,7 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
982982
// High stage threshold (10%) = 200 * 0.1 = 20
983983
// Free space = 18 < 20, so should exceed high stage
984984
Map<String, DiskUsage> diskUsages = new HashMap<>();
985-
diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 8));
985+
diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 18));
986986

987987
Map<String, AggregateFileCacheStats> fileCacheStats = new HashMap<>();
988988
fileCacheStats.put("warm_node", createAggregateFileCacheStats(100));

0 commit comments

Comments
 (0)