-
Notifications
You must be signed in to change notification settings - Fork 25.4k
Add shard write-load to cluster info #131496
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
6d1093f
4fcea56
db99e30
b24aa23
62d47f2
529cba3
1bb0203
cd3169f
45020c5
c5ceb92
6f2ea34
7886659
cfe5759
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; | ||
import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.cluster.metadata.ProjectId; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.node.DiscoveryNodeUtils; | ||
import org.elasticsearch.cluster.routing.RecoverySource; | ||
|
@@ -104,6 +105,7 @@ | |
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Predicate; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.IntStream; | ||
import java.util.stream.Stream; | ||
|
||
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength; | ||
|
@@ -355,6 +357,62 @@ public void testNodeWriteLoadsArePresent() { | |
} | ||
} | ||
|
||
public void testShardWriteLoadsArePresent() { | ||
// Create some indices and some write-load | ||
final int numIndices = randomIntBetween(1, 5); | ||
final String indexPrefix = randomIdentifier(); | ||
IntStream.range(0, numIndices).forEach(i -> { | ||
final String indexName = indexPrefix + "_" + i; | ||
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)).build()); | ||
IntStream.range(0, randomIntBetween(1, 500)) | ||
.forEach(j -> prepareIndex(indexName).setSource("foo", randomIdentifier(), "bar", randomIdentifier()).get()); | ||
}); | ||
|
||
final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); | ||
|
||
// Not collecting stats yet because allocation write load stats collection is disabled by default. | ||
{ | ||
ClusterInfoServiceUtils.refresh(clusterInfoService); | ||
final Map<ShardId, Double> shardWriteLoads = clusterInfoService.getClusterInfo().getShardWriteLoads(); | ||
assertNotNull(shardWriteLoads); | ||
assertTrue(shardWriteLoads.isEmpty()); | ||
} | ||
|
||
// Enable collection for node write loads. | ||
updateClusterSettings( | ||
Settings.builder() | ||
.put( | ||
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), | ||
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED | ||
) | ||
.build() | ||
); | ||
|
||
try { | ||
// Force a ClusterInfo refresh to run collection of the node thread pool usage stats. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. copy-paste failed you here :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in 7886659 |
||
ClusterInfoServiceUtils.refresh(clusterInfoService); | ||
final Map<ShardId, Double> shardWriteLoads = clusterInfoService.getClusterInfo().getShardWriteLoads(); | ||
|
||
// Verify that each shard has write-load reported. | ||
final ClusterState state = getInstanceFromNode(ClusterService.class).state(); | ||
assertEquals(state.projectState(ProjectId.DEFAULT).metadata().getTotalNumberOfShards(), shardWriteLoads.size()); | ||
double maximumLoadRecorded = 0; | ||
for (IndexMetadata indexMetadata : state.projectState(ProjectId.DEFAULT).metadata()) { | ||
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { | ||
final ShardId shardId = new ShardId(indexMetadata.getIndex(), i); | ||
assertTrue(shardWriteLoads.containsKey(shardId)); | ||
maximumLoadRecorded = Math.max(shardWriteLoads.get(shardId), maximumLoadRecorded); | ||
} | ||
} | ||
// And that at least one is greater than zero | ||
assertThat(maximumLoadRecorded, greaterThan(0.0)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't this assert be per index? Since all the indices received writes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No we insert between 1 and 500 documents in indices with between 1 and 3 shards, so it's possible some shards will not be written to. This is just a sanity check anyhow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think I follow. Some shards won't receive writes, I agree. But all indices will have writes (to at least one shard), and thus maximumLoadRecorded would have a value per index. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh sorry, you're right. For some reason I read that as per shard. Indeed it could be tightened up to be per index. I'll put up a small PR to do that. |
||
} finally { | ||
updateClusterSettings( | ||
Settings.builder().putNull(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey()).build() | ||
); | ||
} | ||
} | ||
|
||
public void testIndexCanChangeCustomDataPath() throws Exception { | ||
final String index = "test-custom-data-path"; | ||
final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10)); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,9 +59,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { | |
final Map<NodeAndPath, ReservedSpace> reservedSpace; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The class comment further up ^ could use an update at this point. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in cfe5759 |
||
final Map<String, EstimatedHeapUsage> estimatedHeapUsages; | ||
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools; | ||
final Map<ShardId, Double> shardWriteLoads; | ||
|
||
protected ClusterInfo() { | ||
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); | ||
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); | ||
} | ||
|
||
/** | ||
|
@@ -85,7 +86,8 @@ public ClusterInfo( | |
Map<NodeAndShard, String> dataPath, | ||
Map<NodeAndPath, ReservedSpace> reservedSpace, | ||
Map<String, EstimatedHeapUsage> estimatedHeapUsages, | ||
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools | ||
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools, | ||
Map<ShardId, Double> shardWriteLoads | ||
) { | ||
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage); | ||
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage); | ||
|
@@ -95,6 +97,7 @@ public ClusterInfo( | |
this.reservedSpace = Map.copyOf(reservedSpace); | ||
this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages); | ||
this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools); | ||
this.shardWriteLoads = Map.copyOf(shardWriteLoads); | ||
} | ||
|
||
public ClusterInfo(StreamInput in) throws IOException { | ||
|
@@ -116,6 +119,11 @@ public ClusterInfo(StreamInput in) throws IOException { | |
} else { | ||
this.nodeUsageStatsForThreadPools = Map.of(); | ||
} | ||
if (in.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) { | ||
this.shardWriteLoads = in.readImmutableMap(ShardId::new, StreamInput::readDouble); | ||
} else { | ||
this.shardWriteLoads = Map.of(); | ||
} | ||
} | ||
|
||
@Override | ||
|
@@ -136,6 +144,9 @@ public void writeTo(StreamOutput out) throws IOException { | |
if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) { | ||
out.writeMap(this.nodeUsageStatsForThreadPools, StreamOutput::writeWriteable); | ||
} | ||
if (out.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) { | ||
out.writeMap(this.shardWriteLoads, StreamOutput::writeWriteable, StreamOutput::writeDouble); | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -255,6 +266,16 @@ public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() { | |
return this.mostAvailableSpaceUsage; | ||
} | ||
|
||
/** | ||
* Returns a map of shard IDs to the write-loads for use in balancing. The write-loads can be interpreted | ||
* as the average number of threads that ingestion to the shard will consume. | ||
* This information may be partial or missing altogether under some circumstances. The absence of a shard | ||
* write load from the map should be interpreted as "unknown". | ||
*/ | ||
public Map<ShardId, Double> getShardWriteLoads() { | ||
return shardWriteLoads; | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. update the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like that we add equals and hashCode just for testing, but I updated them in cd3169f |
||
/** | ||
* Returns the shard size for the given shardId or <code>null</code> if that metric is not available. | ||
*/ | ||
|
@@ -466,6 +487,7 @@ public static class Builder { | |
private Map<NodeAndPath, ReservedSpace> reservedSpace = Map.of(); | ||
private Map<String, EstimatedHeapUsage> estimatedHeapUsages = Map.of(); | ||
private Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools = Map.of(); | ||
private Map<ShardId, Double> shardWriteLoads = Map.of(); | ||
|
||
public ClusterInfo build() { | ||
return new ClusterInfo( | ||
|
@@ -476,7 +498,8 @@ public ClusterInfo build() { | |
dataPath, | ||
reservedSpace, | ||
estimatedHeapUsages, | ||
nodeUsageStatsForThreadPools | ||
nodeUsageStatsForThreadPools, | ||
shardWriteLoads | ||
); | ||
} | ||
|
||
|
@@ -519,5 +542,10 @@ public Builder nodeUsageStatsForThreadPools(Map<String, NodeUsageStatsForThreadP | |
this.nodeUsageStatsForThreadPools = nodeUsageStatsForThreadPools; | ||
return this; | ||
} | ||
|
||
public Builder shardWriteLoads(Map<ShardId, Double> shardWriteLoads) { | ||
this.shardWriteLoads = shardWriteLoads; | ||
return this; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -159,7 +159,8 @@ public ClusterInfo getClusterInfo() { | |
dataPath, | ||
Map.of(), | ||
estimatedHeapUsages, | ||
nodeThreadPoolUsageStats | ||
nodeThreadPoolUsageStats, | ||
allocation.clusterInfo().getShardWriteLoads() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be saved as a private variable initialized in the ClusterInfoSimulator constructor? Similar to estimatedHeapUsages and nodeThreadPoolUsageStats There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No we won't use it at this level in the |
||
); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ | |
import org.elasticsearch.common.unit.ByteSizeValue; | ||
import org.elasticsearch.common.util.concurrent.EsExecutors; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.index.shard.IndexingStats; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.index.store.StoreStats; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
|
@@ -215,7 +216,7 @@ void execute() { | |
logger.trace("starting async refresh"); | ||
|
||
try (var ignoredRefs = fetchRefs) { | ||
maybeFetchIndicesStats(diskThresholdEnabled); | ||
maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED); | ||
maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled); | ||
maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled); | ||
maybeFetchNodesUsageStatsForThreadPools(writeLoadConstraintEnabled); | ||
|
@@ -301,7 +302,14 @@ public void onFailure(Exception e) { | |
private void fetchIndicesStats() { | ||
final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); | ||
indicesStatsRequest.clear(); | ||
indicesStatsRequest.store(true); | ||
if (diskThresholdEnabled) { | ||
// This returns the shard sizes on disk | ||
indicesStatsRequest.store(true); | ||
} | ||
if (writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED) { | ||
// This returns the shard write-loads | ||
indicesStatsRequest.indexing(true); | ||
} | ||
indicesStatsRequest.indicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_CLOSED_HIDDEN); | ||
indicesStatsRequest.timeout(fetchTimeout); | ||
client.admin() | ||
|
@@ -350,13 +358,15 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { | |
} | ||
|
||
final ShardStats[] stats = indicesStatsResponse.getShards(); | ||
final Map<ShardId, Double> shardWriteLoadByIdentifierBuilder = new HashMap<>(); | ||
final Map<String, Long> shardSizeByIdentifierBuilder = new HashMap<>(); | ||
final Map<ShardId, Long> shardDataSetSizeBuilder = new HashMap<>(); | ||
final Map<ClusterInfo.NodeAndShard, String> dataPath = new HashMap<>(); | ||
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceBuilders = | ||
new HashMap<>(); | ||
buildShardLevelInfo( | ||
adjustShardStats(stats), | ||
shardWriteLoadByIdentifierBuilder, | ||
shardSizeByIdentifierBuilder, | ||
shardDataSetSizeBuilder, | ||
dataPath, | ||
|
@@ -370,7 +380,8 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { | |
Map.copyOf(shardSizeByIdentifierBuilder), | ||
Map.copyOf(shardDataSetSizeBuilder), | ||
Map.copyOf(dataPath), | ||
Map.copyOf(reservedSpace) | ||
Map.copyOf(reservedSpace), | ||
Map.copyOf(shardWriteLoadByIdentifierBuilder) | ||
); | ||
} | ||
|
||
|
@@ -527,8 +538,6 @@ public ClusterInfo getClusterInfo() { | |
estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage)); | ||
} | ||
}); | ||
final Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStats = new HashMap<>(); | ||
nodeThreadPoolUsageStatsPerNode.forEach((nodeId, nodeWriteLoad) -> { nodeThreadPoolUsageStats.put(nodeId, nodeWriteLoad); }); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This appeared to be just a copy, which already happens in the |
||
return new ClusterInfo( | ||
leastAvailableSpaceUsages, | ||
mostAvailableSpaceUsages, | ||
|
@@ -537,7 +546,8 @@ public ClusterInfo getClusterInfo() { | |
indicesStatsSummary.dataPath, | ||
indicesStatsSummary.reservedSpace, | ||
estimatedHeapUsages, | ||
nodeThreadPoolUsageStats | ||
nodeThreadPoolUsageStatsPerNode, | ||
indicesStatsSummary.shardWriteLoads() | ||
); | ||
} | ||
|
||
|
@@ -567,6 +577,7 @@ public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) { | |
|
||
static void buildShardLevelInfo( | ||
ShardStats[] stats, | ||
Map<ShardId, Double> shardWriteLoads, | ||
Map<String, Long> shardSizes, | ||
Map<ShardId, Long> shardDataSetSizeBuilder, | ||
Map<ClusterInfo.NodeAndShard, String> dataPathByShard, | ||
|
@@ -577,25 +588,31 @@ static void buildShardLevelInfo( | |
dataPathByShard.put(ClusterInfo.NodeAndShard.from(shardRouting), s.getDataPath()); | ||
|
||
final StoreStats storeStats = s.getStats().getStore(); | ||
if (storeStats == null) { | ||
continue; | ||
} | ||
final long size = storeStats.sizeInBytes(); | ||
final long dataSetSize = storeStats.totalDataSetSizeInBytes(); | ||
final long reserved = storeStats.reservedSizeInBytes(); | ||
|
||
final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting); | ||
logger.trace("shard: {} size: {} reserved: {}", shardIdentifier, size, reserved); | ||
shardSizes.put(shardIdentifier, size); | ||
if (dataSetSize > shardDataSetSizeBuilder.getOrDefault(shardRouting.shardId(), -1L)) { | ||
shardDataSetSizeBuilder.put(shardRouting.shardId(), dataSetSize); | ||
if (storeStats != null) { | ||
final long size = storeStats.sizeInBytes(); | ||
final long dataSetSize = storeStats.totalDataSetSizeInBytes(); | ||
final long reserved = storeStats.reservedSizeInBytes(); | ||
|
||
final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting); | ||
logger.trace("shard: {} size: {} reserved: {}", shardIdentifier, size, reserved); | ||
shardSizes.put(shardIdentifier, size); | ||
if (dataSetSize > shardDataSetSizeBuilder.getOrDefault(shardRouting.shardId(), -1L)) { | ||
shardDataSetSizeBuilder.put(shardRouting.shardId(), dataSetSize); | ||
} | ||
if (reserved != StoreStats.UNKNOWN_RESERVED_BYTES) { | ||
final ClusterInfo.ReservedSpace.Builder reservedSpaceBuilder = reservedSpaceByShard.computeIfAbsent( | ||
new ClusterInfo.NodeAndPath(shardRouting.currentNodeId(), s.getDataPath()), | ||
t -> new ClusterInfo.ReservedSpace.Builder() | ||
); | ||
reservedSpaceBuilder.add(shardRouting.shardId(), reserved); | ||
} | ||
} | ||
if (reserved != StoreStats.UNKNOWN_RESERVED_BYTES) { | ||
final ClusterInfo.ReservedSpace.Builder reservedSpaceBuilder = reservedSpaceByShard.computeIfAbsent( | ||
new ClusterInfo.NodeAndPath(shardRouting.currentNodeId(), s.getDataPath()), | ||
t -> new ClusterInfo.ReservedSpace.Builder() | ||
); | ||
reservedSpaceBuilder.add(shardRouting.shardId(), reserved); | ||
final IndexingStats indexingStats = s.getStats().getIndexing(); | ||
if (indexingStats != null) { | ||
final double shardWriteLoad = indexingStats.getTotal().getPeakWriteLoad(); | ||
if (shardWriteLoad > shardWriteLoads.getOrDefault(shardRouting.shardId(), -1.0)) { | ||
shardWriteLoads.put(shardRouting.shardId(), shardWriteLoad); | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -623,9 +640,10 @@ private record IndicesStatsSummary( | |
Map<String, Long> shardSizes, | ||
Map<ShardId, Long> shardDataSetSizes, | ||
Map<ClusterInfo.NodeAndShard, String> dataPath, | ||
Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace | ||
Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace, | ||
Map<ShardId, Double> shardWriteLoads | ||
) { | ||
static final IndicesStatsSummary EMPTY = new IndicesStatsSummary(Map.of(), Map.of(), Map.of(), Map.of()); | ||
static final IndicesStatsSummary EMPTY = new IndicesStatsSummary(Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -135,9 +135,17 @@ public void testFillShardLevelInfo() { | |
0 | ||
) }; | ||
Map<String, Long> shardSizes = new HashMap<>(); | ||
HashMap<ShardId, Double> shardWriteLoads = new HashMap<>(); | ||
Map<ShardId, Long> shardDataSetSizes = new HashMap<>(); | ||
Map<ClusterInfo.NodeAndShard, String> routingToPath = new HashMap<>(); | ||
InternalClusterInfoService.buildShardLevelInfo(stats, shardSizes, shardDataSetSizes, routingToPath, new HashMap<>()); | ||
InternalClusterInfoService.buildShardLevelInfo( | ||
stats, | ||
shardWriteLoads, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Below there are checks on the results. Are there some checks that can be added for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added in c5ceb92 |
||
shardSizes, | ||
shardDataSetSizes, | ||
routingToPath, | ||
new HashMap<>() | ||
); | ||
|
||
assertThat( | ||
shardSizes, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -621,6 +621,7 @@ public void testUnassignedAllocationPredictsDiskUsage() { | |
ImmutableOpenMap.of(), | ||
ImmutableOpenMap.of(), | ||
ImmutableOpenMap.of(), | ||
ImmutableOpenMap.of(), | ||
ImmutableOpenMap.of() | ||
); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like I missed a handful of these in the builder conversion 😏 Oops.. |
||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.