From 9b7e4a5a15209e32f493977432518fde1d6a0f5c Mon Sep 17 00:00:00 2001 From: guojialiang Date: Fri, 1 Aug 2025 15:01:15 +0800 Subject: [PATCH 1/6] remove listAll from computeReferencedSegmentsCheckpoint Signed-off-by: guojialiang --- .../replication/MergedSegmentWarmerIT.java | 80 +++++++++++++++++-- .../common/settings/IndexScopedSettings.java | 1 + .../org/opensearch/index/IndexSettings.java | 25 ++++++ .../opensearch/index/shard/IndexShard.java | 80 ++++++++++++++----- .../MergedSegmentReplicationTarget.java | 2 +- .../index/shard/RemoteIndexShardTests.java | 4 +- .../SegmentReplicationIndexShardTests.java | 43 +++++++++- 7 files changed, 203 insertions(+), 32 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java index 41e08690f22be..47494d27868bb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java @@ -182,8 +182,8 @@ public void testMergeSegmentWarmerWithInactiveReplica() throws Exception { assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size()); } - // Construct a case with redundant pending merge segments in replica shard, and finally delete these files - public void testCleanupRedundantPendingMergeFile() throws Exception { + // Construct a case with redundant merge segments in replica shard, and finally delete these files + public void testCleanupReplicaRedundantMergedSegment() throws Exception { final String primaryNode = internalCluster().startDataOnlyNode(); createIndex( INDEX_NAME, @@ -227,13 +227,13 @@ public void testCleanupRedundantPendingMergeFile() throws Exception { } IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME); - assertBusy(() -> assertFalse(replicaShard.getPendingMergedSegmentCheckpoints().isEmpty())); + assertBusy(() -> assertFalse(replicaShard.getReplicaMergedSegmentCheckpoints().isEmpty())); client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get(); forceMergeComplete.set(true); // Verify replica shard has pending merged segments - assertBusy(() -> { assertFalse(replicaShard.getPendingMergedSegmentCheckpoints().isEmpty()); }, 1, TimeUnit.MINUTES); + assertBusy(() -> { assertFalse(replicaShard.getReplicaMergedSegmentCheckpoints().isEmpty()); }, 1, TimeUnit.MINUTES); waitForSegmentCount(INDEX_NAME, 1, logger); primaryTransportService.clearAllRules(); @@ -245,20 +245,86 @@ public void testCleanupRedundantPendingMergeFile() throws Exception { .setSettings( Settings.builder() .put(IndexSettings.INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1)) + .put(IndexSettings.INDEX_MERGED_SEGMENT_CHECKPOINT_RETENTION_TIME.getKey(), TimeValue.timeValueSeconds(1)) ) ); + waitForSameFilesInPrimaryAndReplica(INDEX_NAME, primaryNode, replicaNode); + assertBusy(() -> assertTrue(replicaShard.getReplicaMergedSegmentCheckpoints().isEmpty())); + } + + // Construct a case with redundant merged segment checkpoint in the primary shard and delete it based on the expiration time + public void testPrimaryMergedSegmentCheckpointRetentionTimeout() throws Exception { + final String primaryNode = internalCluster().startDataOnlyNode(); + // close auto refresh + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put("index.refresh_interval", -1).build()); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replicaNode = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + // generate segment _0.si + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + refresh(INDEX_NAME); + // generate segment _1.si + client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").get(); + refresh(INDEX_NAME); + // generate segment _2.si + client().prepareIndex(INDEX_NAME).setId("3").setSource("abc", "def").get(); + refresh(INDEX_NAME); + + // force merge 3 segments to 2 segments, generate segment _3.si + // specify parameter flush as false to prevent triggering the refresh operation + logger.info("force merge segments to 2"); + client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).flush(false).maxNumSegments(2)).get(); + // since the refresh operation has not been performed, _3.si will remain in primaryMergedSegmentCheckpoints + waitForSameFilesInPrimaryAndReplica(INDEX_NAME, primaryNode, replicaNode); + + // force merge 2 segments to 1 segment, generate segment _4.si + // use the default value (true) of parameter flush to trigger the refresh operation + logger.info("force merge segments to 1"); + client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get(); + refresh(INDEX_NAME); + // since the refresh operation has been performed, _4.si will be removed from primaryMergedSegmentCheckpoints + waitForSegmentCount(INDEX_NAME, 1, logger); + + // Verify that primary shard and replica shard have non-empty merged segment checkpoints + IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); + assertBusy(() -> assertFalse(primaryShard.getPrimaryMergedSegmentCheckpoints().isEmpty())); + + IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME); + assertBusy(() -> assertFalse(replicaShard.getReplicaMergedSegmentCheckpoints().isEmpty())); + + // update the configuration to expire _3.si, and then remove it from primaryMergedSegmentCheckpoints + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings( + Settings.builder() + .put(IndexSettings.INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1)) + .put(IndexSettings.INDEX_MERGED_SEGMENT_CHECKPOINT_RETENTION_TIME.getKey(), TimeValue.timeValueSeconds(1)) + ) + ); + + waitForSameFilesInPrimaryAndReplica(INDEX_NAME, primaryNode, replicaNode); + + // Verify that primary shard and replica shard have empty merged segment checkpoints + assertBusy(() -> assertTrue(primaryShard.getPrimaryMergedSegmentCheckpoints().isEmpty())); + assertBusy(() -> assertTrue(replicaShard.getReplicaMergedSegmentCheckpoints().isEmpty())); + } + + public void waitForSameFilesInPrimaryAndReplica(String indexName, String primaryNode, String replicaNode) throws Exception { assertBusy(() -> { - IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); + IndexShard primaryShard = getIndexShard(primaryNode, indexName); Directory primaryDirectory = primaryShard.store().directory(); Set primaryFiles = Sets.newHashSet(primaryDirectory.listAll()); primaryFiles.removeIf(f -> f.startsWith("segment")); + IndexShard replicaShard = getIndexShard(replicaNode, indexName); Directory replicaDirectory = replicaShard.store().directory(); Set replicaFiles = Sets.newHashSet(replicaDirectory.listAll()); replicaFiles.removeIf(f -> f.startsWith("segment")); - // Verify replica shard does not have pending merged segments - assertEquals(0, replicaShard.getPendingMergedSegmentCheckpoints().size()); // Verify that primary shard and replica shard have the same file list + logger.info("primary files: {}, replica files: {}", primaryFiles, replicaFiles); assertEquals(primaryFiles, replicaFiles); }, 1, TimeUnit.MINUTES); } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index ba442aaddae82..b6aa5e3bf5aab 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -160,6 +160,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.MAX_NESTED_QUERY_DEPTH_SETTING, IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, IndexSettings.INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING, + IndexSettings.INDEX_MERGED_SEGMENT_CHECKPOINT_RETENTION_TIME, IndexSettings.DEFAULT_FIELD_SETTING, IndexSettings.QUERY_STRING_LENIENT_SETTING, IndexSettings.ALLOW_UNMAPPED, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 1e94cad4e01d9..9b09843da00d1 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -179,6 +179,17 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); + /** + * Index setting describing the maximum retention time of merged segment checkpoint in the primary shard, used to prevent memory leak. + */ + public static final Setting INDEX_MERGED_SEGMENT_CHECKPOINT_RETENTION_TIME = Setting.timeSetting( + "index.merged_segment_checkpoint.retention_time", + TimeValue.timeValueMinutes(15), + TimeValue.timeValueSeconds(0), + Property.Dynamic, + Property.IndexScope + ); + public static final Setting INDEX_SEARCH_IDLE_AFTER = Setting.timeSetting( "index.search.idle.after", TimeValue.timeValueSeconds(30), @@ -823,6 +834,7 @@ public static IndexMergePolicy fromString(String text) { private volatile Translog.Durability durability; private volatile TimeValue syncInterval; private volatile TimeValue publishReferencedSegmentsInterval; + private volatile TimeValue mergedSegmentCheckpointRetentionTime; private volatile TimeValue refreshInterval; private volatile ByteSizeValue flushThresholdSize; private volatile TimeValue translogRetentionAge; @@ -1036,6 +1048,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti defaultFields = scopedSettings.get(DEFAULT_FIELD_SETTING); syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings); publishReferencedSegmentsInterval = INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.get(settings); + mergedSegmentCheckpointRetentionTime = INDEX_MERGED_SEGMENT_CHECKPOINT_RETENTION_TIME.get(settings); refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); @@ -1161,6 +1174,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING, this::setPublishReferencedSegmentsInterval ); + scopedSettings.addSettingsUpdateConsumer( + INDEX_MERGED_SEGMENT_CHECKPOINT_RETENTION_TIME, + this::setMergedSegmentCheckpointRetentionTime + ); scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow); scopedSettings.addSettingsUpdateConsumer(MAX_INNER_RESULT_WINDOW_SETTING, this::setMaxInnerResultWindow); scopedSettings.addSettingsUpdateConsumer(MAX_ADJACENCY_MATRIX_FILTERS_SETTING, this::setMaxAdjacencyMatrixFilters); @@ -1547,6 +1564,14 @@ public void setPublishReferencedSegmentsInterval(TimeValue publishReferencedSegm this.publishReferencedSegmentsInterval = publishReferencedSegmentsInterval; } + public TimeValue getMergedSegmentCheckpointRetentionTime() { + return mergedSegmentCheckpointRetentionTime; + } + + public void setMergedSegmentCheckpointRetentionTime(TimeValue mergedSegmentCheckpointRetentionTime) { + this.mergedSegmentCheckpointRetentionTime = mergedSegmentCheckpointRetentionTime; + } + /** * Returns the translog sync/upload buffer interval when remote translog store is enabled and index setting * {@code index.translog.durability} is set as {@code request}. diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index de3f227b2d62c..7508fe8c3e6eb 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -97,8 +97,10 @@ import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.metrics.MeanMetric; import org.opensearch.common.settings.Settings; +import org.opensearch.common.time.DateUtils; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractAsyncTask; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.AsyncIOProcessor; @@ -222,6 +224,8 @@ import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -387,7 +391,12 @@ Runnable getGlobalCheckpointSyncer() { private final ClusterApplierService clusterApplierService; private final MergedSegmentPublisher mergedSegmentPublisher; private final ReferencedSegmentsPublisher referencedSegmentsPublisher; - private final Set pendingMergedSegmentCheckpoints = Sets.newConcurrentHashSet(); + + // Primary Shard: track merged segment checkpoints that have been published for pre-warm but not yet refreshed. + private final Set primaryMergedSegmentCheckpoints = Sets.newConcurrentHashSet(); + + // Replica Shard: record the pre-copied merged segment checkpoints, which are not yet refreshed. + private final Set replicaMergedSegmentCheckpoints = Sets.newConcurrentHashSet(); @InternalApi public IndexShard( @@ -1780,7 +1789,7 @@ public void finalizeReplication(SegmentInfos infos) throws IOException { segmentCommitInfoName ) ); - pendingMergedSegmentCheckpoints.removeIf(s -> s.getSegmentName().equals(segmentCommitInfoName)); + replicaMergedSegmentCheckpoints.removeIf(s -> s.getSegmentName().equals(segmentCommitInfoName)); } } } @@ -1825,7 +1834,7 @@ public void finalizeReplication(SegmentInfos infos) throws IOException { */ public void cleanupRedundantPendingMergeSegment(ReferencedSegmentsCheckpoint referencedSegmentsCheckpoint) { List pendingDeleteCheckpoints = new ArrayList<>(); - for (MergedSegmentCheckpoint mergedSegmentCheckpoint : pendingMergedSegmentCheckpoints) { + for (MergedSegmentCheckpoint mergedSegmentCheckpoint : replicaMergedSegmentCheckpoints) { if (false == referencedSegmentsCheckpoint.getSegmentNames().contains(mergedSegmentCheckpoint.getSegmentName()) && referencedSegmentsCheckpoint.isAheadOf(mergedSegmentCheckpoint)) { logger.trace( @@ -1838,17 +1847,51 @@ public void cleanupRedundantPendingMergeSegment(ReferencedSegmentsCheckpoint ref } for (MergedSegmentCheckpoint mergedSegmentCheckpoint : pendingDeleteCheckpoints) { store.deleteQuiet(mergedSegmentCheckpoint.getMetadataMap().keySet().toArray(new String[0])); - pendingMergedSegmentCheckpoints.remove(mergedSegmentCheckpoint); + replicaMergedSegmentCheckpoints.remove(mergedSegmentCheckpoint); } } - public void addPendingMergeSegmentCheckpoint(MergedSegmentCheckpoint mergedSegmentCheckpoint) { - pendingMergedSegmentCheckpoints.add(mergedSegmentCheckpoint); + // Remove expired primary merged segment checkpoints to prevent memory leaks + public void removeExpiredPrimaryMergedSegmentCheckpoints() { + Set expiredMergedSegmentCheckpoints = primaryMergedSegmentCheckpoints.stream() + .filter( + m -> Duration.ofNanos(DateUtils.toLong(Instant.now()) - m.getCreatedTimeStamp()).toMillis() > indexSettings + .getMergedSegmentCheckpointRetentionTime() + .millis() + ) + .collect(Collectors.toSet()); + expiredMergedSegmentCheckpoints.forEach(primaryMergedSegmentCheckpoints::remove); + logger.trace("primary shard remove expired merged segment checkpoints {}", expiredMergedSegmentCheckpoints); + } + + // Remove the already refreshed segments from primary merged segment checkpoints. + public void updatePrimaryMergedSegmentCheckpoints(SegmentInfos segmentInfos) { + Set segmentNames = segmentInfos.asList().stream().map(s -> s.info.name).collect(Collectors.toSet()); + Set refreshedMergedSegmentCheckpoints = primaryMergedSegmentCheckpoints.stream() + .filter(m -> segmentNames.contains(m.getSegmentName())) + .collect(Collectors.toSet()); + refreshedMergedSegmentCheckpoints.forEach(primaryMergedSegmentCheckpoints::remove); + logger.trace("primary shard remove refreshed merged segment checkpoints {}", refreshedMergedSegmentCheckpoints); + } + + public void addPrimaryMergedSegmentCheckpoint(MergedSegmentCheckpoint mergedSegmentCheckpoint) { + logger.trace("primary shard add merged segment checkpoint {}", mergedSegmentCheckpoint); + primaryMergedSegmentCheckpoints.add(mergedSegmentCheckpoint); } // for tests - public Set getPendingMergedSegmentCheckpoints() { - return pendingMergedSegmentCheckpoints; + public Set getPrimaryMergedSegmentCheckpoints() { + return primaryMergedSegmentCheckpoints; + } + + public void addReplicaMergedSegmentCheckpoint(MergedSegmentCheckpoint mergedSegmentCheckpoint) { + logger.trace("replica shard add merged segment checkpoint {}", mergedSegmentCheckpoint); + replicaMergedSegmentCheckpoints.add(mergedSegmentCheckpoint); + } + + // for tests + public Set getReplicaMergedSegmentCheckpoints() { + return replicaMergedSegmentCheckpoints; } /** @@ -1944,6 +1987,7 @@ ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) th public void publishReferencedSegments() throws IOException { assert referencedSegmentsPublisher != null; referencedSegmentsPublisher.publish(this, computeReferencedSegmentsCheckpoint()); + removeExpiredPrimaryMergedSegmentCheckpoints(); } /** @@ -1955,16 +1999,11 @@ public void publishReferencedSegments() throws IOException { */ public ReferencedSegmentsCheckpoint computeReferencedSegmentsCheckpoint() throws IOException { try (GatedCloseable segmentInfosGatedCloseable = getSegmentInfosSnapshot()) { - String[] allFiles = store.directory().listAll(); Set segmentNames = Sets.newHashSet(); - for (String file : allFiles) { - // filter segment files - if (false == file.endsWith(".si")) { - continue; - } - String segmentName = IndexFileNames.parseSegmentName(file); - segmentNames.add(segmentName); - } + segmentNames.addAll(segmentInfosGatedCloseable.get().asList().stream().map(s -> s.info.name).collect(Collectors.toSet())); + segmentNames.addAll( + primaryMergedSegmentCheckpoints.stream().map(MergedSegmentCheckpoint::getSegmentName).collect(Collectors.toSet()) + ); return new ReferencedSegmentsCheckpoint( shardId, getOperationPrimaryTerm(), @@ -1979,7 +2018,9 @@ public ReferencedSegmentsCheckpoint computeReferencedSegmentsCheckpoint() throws public void publishMergedSegment(SegmentCommitInfo segmentCommitInfo) throws IOException { assert mergedSegmentPublisher != null; - mergedSegmentPublisher.publish(this, computeMergeSegmentCheckpoint(segmentCommitInfo)); + MergedSegmentCheckpoint mergedSegmentCheckpoint = computeMergeSegmentCheckpoint(segmentCommitInfo); + addPrimaryMergedSegmentCheckpoint(mergedSegmentCheckpoint); + mergedSegmentPublisher.publish(this, mergedSegmentCheckpoint); } /** @@ -5112,6 +5153,9 @@ private void updateReplicationCheckpoint() { final Tuple, ReplicationCheckpoint> tuple = getLatestSegmentInfosAndCheckpoint(); try (final GatedCloseable ignored = tuple.v1()) { replicationTracker.setLatestReplicationCheckpoint(tuple.v2()); + if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING)) { + updatePrimaryMergedSegmentCheckpoints(tuple.v1().get()); + } } catch (IOException e) { throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e); } diff --git a/server/src/main/java/org/opensearch/indices/replication/MergedSegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/MergedSegmentReplicationTarget.java index ead4007a1b62a..86c6d635ebb3c 100644 --- a/server/src/main/java/org/opensearch/indices/replication/MergedSegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/MergedSegmentReplicationTarget.java @@ -62,7 +62,7 @@ protected void getFilesFromSource( protected void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws Exception { assert checkpoint instanceof MergedSegmentCheckpoint; multiFileWriter.renameAllTempFiles(); - indexShard.addPendingMergeSegmentCheckpoint((MergedSegmentCheckpoint) checkpoint); + indexShard.addReplicaMergedSegmentCheckpoint((MergedSegmentCheckpoint) checkpoint); } @Override diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 43eda929b7aa4..20894f411ef3d 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -648,8 +648,8 @@ public void testMergedSegmentReplicationWithZeroReplica() throws Exception { @LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) @Override @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/18720") - public void testCleanupRedundantPendingMergeSegment() throws Exception { - super.testCleanupRedundantPendingMergeSegment(); + public void testCleanupReplicaRedundantMergedSegment() throws Exception { + super.testCleanupReplicaRedundantMergedSegment(); } private RemoteStoreReplicationSource getRemoteStoreReplicationSource(IndexShard shard, Runnable postGetFilesRunnable) { diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index f416a022ce3b7..8182ee0fe3e71 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -209,7 +209,7 @@ public void testMergedSegmentReplicationWithZeroReplica() throws Exception { } @LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) - public void testCleanupRedundantPendingMergeSegment() throws Exception { + public void testCleanupReplicaRedundantMergedSegment() throws Exception { try (ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory());) { shards.startAll(); final IndexShard primaryShard = shards.getPrimary(); @@ -241,16 +241,51 @@ public void testCleanupRedundantPendingMergeSegment() throws Exception { // verify primary segment count and replica pending merge segment count assertEquals(1, primaryShard.segments(false).size()); - assertEquals(2, replicaShard.getPendingMergedSegmentCheckpoints().size()); + assertEquals(2, replicaShard.getReplicaMergedSegmentCheckpoints().size()); // after segment replication, _4.si is removed from pending merge segments replicateSegments(primaryShard, List.of(replicaShard)); - assertEquals(1, replicaShard.getPendingMergedSegmentCheckpoints().size()); + assertEquals(1, replicaShard.getReplicaMergedSegmentCheckpoints().size()); // after cleanup redundant pending merge segment, _2.si is removed from pending merge segments ReferencedSegmentsCheckpoint referencedSegmentsCheckpoint = primaryShard.computeReferencedSegmentsCheckpoint(); replicaShard.cleanupRedundantPendingMergeSegment(referencedSegmentsCheckpoint); - assertEquals(0, replicaShard.getPendingMergedSegmentCheckpoints().size()); + assertEquals(0, replicaShard.getReplicaMergedSegmentCheckpoints().size()); + } + } + + @LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) + public void testPrimaryMergedSegmentCheckpointRetentionTimeout() throws Exception { + // close auto refresh + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexSettings.INDEX_MERGED_SEGMENT_CHECKPOINT_RETENTION_TIME.getKey(), TimeValue.timeValueSeconds(0)) + .put("index.refresh_interval", -1) + .build(); + try (ReplicationGroup shards = createGroup(1, indexSettings, indexMapping, new NRTReplicationEngineFactory());) { + shards.startAll(); + final IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + + // generate segment _0.si and _1.si + int numDocs = 1; + shards.indexDocs(numDocs); + primaryShard.refresh("test"); + flushShard(primaryShard); + + shards.indexDocs(numDocs); + primaryShard.refresh("test"); + flushShard(primaryShard); + + // generate pending merge segment _2.si + // specify parameter flush as false to prevent triggering the refresh operation + primaryShard.forceMerge(new ForceMergeRequest("test").flush(false).maxNumSegments(1)); + assertEquals(1, primaryShard.getPrimaryMergedSegmentCheckpoints().size()); + primaryShard.removeExpiredPrimaryMergedSegmentCheckpoints(); + assertEquals(0, primaryShard.getPrimaryMergedSegmentCheckpoints().size()); + + primaryShard.refresh("test"); + replicateSegments(primaryShard, List.of(replicaShard)); } } From 0c063022e3fe4872a09051da1f91901c59664ad0 Mon Sep 17 00:00:00 2001 From: guojialiang Date: Fri, 1 Aug 2025 16:12:06 +0800 Subject: [PATCH 2/6] fix ut Signed-off-by: guojialiang --- .../org/opensearch/index/shard/RemoteIndexShardTests.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 20894f411ef3d..7d971a32537a1 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -652,6 +652,13 @@ public void testCleanupReplicaRedundantMergedSegment() throws Exception { super.testCleanupReplicaRedundantMergedSegment(); } + @LockFeatureFlag(MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG) + @Override + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/18890") + public void testPrimaryMergedSegmentCheckpointRetentionTimeout() throws Exception { + super.testPrimaryMergedSegmentCheckpointRetentionTimeout(); + } + private RemoteStoreReplicationSource getRemoteStoreReplicationSource(IndexShard shard, Runnable postGetFilesRunnable) { return new RemoteStoreReplicationSource(shard) { @Override From 253c15f178beb622067d6f49b077080f08ae82e5 Mon Sep 17 00:00:00 2001 From: guojialiang Date: Fri, 1 Aug 2025 17:28:43 +0800 Subject: [PATCH 3/6] fix ut Signed-off-by: guojialiang --- server/src/main/java/org/opensearch/index/shard/IndexShard.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 7508fe8c3e6eb..8f9323783d71f 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1855,7 +1855,7 @@ public void cleanupRedundantPendingMergeSegment(ReferencedSegmentsCheckpoint ref public void removeExpiredPrimaryMergedSegmentCheckpoints() { Set expiredMergedSegmentCheckpoints = primaryMergedSegmentCheckpoints.stream() .filter( - m -> Duration.ofNanos(DateUtils.toLong(Instant.now()) - m.getCreatedTimeStamp()).toMillis() > indexSettings + m -> Duration.ofNanos(DateUtils.toLong(Instant.now()) - m.getCreatedTimeStamp()).toMillis() >= indexSettings .getMergedSegmentCheckpointRetentionTime() .millis() ) From 9ba9cb68b479db8a2b4ba649c1c505b877184673 Mon Sep 17 00:00:00 2001 From: guojialiang Date: Sun, 3 Aug 2025 00:25:00 +0800 Subject: [PATCH 4/6] update Signed-off-by: guojialiang --- .../indices/replication/SegmentReplicationSourceService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index c748c46535e3d..7055007408a01 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -128,7 +128,7 @@ private class CheckpointInfoRequestHandler implements TransportRequestHandler Date: Mon, 4 Aug 2025 10:25:43 +0800 Subject: [PATCH 5/6] update Signed-off-by: guojialiang --- .../src/main/java/org/opensearch/index/shard/IndexShard.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 8f9323783d71f..9185c6f1961f7 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -5154,7 +5154,9 @@ private void updateReplicationCheckpoint() { try (final GatedCloseable ignored = tuple.v1()) { replicationTracker.setLatestReplicationCheckpoint(tuple.v2()); if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING)) { - updatePrimaryMergedSegmentCheckpoints(tuple.v1().get()); + if (isPrimaryMode() && routingEntry().active()) { + updatePrimaryMergedSegmentCheckpoints(tuple.v1().get()); + } } } catch (IOException e) { throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e); From 137195f81c7c1e3ce8c77f7bfa8bf53c8a320834 Mon Sep 17 00:00:00 2001 From: guojialiang Date: Tue, 12 Aug 2025 00:53:00 +0800 Subject: [PATCH 6/6] clear completedReplications when cancel Signed-off-by: guojialiang --- .../org/opensearch/indices/replication/SegmentReplicator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java index 72abd4f33e465..342b83679861a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicator.java @@ -395,6 +395,7 @@ void cancel(ShardId shardId, String reason) { onGoingMergedSegmentReplications.cancelForShard(shardId, reason); replicationCheckpointStats.remove(shardId); primaryCheckpoint.remove(shardId); + completedReplications.remove(shardId); } SegmentReplicationTarget get(ShardId shardId) {