Skip to content

[merged segment warmer] support cleanup redundant pending merged segments #18720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher;
import org.opensearch.indices.replication.checkpoint.ReferencedSegmentsPublisher;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -731,7 +732,8 @@ public static final IndexShard newIndexShard(
() -> indexService.getIndexSettings().getRefreshInterval(),
indexService.getRefreshMutex(),
clusterService.getClusterApplierService(),
MergedSegmentPublisher.EMPTY
MergedSegmentPublisher.EMPTY,
ReferencedSegmentsPublisher.EMPTY
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,35 @@
package org.opensearch.indices.replication;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.Directory;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.set.Sets;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.TieredMergePolicyProvider;
import org.opensearch.index.engine.Segment;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.TransportService;

import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* This class runs Segment Replication Integ test suite with merged segment warmer enabled.
*/
Expand Down Expand Up @@ -175,6 +182,87 @@ public void testMergeSegmentWarmerWithInactiveReplica() throws Exception {
assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size());
}

// Construct a case with redundant pending merge segments in replica shard, and finally delete these files
public void testCleanupRedundantPendingMergeFile() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), 5)
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 5)
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false)
.build()
);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

AtomicBoolean forceMergeComplete = new AtomicBoolean(false);
MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));

primaryTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
if (forceMergeComplete.get() == false) {
logger.trace("mock connection exception");
throw new ConnectTransportException(connection.getNode(), "mock connection exception");
}

}
connection.sendRequest(requestId, action, request, options);
}
);

for (int i = 0; i < 30; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo" + i, "bar" + i)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
}

IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
assertBusy(() -> assertFalse(replicaShard.getPendingMergedSegmentCheckpoints().isEmpty()));

client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get();
forceMergeComplete.set(true);

// Verify replica shard has pending merged segments
assertBusy(() -> { assertFalse(replicaShard.getPendingMergedSegmentCheckpoints().isEmpty()); }, 1, TimeUnit.MINUTES);

waitForSegmentCount(INDEX_NAME, 1, logger);
primaryTransportService.clearAllRules();

assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(
Settings.builder()
.put(IndexSettings.INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
)
);

assertBusy(() -> {
IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
Directory primaryDirectory = primaryShard.store().directory();
Set<String> primaryFiles = Sets.newHashSet(primaryDirectory.listAll());
primaryFiles.removeIf(f -> f.startsWith("segment"));
Directory replicaDirectory = replicaShard.store().directory();
Set<String> replicaFiles = Sets.newHashSet(replicaDirectory.listAll());
replicaFiles.removeIf(f -> f.startsWith("segment"));
// Verify replica shard does not have pending merged segments
assertEquals(0, replicaShard.getPendingMergedSegmentCheckpoints().size());
// Verify that primary shard and replica shard have the same file list
assertEquals(primaryFiles, replicaFiles);
}, 1, TimeUnit.MINUTES);
}

public static void waitForSegmentCount(String indexName, int segmentCount, Logger logger) throws Exception {
assertBusy(() -> {
Set<String> primarySegments = Sets.newHashSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.MAX_TERMS_COUNT_SETTING,
IndexSettings.MAX_NESTED_QUERY_DEPTH_SETTING,
IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING,
IndexSettings.INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING,
IndexSettings.DEFAULT_FIELD_SETTING,
IndexSettings.QUERY_STRING_LENIENT_SETTING,
IndexSettings.ALLOW_UNMAPPED,
Expand Down
77 changes: 75 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher;
import org.opensearch.indices.replication.checkpoint.ReferencedSegmentsPublisher;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.plugins.IndexStorePlugin;
Expand Down Expand Up @@ -178,6 +179,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask;
private volatile AsyncReplicationTask asyncReplicationTask;
private volatile AsyncPublishReferencedSegmentsTask asyncPublishReferencedSegmentsTask;

// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
Expand Down Expand Up @@ -330,6 +332,9 @@ public IndexService(
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
}
this.asyncReplicationTask = new AsyncReplicationTask(this);
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING)) {
this.asyncPublishReferencedSegmentsTask = new AsyncPublishReferencedSegmentsTask(this);
}
this.translogFactorySupplier = translogFactorySupplier;
this.recoverySettings = recoverySettings;
this.remoteStoreSettings = remoteStoreSettings;
Expand Down Expand Up @@ -443,6 +448,11 @@ AsyncReplicationTask getReplicationTask() {
return asyncReplicationTask;
}

// visible for tests
AsyncPublishReferencedSegmentsTask getPublishReferencedSegmentsTask() {
return asyncPublishReferencedSegmentsTask;
}

/**
* Context for index creation
*
Expand Down Expand Up @@ -614,6 +624,7 @@ public synchronized IndexShard createShard(
sourceNode,
discoveryNodes,
mergedSegmentWarmerFactory,
null,
null
);
}
Expand All @@ -629,7 +640,8 @@ public synchronized IndexShard createShard(
@Nullable DiscoveryNode sourceNode,
DiscoveryNodes discoveryNodes,
MergedSegmentWarmerFactory mergedSegmentWarmerFactory,
MergedSegmentPublisher mergedSegmentPublisher
MergedSegmentPublisher mergedSegmentPublisher,
ReferencedSegmentsPublisher referencedSegmentsPublisher
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -762,7 +774,8 @@ protected void closeInternal() {
this::getRefreshInterval,
refreshMutex,
clusterService.getClusterApplierService(),
this.indexSettings.isSegRepEnabledOrRemoteNode() ? mergedSegmentPublisher : null
this.indexSettings.isSegRepEnabledOrRemoteNode() ? mergedSegmentPublisher : null,
this.indexSettings.isSegRepEnabledOrRemoteNode() ? referencedSegmentsPublisher : null
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down Expand Up @@ -1169,6 +1182,9 @@ public synchronized void updateMetadata(final IndexMetadata currentIndexMetadata
onRefreshIntervalChange();
updateFsyncTaskIfNecessary();
updateReplicationTask();
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING)) {
updatePublishReferencedSegmentsTask();
}
}

metadataListeners.forEach(c -> c.accept(newIndexMetadata));
Expand All @@ -1182,6 +1198,17 @@ private void updateReplicationTask() {
}
}

private void updatePublishReferencedSegmentsTask() {
if (getIndexSettings().getPublishReferencedSegmentsInterval().equals(asyncPublishReferencedSegmentsTask.getInterval())) {
return;
}
try {
asyncPublishReferencedSegmentsTask.close();
} finally {
asyncPublishReferencedSegmentsTask = new AsyncPublishReferencedSegmentsTask(this);
}
}

/**
* Called whenever the cluster level {@code cluster.default.index.max_merge_at_once} changes.
*/
Expand Down Expand Up @@ -1509,6 +1536,52 @@ private void maybeSyncSegments(boolean force) {
}
}

