Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -690,28 +690,18 @@ private List<String> startTestNodes(int nodeCount, Settings additionalSettings)
* Helper method to simulate disk pressure for both hot and warm indices
*/
private void simulateDiskPressure(MockInternalClusterInfoService clusterInfoService) {
boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings);
if (isWarmIndex) {
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> TOTAL_SPACE_BYTES - WATERMARK_BYTES + 10);
} else {
clusterInfoService.setDiskUsageFunctionAndRefresh(
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1)
);
}
clusterInfoService.setDiskUsageFunctionAndRefresh(
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, WATERMARK_BYTES - 1)
);
}

/**
* Helper method to release disk pressure for both hot and warm indices
*/
private void releaseDiskPressure(MockInternalClusterInfoService clusterInfoService) {
boolean isWarmIndex = WRITABLE_WARM_INDEX_SETTING.get(settings);
if (isWarmIndex) {
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 100L);
} else {
clusterInfoService.setDiskUsageFunctionAndRefresh(
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES)
);
}
clusterInfoService.setDiskUsageFunctionAndRefresh(
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.index.store.remote.filecache.AggregateFileCacheStats;
import org.opensearch.transport.client.Client;

import java.util.ArrayList;
Expand All @@ -70,7 +69,6 @@
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE;
import static org.opensearch.cluster.routing.RoutingPool.getIndexPool;
import static org.opensearch.cluster.routing.RoutingPool.getNodePool;

/**
Expand All @@ -86,7 +84,6 @@ public class DiskThresholdMonitor {
private final DiskThresholdSettings diskThresholdSettings;
private final Client client;
private final Supplier<ClusterState> clusterStateSupplier;
private final Supplier<Double> dataToFileCacheSizeRatioSupplier;
private final LongSupplier currentTimeMillisSupplier;
private final RerouteService rerouteService;
private final NodeDiskEvaluator nodeDiskEvaluator;
Expand Down Expand Up @@ -126,7 +123,6 @@ public DiskThresholdMonitor(
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
this.client = client;
this.nodeDiskEvaluator = new NodeDiskEvaluator(diskThresholdSettings, dataToFileCacheSizeRatioSupplier);
this.dataToFileCacheSizeRatioSupplier = dataToFileCacheSizeRatioSupplier;
}

private void checkFinished() {
Expand Down Expand Up @@ -181,10 +177,6 @@ public void onNewInfo(ClusterInfo info) {
// Only for Dedicated Warm Nodes
final boolean isWarmNode = REMOTE_CAPABLE.equals(getNodePool(routingNode));
nodeDiskEvaluator.setNodeType(isWarmNode);
if (isWarmNode) {
// Create DiskUsage for Warm Nodes based on total Addressable Space
usage = getWarmDiskUsage(usage, info, routingNode, state);
}

if (nodeDiskEvaluator.isNodeExceedingFloodStageWatermark(usage)) {

Expand Down Expand Up @@ -431,29 +423,6 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste
);
}

private DiskUsage getWarmDiskUsage(DiskUsage diskUsage, ClusterInfo info, RoutingNode node, ClusterState state) {
double dataToFileCacheSizeRatio = dataToFileCacheSizeRatioSupplier.get();
AggregateFileCacheStats fileCacheStats = info.getNodeFileCacheStats().getOrDefault(diskUsage.getNodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
long totalAddressableSpace = (long) dataToFileCacheSizeRatio * nodeCacheSize;
final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
.filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getIndexPool(state.metadata().getIndexSafe(shard.index()))))
.collect(Collectors.toList());

long remoteShardSize = 0L;
for (ShardRouting shard : remoteShardsOnNode) {
remoteShardSize += DiskThresholdDecider.getExpectedShardSize(shard, 0L, info, null, state.metadata(), state.getRoutingTable());
}
final DiskUsage warmDiskUsage = new DiskUsage(
diskUsage.getNodeId(),
diskUsage.getNodeName(),
diskUsage.getPath(),
totalAddressableSpace,
Math.max(0, totalAddressableSpace - remoteShardSize)
);
return warmDiskUsage;
}

private void markNodesMissingUsageIneligibleForRelease(
RoutingNodes routingNodes,
Map<String, DiskUsage> usages,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterInfo;
import org.opensearch.cluster.DiskUsage;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.DiskThresholdEvaluator;
Expand All @@ -49,6 +50,7 @@
import org.opensearch.index.store.remote.filecache.FileCacheSettings;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -111,11 +113,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return Decision.ALWAYS;
}

final Decision decision = earlyTerminate(node, allocation);
ClusterInfo clusterInfo = allocation.clusterInfo();
Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
final Decision decision = earlyTerminate(node, allocation, usages);
if (decision != null) {
return decision;
}

DiskUsage usage = usages.get(node.nodeId());
final long shardSize = DiskThresholdDecider.getExpectedShardSize(
shardRouting,
0L,
Expand All @@ -125,18 +130,21 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
allocation.routingTable()
);

final long totalAddressableSpace = calculateTotalAddressableSpace(node, allocation);
final long currentNodeRemoteShardSize = calculateCurrentNodeRemoteShardSize(node, allocation, false);
final long freeSpace = Math.max(totalAddressableSpace - currentNodeRemoteShardSize, 0);
final long freeSpaceAfterAllocation = Math.max(freeSpace - shardSize, 0);
final long freeSpaceLowThreshold = diskThresholdEvaluator.getFreeSpaceLowThreshold(totalAddressableSpace);
final DiskUsage usageAfterShardAssigned = new DiskUsage(
usage.getNodeId(),
usage.getNodeName(),
usage.getPath(),
usage.getTotalBytes(),
Math.max(0, usage.getFreeBytes() - shardSize)
);
final long freeSpaceLowThreshold = diskThresholdEvaluator.getFreeSpaceLowThreshold(usage.getTotalBytes());

final ByteSizeValue freeSpaceLowThresholdInByteSize = new ByteSizeValue(freeSpaceLowThreshold);
final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(freeSpace);
final ByteSizeValue freeSpaceAfterAllocationInByteSize = new ByteSizeValue(freeSpaceAfterAllocation);
final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(usage.getFreeBytes());
final ByteSizeValue freeSpaceAfterAllocationInByteSize = new ByteSizeValue(usageAfterShardAssigned.getFreeBytes());
final ByteSizeValue shardSizeInByteSize = new ByteSizeValue(shardSize);

if (freeSpaceAfterAllocation < freeSpaceLowThreshold) {
if (diskThresholdEvaluator.isNodeExceedingLowWatermark(usageAfterShardAssigned)) {
logger.warn(
"after allocating [{}] node [{}] would have less than the required threshold of "
+ "{} free (currently {} free, estimated shard size is {}), preventing allocation",
Expand Down Expand Up @@ -180,21 +188,29 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
return Decision.ALWAYS;
}

final Decision decision = earlyTerminate(node, allocation);
ClusterInfo clusterInfo = allocation.clusterInfo();
Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
final Decision decision = earlyTerminate(node, allocation, usages);
if (decision != null) {
return decision;
}

final long totalAddressableSpace = calculateTotalAddressableSpace(node, allocation);
final long currentNodeRemoteShardSize = calculateCurrentNodeRemoteShardSize(node, allocation, true);
final long freeSpace = Math.max(totalAddressableSpace - currentNodeRemoteShardSize, 0);
final long leavingRemoteShardSize = calculateCurrentNodeLeavingRemoteShardSize(node, allocation);
final DiskUsage usage = usages.get(node.nodeId());
final DiskUsage usageAfterSubtractingLeavingShard = new DiskUsage(
usage.getNodeId(),
usage.getNodeName(),
usage.getPath(),
usage.getTotalBytes(),
Math.min(usage.getFreeBytes() + leavingRemoteShardSize, usage.getTotalBytes())
);

final long freeSpaceHighThreshold = diskThresholdEvaluator.getFreeSpaceHighThreshold(totalAddressableSpace);
final long freeSpaceHighThreshold = diskThresholdEvaluator.getFreeSpaceHighThreshold(usage.getTotalBytes());

final ByteSizeValue freeSpaceHighThresholdInByteSize = new ByteSizeValue(freeSpaceHighThreshold);
final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(freeSpace);
final ByteSizeValue freeSpaceInByteSize = new ByteSizeValue(usageAfterSubtractingLeavingShard.getFreeBytes());

if (freeSpace < freeSpaceHighThreshold) {
if (diskThresholdEvaluator.isNodeExceedingHighWatermark(usageAfterSubtractingLeavingShard)) {
logger.warn(
"less than the required {} of free remote addressable space threshold left ({} free) on node [{}], shard cannot remain",
freeSpaceHighThresholdInByteSize,
Expand All @@ -220,18 +236,14 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
);
}

private long calculateCurrentNodeRemoteShardSize(RoutingNode node, RoutingAllocation allocation, boolean subtractLeavingShards) {
final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
.filter(
shard -> shard.primary()
&& REMOTE_CAPABLE.equals(getShardPool(shard, allocation))
&& (subtractLeavingShards == false || shard.relocating() == false)
)
private long calculateCurrentNodeLeavingRemoteShardSize(RoutingNode node, RoutingAllocation allocation) {
final List<ShardRouting> leavingRemoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
.filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)) && (shard.relocating() == true))
.collect(Collectors.toList());

var remoteShardSize = 0L;
for (ShardRouting shard : remoteShardsOnNode) {
remoteShardSize += DiskThresholdDecider.getExpectedShardSize(
var leavingRemoteShardSize = 0L;
for (ShardRouting shard : leavingRemoteShardsOnNode) {
leavingRemoteShardSize += DiskThresholdDecider.getExpectedShardSize(
shard,
0L,
allocation.clusterInfo(),
Expand All @@ -241,19 +253,10 @@ private long calculateCurrentNodeRemoteShardSize(RoutingNode node, RoutingAlloca
);
}

return remoteShardSize;
}

private long calculateTotalAddressableSpace(RoutingNode node, RoutingAllocation allocation) {
ClusterInfo clusterInfo = allocation.clusterInfo();
// TODO: Change the default value to 5 instead of 0
final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio();
final AggregateFileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
return (long) dataToFileCacheSizeRatio * nodeCacheSize;
return leavingRemoteShardSize;
}

private Decision earlyTerminate(RoutingNode node, RoutingAllocation allocation) {
private Decision earlyTerminate(RoutingNode node, RoutingAllocation allocation, final Map<String, DiskUsage> usages) {
// Always allow allocation if the decider is disabled
if (diskThresholdSettings.isWarmThresholdEnabled() == false) {
return allocation.decision(Decision.YES, NAME, "the warm disk threshold decider is disabled");
Expand Down Expand Up @@ -285,9 +288,12 @@ private Decision earlyTerminate(RoutingNode node, RoutingAllocation allocation)
return allocation.decision(Decision.YES, NAME, "File Cache Stat is unavailable");
}

double remoteDataRatio = fileCacheSettings.getRemoteDataRatio();
if (remoteDataRatio == 0) {
return allocation.decision(Decision.YES, NAME, "Remote data ratio is set to 0, no limit on allocation");
// Fail open if there are no disk usages available
if (usages.isEmpty() || usages.containsKey(node.nodeId()) == false) {
if (logger.isTraceEnabled()) {
logger.trace("unable to determine disk usages for disk-aware allocation, allowing allocation");
}
return allocation.decision(Decision.YES, NAME, "disk usages are unavailable");
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
// High stage threshold (50%) = 200 * 0.15 = 30
// Free space = 28 < 30, so should exceed low stage
Map<String, DiskUsage> diskUsages = new HashMap<>();
diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 8));
diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 28));

Map<String, AggregateFileCacheStats> fileCacheStats = new HashMap<>();
fileCacheStats.put("warm_node", createAggregateFileCacheStats(100));
Expand Down Expand Up @@ -982,7 +982,7 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
// High stage threshold (10%) = 200 * 0.1 = 20
// Free space = 18 < 20, so should exceed high stage
Map<String, DiskUsage> diskUsages = new HashMap<>();
diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 8));
diskUsages.put("warm_node", new DiskUsage("warm_node", "warm_node", "/foo/bar", 200, 18));

Map<String, AggregateFileCacheStats> fileCacheStats = new HashMap<>();
fileCacheStats.put("warm_node", createAggregateFileCacheStats(100));
Expand Down
Loading
Loading