Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ public void testMergeSegmentWarmerWithInactiveReplica() throws Exception {
assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size());
}

// Construct a case with redundant pending merge segments in replica shard, and finally delete these files
public void testCleanupRedundantPendingMergeFile() throws Exception {
// Construct a case with redundant merge segments in replica shard, and finally delete these files
public void testCleanupReplicaRedundantMergedSegment() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Expand Down Expand Up @@ -227,13 +227,13 @@ public void testCleanupRedundantPendingMergeFile() throws Exception {
}

IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
assertBusy(() -> assertFalse(replicaShard.getPendingMergedSegmentCheckpoints().isEmpty()));
assertBusy(() -> assertFalse(replicaShard.getReplicaMergedSegmentCheckpoints().isEmpty()));

client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get();
forceMergeComplete.set(true);

// Verify replica shard has pending merged segments
assertBusy(() -> { assertFalse(replicaShard.getPendingMergedSegmentCheckpoints().isEmpty()); }, 1, TimeUnit.MINUTES);
assertBusy(() -> { assertFalse(replicaShard.getReplicaMergedSegmentCheckpoints().isEmpty()); }, 1, TimeUnit.MINUTES);

waitForSegmentCount(INDEX_NAME, 1, logger);
primaryTransportService.clearAllRules();
Expand All @@ -245,20 +245,86 @@ public void testCleanupRedundantPendingMergeFile() throws Exception {
.setSettings(
Settings.builder()
.put(IndexSettings.INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(IndexSettings.INDEX_MERGED_SEGMENT_CHECKPOINT_RETENTION_TIME.getKey(), TimeValue.timeValueSeconds(1))
)
);

waitForSameFilesInPrimaryAndReplica(INDEX_NAME, primaryNode, replicaNode);
assertBusy(() -> assertTrue(replicaShard.getReplicaMergedSegmentCheckpoints().isEmpty()));
}

// Construct a case with redundant merged segment checkpoint in the primary shard and delete it based on the expiration time
public void testPrimaryMergedSegmentCheckpointRetentionTimeout() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
// close auto refresh
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put("index.refresh_interval", -1).build());
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

// generate segment _0.si
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get();
refresh(INDEX_NAME);
// generate segment _1.si
client().prepareIndex(INDEX_NAME).setId("2").setSource("bar", "baz").get();
refresh(INDEX_NAME);
// generate segment _2.si
client().prepareIndex(INDEX_NAME).setId("3").setSource("abc", "def").get();
refresh(INDEX_NAME);

// force merge 3 segments to 2 segments, generate segment _3.si
// specify parameter flush as false to prevent triggering the refresh operation
logger.info("force merge segments to 2");
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).flush(false).maxNumSegments(2)).get();
// since the refresh operation has not been performed, _3.si will remain in primaryMergedSegmentCheckpoints
waitForSameFilesInPrimaryAndReplica(INDEX_NAME, primaryNode, replicaNode);

// force merge 2 segments to 1 segment, generate segment _4.si
// use the default value (true) of parameter flush to trigger the refresh operation
logger.info("force merge segments to 1");
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get();
refresh(INDEX_NAME);
// since the refresh operation has been performed, _4.si will be removed from primaryMergedSegmentCheckpoints
waitForSegmentCount(INDEX_NAME, 1, logger);

// Verify that primary shard and replica shard have non-empty merged segment checkpoints
IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
assertBusy(() -> assertFalse(primaryShard.getPrimaryMergedSegmentCheckpoints().isEmpty()));

IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
assertBusy(() -> assertFalse(replicaShard.getReplicaMergedSegmentCheckpoints().isEmpty()));

// update the configuration to expire _3.si, and then remove it from primaryMergedSegmentCheckpoints
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(
Settings.builder()
.put(IndexSettings.INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(IndexSettings.INDEX_MERGED_SEGMENT_CHECKPOINT_RETENTION_TIME.getKey(), TimeValue.timeValueSeconds(1))
)
);

waitForSameFilesInPrimaryAndReplica(INDEX_NAME, primaryNode, replicaNode);

// Verify that primary shard and replica shard have empty merged segment checkpoints
assertBusy(() -> assertTrue(primaryShard.getPrimaryMergedSegmentCheckpoints().isEmpty()));
assertBusy(() -> assertTrue(replicaShard.getReplicaMergedSegmentCheckpoints().isEmpty()));
}