/**
* Publish primary shard referenced segments in a defined interval.
*
* @opensearch.internal
*/
final class AsyncPublishReferencedSegmentsTask extends BaseAsyncTask {

AsyncPublishReferencedSegmentsTask(IndexService indexService) {
super(indexService, indexService.getIndexSettings().getPublishReferencedSegmentsInterval());
}

@Override
protected void runInternal() {
indexService.maybePublishReferencedSegments();
}

@Override
protected String getThreadPool() {
return ThreadPool.Names.GENERIC;
}

@Override
public String toString() {
return "publish_primary_referenced_segments";
}

@Override
protected boolean mustReschedule() {
return indexSettings.isSegRepEnabledOrRemoteNode() && super.mustReschedule();
}
}

private void maybePublishReferencedSegments() {
for (IndexShard shard : this.shards.values()) {
try {
// Only the primary shard publish referenced segments.
// The replicas cleans up the redundant pending merge segments according to the primary shard request.
if (shard.isPrimaryMode() && shard.routingEntry().active()) {
shard.publishReferencedSegments();
}
} catch (IOException ex) {
logger.warn(() -> new ParameterizedMessage("failed to publish primary referenced segments"), ex);
}
}
}

final class AsyncTrimTranslogTask extends BaseAsyncTask {

AsyncTrimTranslogTask(IndexService indexService) {
Expand Down
23 changes: 23 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ public static IndexMergePolicy fromString(String text) {
Property.Dynamic,
Property.IndexScope
);

public static final Setting<TimeValue> INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING = Setting.timeSetting(
"index.segment_replication.publish_referenced_segments_interval",
TimeValue.timeValueMinutes(10),
TimeValue.timeValueSeconds(1),
Property.Dynamic,
Property.IndexScope
);

public static final Setting<TimeValue> INDEX_SEARCH_IDLE_AFTER = Setting.timeSetting(
"index.search.idle.after",
TimeValue.timeValueSeconds(30),
Expand Down Expand Up @@ -817,6 +826,7 @@ public static IndexMergePolicy fromString(String text) {
private final boolean defaultAllowUnmappedFields;
private volatile Translog.Durability durability;
private volatile TimeValue syncInterval;
private volatile TimeValue publishReferencedSegmentsInterval;
private volatile TimeValue refreshInterval;
private volatile ByteSizeValue flushThresholdSize;
private volatile TimeValue translogRetentionAge;
Expand Down Expand Up @@ -1036,6 +1046,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this.durability = scopedSettings.get(INDEX_TRANSLOG_DURABILITY_SETTING);
defaultFields = scopedSettings.get(DEFAULT_FIELD_SETTING);
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
publishReferencedSegmentsInterval = INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.get(settings);
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
Expand Down Expand Up @@ -1157,6 +1168,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval);
scopedSettings.addSettingsUpdateConsumer(
INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING,
this::setPublishReferencedSegmentsInterval
);
scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow);
scopedSettings.addSettingsUpdateConsumer(MAX_INNER_RESULT_WINDOW_SETTING, this::setMaxInnerResultWindow);
scopedSettings.addSettingsUpdateConsumer(MAX_ADJACENCY_MATRIX_FILTERS_SETTING, this::setMaxAdjacencyMatrixFilters);
Expand Down Expand Up @@ -1537,6 +1552,14 @@ public void setTranslogSyncInterval(TimeValue translogSyncInterval) {
this.syncInterval = translogSyncInterval;
}

public TimeValue getPublishReferencedSegmentsInterval() {
return publishReferencedSegmentsInterval;
}

public void setPublishReferencedSegmentsInterval(TimeValue publishReferencedSegmentsInterval) {
this.publishReferencedSegmentsInterval = publishReferencedSegmentsInterval;
}

/**
* Returns the translog sync/upload buffer interval when remote translog store is enabled and index setting
* {@code index.translog.durability} is set as {@code request}.
Expand Down
Loading
Loading