diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cf0e78220f4a..a57338a3581a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Return full error for GRPC error response ([#19568](https://github.com/opensearch-project/OpenSearch/pull/19568)) - Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005)) +- Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929)) + ### Changed - Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350)) - Refactor to move prepareIndex and prepareDelete methods to Engine class ([#19551](https://github.com/opensearch-project/OpenSearch/pull/19551)) diff --git a/libs/core/src/main/java/org/opensearch/core/action/ActionListener.java b/libs/core/src/main/java/org/opensearch/core/action/ActionListener.java index 4fd55898a2cb5..b93f841e3bda4 100644 --- a/libs/core/src/main/java/org/opensearch/core/action/ActionListener.java +++ b/libs/core/src/main/java/org/opensearch/core/action/ActionListener.java @@ -358,4 +358,8 @@ static void completeWith(ActionListener listener, CheckedSu throw ex; } } + + static ActionListener noOp() { + return ActionListener.wrap(response -> {}, exception -> {}); + } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml index ae94245cb65d3..b69516fd5abc6 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml @@ -1,13 +1,121 @@ "Help": - skip: - version: " - 3.2.99" + version: " - 3.3.99" + reason: merges.warmer stats added in 3.4.0 + features: node_selector + - do: + cat.shards: + help: true + node_selector: + version: "3.4.0 - " + + - match: + $body: | + /^ index .+ \n + shard .+ \n + prirep .+ \n + state .+ \n + docs .+ \n + store .+ \n + ip .+ \n + id .+ \n + node .+ \n + sync_id .+ \n + unassigned.reason .+ \n + unassigned.at .+ \n + unassigned.for .+ \n + unassigned.details .+ \n + recoverysource.type .+ \n + completion.size .+ \n + fielddata.memory_size .+ \n + fielddata.evictions .+ \n + query_cache.memory_size .+ \n + query_cache.evictions .+ \n + flush.total .+ \n + flush.total_time .+ \n + get.current .+ \n + get.time .+ \n + get.total .+ \n + get.exists_time .+ \n + get.exists_total .+ \n + get.missing_time .+ \n + get.missing_total .+ \n + indexing.delete_current .+ \n + indexing.delete_time .+ \n + indexing.delete_total .+ \n + indexing.index_current .+ \n + indexing.index_time .+ \n + indexing.index_total .+ \n + indexing.index_failed .+ \n + merges.current .+ \n + merges.current_docs .+ \n + merges.current_size .+ \n + merges.total .+ \n + merges.total_docs .+ \n + merges.total_size .+ \n + merges.total_time .+ \n + merges.warmer.total_invocations .+ \n + merges.warmer.total_time .+ \n + merges.warmer.ongoing_count .+ \n + merges.warmer.total_bytes_received .+ \n + merges.warmer.total_bytes_sent .+ \n + merges.warmer.total_receive_time .+ \n + merges.warmer.total_failure_count .+ \n + merges.warmer.total_send_time .+ \n + refresh.total .+ \n + refresh.time .+ \n + refresh.external_total .+ \n + refresh.external_time .+ \n + refresh.listeners .+ \n + search.fetch_current .+ \n + search.fetch_time .+ \n + search.fetch_total .+ \n + search.open_contexts .+ \n + search.query_current .+ \n + search.query_time .+ \n + search.query_total .+ \n + search.query_failed .+ \n + search.concurrent_query_current .+ \n + search.concurrent_query_time .+ \n + search.concurrent_query_total .+ \n + search.concurrent_avg_slice_count .+ \n + search.startree_query_current .+ \n + search.startree_query_time .+ \n + search.startree_query_total .+ \n + search.startree_query_failed .+ \n + search.scroll_current .+ \n + search.scroll_time .+ \n + search.scroll_total .+ \n + search.point_in_time_current .+ \n + search.point_in_time_time .+ \n + search.point_in_time_total .+ \n + search.search_idle_reactivate_count_total .+ \n + segments.count .+ \n + segments.memory .+ \n + segments.index_writer_memory .+ \n + segments.version_map_memory .+ \n + segments.fixed_bitset_memory .+ \n + seq_no.max .+ \n + seq_no.local_checkpoint .+ \n + seq_no.global_checkpoint .+ \n + warmer.current .+ \n + warmer.total .+ \n + warmer.total_time .+ \n + path.data .+ \n + path.state .+ \n + docs.deleted .+ \n + $/ +--- +"Help from 3.3.0 to 3.3.99": + - skip: + version: " - 3.2.99, 3.4.0 - " reason: search query failure stats is added in 3.3.0 features: node_selector - do: cat.shards: help: true node_selector: - version: "3.3.0 - " + version: "3.3.0 - 3.3.99" - match: $body: | @@ -398,81 +506,81 @@ - match: $body: | - /^ index .+ \n - shard .+ \n - prirep .+ \n - state .+ \n - docs .+ \n - store .+ \n - ip .+ \n - id .+ \n - node .+ \n - sync_id .+ \n - unassigned.reason .+ \n - unassigned.at .+ \n - unassigned.for .+ \n - unassigned.details .+ \n - recoverysource.type .+ \n - completion.size .+ \n - fielddata.memory_size .+ \n - fielddata.evictions .+ \n - query_cache.memory_size .+ \n - query_cache.evictions .+ \n - flush.total .+ \n - flush.total_time .+ \n - get.current .+ \n - get.time .+ \n - get.total .+ \n - get.exists_time .+ \n - get.exists_total .+ \n - get.missing_time .+ \n - get.missing_total .+ \n - indexing.delete_current .+ \n - indexing.delete_time .+ \n - indexing.delete_total .+ \n - indexing.index_current .+ \n - indexing.index_time .+ \n - indexing.index_total .+ \n - indexing.index_failed .+ \n - merges.current .+ \n - merges.current_docs .+ \n - merges.current_size .+ \n - merges.total .+ \n - merges.total_docs .+ \n - merges.total_size .+ \n - merges.total_time .+ \n - refresh.total .+ \n - refresh.time .+ \n - refresh.external_total .+ \n - refresh.external_time .+ \n - refresh.listeners .+ \n - search.fetch_current .+ \n - search.fetch_time .+ \n - search.fetch_total .+ \n - search.open_contexts .+ \n - search.query_current .+ \n - search.query_time .+ \n - search.query_total .+ \n - search.scroll_current .+ \n - search.scroll_time .+ \n - search.scroll_total .+ \n - search.point_in_time_current .+ \n - search.point_in_time_time .+ \n - search.point_in_time_total .+ \n - segments.count .+ \n - segments.memory .+ \n - segments.index_writer_memory .+ \n - segments.version_map_memory .+ \n - segments.fixed_bitset_memory .+ \n - seq_no.max .+ \n - seq_no.local_checkpoint .+ \n - seq_no.global_checkpoint .+ \n - warmer.current .+ \n - warmer.total .+ \n - warmer.total_time .+ \n - path.data .+ \n - path.state .+ \n - $/ + /^ index .+ \n + shard .+ \n + prirep .+ \n + state .+ \n + docs .+ \n + store .+ \n + ip .+ \n + id .+ \n + node .+ \n + sync_id .+ \n + unassigned.reason .+ \n + unassigned.at .+ \n + unassigned.for .+ \n + unassigned.details .+ \n + recoverysource.type .+ \n + completion.size .+ \n + fielddata.memory_size .+ \n + fielddata.evictions .+ \n + query_cache.memory_size .+ \n + query_cache.evictions .+ \n + flush.total .+ \n + flush.total_time .+ \n + get.current .+ \n + get.time .+ \n + get.total .+ \n + get.exists_time .+ \n + get.exists_total .+ \n + get.missing_time .+ \n + get.missing_total .+ \n + indexing.delete_current .+ \n + indexing.delete_time .+ \n + indexing.delete_total .+ \n + indexing.index_current .+ \n + indexing.index_time .+ \n + indexing.index_total .+ \n + indexing.index_failed .+ \n + merges.current .+ \n + merges.current_docs .+ \n + merges.current_size .+ \n + merges.total .+ \n + merges.total_docs .+ \n + merges.total_size .+ \n + merges.total_time .+ \n + refresh.total .+ \n + refresh.time .+ \n + refresh.external_total .+ \n + refresh.external_time .+ \n + refresh.listeners .+ \n + search.fetch_current .+ \n + search.fetch_time .+ \n + search.fetch_total .+ \n + search.open_contexts .+ \n + search.query_current .+ \n + search.query_time .+ \n + search.query_total .+ \n + search.scroll_current .+ \n + search.scroll_time .+ \n + search.scroll_total .+ \n + search.point_in_time_current .+ \n + search.point_in_time_time .+ \n + search.point_in_time_total .+ \n + segments.count .+ \n + segments.memory .+ \n + segments.index_writer_memory .+ \n + segments.version_map_memory .+ \n + segments.fixed_bitset_memory .+ \n + seq_no.max .+ \n + seq_no.local_checkpoint .+ \n + seq_no.global_checkpoint .+ \n + warmer.current .+ \n + warmer.total .+ \n + warmer.total_time .+ \n + path.data .+ \n + path.state .+ \n + $/ --- "Help before - 2.4.0": - skip: @@ -567,7 +675,7 @@ - match: $body: | - /^$/ + /^$/ - do: indices.create: index: index1 @@ -580,7 +688,7 @@ - match: $body: | - /^(index1 \s+ \d \s+ (p|r) \s+ ((STARTED|INITIALIZING|RELOCATING) \s+ (\d \s+ (\d+|\d+[.]\d+)(kb|b) \s+)? \d{1,3}.\d{1,3}.\d{1,3}.\d{1,3} \s+ .+|UNASSIGNED \s+) \n?){10}$/ + /^(index1 \s+ \d \s+ (p|r) \s+ ((STARTED|INITIALIZING|RELOCATING) \s+ (\d \s+ (\d+|\d+[.]\d+)(kb|b) \s+)? \d{1,3}.\d{1,3}.\d{1,3}.\d{1,3} \s+ .+|UNASSIGNED \s+) \n?){10}$/ - do: indices.create: @@ -594,14 +702,14 @@ cat.shards: {} - match: $body: | - /^(index(1|2) \s+ \d \s+ (p|r) \s+ ((STARTED|INITIALIZING|RELOCATING) \s+ (\d \s+ (\d+|\d+[.]\d+)(kb|b) \s+)? \d{1,3}.\d{1,3}.\d{1,3}.\d{1,3} \s+ .+|UNASSIGNED \s+) \n?){15}$/ + /^(index(1|2) \s+ \d \s+ (p|r) \s+ ((STARTED|INITIALIZING|RELOCATING) \s+ (\d \s+ (\d+|\d+[.]\d+)(kb|b) \s+)? \d{1,3}.\d{1,3}.\d{1,3}.\d{1,3} \s+ .+|UNASSIGNED \s+) \n?){15}$/ - do: cat.shards: index: index2 - match: $body: | - /^(index2 \s+ \d \s+ (p|r) \s+ ((STARTED|INITIALIZING|RELOCATING) \s+ (\d \s+ (\d+|\d+[.]\d+)(kb|b) \s+)? \d{1,3}.\d{1,3}.\d{1,3}.\d{1,3} \s+ .+|UNASSIGNED \s+) \n?){5}$/ + /^(index2 \s+ \d \s+ (p|r) \s+ ((STARTED|INITIALIZING|RELOCATING) \s+ (\d \s+ (\d+|\d+[.]\d+)(kb|b) \s+)? \d{1,3}.\d{1,3}.\d{1,3}.\d{1,3} \s+ .+|UNASSIGNED \s+) \n?){5}$/ --- "Test cat shards using wildcards": @@ -638,7 +746,7 @@ - match: $body: | - /^(foo \n?)$/ + /^(foo \n?)$/ - do: cat.shards: @@ -648,7 +756,7 @@ - match: $body: | - /^(ba(r|z) \n?){2}$/ + /^(ba(r|z) \n?){2}$/ --- "Test cat shards sort": @@ -679,9 +787,9 @@ h: [index, docs] s: [docs] -# don't use the store here it's cached and might be stale + # don't use the store here it's cached and might be stale - match: $body: | - /^ foo \s+ 0\n - bar \s+ 1\n - $/ + /^ foo \s+ 0\n + bar \s+ 1\n + $/ diff --git a/server/src/internalClusterTest/java/org/opensearch/merge/MergeStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/merge/MergeStatsIT.java new file mode 100644 index 0000000000000..5346b1c6fc1b8 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/merge/MergeStatsIT.java @@ -0,0 +1,353 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.merge; + +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; +import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.action.admin.indices.stats.IndexStats; +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.merge.MergeStats; +import org.opensearch.index.merge.MergedSegmentWarmerStats; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/* + * Integration tests asserting on MergeStats for remote store enabled domains. + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class MergeStatsIT extends RemoteStoreBaseIntegTestCase { + + private static final String INDEX_NAME = "test-idx"; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(RecoverySettings.INDICES_MERGED_SEGMENT_REPLICATION_WARMER_ENABLED_SETTING.getKey(), true) + .build(); + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .put(ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1) + .put(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 2) + .build(); + } + + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + featureSettings.put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true); + return featureSettings.build(); + } + + private void setup() { + internalCluster().startNodes(2); + } + + public void testNodesStats() throws Exception { + setup(); + String[] indices = setupIndices(1); + + ClusterState state = getClusterState(); + List nodes = state.nodes().getNodes().values().stream().map(DiscoveryNode::getName).toList(); + + // ensure merge is executed + for (String index : indices) { + client().admin().indices().forceMerge(new ForceMergeRequest(index).maxNumSegments(2)); + } + final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true"); + nodesStatsRequest.indices(CommonStatsFlags.ALL); + for (String node : nodes) { + NodesStatsResponse response = client(node).admin().cluster().nodesStats(nodesStatsRequest).get(); + + // Shard stats + List allNodesStats = response.getNodes(); + assertEquals(2, allNodesStats.size()); + for (NodeStats nodeStats : allNodesStats) { + assertNotNull(nodeStats.getIndices()); + MergeStats mergeStats = nodeStats.getIndices().getMerge(); + assertNotNull(mergeStats); + assertMergeStats(mergeStats, StatsScope.AGGREGATED); + MergedSegmentWarmerStats mergedSegmentWarmerStats = mergeStats.getWarmerStats(); + assertNotNull(mergedSegmentWarmerStats); + assertMergedSegmentWarmerStats(mergedSegmentWarmerStats, StatsScope.AGGREGATED); + } + + assertEquals( + "Expected sent size by node 2 to be equal to recieved size by node 1.", + allNodesStats.get(0).getIndices().getMerge().getWarmerStats().getTotalReceivedSize(), + allNodesStats.get(1).getIndices().getMerge().getWarmerStats().getTotalSentSize() + ); + assertEquals( + "Expected sent size by node 1 to be equal to recieved size by node 2.", + allNodesStats.get(0).getIndices().getMerge().getWarmerStats().getTotalSentSize(), + allNodesStats.get(1).getIndices().getMerge().getWarmerStats().getTotalReceivedSize() + ); + } + } + + public void testShardStats() throws Exception { + setup(); + + String[] indices = setupIndices(1); + + ClusterState state = getClusterState(); + List nodes = state.nodes().getNodes().values().stream().map(DiscoveryNode::getName).toList(); + + // ensure merge is executed + for (String index : indices) { + client().admin().indices().forceMerge(new ForceMergeRequest(index).maxNumSegments(2)); + } + Map> shardsSentAndReceivedSize = new HashMap<>(); + + for (String node : nodes) { + IndicesStatsResponse response = client(node).admin().indices().stats(new IndicesStatsRequest()).get(); + + // Shard stats + ShardStats[] allShardStats = response.getShards(); + assertEquals(4, allShardStats.length); + + for (ShardStats shardStats : allShardStats) { + StatsScope type = shardStats.getShardRouting().primary() ? StatsScope.PRIMARY_SHARD : StatsScope.REPLICA_SHARD; + CommonStats commonStats = shardStats.getStats(); + assertNotNull(commonStats); + MergeStats mergeStats = commonStats.getMerge(); + assertNotNull(mergeStats); + assertMergeStats(mergeStats, type); + MergedSegmentWarmerStats mergedSegmentWarmerStats = mergeStats.getWarmerStats(); + assertNotNull(mergedSegmentWarmerStats); + assertMergedSegmentWarmerStats(mergedSegmentWarmerStats, type); + + String primaryOrReplica = type.equals(StatsScope.PRIMARY_SHARD) ? "[P]" : "[R]"; + shardsSentAndReceivedSize.put(shardStats.getShardRouting().shardId() + primaryOrReplica, new HashMap<>() { + { + put("RECEIVED", mergedSegmentWarmerStats.getTotalReceivedSize()); + put("SENT", mergedSegmentWarmerStats.getTotalSentSize()); + } + }); + } + } + + for (int shard = 0; shard <= 1; shard++) { + assertEquals( + "Expected sent size by primary shard to be equal to recieved size by replica shard.", + shardsSentAndReceivedSize.get("[" + indices[0] + "][" + shard + "][R]").get("RECEIVED"), + shardsSentAndReceivedSize.get("[" + indices[0] + "][" + shard + "][P]").get("SENT") + ); + assertEquals( + "Expected sent size by replica shard to be equal to recieved size by primary shard.", + shardsSentAndReceivedSize.get("[" + indices[0] + "][" + shard + "][R]").get("SENT"), + shardsSentAndReceivedSize.get("[" + indices[0] + "][" + shard + "][P]").get("RECEIVED") + ); + } + } + + public void testIndicesStats() throws Exception { + setup(); + String[] indices = setupIndices(1); + + ClusterState state = getClusterState(); + List nodes = state.nodes().getNodes().values().stream().map(DiscoveryNode::getName).toList(); + + // ensure merge is executed + for (String index : indices) { + client().admin().indices().forceMerge(new ForceMergeRequest(index).maxNumSegments(2)); + } + + for (String node : nodes) { + IndicesStatsResponse response = client(node).admin().indices().stats(new IndicesStatsRequest()).get(); + + // Shard stats + Map allIndicesStats = response.getIndices(); + assertEquals(1, allIndicesStats.size()); + for (String index : indices) { + IndexStats indexStats = allIndicesStats.get(index); + CommonStats totalStats = indexStats.getTotal(); + CommonStats priStats = indexStats.getPrimaries(); + assertNotNull(totalStats); + assertNotNull(priStats); + + MergeStats totalMergeStats = totalStats.getMerge(); + assertNotNull(totalMergeStats); + MergeStats priMergeStats = priStats.getMerge(); + assertNotNull(priMergeStats); + + assertMergeStats(priMergeStats, StatsScope.PRIMARY_SHARD); + assertMergeStats(totalMergeStats, StatsScope.AGGREGATED); + + MergedSegmentWarmerStats totalMergedSegmentWarmerStats = totalMergeStats.getWarmerStats(); + MergedSegmentWarmerStats priMergedSegmentWarmerStats = priMergeStats.getWarmerStats(); + + assertNotNull(totalMergedSegmentWarmerStats); + assertNotNull(priMergedSegmentWarmerStats); + + assertMergedSegmentWarmerStats(priMergedSegmentWarmerStats, StatsScope.PRIMARY_SHARD); + assertMergedSegmentWarmerStats(totalMergedSegmentWarmerStats, StatsScope.AGGREGATED); + } + } + } + + private void assertMergeStats(MergeStats stats, StatsScope type) { + if (Arrays.asList(StatsScope.PRIMARY_SHARD, StatsScope.AGGREGATED).contains(type)) { + assertTrue("Current merges should be >= 0", stats.getCurrent() >= 0); + assertTrue("Current merge docs should be >= 0", stats.getCurrentNumDocs() >= 0); + assertTrue("Current merge size should be >= 0", stats.getCurrentSizeInBytes() >= 0); + assertTrue("Total merges should be >= 1", stats.getTotal() >= 1); + assertTrue("Total merge time should be >= 1ms", stats.getTotalTimeInMillis() >= 1); + assertTrue("Total merge time should be >= 1ms", stats.getTotalTime().getMillis() >= 1); + assertTrue("Total merged docs should be >= 1", stats.getTotalNumDocs() >= 1); + assertTrue("Total merged size should be >= 1 byte", stats.getTotalSizeInBytes() >= 1); + assertTrue("Total merged size should be >= 1 byte", stats.getTotalSize().getBytes() >= 1); + assertTrue("Total stopped time should be >= 0", stats.getTotalStoppedTimeInMillis() >= 0); + assertTrue("Total stopped time should be >= 0", stats.getTotalStoppedTime().getMillis() >= 0); + assertTrue("Total throttled time should be >= 0", stats.getTotalThrottledTime().getMillis() >= 0); + assertTrue("Total throttled time should be >= 0", stats.getTotalThrottledTimeInMillis() >= 0); + } else if (type == StatsScope.REPLICA_SHARD) { + assertEquals("Replica shard current merges should be 0", 0, stats.getCurrent()); + assertEquals("Replica shard current merge docs should be 0", 0, stats.getCurrentNumDocs()); + assertEquals("Replica shard current merge size should be 0", 0, stats.getCurrentSizeInBytes()); + assertEquals("Replica shard total merges should be 0", 0, stats.getTotal()); + assertEquals("Replica shard total merge time should be 0", 0, stats.getTotalTimeInMillis()); + assertEquals("Replica shard total merge time should be 0", 0, stats.getTotalTime().getMillis()); + assertEquals("Replica shard total merged docs should be 0", 0, stats.getTotalNumDocs()); + assertEquals("Replica shard total merged size should be 0", 0, stats.getTotalSizeInBytes()); + assertEquals("Replica shard total merged size should be 0", 0, stats.getTotalSize().getBytes()); + assertEquals("Replica shard total stopped time should be 0", 0, stats.getTotalStoppedTimeInMillis()); + assertEquals("Replica shard total stopped time should be 0", 0, stats.getTotalStoppedTime().getMillis()); + assertEquals("Replica shard total throttled time should be 0", 0, stats.getTotalThrottledTime().getMillis()); + assertEquals("Replica shard total throttled time should be 0", 0, stats.getTotalThrottledTimeInMillis()); + } + } + + private void assertMergedSegmentWarmerStats(MergedSegmentWarmerStats stats, StatsScope type) { + if (type == StatsScope.PRIMARY_SHARD) { + assertTrue("Primary shard warm invocations should be >= 1", stats.getTotalInvocationsCount() >= 1); + assertTrue("Primary shard warm time should be >= 1ms", stats.getTotalTime().getMillis() >= 1); + assertEquals("Primary shard warm failures should be == 0", 0, stats.getTotalFailureCount()); + assertTrue("Primary shard sent size should be >= 0", stats.getTotalSentSize().getBytes() >= 0); + assertEquals("Primary shard received size should be 0", 0, stats.getTotalReceivedSize().getBytes()); + assertTrue("Primary shard send time should be >= 0", stats.getTotalSendTime().millis() >= 0); + assertEquals("Primary shard receive time should be 0", 0, stats.getTotalReceiveTime().millis()); + assertTrue("Primary shard ongoing warms should be >= 0", stats.getOngoingCount() >= 0); + } else if (type == StatsScope.REPLICA_SHARD) { + assertEquals("Replica shard warm invocations should be 0", 0, stats.getTotalInvocationsCount()); + assertEquals("Replica shard warm time should be 0", 0, stats.getTotalTime().getMillis()); + assertEquals("Replica shard warm failures should be 0", 0, stats.getTotalFailureCount()); + assertEquals("Replica shard sent size should be 0", 0, stats.getTotalSentSize().getBytes()); + assertTrue("Replica shard received size should be >= 1", stats.getTotalReceivedSize().getBytes() >= 1); + assertEquals("Replica shard send time should be 0", 0, stats.getTotalSendTime().millis()); + assertTrue("Replica shard receive time should be >= 1ms", stats.getTotalReceiveTime().millis() >= 1); + assertEquals("Replica shard ongoing warms should be 0", 0, stats.getOngoingCount()); + } else if (type == StatsScope.AGGREGATED) { + assertTrue("Expected warmerStats.getOngoingCount >= 0, found " + stats.getOngoingCount(), stats.getOngoingCount() >= 0); + assertTrue( + "Expected warmerStats.getTotalTime >= 1, found " + stats.getTotalTime().millis(), + stats.getTotalTime().getMillis() >= 1 + ); + assertTrue( + "Expected warmerStats.getTotalSendTime >= 1, found " + stats.getTotalSendTime().getMillis(), + stats.getTotalSendTime().getMillis() >= 1 + ); + assertTrue( + "Expected warmerStats.getTotalReceiveTime >= 1, found " + stats.getTotalReceiveTime().getMillis(), + stats.getTotalReceiveTime().getMillis() >= 1 + ); + assertTrue( + "Expected warmerStats.getTotalInvocationsCount >= 1, found " + stats.getTotalInvocationsCount(), + stats.getTotalInvocationsCount() >= 1 + ); + assertTrue( + "Expected warmerStats.getTotalReceivedSize >= 1, found " + stats.getTotalReceivedSize().getBytes(), + stats.getTotalReceivedSize().getBytes() >= 1 + ); + assertTrue( + "Expected warmerStats.getTotalSentSize >= 1, found " + stats.getTotalSentSize().getBytes(), + stats.getTotalSentSize().getBytes() >= 1 + ); + assertEquals( + "Expected warmerStats.getTotalFailureCount == 0, found " + stats.getTotalFailureCount(), + 0, + stats.getTotalFailureCount() + ); + } + } + + private void indexDocs(String... indexNames) { + for (String indexName : indexNames) { + for (int i = 0; i < randomIntBetween(25, 30); i++) { + if (randomBoolean()) { + flush(indexName); + } else { + refresh(indexName); + } + int numberOfOperations = randomIntBetween(25, 30); + indexBulk(indexName, numberOfOperations); + } + } + } + + private String[] setupIndices(int count) throws Exception { + if (count <= 0) { + return new String[0]; + } + String[] indices = new String[count]; + for (int i = 0; i < count; i++) { + indices[i] = INDEX_NAME + i; + } + createIndex(indices); + ensureGreen(indices); + for (String index : indices) { + indexDocs(index); + } + waitForDocsOnReplicas(indices); + return indices; + } + + private void waitForDocsOnReplicas(String... indices) throws Exception { + for (String index : indices) { + SearchRequest searchRequest = new SearchRequest(index); + searchRequest.preference("_replica"); + assertBusy(() -> { + long totalDocs = client().search(searchRequest).actionGet().getHits().getTotalHits().value(); + assertTrue("Docs should be searchable on replicas", totalDocs > 0); + }, 10, TimeUnit.SECONDS); + } + } + + private enum StatsScope { + PRIMARY_SHARD, + REPLICA_SHARD, + AGGREGATED + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index ebe7791aede8f..8020d4469a274 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -88,6 +88,7 @@ import org.opensearch.index.mapper.SourceToParse; import org.opensearch.index.mapper.Uid; import org.opensearch.index.merge.MergeStats; +import org.opensearch.index.merge.MergedSegmentTransferTracker; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.DocsStats; @@ -216,6 +217,10 @@ public MergeStats getMergeStats() { return new MergeStats(); } + public MergedSegmentTransferTracker getMergedSegmentTransferTracker() { + return engineConfig.getMergedSegmentTransferTracker(); + } + /** returns the history uuid for the engine */ public abstract String getHistoryUUID(); diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index cb4b6ec6161d0..00f6d7e670d61 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -57,6 +57,7 @@ import org.opensearch.index.codec.CodecSettings; import org.opensearch.index.mapper.DocumentMapperForType; import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.merge.MergedSegmentTransferTracker; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.store.Store; import org.opensearch.index.translog.InternalTranslogFactory; @@ -115,6 +116,7 @@ public final class EngineConfig { private final Comparator leafSorter; private final Supplier documentMapperForTypeSupplier; private final ClusterApplierService clusterApplierService; + private final MergedSegmentTransferTracker mergedSegmentTransferTracker; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -306,6 +308,7 @@ private EngineConfig(Builder builder) { this.documentMapperForTypeSupplier = builder.documentMapperForTypeSupplier; this.indexReaderWarmer = builder.indexReaderWarmer; this.clusterApplierService = builder.clusterApplierService; + this.mergedSegmentTransferTracker = builder.mergedSegmentTransferTracker; } /** @@ -625,6 +628,13 @@ public ClusterApplierService getClusterApplierService() { return this.clusterApplierService; } + /** + * Returns the MergedSegmentTransferTracker instance. + */ + public MergedSegmentTransferTracker getMergedSegmentTransferTracker() { + return this.mergedSegmentTransferTracker; + } + /** * Builder for EngineConfig class * @@ -662,6 +672,7 @@ public static class Builder { Comparator leafSorter; private IndexWriter.IndexReaderWarmer indexReaderWarmer; private ClusterApplierService clusterApplierService; + private MergedSegmentTransferTracker mergedSegmentTransferTracker; public Builder shardId(ShardId shardId) { this.shardId = shardId; @@ -813,6 +824,11 @@ public Builder clusterApplierService(ClusterApplierService clusterApplierService return this; } + public Builder mergedSegmentTransferTracker(MergedSegmentTransferTracker mergedSegmentTransferTracker) { + this.mergedSegmentTransferTracker = mergedSegmentTransferTracker; + return this; + } + public EngineConfig build() { return new EngineConfig(this); } diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java index 2240b6e76eaac..0bafb1bf788ed 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java @@ -29,6 +29,7 @@ import org.opensearch.index.codec.CodecServiceFactory; import org.opensearch.index.mapper.DocumentMapperForType; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.merge.MergedSegmentTransferTracker; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.store.Store; import org.opensearch.index.translog.TranslogConfig; @@ -160,7 +161,8 @@ public EngineConfig newEngineConfig( Comparator leafSorter, Supplier documentMapperForTypeSupplier, IndexWriter.IndexReaderWarmer indexReaderWarmer, - ClusterApplierService clusterApplierService + ClusterApplierService clusterApplierService, + MergedSegmentTransferTracker mergedSegmentTransferTracker ) { CodecService codecServiceToUse = codecService; if (codecService == null && this.codecServiceFactory != null) { @@ -197,6 +199,7 @@ public EngineConfig newEngineConfig( .documentMapperForTypeSupplier(documentMapperForTypeSupplier) .indexReaderWarmer(indexReaderWarmer) .clusterApplierService(clusterApplierService) + .mergedSegmentTransferTracker(mergedSegmentTransferTracker) .build(); } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index fcc81335d4363..e154c69fabf81 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -104,6 +104,7 @@ import org.opensearch.index.mapper.SourceFieldMapper; import org.opensearch.index.mapper.Uid; import org.opensearch.index.merge.MergeStats; +import org.opensearch.index.merge.MergedSegmentTransferTracker; import org.opensearch.index.merge.OnGoingMerge; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SeqNoStats; @@ -258,7 +259,11 @@ public TranslogManager translogManager() { boolean success = false; try { this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); - mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); + mergeScheduler = scheduler = new EngineMergeScheduler( + engineConfig.getShardId(), + engineConfig.getIndexSettings(), + getMergedSegmentTransferTracker() + ); throttle = new IndexThrottle(); try { store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath()); @@ -2475,8 +2480,8 @@ private final class EngineMergeScheduler extends OpenSearchConcurrentMergeSchedu private final AtomicInteger numMergesInFlight = new AtomicInteger(0); private final AtomicBoolean isThrottling = new AtomicBoolean(); - EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings) { - super(shardId, indexSettings); + EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings, MergedSegmentTransferTracker mergedSegmentTransferTracker) { + super(shardId, indexSettings, mergedSegmentTransferTracker); } @Override diff --git a/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java b/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java index b3bec459eb1aa..e1246582b02f7 100644 --- a/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java +++ b/server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java @@ -16,6 +16,7 @@ import org.apache.lucene.index.SegmentReader; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.logging.Loggers; +import org.opensearch.index.merge.MergedSegmentTransferTracker; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.transport.TransportService; @@ -33,7 +34,7 @@ public class MergedSegmentWarmer implements IndexWriter.IndexReaderWarmer { private final RecoverySettings recoverySettings; private final ClusterService clusterService; private final IndexShard indexShard; - + private final MergedSegmentTransferTracker mergedSegmentTransferTracker; private final Logger logger; public MergedSegmentWarmer( @@ -46,23 +47,29 @@ public MergedSegmentWarmer( this.recoverySettings = recoverySettings; this.clusterService = clusterService; this.indexShard = indexShard; + this.mergedSegmentTransferTracker = indexShard.mergedSegmentTransferTracker(); this.logger = Loggers.getLogger(getClass(), indexShard.shardId()); } @Override public void warm(LeafReader leafReader) throws IOException { - try { - if (shouldWarm() == false) { - return; - } - // IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader. - assert leafReader instanceof SegmentReader; - assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled(); + if (shouldWarm() == false) { + return; + } - long startTime = System.currentTimeMillis(); + mergedSegmentTransferTracker.incrementTotalWarmInvocationsCount(); + mergedSegmentTransferTracker.incrementOngoingWarms(); + // IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader. + assert leafReader instanceof SegmentReader; + assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled(); + long startTime = System.currentTimeMillis(); + long elapsedTime = 0; + try { SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo(); - logger.info(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo)); + logger.trace(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo)); indexShard.publishMergedSegment(segmentCommitInfo); + elapsedTime = System.currentTimeMillis() - startTime; + long finalElapsedTime = elapsedTime; logger.trace(() -> { long segmentSize = -1; try { @@ -72,17 +79,15 @@ public void warm(LeafReader leafReader) throws IOException { "Completed segment warming for {}. Size: {}B, Timing: {}ms", segmentCommitInfo.info.name, segmentSize, - (System.currentTimeMillis() - startTime) + finalElapsedTime ); }); - } catch (Exception e) { - logger.warn( - () -> new ParameterizedMessage( - "Throw exception during merged segment warmer, skip merged segment {} warmer", - ((SegmentReader) leafReader).getSegmentName() - ), - e - ); + } catch (Throwable t) { + logger.warn(() -> new ParameterizedMessage("Failed to warm segment. Continuing. {}", leafReader), t); + mergedSegmentTransferTracker.incrementTotalWarmFailureCount(); + } finally { + mergedSegmentTransferTracker.addTotalWarmTimeMillis(elapsedTime); + mergedSegmentTransferTracker.decrementOngoingWarms(); } } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 1fab651078cc4..64cc076d97af5 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -21,6 +21,7 @@ import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.merge.MergeStats; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; @@ -499,6 +500,13 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException { @Override public void maybePruneDeletes() {} + @Override + public MergeStats getMergeStats() { + MergeStats mergeStats = new MergeStats(); + mergeStats.add(engineConfig.getMergedSegmentTransferTracker().stats()); + return mergeStats; + } + @Override public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {} diff --git a/server/src/main/java/org/opensearch/index/engine/OpenSearchConcurrentMergeScheduler.java b/server/src/main/java/org/opensearch/index/engine/OpenSearchConcurrentMergeScheduler.java index a9c46759b96f8..e79ca86daef04 100644 --- a/server/src/main/java/org/opensearch/index/engine/OpenSearchConcurrentMergeScheduler.java +++ b/server/src/main/java/org/opensearch/index/engine/OpenSearchConcurrentMergeScheduler.java @@ -47,6 +47,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.MergeSchedulerConfig; import org.opensearch.index.merge.MergeStats; +import org.opensearch.index.merge.MergedSegmentTransferTracker; import org.opensearch.index.merge.OnGoingMerge; import java.io.IOException; @@ -78,12 +79,18 @@ class OpenSearchConcurrentMergeScheduler extends ConcurrentMergeScheduler { private final Set onGoingMerges = ConcurrentCollections.newConcurrentSet(); private final Set readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges); private final MergeSchedulerConfig config; + private final MergedSegmentTransferTracker mergedSegmentTransferTracker; - OpenSearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings) { + OpenSearchConcurrentMergeScheduler( + ShardId shardId, + IndexSettings indexSettings, + MergedSegmentTransferTracker mergedSegmentTransferTracker + ) { this.config = indexSettings.getMergeSchedulerConfig(); this.shardId = shardId; this.indexSettings = indexSettings; this.logger = Loggers.getLogger(getClass(), shardId); + this.mergedSegmentTransferTracker = mergedSegmentTransferTracker; refreshConfig(); } @@ -211,7 +218,8 @@ MergeStats stats() { currentMergesSizeInBytes.count(), totalMergeStoppedTime.count(), totalMergeThrottledTime.count(), - config.isAutoThrottle() ? getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY + config.isAutoThrottle() ? getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY, + mergedSegmentTransferTracker.stats() ); return mergeStats; } diff --git a/server/src/main/java/org/opensearch/index/merge/MergeStats.java b/server/src/main/java/org/opensearch/index/merge/MergeStats.java index 7ecaed60735b4..a96e82a5ba6ea 100644 --- a/server/src/main/java/org/opensearch/index/merge/MergeStats.java +++ b/server/src/main/java/org/opensearch/index/merge/MergeStats.java @@ -70,7 +70,11 @@ public class MergeStats implements Writeable, ToXContentFragment { private long unreferencedFileCleanUpsPerformed; - public MergeStats() {} + private final MergedSegmentWarmerStats warmerStats; + + public MergeStats() { + this.warmerStats = new MergedSegmentWarmerStats(); + } public MergeStats(StreamInput in) throws IOException { total = in.readVLong(); @@ -87,6 +91,11 @@ public MergeStats(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_2_11_0)) { unreferencedFileCleanUpsPerformed = in.readOptionalVLong(); } + if (in.getVersion().onOrAfter(Version.V_3_4_0)) { + this.warmerStats = new MergedSegmentWarmerStats(in); + } else { + this.warmerStats = new MergedSegmentWarmerStats(); + } } public void add( @@ -100,6 +109,34 @@ public void add( long stoppedTimeMillis, long throttledTimeMillis, double mbPerSecAutoThrottle + ) { + add( + totalMerges, + totalMergeTime, + totalNumDocs, + totalSizeInBytes, + currentMerges, + currentNumDocs, + currentSizeInBytes, + stoppedTimeMillis, + throttledTimeMillis, + mbPerSecAutoThrottle, + new MergedSegmentWarmerStats() + ); + } + + public void add( + long totalMerges, + long totalMergeTime, + long totalNumDocs, + long totalSizeInBytes, + long currentMerges, + long currentNumDocs, + long currentSizeInBytes, + long stoppedTimeMillis, + long throttledTimeMillis, + double mbPerSecAutoThrottle, + MergedSegmentWarmerStats mergedSegmentWarmerStats ) { this.total += totalMerges; this.totalTimeInMillis += totalMergeTime; @@ -116,6 +153,15 @@ public void add( } else { this.totalBytesPerSecAutoThrottle += bytesPerSecAutoThrottle; } + this.add(mergedSegmentWarmerStats); + } + + public void add(MergedSegmentWarmerStats warmerStats) { + if (this.warmerStats == null) { + return; + } + this.warmerStats.add(warmerStats); + this.warmerStats.addTotals(warmerStats); } public void add(MergeStats mergeStats) { @@ -126,6 +172,7 @@ public void add(MergeStats mergeStats) { this.currentNumDocs += mergeStats.currentNumDocs; this.currentSizeInBytes += mergeStats.currentSizeInBytes; + this.warmerStats.add(mergeStats.warmerStats); addTotals(mergeStats); } @@ -145,6 +192,7 @@ public void addTotals(MergeStats mergeStats) { } else { this.totalBytesPerSecAutoThrottle += mergeStats.totalBytesPerSecAutoThrottle; } + this.warmerStats.addTotals(mergeStats.warmerStats); } public void addUnreferencedFileCleanUpStats(long unreferencedFileCleanUpsPerformed) { @@ -239,6 +287,10 @@ public ByteSizeValue getCurrentSize() { return new ByteSizeValue(currentSizeInBytes); } + public MergedSegmentWarmerStats getWarmerStats() { + return warmerStats; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.MERGES); @@ -256,6 +308,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.field(Fields.TOTAL_THROTTLE_BYTES_PER_SEC_IN_BYTES, totalBytesPerSecAutoThrottle); builder.field(Fields.UNREFERENCED_FILE_CLEANUPS_PERFORMED, unreferencedFileCleanUpsPerformed); + this.warmerStats.toXContent(builder, params); builder.endObject(); return builder; } @@ -302,5 +355,8 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_11_0)) { out.writeOptionalVLong(unreferencedFileCleanUpsPerformed); } + if (out.getVersion().onOrAfter(Version.V_3_4_0)) { + this.warmerStats.writeTo(out); + } } } diff --git a/server/src/main/java/org/opensearch/index/merge/MergedSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/merge/MergedSegmentTransferTracker.java new file mode 100644 index 0000000000000..8ed92dc036adc --- /dev/null +++ b/server/src/main/java/org/opensearch/index/merge/MergedSegmentTransferTracker.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.merge; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.metrics.CounterMetric; + +/** + * A component that tracks stats related to merged segment replication operations. + * This includes metrics for pre-copy(warm) invocations, failures, bytes transferred, and timing information. + * + * @opensearch.experimental + */ +@ExperimentalApi +public class MergedSegmentTransferTracker { + + private final CounterMetric totalWarmInvocationsCount = new CounterMetric(); + private final CounterMetric totalWarmTimeMillis = new CounterMetric(); + private final CounterMetric totalWarmFailureCount = new CounterMetric(); + private final CounterMetric totalBytesSent = new CounterMetric(); + private final CounterMetric totalBytesReceived = new CounterMetric(); + private final CounterMetric totalSendTimeMillis = new CounterMetric(); + private final CounterMetric totalReceiveTimeMillis = new CounterMetric(); + private final CounterMetric ongoingWarms = new CounterMetric(); + + public void incrementTotalWarmInvocationsCount() { + totalWarmInvocationsCount.inc(); + } + + public void incrementOngoingWarms() { + ongoingWarms.inc(); + } + + public void decrementOngoingWarms() { + ongoingWarms.dec(); + } + + public void incrementTotalWarmFailureCount() { + totalWarmFailureCount.inc(); + } + + public void addTotalWarmTimeMillis(long time) { + totalWarmTimeMillis.inc(time); + } + + public void addTotalSendTimeMillis(long time) { + totalSendTimeMillis.inc(time); + } + + public void addTotalReceiveTimeMillis(long time) { + totalReceiveTimeMillis.inc(time); + } + + public void addTotalBytesSent(long bytes) { + totalBytesSent.inc(bytes); + } + + public void addTotalBytesReceived(long bytes) { + totalBytesReceived.inc(bytes); + } + + public MergedSegmentWarmerStats stats() { + final MergedSegmentWarmerStats stats = new MergedSegmentWarmerStats(); + stats.add( + totalWarmInvocationsCount.count(), + totalWarmTimeMillis.count(), + totalWarmFailureCount.count(), + totalBytesSent.count(), + totalBytesReceived.count(), + totalSendTimeMillis.count(), + totalReceiveTimeMillis.count(), + ongoingWarms.count() + ); + return stats; + } +} diff --git a/server/src/main/java/org/opensearch/index/merge/MergedSegmentWarmerStats.java b/server/src/main/java/org/opensearch/index/merge/MergedSegmentWarmerStats.java new file mode 100644 index 0000000000000..cf407fe5572ef --- /dev/null +++ b/server/src/main/java/org/opensearch/index/merge/MergedSegmentWarmerStats.java @@ -0,0 +1,185 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.merge; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Stores stats about a merged segment warmer process + * + * @opensearch.api + */ +@ExperimentalApi +public class MergedSegmentWarmerStats implements Writeable, ToXContentFragment { + + // [PRIMARY SHARD] Number of times segment MergedSegmentWarmer.warm has been invoked + private long totalInvocationsCount; + + // [PRIMARY SHARD] Total time spent warming segments in milliseconds + private long totalTimeMillis; + + // [PRIMARY SHARD] Number of times segment warming has failed + private long totalFailureCount; + + // [PRIMARY SHARD] Total bytes sent during segment warming + private long totalBytesSent; + + // [REPLICA SHARD] Total bytes received during segment warming + private long totalBytesReceived; + + // [PRIMARY SHARD] Total time spent sending segments in milliseconds by a primary shard + private long totalSendTimeMillis; + + // [REPLICA SHARD] Total time spent receiving segments in milliseconds + private long totalReceiveTimeMillis; + + // [PRIMARY SHARD] Current number of ongoing segment warming operations + private long ongoingCount; + + public MergedSegmentWarmerStats() {} + + public MergedSegmentWarmerStats(StreamInput in) throws IOException { + totalInvocationsCount = in.readVLong(); + totalTimeMillis = in.readVLong(); + totalFailureCount = in.readVLong(); + totalBytesSent = in.readVLong(); + totalBytesReceived = in.readVLong(); + totalSendTimeMillis = in.readVLong(); + totalReceiveTimeMillis = in.readVLong(); + ongoingCount = in.readVLong(); + } + + public synchronized void add( + long totalInvocationsCount, + long totalTimeMillis, + long totalFailureCount, + long totalBytesSent, + long totalBytesReceived, + long totalSendTimeMillis, + long totalReceiveTimeMillis, + long ongoingCount + ) { + this.totalInvocationsCount += totalInvocationsCount; + this.totalTimeMillis += totalTimeMillis; + this.totalFailureCount += totalFailureCount; + this.totalBytesSent += totalBytesSent; + this.totalBytesReceived += totalBytesReceived; + this.totalSendTimeMillis += totalSendTimeMillis; + this.totalReceiveTimeMillis += totalReceiveTimeMillis; + this.ongoingCount += ongoingCount; + } + + public void add(MergedSegmentWarmerStats mergedSegmentWarmerStats) { + this.ongoingCount += mergedSegmentWarmerStats.ongoingCount; + } + + public synchronized void addTotals(MergedSegmentWarmerStats mergedSegmentWarmerStats) { + if (mergedSegmentWarmerStats == null) { + return; + } + this.totalInvocationsCount += mergedSegmentWarmerStats.totalInvocationsCount; + this.totalTimeMillis += mergedSegmentWarmerStats.totalTimeMillis; + this.totalFailureCount += mergedSegmentWarmerStats.totalFailureCount; + this.totalBytesSent += mergedSegmentWarmerStats.totalBytesSent; + this.totalBytesReceived += mergedSegmentWarmerStats.totalBytesReceived; + this.totalSendTimeMillis += mergedSegmentWarmerStats.totalSendTimeMillis; + this.totalReceiveTimeMillis += mergedSegmentWarmerStats.totalReceiveTimeMillis; + } + + public long getTotalInvocationsCount() { + return this.totalInvocationsCount; + } + + public TimeValue getTotalTime() { + return new TimeValue(totalTimeMillis); + } + + public long getOngoingCount() { + return ongoingCount; + } + + public ByteSizeValue getTotalReceivedSize() { + return new ByteSizeValue(totalBytesReceived); + } + + public ByteSizeValue getTotalSentSize() { + return new ByteSizeValue(totalBytesSent); + } + + public TimeValue getTotalReceiveTime() { + return new TimeValue(totalReceiveTimeMillis); + } + + public long getTotalFailureCount() { + return totalFailureCount; + } + + public TimeValue getTotalSendTime() { + return new TimeValue(totalSendTimeMillis); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.WARMER); + builder.field(Fields.WARM_INVOCATIONS_COUNT, totalInvocationsCount); + builder.humanReadableField(Fields.TOTAL_TIME_MILLIS, Fields.TOTAL_TIME, getTotalTime()); + builder.field(Fields.TOTAL_FAILURE_COUNT, totalFailureCount); + builder.humanReadableField(Fields.TOTAL_BYTES_SENT, Fields.TOTAL_SENT_SIZE, getTotalSentSize()); + builder.humanReadableField(Fields.TOTAL_BYTES_RECEIVED, Fields.TOTAL_RECEIVED_SIZE, getTotalReceivedSize()); + builder.humanReadableField(Fields.TOTAL_SEND_TIME_MILLIS, Fields.TOTAL_SEND_TIME, getTotalSendTime()); + builder.humanReadableField(Fields.TOTAL_RECEIVE_TIME_MILLIS, Fields.TOTAL_RECEIVE_TIME, getTotalReceiveTime()); + builder.field(Fields.ONGOING_COUNT, ongoingCount); + builder.endObject(); + return builder; + } + + /** + * Fields used for merge statistics + * + * @opensearch.internal + */ + static final class Fields { + static final String WARMER = "warmer"; + static final String WARM_INVOCATIONS_COUNT = "total_invocations_count"; + static final String TOTAL_TIME_MILLIS = "total_time_millis"; + static final String TOTAL_FAILURE_COUNT = "total_failure_count"; + static final String TOTAL_BYTES_SENT = "total_bytes_sent"; + static final String TOTAL_BYTES_RECEIVED = "total_bytes_received"; + static final String TOTAL_SEND_TIME_MILLIS = "total_send_time_millis"; + static final String TOTAL_RECEIVE_TIME_MILLIS = "total_receive_time_millis"; + static final String ONGOING_COUNT = "ongoing_count"; + + public static final String TOTAL_TIME = "total_time"; + public static final String TOTAL_SEND_TIME = "total_send_time"; + public static final String TOTAL_RECEIVE_TIME = "total_receive_time"; + public static final String TOTAL_SENT_SIZE = "total_sent_size"; + public static final String TOTAL_RECEIVED_SIZE = "total_received_size"; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalInvocationsCount); + out.writeVLong(totalTimeMillis); + out.writeVLong(totalFailureCount); + out.writeVLong(totalBytesSent); + out.writeVLong(totalBytesReceived); + out.writeVLong(totalSendTimeMillis); + out.writeVLong(totalReceiveTimeMillis); + out.writeVLong(ongoingCount); + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 0a365e4d756d8..906cf079179b5 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -157,6 +157,7 @@ import org.opensearch.index.mapper.SourceToParse; import org.opensearch.index.mapper.Uid; import org.opensearch.index.merge.MergeStats; +import org.opensearch.index.merge.MergedSegmentTransferTracker; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; import org.opensearch.index.remote.RemoteSegmentStats; @@ -389,6 +390,7 @@ Runnable getGlobalCheckpointSyncer() { private final MergedSegmentPublisher mergedSegmentPublisher; private final ReferencedSegmentsPublisher referencedSegmentsPublisher; private final Set pendingMergedSegmentCheckpoints = Sets.newConcurrentHashSet(); + private final MergedSegmentTransferTracker mergedSegmentTransferTracker; @InternalApi public IndexShard( @@ -452,6 +454,7 @@ public IndexShard( indexSettings.isAssignedOnRemoteNode(), () -> getRemoteTranslogUploadBufferInterval(remoteStoreSettings::getClusterRemoteTranslogBufferInterval) ); + this.mergedSegmentTransferTracker = new MergedSegmentTransferTracker(); this.mapperService = mapperService; this.indexCache = indexCache; this.internalIndexingStats = new InternalIndexingStats(threadPool); @@ -2277,6 +2280,10 @@ public void resetToWriteableEngine() throws IOException, InterruptedException, T indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); }); } + public MergedSegmentTransferTracker mergedSegmentTransferTracker() { + return mergedSegmentTransferTracker; + } + /** * Wrapper for a non-closing reader * @@ -4322,7 +4329,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro // timeseries () -> docMapper(), mergedSegmentWarmerFactory.get(this), - clusterApplierService + clusterApplierService, + mergedSegmentTransferTracker ); } diff --git a/server/src/main/java/org/opensearch/indices/replication/MergedSegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/MergedSegmentReplicationTarget.java index ead4007a1b62a..a4353828c89cf 100644 --- a/server/src/main/java/org/opensearch/indices/replication/MergedSegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/MergedSegmentReplicationTarget.java @@ -55,7 +55,14 @@ protected void getFilesFromSource( List filesToFetch, StepListener getFilesListener ) { - source.getMergedSegmentFiles(getId(), checkpoint, filesToFetch, indexShard, this::updateFileRecoveryBytes, getFilesListener); + source.getMergedSegmentFiles( + getId(), + checkpoint, + filesToFetch, + indexShard, + this::updateMergedSegmentFileRecoveryBytes, + getFilesListener + ); } @Override @@ -69,4 +76,9 @@ protected void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse public MergedSegmentReplicationTarget retryCopy() { return new MergedSegmentReplicationTarget(indexShard, checkpoint, source, listener); } + + protected void updateMergedSegmentFileRecoveryBytes(String fileName, long bytesRecovered) { + indexShard.mergedSegmentTransferTracker().addTotalBytesReceived(bytesRecovered); + updateFileRecoveryBytes(fileName, bytesRecovered); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index 2aedf9534abe3..b657fa9bcb4e8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -180,13 +180,19 @@ public void getMergedSegmentFiles( CountDownLatch latch = new CountDownLatch(1); indexShard.getFileDownloader() - .downloadAsync(cancellableThreads, remoteDirectory, storeDirectory, toDownloadSegmentNames, ActionListener.wrap(r -> { - latch.countDown(); - notifyOnceListener.onResponse(new GetSegmentFilesResponse(filesToFetch)); - }, e -> { - latch.countDown(); - notifyOnceListener.onFailure(e); - })); + .downloadAsync( + cancellableThreads, + remoteDirectory, + new ReplicationStatsDirectoryWrapper(storeDirectory, fileProgressTracker), + toDownloadSegmentNames, + ActionListener.wrap(r -> { + latch.countDown(); + notifyOnceListener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + }, e -> { + latch.countDown(); + notifyOnceListener.onFailure(e); + }) + ); try { if (latch.await( indexShard.getRecoverySettings().getMergedSegmentReplicationTimeout().millis(), diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/AbstractPublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/AbstractPublishCheckpointAction.java index ddf7a1f61030c..42d29d370929a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/AbstractPublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/AbstractPublishCheckpointAction.java @@ -44,6 +44,7 @@ import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Abstract base class for publish checkpoint. @@ -112,8 +113,10 @@ final void doPublish( TransportRequest request, String action, boolean waitForCompletion, - TimeValue waitTimeout + TimeValue waitTimeout, + ActionListener listener ) { + ActionListener notifyOnceListener = ActionListener.notifyOnce(listener); String primaryAllocationId = indexShard.routingEntry().allocationId().getId(); long primaryTerm = indexShard.getPendingPrimaryTerm(); final ThreadContext threadContext = threadPool.getThreadContext(); @@ -206,14 +209,22 @@ public void handleException(TransportException e) { ); if (waitForCompletion) { try { - latch.await(waitTimeout.seconds(), TimeUnit.SECONDS); + if (latch.await(waitTimeout.seconds(), TimeUnit.SECONDS) == false) { + notifyOnceListener.onFailure( + new TimeoutException("Timed out waiting for publish checkpoint to complete. Checkpoint: " + checkpoint) + ); + } } catch (InterruptedException e) { + notifyOnceListener.onFailure(e); logger.warn( () -> new ParameterizedMessage("Interrupted while waiting for publish checkpoint complete [{}]", checkpoint), e ); } } + notifyOnceListener.onResponse(null); + } catch (Exception e) { + notifyOnceListener.onFailure(e); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index 722abceb29a30..ba9ebfe4ad4b8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -98,7 +98,7 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) { * Publish checkpoint request to shard */ final void publish(IndexShard indexShard, ReplicationCheckpoint checkpoint) { - doPublish(indexShard, checkpoint, new PublishCheckpointRequest(checkpoint), TASK_ACTION_NAME, false, null); + doPublish(indexShard, checkpoint, new PublishCheckpointRequest(checkpoint), TASK_ACTION_NAME, false, null, ActionListener.noOp()); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishMergedSegmentAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishMergedSegmentAction.java index 73eb456fb8915..e397e75cdc75d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishMergedSegmentAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishMergedSegmentAction.java @@ -84,7 +84,8 @@ final public void publish(IndexShard indexShard, MergedSegmentCheckpoint checkpo new PublishMergedSegmentRequest(checkpoint), TASK_ACTION_NAME, true, - indexShard.getRecoverySettings().getMergedSegmentReplicationTimeout() + indexShard.getRecoverySettings().getMergedSegmentReplicationTimeout(), + ActionListener.noOp() ); } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishReferencedSegmentsAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishReferencedSegmentsAction.java index e9b2e6e411e7f..353bd16635ee6 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishReferencedSegmentsAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishReferencedSegmentsAction.java @@ -79,7 +79,8 @@ final void publish(IndexShard indexShard, ReferencedSegmentsCheckpoint checkpoin new PublishReferencedSegmentsRequest(checkpoint), TASK_ACTION_NAME, false, - indexShard.getRecoverySettings().getMergedSegmentReplicationTimeout() + indexShard.getRecoverySettings().getMergedSegmentReplicationTimeout(), + ActionListener.noOp() ); } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/RemoteStorePublishMergedSegmentAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/RemoteStorePublishMergedSegmentAction.java index da5560875db3f..23a4ca0c9ece1 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/RemoteStorePublishMergedSegmentAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/RemoteStorePublishMergedSegmentAction.java @@ -83,8 +83,10 @@ public RemoteStorePublishMergedSegmentAction( protected void doReplicaOperation(RemoteStorePublishMergedSegmentRequest shardRequest, IndexShard replica) { RemoteStoreMergedSegmentCheckpoint checkpoint = shardRequest.getMergedSegment(); if (checkpoint.getShardId().equals(replica.shardId())) { + long startTime = System.currentTimeMillis(); replica.getRemoteDirectory().markMergedSegmentsPendingDownload(checkpoint.getLocalToRemoteSegmentFilenameMap()); replicationService.onNewMergedSegmentCheckpoint(checkpoint, replica); + replica.mergedSegmentTransferTracker().addTotalReceiveTimeMillis(System.currentTimeMillis() - startTime); } else { logger.warn( () -> new ParameterizedMessage( @@ -114,6 +116,7 @@ public final void publish(IndexShard indexShard, MergedSegmentCheckpoint checkpo long elapsedTimeMillis = endTimeMillis - startTimeMillis; long timeoutMillis = indexShard.getRecoverySettings().getMergedSegmentReplicationTimeout().millis(); long timeLeftMillis = Math.max(0, timeoutMillis - elapsedTimeMillis); + indexShard.mergedSegmentTransferTracker().addTotalSendTimeMillis(elapsedTimeMillis); if (timeLeftMillis > 0) { RemoteStoreMergedSegmentCheckpoint remoteStoreMergedSegmentCheckpoint = new RemoteStoreMergedSegmentCheckpoint( @@ -126,9 +129,19 @@ public final void publish(IndexShard indexShard, MergedSegmentCheckpoint checkpo new RemoteStorePublishMergedSegmentRequest(remoteStoreMergedSegmentCheckpoint), "segrep_remote_publish_merged_segment", true, - TimeValue.timeValueMillis(timeLeftMillis) + TimeValue.timeValueMillis(timeLeftMillis), + new ActionListener<>() { + @Override + public void onResponse(Void unused) {} + + @Override + public void onFailure(Exception e) { + indexShard.mergedSegmentTransferTracker().incrementTotalWarmFailureCount(); + } + } ); } else { + indexShard.mergedSegmentTransferTracker().incrementTotalWarmFailureCount(); logger.warn( () -> new ParameterizedMessage( "Unable to confirm upload of merged segment {} to remote store. Timeout of {}ms exceeded. Skipping pre-copy.", @@ -167,6 +180,7 @@ public void beforeUpload(String file) {} @Override public void onSuccess(String file) { localToRemoteStoreFilenames.put(file, indexShard.getRemoteDirectory().getExistingRemoteFilename(file)); + indexShard.mergedSegmentTransferTracker().addTotalBytesSent(checkpoint.getMetadataMap().get(file).length()); } @Override diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java index 6765423ffa65e..215ab36a79cdc 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java @@ -62,6 +62,7 @@ import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.Strings; import org.opensearch.index.IndexSettings; +import org.opensearch.index.merge.MergedSegmentWarmerStats; import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestResponse; import org.opensearch.rest.action.RestResponseListener; @@ -591,6 +592,78 @@ protected Table getTableWithHeader(final RestRequest request, final PageToken pa ); table.addCell("pri.merges.total_time", "default:false;text-align:right;desc:time spent in merges"); + table.addCell( + "merges.warmer.total_invocations", + "alias:mswti,mergedSegmentWarmerTotalInvocations;default:false;text-align:right;desc:total invocations of merged segment warmer" + ); + table.addCell( + "pri.merges.warmer.total_invocations", + "default:false;text-align:right;desc:total invocations of merged segment warmer" + ); + + table.addCell( + "merges.warmer.total_time", + "alias:mswtt,mergedSegmentWarmerTotalTime;default:false;text-align:right;desc:total wallclock time spent in the warming operation" + ); + table.addCell( + "pri.merges.warmer.total_time", + "default:false;text-align:right;desc:total wallclock time spent in the warming operation" + ); + + table.addCell( + "merges.warmer.ongoing_count", + "alias:mswoc,mergedSegmentWarmerOngoingCount;default:false;text-align:right;desc:point-in-time metric for number of in-progress warm operations" + ); + table.addCell( + "pri.merges.warmer.ongoing_count", + "default:false;text-align:right;desc:point-in-time metric for number of in-progress warm operations" + ); + + table.addCell( + "merges.warmer.total_bytes_received", + "alias:mswtbr,mergedSegmentWarmerTotalBytesReceived;default:false;text-align:right;desc:total bytes received by a replica shard during the warm operation" + ); + table.addCell( + "pri.merges.warmer.total_bytes_received", + "default:false;text-align:right;desc:total bytes received by a replica shard during the warm operation" + ); + + table.addCell( + "merges.warmer.total_bytes_sent", + "alias:mswtbs,mergedSegmentWarmerTotalBytesSent;default:false;text-align:right;desc:total bytes sent by a primary shard during the warm operation" + ); + table.addCell( + "pri.merges.warmer.total_bytes_sent", + "default:false;text-align:right;desc:total bytes sent by a primary shard during the warm operation" + ); + + table.addCell( + "merges.warmer.total_receive_time", + "alias:mswtrt,mergedSegmentWarmerTotalReceiveTime;default:false;text-align:right;desc:total wallclock time spent receiving merged segments by a replica shard" + ); + table.addCell( + "pri.merges.warmer.total_receive_time", + "default:false;text-align:right;desc:total wallclock time spent receiving merged segments by a replica shard" + ); + + table.addCell( + "merges.warmer.total_failure_count", + "alias:mswtfc,mergedSegmentWarmerTotalFailureCount;default:false;text-align:right;desc:total failures in merged segment warmer" + ); + table.addCell( + "pri.merges.warmer.total_failure_count", + "default:false;text-align:right;desc:total failures in merged segment warmer" + ); + + table.addCell( + "merges.warmer.total_send_time", + "alias:mswtst,mergedSegmentWarmerTotalSendTime;default:false;text-align:right;desc:total wallclock time spent sending merged segments by a primary shard" + ); + table.addCell( + "pri.merges.warmer.total_send_time", + "default:false;text-align:right;desc:total wallclock time spent sending merged segments by a primary shard" + ); + table.addCell("refresh.total", "sibling:pri;alias:rto,refreshTotal;default:false;text-align:right;desc:total refreshes"); table.addCell("pri.refresh.total", "default:false;text-align:right;desc:total refreshes"); @@ -994,6 +1067,37 @@ protected Table buildTable( table.addCell(totalStats.getMerge() == null ? null : totalStats.getMerge().getTotalTime()); table.addCell(primaryStats.getMerge() == null ? null : primaryStats.getMerge().getTotalTime()); + MergedSegmentWarmerStats mergedSegmentWarmerTotalStats = totalStats.getMerge() == null + ? null + : totalStats.getMerge().getWarmerStats(); + MergedSegmentWarmerStats mergedSegmentWarmerPrimaryStats = primaryStats.getMerge() == null + ? null + : primaryStats.getMerge().getWarmerStats(); + + table.addCell(mergedSegmentWarmerTotalStats == null ? null : mergedSegmentWarmerTotalStats.getTotalInvocationsCount()); + table.addCell(mergedSegmentWarmerPrimaryStats == null ? null : mergedSegmentWarmerPrimaryStats.getTotalInvocationsCount()); + + table.addCell(mergedSegmentWarmerTotalStats == null ? null : mergedSegmentWarmerTotalStats.getTotalTime()); + table.addCell(mergedSegmentWarmerPrimaryStats == null ? null : mergedSegmentWarmerPrimaryStats.getTotalTime()); + + table.addCell(mergedSegmentWarmerTotalStats == null ? null : mergedSegmentWarmerTotalStats.getOngoingCount()); + table.addCell(mergedSegmentWarmerPrimaryStats == null ? null : mergedSegmentWarmerPrimaryStats.getOngoingCount()); + + table.addCell(mergedSegmentWarmerTotalStats == null ? null : mergedSegmentWarmerTotalStats.getTotalReceivedSize()); + table.addCell(mergedSegmentWarmerPrimaryStats == null ? null : mergedSegmentWarmerPrimaryStats.getTotalReceivedSize()); + + table.addCell(mergedSegmentWarmerTotalStats == null ? null : mergedSegmentWarmerTotalStats.getTotalSentSize()); + table.addCell(mergedSegmentWarmerPrimaryStats == null ? null : mergedSegmentWarmerPrimaryStats.getTotalSentSize()); + + table.addCell(mergedSegmentWarmerTotalStats == null ? null : mergedSegmentWarmerTotalStats.getTotalReceiveTime()); + table.addCell(mergedSegmentWarmerPrimaryStats == null ? null : mergedSegmentWarmerPrimaryStats.getTotalReceiveTime()); + + table.addCell(mergedSegmentWarmerTotalStats == null ? null : mergedSegmentWarmerTotalStats.getTotalFailureCount()); + table.addCell(mergedSegmentWarmerPrimaryStats == null ? null : mergedSegmentWarmerPrimaryStats.getTotalFailureCount()); + + table.addCell(mergedSegmentWarmerTotalStats == null ? null : mergedSegmentWarmerTotalStats.getTotalSendTime()); + table.addCell(mergedSegmentWarmerPrimaryStats == null ? null : mergedSegmentWarmerPrimaryStats.getTotalSendTime()); + table.addCell(totalStats.getRefresh() == null ? null : totalStats.getRefresh().getTotal()); table.addCell(primaryStats.getRefresh() == null ? null : primaryStats.getRefresh().getTotal()); diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java index 428ba17aae5a7..a3fbff262957f 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java @@ -57,6 +57,7 @@ import org.opensearch.index.flush.FlushStats; import org.opensearch.index.get.GetStats; import org.opensearch.index.merge.MergeStats; +import org.opensearch.index.merge.MergedSegmentWarmerStats; import org.opensearch.index.refresh.RefreshStats; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.shard.IndexingStats; @@ -274,6 +275,39 @@ protected Table getTableWithHeader(final RestRequest request) { table.addCell("merges.total_size", "alias:mts,mergesTotalSize;default:false;text-align:right;desc:size merged"); table.addCell("merges.total_time", "alias:mtt,mergesTotalTime;default:false;text-align:right;desc:time spent in merges"); + table.addCell( + "merges.warmer.total_invocations", + "alias:mswti,mergedSegmentWarmerTotalInvocations;default:false;text-align:right;desc:total invocations of merged segment warmer" + ); + table.addCell( + "merges.warmer.total_time", + "alias:mswtt,mergedSegmentWarmerTotalTime;default:false;text-align:right;desc:total wallclock time spent in the warming operation" + ); + table.addCell( + "merges.warmer.ongoing_count", + "alias:mswoc,mergedSegmentWarmerOngoingCount;default:false;text-align:right;desc:point-in-time metric for number of in-progress warm operations" + ); + table.addCell( + "merges.warmer.total_bytes_received", + "alias:mswtbr,mergedSegmentWarmerTotalBytesReceived;default:false;text-align:right;desc:total bytes received by a replica shard during the warm operation" + ); + table.addCell( + "merges.warmer.total_bytes_sent", + "alias:mswtbs,mergedSegmentWarmerTotalBytesSent;default:false;text-align:right;desc:total bytes sent by a primary shard during the warm operation" + ); + table.addCell( + "merges.warmer.total_receive_time", + "alias:mswtrt,mergedSegmentWarmerTotalReceiveTime;default:false;text-align:right;desc:total wallclock time spent receiving merged segments by a replica shard" + ); + table.addCell( + "merges.warmer.total_failure_count", + "alias:mswtfc,mergedSegmentWarmerTotalFailureCount;default:false;text-align:right;desc:total failures in merged segment warmer" + ); + table.addCell( + "merges.warmer.total_send_time", + "alias:mswtst,mergedSegmentWarmerTotalSendTime;default:false;text-align:right;desc:total wallclock time spent sending merged segments by a primary shard" + ); + table.addCell("refresh.total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total refreshes"); table.addCell("refresh.time", "alias:rti,refreshTime;default:false;text-align:right;desc:time spent in refreshes"); table.addCell("refresh.external_total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total external refreshes"); @@ -548,6 +582,16 @@ Table buildTable( table.addCell(mergeStats == null ? null : mergeStats.getTotalSize()); table.addCell(mergeStats == null ? null : mergeStats.getTotalTime()); + MergedSegmentWarmerStats mergedSegmentWarmerStats = mergeStats == null ? null : mergeStats.getWarmerStats(); + table.addCell(mergedSegmentWarmerStats == null ? null : mergedSegmentWarmerStats.getTotalInvocationsCount()); + table.addCell(mergedSegmentWarmerStats == null ? null : mergedSegmentWarmerStats.getTotalTime()); + table.addCell(mergedSegmentWarmerStats == null ? null : mergedSegmentWarmerStats.getOngoingCount()); + table.addCell(mergedSegmentWarmerStats == null ? null : mergedSegmentWarmerStats.getTotalReceivedSize()); + table.addCell(mergedSegmentWarmerStats == null ? null : mergedSegmentWarmerStats.getTotalSentSize()); + table.addCell(mergedSegmentWarmerStats == null ? null : mergedSegmentWarmerStats.getTotalReceiveTime()); + table.addCell(mergedSegmentWarmerStats == null ? null : mergedSegmentWarmerStats.getTotalFailureCount()); + table.addCell(mergedSegmentWarmerStats == null ? null : mergedSegmentWarmerStats.getTotalSendTime()); + RefreshStats refreshStats = indicesStats == null ? null : indicesStats.getRefresh(); table.addCell(refreshStats == null ? null : refreshStats.getTotal()); table.addCell(refreshStats == null ? null : refreshStats.getTotalTime()); diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java index 51d11ce91c7d1..7a58c51d10cc9 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestShardsAction.java @@ -54,6 +54,7 @@ import org.opensearch.index.flush.FlushStats; import org.opensearch.index.get.GetStats; import org.opensearch.index.merge.MergeStats; +import org.opensearch.index.merge.MergedSegmentWarmerStats; import org.opensearch.index.refresh.RefreshStats; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.seqno.SeqNoStats; @@ -214,6 +215,39 @@ protected Table getTableWithHeader(final RestRequest request, final PageToken pa table.addCell("merges.total_size", "alias:mts,mergesTotalSize;default:false;text-align:right;desc:size merged"); table.addCell("merges.total_time", "alias:mtt,mergesTotalTime;default:false;text-align:right;desc:time spent in merges"); + table.addCell( + "merges.warmer.total_invocations", + "alias:mswti,mergedSegmentWarmerTotalInvocations;default:false;text-align:right;desc:total invocations of merged segment warmer" + ); + table.addCell( + "merges.warmer.total_time", + "alias:mswtt,mergedSegmentWarmerTotalTime;default:false;text-align:right;desc:total wallclock time spent in the warming operation" + ); + table.addCell( + "merges.warmer.ongoing_count", + "alias:mswoc,mergedSegmentWarmerOngoingCount;default:false;text-align:right;desc:point-in-time metric for number of in-progress warm operations" + ); + table.addCell( + "merges.warmer.total_bytes_received", + "alias:mswtbr,mergedSegmentWarmerTotalBytesReceived;default:false;text-align:right;desc:total bytes received by a replica shard during the warm operation" + ); + table.addCell( + "merges.warmer.total_bytes_sent", + "alias:mswtbs,mergedSegmentWarmerTotalBytesSent;default:false;text-align:right;desc:total bytes sent by a primary shard during the warm operation" + ); + table.addCell( + "merges.warmer.total_receive_time", + "alias:mswtrt,mergedSegmentWarmerTotalReceiveTime;default:false;text-align:right;desc:total wallclock time spent receiving merged segments by a replica shard" + ); + table.addCell( + "merges.warmer.total_failure_count", + "alias:mswtfc,mergedSegmentWarmerTotalFailureCount;default:false;text-align:right;desc:total failures in merged segment warmer" + ); + table.addCell( + "merges.warmer.total_send_time", + "alias:mswtst,mergedSegmentWarmerTotalSendTime;default:false;text-align:right;desc:total wallclock time spent sending merged segments by a primary shard" + ); + table.addCell("refresh.total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total refreshes"); table.addCell("refresh.time", "alias:rti,refreshTime;default:false;text-align:right;desc:time spent in refreshes"); table.addCell("refresh.external_total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total external refreshes"); @@ -450,6 +484,63 @@ Table buildTable( table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getTotalSize)); table.addCell(getOrNull(commonStats, CommonStats::getMerge, MergeStats::getTotalTime)); + table.addCell( + getOrNull( + commonStats, + (c) -> c.getMerge() == null ? null : c.getMerge().getWarmerStats(), + MergedSegmentWarmerStats::getTotalInvocationsCount + ) + ); + table.addCell( + getOrNull( + commonStats, + (c) -> c.getMerge() == null ? null : c.getMerge().getWarmerStats(), + MergedSegmentWarmerStats::getTotalTime + ) + ); + table.addCell( + getOrNull( + commonStats, + (c) -> c.getMerge() == null ? null : c.getMerge().getWarmerStats(), + MergedSegmentWarmerStats::getOngoingCount + ) + ); + table.addCell( + getOrNull( + commonStats, + (c) -> c.getMerge() == null ? null : c.getMerge().getWarmerStats(), + MergedSegmentWarmerStats::getTotalReceivedSize + ) + ); + table.addCell( + getOrNull( + commonStats, + (c) -> c.getMerge() == null ? null : c.getMerge().getWarmerStats(), + MergedSegmentWarmerStats::getTotalSentSize + ) + ); + table.addCell( + getOrNull( + commonStats, + (c) -> c.getMerge() == null ? null : c.getMerge().getWarmerStats(), + MergedSegmentWarmerStats::getTotalReceiveTime + ) + ); + table.addCell( + getOrNull( + commonStats, + (c) -> c.getMerge() == null ? null : c.getMerge().getWarmerStats(), + MergedSegmentWarmerStats::getTotalFailureCount + ) + ); + table.addCell( + getOrNull( + commonStats, + (c) -> c.getMerge() == null ? null : c.getMerge().getWarmerStats(), + MergedSegmentWarmerStats::getTotalSendTime + ) + ); + table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getTotal)); table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getTotalTime)); table.addCell(getOrNull(commonStats, CommonStats::getRefresh, RefreshStats::getExternalTotal)); diff --git a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java index cf184b29a14ff..ee77e6086f80c 100644 --- a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java @@ -73,6 +73,7 @@ public void testCreateEngineConfigFromFactory() { null, null, null, + null, null ); @@ -156,6 +157,7 @@ public void testCreateCodecServiceFromFactory() { null, null, null, + null, null ); assertNotNull(config.getCodec()); diff --git a/server/src/test/java/org/opensearch/index/engine/MergeRateLimitingTests.java b/server/src/test/java/org/opensearch/index/engine/MergeRateLimitingTests.java index 752c21c5e108c..bc535475e5ecd 100644 --- a/server/src/test/java/org/opensearch/index/engine/MergeRateLimitingTests.java +++ b/server/src/test/java/org/opensearch/index/engine/MergeRateLimitingTests.java @@ -19,6 +19,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; +import org.opensearch.index.merge.MergedSegmentTransferTracker; import org.opensearch.test.OpenSearchTestCase; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; @@ -77,7 +78,11 @@ public void testSettingPrecedence() { IndexSettings indexSettings = new IndexSettings(newIndexMeta("test_index", indexBuilder.build()), nodeSettings); ShardId shardId = new ShardId("test_index", "test_uuid", 0); - OpenSearchConcurrentMergeScheduler scheduler = new OpenSearchConcurrentMergeScheduler(shardId, indexSettings); + OpenSearchConcurrentMergeScheduler scheduler = new OpenSearchConcurrentMergeScheduler( + shardId, + indexSettings, + new MergedSegmentTransferTracker() + ); // Should use cluster-level setting assertThat(scheduler.getForceMergeMBPerSec(), equalTo(75.0)); @@ -85,7 +90,7 @@ public void testSettingPrecedence() { // Test with both index and cluster-level settings - index should take precedence indexBuilder.put(MAX_FORCE_MERGE_MB_PER_SEC_SETTING.getKey(), "25.0"); indexSettings = new IndexSettings(newIndexMeta("test_index", indexBuilder.build()), nodeSettings); - scheduler = new OpenSearchConcurrentMergeScheduler(shardId, indexSettings); + scheduler = new OpenSearchConcurrentMergeScheduler(shardId, indexSettings, new MergedSegmentTransferTracker()); // Should use index-level setting assertThat(scheduler.getForceMergeMBPerSec(), equalTo(25.0)); @@ -104,7 +109,11 @@ public void testDisabledRateLimiting() { IndexSettings indexSettings = new IndexSettings(newIndexMeta("test_index", builder.build()), Settings.EMPTY); ShardId shardId = new ShardId("test_index", "test_uuid", 0); - OpenSearchConcurrentMergeScheduler scheduler = new OpenSearchConcurrentMergeScheduler(shardId, indexSettings); + OpenSearchConcurrentMergeScheduler scheduler = new OpenSearchConcurrentMergeScheduler( + shardId, + indexSettings, + new MergedSegmentTransferTracker() + ); // Should have no rate limiting assertThat(scheduler.getForceMergeMBPerSec(), equalTo(Double.POSITIVE_INFINITY)); @@ -130,7 +139,11 @@ public void testDynamicRateLimitUpdates() throws Exception { IndexSettings indexSettings = new IndexSettings(newIndexMeta("test_index", builder.build()), Settings.EMPTY); ShardId shardId = new ShardId("test_index", "test_uuid", 0); - OpenSearchConcurrentMergeScheduler scheduler = new OpenSearchConcurrentMergeScheduler(shardId, indexSettings); + OpenSearchConcurrentMergeScheduler scheduler = new OpenSearchConcurrentMergeScheduler( + shardId, + indexSettings, + new MergedSegmentTransferTracker() + ); assertThat(scheduler.getForceMergeMBPerSec(), equalTo(10.0)); // Update to a different rate limit @@ -190,7 +203,11 @@ public void testFallbackToClusterSettingWhenIndexSettingRemoved() throws Excepti IndexSettings indexSettings = new IndexSettings(newIndexMeta("test_index", builder.build()), nodeSettings); ShardId shardId = new ShardId("test_index", "test_uuid", 0); - OpenSearchConcurrentMergeScheduler scheduler = new OpenSearchConcurrentMergeScheduler(shardId, indexSettings); + OpenSearchConcurrentMergeScheduler scheduler = new OpenSearchConcurrentMergeScheduler( + shardId, + indexSettings, + new MergedSegmentTransferTracker() + ); // Should initially use index-level setting assertThat(scheduler.getForceMergeMBPerSec(), equalTo(25.0)); diff --git a/server/src/test/java/org/opensearch/index/merge/MergeStatsTests.java b/server/src/test/java/org/opensearch/index/merge/MergeStatsTests.java new file mode 100644 index 0000000000000..aca196267724d --- /dev/null +++ b/server/src/test/java/org/opensearch/index/merge/MergeStatsTests.java @@ -0,0 +1,244 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.merge; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class MergeStatsTests extends OpenSearchTestCase { + + public void testDefaultConstructor() { + MergeStats stats = new MergeStats(); + assertEquals(0, stats.getTotal()); + assertEquals(0, stats.getTotalTimeInMillis()); + assertEquals(0, stats.getTotalNumDocs()); + assertEquals(0, stats.getTotalSizeInBytes()); + assertEquals(0, stats.getCurrent()); + assertEquals(0, stats.getCurrentNumDocs()); + assertEquals(0, stats.getCurrentSizeInBytes()); + assertEquals(0, stats.getTotalStoppedTimeInMillis()); + assertEquals(0, stats.getTotalThrottledTimeInMillis()); + assertEquals(0, stats.getUnreferencedFileCleanUpsPerformed()); + assertNotNull(stats.getWarmerStats()); + } + + public void testAdd() { + MergeStats stats = new MergeStats(); + MergedSegmentWarmerStats warmerStats = new MergedSegmentWarmerStats(); + warmerStats.add(1, 10, 0, 100, 200, 5, 15, 0); + + stats.add(5, 100, 50, 1024, 2, 25, 512, 10, 20, 1.5, warmerStats); + + assertEquals(5, stats.getTotal()); + assertEquals(100, stats.getTotalTimeInMillis()); + assertEquals(50, stats.getTotalNumDocs()); + assertEquals(1024, stats.getTotalSizeInBytes()); + assertEquals(2, stats.getCurrent()); + assertEquals(25, stats.getCurrentNumDocs()); + assertEquals(512, stats.getCurrentSizeInBytes()); + assertEquals(10, stats.getTotalStoppedTimeInMillis()); + assertEquals(20, stats.getTotalThrottledTimeInMillis()); + + assertEquals(1, stats.getWarmerStats().getTotalInvocationsCount()); + assertEquals(10, stats.getWarmerStats().getTotalTime().getMillis()); + assertEquals(0, stats.getWarmerStats().getTotalFailureCount()); + assertEquals(new ByteSizeValue(100), stats.getWarmerStats().getTotalSentSize()); + assertEquals(new ByteSizeValue(200), stats.getWarmerStats().getTotalReceivedSize()); + assertEquals(0, stats.getWarmerStats().getOngoingCount()); + assertEquals(5, stats.getWarmerStats().getTotalSendTime().getMillis()); + assertEquals(15, stats.getWarmerStats().getTotalReceiveTime().getMillis()); + } + + public void testAddWithoutMergedSegmentWarmer() { + MergeStats stats = new MergeStats(); + stats.add(5, 100, 50, 1024, 2, 25, 512, 10, 20, 1.5); + + assertEquals(5, stats.getTotal()); + assertEquals(100, stats.getTotalTimeInMillis()); + assertEquals(50, stats.getTotalNumDocs()); + assertEquals(1024, stats.getTotalSizeInBytes()); + assertEquals(2, stats.getCurrent()); + assertEquals(25, stats.getCurrentNumDocs()); + assertEquals(512, stats.getCurrentSizeInBytes()); + assertEquals(10, stats.getTotalStoppedTimeInMillis()); + assertEquals(20, stats.getTotalThrottledTimeInMillis()); + + assertNotNull(stats.getWarmerStats()); + assertEquals(0, stats.getWarmerStats().getTotalInvocationsCount()); + assertEquals(0, stats.getWarmerStats().getTotalTime().getMillis()); + assertEquals(0, stats.getWarmerStats().getTotalFailureCount()); + assertEquals(new ByteSizeValue(0), stats.getWarmerStats().getTotalSentSize()); + assertEquals(new ByteSizeValue(0), stats.getWarmerStats().getTotalReceivedSize()); + assertEquals(0, stats.getWarmerStats().getOngoingCount()); + assertEquals(0, stats.getWarmerStats().getTotalSendTime().getMillis()); + assertEquals(0, stats.getWarmerStats().getTotalReceiveTime().getMillis()); + } + + public void testAddMergeStats() { + MergeStats stats1 = new MergeStats(); + MergeStats stats2 = new MergeStats(); + + MergedSegmentWarmerStats warmerStats = new MergedSegmentWarmerStats(); + warmerStats.add(1, 10, 0, 100, 200, 5, 15, 0); + + stats1.add(5, 100, 50, 1024, 2, 25, 512, 10, 20, 1.5, warmerStats); + stats2.add(3, 50, 30, 512, 1, 15, 256, 5, 10, 1.0, warmerStats); + + stats1.add(stats2); + + assertEquals(8, stats1.getTotal()); + assertEquals(3, stats1.getCurrent()); + assertEquals(40, stats1.getCurrentNumDocs()); + assertEquals(768, stats1.getCurrentSizeInBytes()); + + assertEquals(2, stats1.getWarmerStats().getTotalInvocationsCount()); + assertEquals(20, stats1.getWarmerStats().getTotalTime().getMillis()); + assertEquals(0, stats1.getWarmerStats().getTotalFailureCount()); + assertEquals(new ByteSizeValue(200), stats1.getWarmerStats().getTotalSentSize()); + assertEquals(new ByteSizeValue(400), stats1.getWarmerStats().getTotalReceivedSize()); + assertEquals(0, stats1.getWarmerStats().getOngoingCount()); + assertEquals(10, stats1.getWarmerStats().getTotalSendTime().getMillis()); + assertEquals(30, stats1.getWarmerStats().getTotalReceiveTime().getMillis()); + } + + public void testAddTotals() { + MergeStats stats1 = new MergeStats(); + MergeStats stats2 = new MergeStats(); + + MergedSegmentWarmerStats warmerStats = new MergedSegmentWarmerStats(); + warmerStats.add(1, 10, 0, 100, 200, 5, 15, 7); + + stats1.add(5, 100, 50, 1024, 2, 25, 512, 10, 20, 1.5, warmerStats); + stats2.add(3, 50, 30, 512, 1, 15, 256, 5, 10, 1.0, warmerStats); + + stats1.addTotals(stats2); + + assertEquals(8, stats1.getTotal()); + assertEquals(2, stats1.getCurrent()); // not expected to get added with addTotals + assertEquals(25, stats1.getCurrentNumDocs()); // not expected to get added with addTotals + assertEquals(512, stats1.getCurrentSizeInBytes()); // not expected to get added with addTotals + assertEquals(150, stats1.getTotalTimeInMillis()); + assertEquals(80, stats1.getTotalNumDocs()); + assertEquals(1536, stats1.getTotalSizeInBytes()); + assertEquals(15, stats1.getTotalStoppedTimeInMillis()); + assertEquals(30, stats1.getTotalThrottledTimeInMillis()); + + assertEquals(2, stats1.getWarmerStats().getTotalInvocationsCount()); + assertEquals(20, stats1.getWarmerStats().getTotalTime().getMillis()); + assertEquals(0, stats1.getWarmerStats().getTotalFailureCount()); + assertEquals(new ByteSizeValue(200), stats1.getWarmerStats().getTotalSentSize()); + assertEquals(new ByteSizeValue(400), stats1.getWarmerStats().getTotalReceivedSize()); + assertEquals(7, stats1.getWarmerStats().getOngoingCount()); // not expected to get added with addTotals + assertEquals(10, stats1.getWarmerStats().getTotalSendTime().getMillis()); + assertEquals(30, stats1.getWarmerStats().getTotalReceiveTime().getMillis()); + } + + public void testAddWithNull() { + MergeStats stats = new MergeStats(); + stats.add((MergeStats) null); + stats.addTotals(null); + + assertEquals(0, stats.getTotal()); + assertEquals(0, stats.getCurrent()); + } + + public void testUnreferencedFileCleanUpStats() { + MergeStats stats = new MergeStats(); + stats.addUnreferencedFileCleanUpStats(5); + assertEquals(5, stats.getUnreferencedFileCleanUpsPerformed()); + + stats.addUnreferencedFileCleanUpStats(3); + assertEquals(8, stats.getUnreferencedFileCleanUpsPerformed()); + } + + public void testGetters() { + MergeStats stats = new MergeStats(); + MergedSegmentWarmerStats warmerStats = new MergedSegmentWarmerStats(); + warmerStats.add(1, 10, 0, 100, 200, 5, 15, 0); + + stats.add(5, 100, 50, 1024, 2, 25, 512, 10, 20, 1.5, warmerStats); + + assertEquals(new TimeValue(100), stats.getTotalTime()); + assertEquals(new TimeValue(10), stats.getTotalStoppedTime()); + assertEquals(new TimeValue(20), stats.getTotalThrottledTime()); + assertEquals(new ByteSizeValue(1024), stats.getTotalSize()); + assertEquals(new ByteSizeValue(512), stats.getCurrentSize()); + assertTrue(stats.getTotalBytesPerSecAutoThrottle() > 0); + } + + public void testAutoThrottleMaxValue() { + MergeStats stats1 = new MergeStats(); + MergeStats stats2 = new MergeStats(); + + MergedSegmentWarmerStats warmerStats = new MergedSegmentWarmerStats(); + + stats1.add(1, 10, 5, 100, 0, 0, 0, 0, 0, Double.MAX_VALUE, warmerStats); + stats2.add(1, 10, 5, 100, 0, 0, 0, 0, 0, 1.0, warmerStats); + + stats1.addTotals(stats2); + assertEquals(Long.MAX_VALUE, stats1.getTotalBytesPerSecAutoThrottle()); + } + + public void testSerialization() throws IOException { + MergeStats original = new MergeStats(); + MergedSegmentWarmerStats warmerStats = new MergedSegmentWarmerStats(); + warmerStats.add(1, 10, 0, 100, 200, 5, 15, 0); + + original.add(5, 100, 50, 1024, 2, 25, 512, 10, 20, 1.5, warmerStats); + original.addUnreferencedFileCleanUpStats(3); + + BytesStreamOutput out = new BytesStreamOutput(); + original.writeTo(out); + + BytesReference bytes = out.bytes(); + StreamInput in = bytes.streamInput(); + MergeStats deserialized = new MergeStats(in); + + assertEquals(original.getTotal(), deserialized.getTotal()); + assertEquals(original.getTotalTimeInMillis(), deserialized.getTotalTimeInMillis()); + assertEquals(original.getTotalNumDocs(), deserialized.getTotalNumDocs()); + assertEquals(original.getTotalSizeInBytes(), deserialized.getTotalSizeInBytes()); + assertEquals(original.getCurrent(), deserialized.getCurrent()); + assertEquals(original.getCurrentNumDocs(), deserialized.getCurrentNumDocs()); + assertEquals(original.getCurrentSizeInBytes(), deserialized.getCurrentSizeInBytes()); + assertEquals(original.getTotalStoppedTimeInMillis(), deserialized.getTotalStoppedTimeInMillis()); + assertEquals(original.getTotalThrottledTimeInMillis(), deserialized.getTotalThrottledTimeInMillis()); + assertEquals(original.getTotalBytesPerSecAutoThrottle(), deserialized.getTotalBytesPerSecAutoThrottle()); + } + + public void testToXContent() throws IOException { + MergeStats stats = new MergeStats(); + MergedSegmentWarmerStats warmerStats = new MergedSegmentWarmerStats(); + warmerStats.add(1, 10, 0, 100, 200, 5, 15, 0); + + stats.add(5, 100, 50, 1024, 2, 25, 512, 10, 20, 1.5, warmerStats); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + stats.toXContent(builder, null); + builder.endObject(); + + String json = builder.toString(); + assertTrue(json.contains("merges")); + assertTrue(json.contains("current")); + assertTrue(json.contains("total")); + assertTrue(json.contains("total_time_in_millis")); + assertTrue(json.contains("total_docs")); + assertTrue(json.contains("total_size_in_bytes")); + assertTrue(json.contains("warmer")); + } +} diff --git a/server/src/test/java/org/opensearch/index/merge/MergedSegmentTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/merge/MergedSegmentTransferTrackerTests.java new file mode 100644 index 0000000000000..26e895c9b05a5 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/merge/MergedSegmentTransferTrackerTests.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.merge; + +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.test.OpenSearchTestCase; + +public class MergedSegmentTransferTrackerTests extends OpenSearchTestCase { + + private MergedSegmentTransferTracker tracker; + + @Override + public void setUp() throws Exception { + super.setUp(); + tracker = new MergedSegmentTransferTracker(); + } + + public void testInitialStats() { + MergedSegmentWarmerStats stats = tracker.stats(); + assertEquals(0, stats.getTotalInvocationsCount()); + assertEquals(TimeValue.ZERO, stats.getTotalTime()); + assertEquals(0, stats.getTotalFailureCount()); + assertEquals(ByteSizeValue.class, stats.getTotalSentSize().getClass()); + assertEquals(0, stats.getTotalSentSize().getBytes()); + assertEquals(ByteSizeValue.class, stats.getTotalReceivedSize().getClass()); + assertEquals(0, stats.getTotalReceivedSize().getBytes()); + assertEquals(TimeValue.ZERO, stats.getTotalSendTime()); + assertEquals(TimeValue.ZERO, stats.getTotalReceiveTime()); + assertEquals(0, stats.getOngoingCount()); + } + + public void testIncrementCounters() { + tracker.incrementTotalWarmInvocationsCount(); + tracker.incrementTotalWarmFailureCount(); + + MergedSegmentWarmerStats stats = tracker.stats(); + assertEquals(1, stats.getTotalInvocationsCount()); + assertEquals(1, stats.getTotalFailureCount()); + } + + public void testOngoingWarms() { + tracker.incrementOngoingWarms(); + tracker.incrementOngoingWarms(); + assertEquals(2, tracker.stats().getOngoingCount()); + + tracker.decrementOngoingWarms(); + assertEquals(1, tracker.stats().getOngoingCount()); + } + + public void testAddTimeAndBytes() { + tracker.addTotalWarmTimeMillis(100); + tracker.addTotalSendTimeMillis(200); + tracker.addTotalReceiveTimeMillis(300); + tracker.addTotalBytesSent(1024); + tracker.addTotalBytesReceived(2048); + + MergedSegmentWarmerStats stats = tracker.stats(); + assertEquals(new TimeValue(100), stats.getTotalTime()); + assertEquals(new TimeValue(200), stats.getTotalSendTime()); + assertEquals(new TimeValue(300), stats.getTotalReceiveTime()); + assertEquals(1024, stats.getTotalSentSize().getBytes()); + assertEquals(2048, stats.getTotalReceivedSize().getBytes()); + } + + public void testCumulativeStats() { + tracker.addTotalWarmTimeMillis(100); + tracker.addTotalWarmTimeMillis(50); + assertEquals(new TimeValue(150), tracker.stats().getTotalTime()); + + tracker.addTotalBytesSent(1000); + tracker.addTotalBytesSent(500); + assertEquals(1500, tracker.stats().getTotalSentSize().getBytes()); + } +} diff --git a/server/src/test/java/org/opensearch/index/merge/MergedSegmentWarmerStatsTests.java b/server/src/test/java/org/opensearch/index/merge/MergedSegmentWarmerStatsTests.java new file mode 100644 index 0000000000000..199b7b6ff7cb1 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/merge/MergedSegmentWarmerStatsTests.java @@ -0,0 +1,156 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.merge; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class MergedSegmentWarmerStatsTests extends OpenSearchTestCase { + + public void testDefaultConstructor() { + MergedSegmentWarmerStats stats = new MergedSegmentWarmerStats(); + assertEquals(0, stats.getTotalInvocationsCount()); + assertEquals(0, stats.getTotalTime().getMillis()); + assertEquals(0, stats.getTotalFailureCount()); + assertEquals(0, stats.getTotalSentSize().getBytes()); + assertEquals(0, stats.getTotalReceivedSize().getBytes()); + assertEquals(0, stats.getTotalSendTime().millis()); + assertEquals(0, stats.getTotalReceiveTime().millis()); + assertEquals(0, stats.getOngoingCount()); + } + + public void testAdd() { + MergedSegmentWarmerStats stats = new MergedSegmentWarmerStats(); + stats.add(5, 100, 2, 1024, 2048, 50, 75, 3); + + assertEquals(5, stats.getTotalInvocationsCount()); + assertEquals(100, stats.getTotalTime().getMillis()); + assertEquals(2, stats.getTotalFailureCount()); + assertEquals(1024, stats.getTotalSentSize().getBytes()); + assertEquals(2048, stats.getTotalReceivedSize().getBytes()); + assertEquals(50, stats.getTotalSendTime().millis()); + assertEquals(75, stats.getTotalReceiveTime().millis()); + assertEquals(3, stats.getOngoingCount()); + } + + public void testAddMultiple() { + MergedSegmentWarmerStats stats = new MergedSegmentWarmerStats(); + stats.add(5, 100, 2, 1024, 2048, 50, 75, 3); + stats.add(3, 50, 1, 512, 1024, 25, 30, 1); + + assertEquals(8, stats.getTotalInvocationsCount()); + assertEquals(150, stats.getTotalTime().getMillis()); + assertEquals(3, stats.getTotalFailureCount()); + assertEquals(1536, stats.getTotalSentSize().getBytes()); + assertEquals(3072, stats.getTotalReceivedSize().getBytes()); + assertEquals(75, stats.getTotalSendTime().millis()); + assertEquals(105, stats.getTotalReceiveTime().millis()); + assertEquals(4, stats.getOngoingCount()); + } + + public void testAddStats() { + MergedSegmentWarmerStats stats1 = new MergedSegmentWarmerStats(); + stats1.add(5, 100, 2, 1024, 2048, 50, 75, 3); + + MergedSegmentWarmerStats stats2 = new MergedSegmentWarmerStats(); + stats2.add(3, 50, 1, 512, 1024, 25, 30, 1); + + stats1.add(stats2); + assertEquals(4, stats1.getOngoingCount()); + } + + public void testAddTotals() { + MergedSegmentWarmerStats stats1 = new MergedSegmentWarmerStats(); + stats1.add(5, 100, 2, 1024, 2048, 50, 75, 3); + + MergedSegmentWarmerStats stats2 = new MergedSegmentWarmerStats(); + stats2.add(3, 50, 1, 512, 1024, 25, 30, 1); + + stats1.addTotals(stats2); + assertEquals(8, stats1.getTotalInvocationsCount()); + assertEquals(150, stats1.getTotalTime().getMillis()); + assertEquals(3, stats1.getTotalFailureCount()); + assertEquals(1536, stats1.getTotalSentSize().getBytes()); + assertEquals(3072, stats1.getTotalReceivedSize().getBytes()); + assertEquals(75, stats1.getTotalSendTime().millis()); + assertEquals(105, stats1.getTotalReceiveTime().millis()); + } + + public void testAddTotalsWithNull() { + MergedSegmentWarmerStats stats = new MergedSegmentWarmerStats(); + stats.add(5, 100, 2, 1024, 2048, 50, 75, 3); + + stats.addTotals(null); + assertEquals(5, stats.getTotalInvocationsCount()); + } + + public void testSerialization() throws IOException { + MergedSegmentWarmerStats original = new MergedSegmentWarmerStats(); + original.add(5, 100, 2, 1024, 2048, 50, 75, 3); + + BytesStreamOutput out = new BytesStreamOutput(); + original.writeTo(out); + + BytesReference bytes = out.bytes(); + StreamInput in = bytes.streamInput(); + MergedSegmentWarmerStats deserialized = new MergedSegmentWarmerStats(in); + + assertEquals(original.getTotalInvocationsCount(), deserialized.getTotalInvocationsCount()); + assertEquals(original.getTotalTime().getMillis(), deserialized.getTotalTime().getMillis()); + assertEquals(original.getTotalFailureCount(), deserialized.getTotalFailureCount()); + assertEquals(original.getTotalSentSize().getBytes(), deserialized.getTotalSentSize().getBytes()); + assertEquals(original.getTotalReceivedSize().getBytes(), deserialized.getTotalReceivedSize().getBytes()); + assertEquals(original.getTotalSendTime().millis(), deserialized.getTotalSendTime().millis()); + assertEquals(original.getTotalReceiveTime().millis(), deserialized.getTotalReceiveTime().millis()); + assertEquals(original.getOngoingCount(), deserialized.getOngoingCount()); + } + + public void testToXContent() throws IOException { + MergedSegmentWarmerStats stats = new MergedSegmentWarmerStats(); + stats.add(5, 100, 2, 1024, 2048, 50, 75, 3); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + stats.toXContent(builder, null); + builder.endObject(); + + String json = builder.toString(); + assertTrue(json.contains("warmer")); + assertTrue(json.contains("total_invocations_count")); + assertTrue(json.contains("total_time_millis")); + assertTrue(json.contains("total_failure_count")); + assertTrue(json.contains("total_bytes_sent")); + assertTrue(json.contains("total_bytes_received")); + assertTrue(json.contains("total_send_time_millis")); + assertTrue(json.contains("total_receive_time_millis")); + assertTrue(json.contains("ongoing_count")); + } + + public void testGetters() { + MergedSegmentWarmerStats stats = new MergedSegmentWarmerStats(); + stats.add(5, 100, 2, 1024, 2048, 50, 75, 3); + + assertEquals(5, stats.getTotalInvocationsCount()); + assertEquals(new TimeValue(100), stats.getTotalTime()); + assertEquals(2, stats.getTotalFailureCount()); + assertEquals(new ByteSizeValue(1024), stats.getTotalSentSize()); + assertEquals(new ByteSizeValue(2048), stats.getTotalReceivedSize()); + assertEquals(new TimeValue(50), stats.getTotalSendTime()); + assertEquals(new TimeValue(75), stats.getTotalReceiveTime()); + assertEquals(3, stats.getOngoingCount()); + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java index ca91bbef52296..bf6d2d9e65882 100644 --- a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java @@ -229,7 +229,7 @@ public void testGetMergedSegmentFiles() throws IOException, ExecutionException, GetSegmentFilesResponse response = res.get(); assertEquals(response.files.size(), filesToFetch.size()); assertTrue(response.files.containsAll(filesToFetch)); - closeShards(replicaShard); + closeShardWithRetry(replicaShard); } public void testGetMergedSegmentFilesDownloadTimeout() throws IOException, ExecutionException, InterruptedException { @@ -293,7 +293,7 @@ public void onFailure(Exception e) { observedException.getMessage() != null && observedException.getMessage().equals("Timed out waiting for merged segments download from remote store") ); - closeShards(replicaShard); + closeShardWithRetry(replicaShard); } public void testGetMergedSegmentFilesFailure() throws IOException, ExecutionException, InterruptedException { @@ -356,4 +356,18 @@ private void buildIndexShardBehavior(IndexShard mockShard, IndexShard indexShard FilterDirectory remoteStoreFilterDirectory = new TestFilterDirectory(new TestFilterDirectory(remoteSegmentStoreDirectory)); when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); } + + private void closeShardWithRetry(IndexShard shard) { + try { + assertBusy(() -> { + try { + closeShards(shard); + } catch (RuntimeException e) { + throw new AssertionError("Failed to close shard", e); + } + }); + } catch (Exception e) { + logger.warn("Unable to close shard " + shard.shardId() + ". Exception: " + e); + } + } } diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/RemoteStorePublishMergedSegmentActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/RemoteStorePublishMergedSegmentActionTests.java index 338e1a44e7713..8ab597cde87fb 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/RemoteStorePublishMergedSegmentActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/RemoteStorePublishMergedSegmentActionTests.java @@ -29,6 +29,7 @@ import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexService; +import org.opensearch.index.merge.MergedSegmentTransferTracker; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.RemoteDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectory; @@ -115,6 +116,7 @@ public void testPublishMergedSegment() { final int id = randomIntBetween(0, 4); final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.mergedSegmentTransferTracker()).thenReturn(new MergedSegmentTransferTracker()); when(indexService.getShard(id)).thenReturn(indexShard); final ShardId shardId = new ShardId(index, id); @@ -177,6 +179,7 @@ public void testPublishMergedSegmentWithNoTimeLeftAfterUpload() { final int id = randomIntBetween(0, 4); final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.mergedSegmentTransferTracker()).thenReturn(new MergedSegmentTransferTracker()); when(indexService.getShard(id)).thenReturn(indexShard); final ShardId shardId = new ShardId(index, id); @@ -238,6 +241,7 @@ public void testPublishMergedSegmentActionOnPrimary() throws InterruptedExceptio final int id = randomIntBetween(0, 4); final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.mergedSegmentTransferTracker()).thenReturn(new MergedSegmentTransferTracker()); when(indexService.getShard(id)).thenReturn(indexShard); final ShardId shardId = new ShardId(index, id); @@ -279,6 +283,7 @@ public void testPublishMergedSegmentActionOnReplica() throws IOException { when(indicesService.indexServiceSafe(index)).thenReturn(indexService); final int id = randomIntBetween(0, 4); final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.mergedSegmentTransferTracker()).thenReturn(new MergedSegmentTransferTracker()); when(indexService.getShard(id)).thenReturn(indexShard); final ShardId shardId = new ShardId(index, id); final RemoteSegmentStoreDirectory remoteDirectory = new RemoteSegmentStoreDirectory( @@ -346,6 +351,7 @@ public void testPublishMergedSegmentActionOnReplicaWithMismatchedShardId() throw when(indicesService.indexServiceSafe(index)).thenReturn(indexService); final int id = randomIntBetween(0, 4); final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.mergedSegmentTransferTracker()).thenReturn(new MergedSegmentTransferTracker()); when(indexService.getShard(id)).thenReturn(indexShard); final ShardId shardId = new ShardId(index, id); final RemoteSegmentStoreDirectory remoteDirectory = new RemoteSegmentStoreDirectory( @@ -425,6 +431,7 @@ public void testPublishMergedSegmentActionOnDocrepReplicaDuringMigration() throw when(indicesService.indexServiceSafe(index)).thenReturn(indexService); final int id = randomIntBetween(0, 4); final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.mergedSegmentTransferTracker()).thenReturn(new MergedSegmentTransferTracker()); when(indexService.getShard(id)).thenReturn(indexShard); final ShardId shardId = new ShardId(index, id); @@ -461,6 +468,7 @@ public void testPublishMergedSegmentActionOnDocrepReplicaDuringMigration() throw public void testGetReplicationModeWithRemoteTranslog() { final RemoteStorePublishMergedSegmentAction action = createAction(); final IndexShard indexShard = mock(IndexShard.class); + when(indexShard.mergedSegmentTransferTracker()).thenReturn(new MergedSegmentTransferTracker()); when(indexShard.indexSettings()).thenReturn(createIndexSettings(true)); assertEquals(ReplicationMode.FULL_REPLICATION, action.getReplicationMode(indexShard)); } diff --git a/server/src/test/java/org/opensearch/rest/action/cat/RestShardsActionTests.java b/server/src/test/java/org/opensearch/rest/action/cat/RestShardsActionTests.java index 8bb4660d15155..708e0c7d3da77 100644 --- a/server/src/test/java/org/opensearch/rest/action/cat/RestShardsActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/cat/RestShardsActionTests.java @@ -158,7 +158,7 @@ private void assertTable(Table table) { assertThat(headers.get(6).value, equalTo("ip")); assertThat(headers.get(7).value, equalTo("id")); assertThat(headers.get(8).value, equalTo("node")); - assertThat(headers.get(84).value, equalTo("docs.deleted")); + assertThat(headers.get(92).value, equalTo("docs.deleted")); final List> rows = table.getRows(); assertThat(rows.size(), equalTo(shardRoutings.size())); @@ -174,9 +174,9 @@ private void assertTable(Table table) { assertThat(row.get(4).value, equalTo(shardStats.getStats().getDocs().getCount())); assertThat(row.get(6).value, equalTo(localNode.getHostAddress())); assertThat(row.get(7).value, equalTo(localNode.getId())); - assertThat(row.get(82).value, equalTo(shardStats.getDataPath())); - assertThat(row.get(83).value, equalTo(shardStats.getStatePath())); - assertThat(row.get(84).value, equalTo(shardStats.getStats().getDocs().getDeleted())); + assertThat(row.get(90).value, equalTo(shardStats.getDataPath())); + assertThat(row.get(91).value, equalTo(shardStats.getStatePath())); + assertThat(row.get(92).value, equalTo(shardStats.getStats().getDocs().getDeleted())); } } }