public void waitForSameFilesInPrimaryAndReplica(String indexName, String primaryNode, String replicaNode) throws Exception {
assertBusy(() -> {
IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
IndexShard primaryShard = getIndexShard(primaryNode, indexName);
Directory primaryDirectory = primaryShard.store().directory();
Set<String> primaryFiles = Sets.newHashSet(primaryDirectory.listAll());
primaryFiles.removeIf(f -> f.startsWith("segment"));
IndexShard replicaShard = getIndexShard(replicaNode, indexName);
Directory replicaDirectory = replicaShard.store().directory();
Set<String> replicaFiles = Sets.newHashSet(replicaDirectory.listAll());
replicaFiles.removeIf(f -> f.startsWith("segment"));
// Verify replica shard does not have pending merged segments
assertEquals(0, replicaShard.getPendingMergedSegmentCheckpoints().size());
// Verify that primary shard and replica shard have the same file list
logger.info("primary files: {}, replica files: {}", primaryFiles, replicaFiles);
assertEquals(primaryFiles, replicaFiles);
}, 1, TimeUnit.MINUTES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.MAX_NESTED_QUERY_DEPTH_SETTING,
IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING,
IndexSettings.INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING,
IndexSettings.INDEX_MERGED_SEGMENT_CHECKPOINT_RETENTION_TIME,
IndexSettings.DEFAULT_FIELD_SETTING,
IndexSettings.QUERY_STRING_LENIENT_SETTING,
IndexSettings.ALLOW_UNMAPPED,
Expand Down
25 changes: 25 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,17 @@ public static IndexMergePolicy fromString(String text) {
Property.IndexScope
);

/**
* Index setting describing the maximum retention time of merged segment checkpoint in the primary shard, used to prevent memory leak.
*/
public static final Setting<TimeValue> INDEX_MERGED_SEGMENT_CHECKPOINT_RETENTION_TIME = Setting.timeSetting(
"index.merged_segment_checkpoint.retention_time",
TimeValue.timeValueMinutes(15),
TimeValue.timeValueSeconds(0),
Property.Dynamic,
Property.IndexScope
);

public static final Setting<TimeValue> INDEX_SEARCH_IDLE_AFTER = Setting.timeSetting(
"index.search.idle.after",
TimeValue.timeValueSeconds(30),
Expand Down Expand Up @@ -838,6 +849,7 @@ public static IndexMergePolicy fromString(String text) {
private volatile Translog.Durability durability;
private volatile TimeValue syncInterval;
private volatile TimeValue publishReferencedSegmentsInterval;
private volatile TimeValue mergedSegmentCheckpointRetentionTime;
private volatile TimeValue refreshInterval;
private volatile ByteSizeValue flushThresholdSize;
private volatile TimeValue translogRetentionAge;
Expand Down Expand Up @@ -1056,6 +1068,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
defaultFields = scopedSettings.get(DEFAULT_FIELD_SETTING);
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
publishReferencedSegmentsInterval = INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.get(settings);
mergedSegmentCheckpointRetentionTime = INDEX_MERGED_SEGMENT_CHECKPOINT_RETENTION_TIME.get(settings);
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
Expand Down Expand Up @@ -1184,6 +1197,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING,
this::setPublishReferencedSegmentsInterval
);
scopedSettings.addSettingsUpdateConsumer(
INDEX_MERGED_SEGMENT_CHECKPOINT_RETENTION_TIME,
this::setMergedSegmentCheckpointRetentionTime
);
scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow);
scopedSettings.addSettingsUpdateConsumer(MAX_INNER_RESULT_WINDOW_SETTING, this::setMaxInnerResultWindow);
scopedSettings.addSettingsUpdateConsumer(MAX_ADJACENCY_MATRIX_FILTERS_SETTING, this::setMaxAdjacencyMatrixFilters);
Expand Down Expand Up @@ -1575,6 +1592,14 @@ public void setPublishReferencedSegmentsInterval(TimeValue publishReferencedSegm
this.publishReferencedSegmentsInterval = publishReferencedSegmentsInterval;
}

public TimeValue getMergedSegmentCheckpointRetentionTime() {
return mergedSegmentCheckpointRetentionTime;
}

public void setMergedSegmentCheckpointRetentionTime(TimeValue mergedSegmentCheckpointRetentionTime) {
this.mergedSegmentCheckpointRetentionTime = mergedSegmentCheckpointRetentionTime;
}

/**
* Returns the translog sync/upload buffer interval when remote translog store is enabled and index setting
* {@code index.translog.durability} is set as {@code request}.
Expand Down
82 changes: 64 additions & 18 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.metrics.MeanMetric;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateUtils;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.AbstractAsyncTask;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.AsyncIOProcessor;
Expand Down Expand Up @@ -223,6 +225,8 @@
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -388,7 +392,12 @@ Runnable getGlobalCheckpointSyncer() {
private final ClusterApplierService clusterApplierService;
private final MergedSegmentPublisher mergedSegmentPublisher;
private final ReferencedSegmentsPublisher referencedSegmentsPublisher;
private final Set<MergedSegmentCheckpoint> pendingMergedSegmentCheckpoints = Sets.newConcurrentHashSet();

// Primary Shard: track merged segment checkpoints that have been published for pre-warm but not yet refreshed.
private final Set<MergedSegmentCheckpoint> primaryMergedSegmentCheckpoints = Sets.newConcurrentHashSet();

// Replica Shard: record the pre-copied merged segment checkpoints, which are not yet refreshed.
private final Set<MergedSegmentCheckpoint> replicaMergedSegmentCheckpoints = Sets.newConcurrentHashSet();

@InternalApi
public IndexShard(
Expand Down Expand Up @@ -1791,7 +1800,7 @@ public void finalizeReplication(SegmentInfos infos) throws IOException {
segmentCommitInfoName
)
);
pendingMergedSegmentCheckpoints.removeIf(s -> s.getSegmentName().equals(segmentCommitInfoName));
replicaMergedSegmentCheckpoints.removeIf(s -> s.getSegmentName().equals(segmentCommitInfoName));
}
}
}
Expand Down Expand Up @@ -1836,7 +1845,7 @@ public void finalizeReplication(SegmentInfos infos) throws IOException {
*/
public void cleanupRedundantPendingMergeSegment(ReferencedSegmentsCheckpoint referencedSegmentsCheckpoint) {
List<MergedSegmentCheckpoint> pendingDeleteCheckpoints = new ArrayList<>();
for (MergedSegmentCheckpoint mergedSegmentCheckpoint : pendingMergedSegmentCheckpoints) {
for (MergedSegmentCheckpoint mergedSegmentCheckpoint : replicaMergedSegmentCheckpoints) {
if (false == referencedSegmentsCheckpoint.getSegmentNames().contains(mergedSegmentCheckpoint.getSegmentName())
&& referencedSegmentsCheckpoint.isAheadOf(mergedSegmentCheckpoint)) {
logger.trace(
Expand All @@ -1849,17 +1858,51 @@ public void cleanupRedundantPendingMergeSegment(ReferencedSegmentsCheckpoint ref
}
for (MergedSegmentCheckpoint mergedSegmentCheckpoint : pendingDeleteCheckpoints) {
store.deleteQuiet(mergedSegmentCheckpoint.getMetadataMap().keySet().toArray(new String[0]));
pendingMergedSegmentCheckpoints.remove(mergedSegmentCheckpoint);
replicaMergedSegmentCheckpoints.remove(mergedSegmentCheckpoint);
}
}

public void addPendingMergeSegmentCheckpoint(MergedSegmentCheckpoint mergedSegmentCheckpoint) {
pendingMergedSegmentCheckpoints.add(mergedSegmentCheckpoint);
// Remove expired primary merged segment checkpoints to prevent memory leaks
public void removeExpiredPrimaryMergedSegmentCheckpoints() {
Set<MergedSegmentCheckpoint> expiredMergedSegmentCheckpoints = primaryMergedSegmentCheckpoints.stream()
.filter(
m -> Duration.ofNanos(DateUtils.toLong(Instant.now()) - m.getCreatedTimeStamp()).toMillis() >= indexSettings
.getMergedSegmentCheckpointRetentionTime()
.millis()
)
.collect(Collectors.toSet());
expiredMergedSegmentCheckpoints.forEach(primaryMergedSegmentCheckpoints::remove);
logger.trace("primary shard remove expired merged segment checkpoints {}", expiredMergedSegmentCheckpoints);
}

// Remove the already refreshed segments from primary merged segment checkpoints.
public void updatePrimaryMergedSegmentCheckpoints(SegmentInfos segmentInfos) {
Set<String> segmentNames = segmentInfos.asList().stream().map(s -> s.info.name).collect(Collectors.toSet());
Set<MergedSegmentCheckpoint> refreshedMergedSegmentCheckpoints = primaryMergedSegmentCheckpoints.stream()
.filter(m -> segmentNames.contains(m.getSegmentName()))
.collect(Collectors.toSet());
refreshedMergedSegmentCheckpoints.forEach(primaryMergedSegmentCheckpoints::remove);
logger.trace("primary shard remove refreshed merged segment checkpoints {}", refreshedMergedSegmentCheckpoints);
}

public void addPrimaryMergedSegmentCheckpoint(MergedSegmentCheckpoint mergedSegmentCheckpoint) {
logger.trace("primary shard add merged segment checkpoint {}", mergedSegmentCheckpoint);
primaryMergedSegmentCheckpoints.add(mergedSegmentCheckpoint);
}

// for tests
public Set<MergedSegmentCheckpoint> getPrimaryMergedSegmentCheckpoints() {
return primaryMergedSegmentCheckpoints;
}

public void addReplicaMergedSegmentCheckpoint(MergedSegmentCheckpoint mergedSegmentCheckpoint) {
logger.trace("replica shard add merged segment checkpoint {}", mergedSegmentCheckpoint);
replicaMergedSegmentCheckpoints.add(mergedSegmentCheckpoint);
}

// for tests
public Set<MergedSegmentCheckpoint> getPendingMergedSegmentCheckpoints() {
return pendingMergedSegmentCheckpoints;
public Set<MergedSegmentCheckpoint> getReplicaMergedSegmentCheckpoints() {
return replicaMergedSegmentCheckpoints;
}

/**
Expand Down Expand Up @@ -1955,6 +1998,7 @@ ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) th
public void publishReferencedSegments() throws IOException {
assert referencedSegmentsPublisher != null;
referencedSegmentsPublisher.publish(this, computeReferencedSegmentsCheckpoint());
removeExpiredPrimaryMergedSegmentCheckpoints();
}

/**
Expand All @@ -1966,16 +2010,11 @@ public void publishReferencedSegments() throws IOException {
*/
public ReferencedSegmentsCheckpoint computeReferencedSegmentsCheckpoint() throws IOException {
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = getSegmentInfosSnapshot()) {
String[] allFiles = store.directory().listAll();
Set<String> segmentNames = Sets.newHashSet();
for (String file : allFiles) {
// filter segment files
if (false == file.endsWith(".si")) {
continue;
}
String segmentName = IndexFileNames.parseSegmentName(file);
segmentNames.add(segmentName);
}
segmentNames.addAll(segmentInfosGatedCloseable.get().asList().stream().map(s -> s.info.name).collect(Collectors.toSet()));
segmentNames.addAll(
primaryMergedSegmentCheckpoints.stream().map(MergedSegmentCheckpoint::getSegmentName).collect(Collectors.toSet())
);
return new ReferencedSegmentsCheckpoint(
shardId,
getOperationPrimaryTerm(),
Expand All @@ -1990,7 +2029,9 @@ public ReferencedSegmentsCheckpoint computeReferencedSegmentsCheckpoint() throws

public void publishMergedSegment(SegmentCommitInfo segmentCommitInfo) throws IOException {
assert mergedSegmentPublisher != null;
mergedSegmentPublisher.publish(this, computeMergeSegmentCheckpoint(segmentCommitInfo));
MergedSegmentCheckpoint mergedSegmentCheckpoint = computeMergeSegmentCheckpoint(segmentCommitInfo);
addPrimaryMergedSegmentCheckpoint(mergedSegmentCheckpoint);
mergedSegmentPublisher.publish(this, mergedSegmentCheckpoint);
}

/**
Expand Down Expand Up @@ -5125,6 +5166,11 @@ private void updateReplicationCheckpoint() {
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> tuple = getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> ignored = tuple.v1()) {
replicationTracker.setLatestReplicationCheckpoint(tuple.v2());
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING)) {
if (isPrimaryMode() && routingEntry().active()) {
updatePrimaryMergedSegmentCheckpoints(tuple.v1().get());
}
}
} catch (IOException e) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e);
}
Expand Down
Loading
Loading