From 0a5eb5de9047274a306002eaa14b52bd3d76247b Mon Sep 17 00:00:00 2001 From: tlrx Date: Wed, 2 Jul 2025 12:00:02 +0200 Subject: [PATCH 1/4] Relates ES-10929 --- .../indices/IndexingMemoryControllerIT.java | 3 +- .../common/lucene/FilterIndexCommit.java | 7 +++ .../index/engine/CombinedDeletionPolicy.java | 26 +++++----- .../ElasticsearchIndexDeletionPolicy.java | 49 +++++++++++++++++++ .../index/engine/EngineConfig.java | 17 ++++++- .../index/engine/InternalEngine.java | 43 ++++++++++------ .../elasticsearch/index/shard/IndexShard.java | 3 +- .../engine/CombinedDeletionPolicyTests.java | 20 ++++---- .../index/engine/InternalEngineTests.java | 6 ++- .../index/shard/IndexShardTests.java | 6 ++- .../index/shard/RefreshListenersTests.java | 4 +- .../index/engine/EngineTestCase.java | 15 ++++-- .../index/engine/FollowingEngineTests.java | 4 +- 13 files changed, 150 insertions(+), 53 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/ElasticsearchIndexDeletionPolicy.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index 520df8a8ebeca..c41b6926b3cae 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -89,7 +89,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + config.getIndexDeletionPolicyWrapper() ); } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/FilterIndexCommit.java b/server/src/main/java/org/elasticsearch/common/lucene/FilterIndexCommit.java index 684dab5a8243a..3a8bd32559c61 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/FilterIndexCommit.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/FilterIndexCommit.java @@ -71,4 +71,11 @@ public Map getUserData() throws IOException { public String toString() { return "FilterIndexCommit{" + "in=" + in + '}'; } + + public static IndexCommit unwrap(IndexCommit in) { + while (in instanceof FilterIndexCommit) { + in = ((FilterIndexCommit) in).getIndexCommit(); + } + return in; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 38caddd57f67a..088c07930e109 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -38,7 +38,7 @@ * In particular, this policy will delete index commits whose max sequence number is at most * the current global checkpoint except the index commit which has the highest max sequence number among those. */ -public class CombinedDeletionPolicy extends IndexDeletionPolicy { +public class CombinedDeletionPolicy extends ElasticsearchIndexDeletionPolicy { private final Logger logger; private final TranslogDeletionPolicy translogDeletionPolicy; private final SoftDeletesPolicy softDeletesPolicy; @@ -48,13 +48,6 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy { // when checking for externally acquired index commits that haven't been released private final Set internallyAcquiredIndexCommits; - interface CommitsListener { - - void onNewAcquiredCommit(IndexCommit commit, Set additionalFiles); - - void onDeletedCommit(IndexCommit commit); - } - @Nullable private final CommitsListener commitsListener; @@ -187,7 +180,7 @@ private void deleteCommit(IndexCommit commit) throws IOException { assert commit.isDeleted() == false : "Index commit [" + commitDescription(commit) + "] is deleted twice"; logger.debug("Delete index commit [{}]", commitDescription(commit)); commit.delete(); - assert commit.isDeleted() : "Deletion commit [" + commitDescription(commit) + "] was suppressed"; + // assert commit.isDeleted() : "Deletion commit [" + commitDescription(commit) + "] was suppressed"; } private void updateRetentionPolicy() throws IOException { @@ -204,7 +197,8 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException { return SegmentInfos.readCommit(indexCommit.getDirectory(), indexCommit.getSegmentsFileName()).totalMaxDoc(); } - SafeCommitInfo getSafeCommitInfo() { + @Override + public SafeCommitInfo getSafeCommitInfo() { return safeCommitInfo; } @@ -214,7 +208,8 @@ SafeCommitInfo getSafeCommitInfo() { * * @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point. */ - synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { + @Override + public synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { return acquireIndexCommit(acquiringSafeCommit, false); } @@ -241,7 +236,8 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredIntern * * @return true if the acquired commit can be clean up. */ - synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { + @Override + public synchronized boolean releaseIndexCommit(final IndexCommit acquiredCommit) { final SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit; final IndexCommit releasingCommit = snapshotIndexCommit.getIndexCommit(); assert acquiredIndexCommits.containsKey(releasingCommit) @@ -316,7 +312,8 @@ private static Set listOfNewFileNames(IndexCommit previous, IndexCommit /** * Checks whether the deletion policy is holding on to externally acquired index commits */ - synchronized boolean hasAcquiredIndexCommitsForTesting() { + @Override + public synchronized boolean hasAcquiredIndexCommitsForTesting() { // We explicitly check only external commits and disregard internal commits acquired by the commits listener for (var e : acquiredIndexCommits.entrySet()) { if (internallyAcquiredIndexCommits.contains(e.getKey()) == false || e.getValue() > 1) { @@ -329,7 +326,8 @@ synchronized boolean hasAcquiredIndexCommitsForTesting() { /** * Checks if the deletion policy can delete some index commits with the latest global checkpoint. */ - boolean hasUnreferencedCommits() { + @Override + public boolean hasUnreferencedCommits() { return maxSeqNoOfNextSafeCommit <= globalCheckpointSupplier.getAsLong(); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchIndexDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchIndexDeletionPolicy.java new file mode 100644 index 0000000000000..2fbfb2aa2128e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchIndexDeletionPolicy.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexDeletionPolicy; + +import java.util.Set; + +public abstract class ElasticsearchIndexDeletionPolicy extends IndexDeletionPolicy { + + /** + * Captures the most recent commit point or the most recent safe commit point. + * Index files of the capturing commit point won't be released until the commit reference is closed. + * + * @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point. + */ + public abstract IndexCommit acquireIndexCommit(boolean acquiringSafeCommit); + + /** + * Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}. + * + * @return true if the acquired commit can be clean up. + */ + public abstract boolean releaseIndexCommit(IndexCommit acquiredIndexCommit); + + /** + * @return information about the safe commit + */ + public abstract SafeCommitInfo getSafeCommitInfo(); + + public abstract boolean hasAcquiredIndexCommitsForTesting(); + + public abstract boolean hasUnreferencedCommits(); + + public interface CommitsListener { + + void onNewAcquiredCommit(IndexCommit commit, Set additionalFiles); + + void onDeletedCommit(IndexCommit commit); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index cdb8a39d4713b..f91deeccaea0d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -41,6 +41,7 @@ import java.util.Comparator; import java.util.List; import java.util.Objects; +import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -151,6 +152,11 @@ public Supplier retentionLeasesSupplier() { private final MergeMetrics mergeMetrics; + /** + * Allows to pass an {@link ElasticsearchIndexDeletionPolicy} wrapper to egine implementations. + */ + private final Function indexDeletionPolicyWrapper; + /** * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} */ @@ -184,7 +190,8 @@ public EngineConfig( boolean promotableToPrimary, MapperService mapperService, EngineResetLock engineResetLock, - MergeMetrics mergeMetrics + MergeMetrics mergeMetrics, + Function indexDeletionPolicyWrapper ) { this.shardId = shardId; this.indexSettings = indexSettings; @@ -233,6 +240,7 @@ public EngineConfig( this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true); this.engineResetLock = engineResetLock; this.mergeMetrics = mergeMetrics; + this.indexDeletionPolicyWrapper = indexDeletionPolicyWrapper; } /** @@ -485,4 +493,11 @@ public EngineResetLock getEngineResetLock() { public MergeMetrics getMergeMetrics() { return mergeMetrics; } + + /** + * @return an {@link ElasticsearchIndexDeletionPolicy} wrapper, to be use by engine implementations. + */ + public Function getIndexDeletionPolicyWrapper() { + return indexDeletionPolicyWrapper; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 170ccc2cc7bbb..36488b99ade73 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -168,7 +168,7 @@ public class InternalEngine extends Engine { private final LocalCheckpointTracker localCheckpointTracker; - private final CombinedDeletionPolicy combinedDeletionPolicy; + private final ElasticsearchIndexDeletionPolicy indexDeletionPolicy; // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttle @@ -275,13 +275,7 @@ public InternalEngine(EngineConfig engineConfig) { this.totalDiskSpace = ByteSizeValue.of(Environment.getFileStore(translog.location()).getTotalSpace(), ByteSizeUnit.BYTES); this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); this.softDeletesPolicy = newSoftDeletesPolicy(); - this.combinedDeletionPolicy = new CombinedDeletionPolicy( - logger, - translogDeletionPolicy, - softDeletesPolicy, - translog::getLastSyncedGlobalCheckpoint, - newCommitsListener() - ); + this.indexDeletionPolicy = newIndexDeletionPolicy(engineConfig, logger, translog, softDeletesPolicy); this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); writer = createWriter(); bootstrapAppendOnlyInfoFromWriter(writer); @@ -389,6 +383,25 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException { ); } + protected ElasticsearchIndexDeletionPolicy newIndexDeletionPolicy( + EngineConfig engineConfig, + Logger logger, + Translog translog, + SoftDeletesPolicy softDeletesPolicy + ) { + var wrapper = engineConfig.getIndexDeletionPolicyWrapper(); + assert wrapper != null : "no index deletion policy wrapper for " + engineConfig.getShardId(); + return wrapper.apply( + new CombinedDeletionPolicy( + logger, + translog.getDeletionPolicy(), + softDeletesPolicy, + translog::getLastSyncedGlobalCheckpoint, + newCommitsListener() + ) + ); + } + @Nullable private CombinedDeletionPolicy.CommitsListener newCommitsListener() { IndexCommitListener listener = engineConfig.getIndexCommitListener(); @@ -680,7 +693,7 @@ Translog getTranslog() { // Package private for testing purposes only boolean hasAcquiredIndexCommitsForTesting() { - return combinedDeletionPolicy.hasAcquiredIndexCommitsForTesting(); + return indexDeletionPolicy.hasAcquiredIndexCommitsForTesting(); } @Override @@ -746,7 +759,7 @@ public Translog.Location getTranslogLastWriteLocation() { } private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException { - if (combinedDeletionPolicy.hasUnreferencedCommits()) { + if (indexDeletionPolicy.hasUnreferencedCommits()) { indexWriter.deleteUnusedFiles(); } translog.trimUnreferencedReaders(); @@ -2545,17 +2558,17 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En future.actionGet(); logger.trace("finish flush for snapshot"); } - return acquireIndexCommitRef(() -> combinedDeletionPolicy.acquireIndexCommit(false)); + return acquireIndexCommitRef(() -> indexDeletionPolicy.acquireIndexCommit(false)); } @Override public IndexCommitRef acquireSafeIndexCommit() throws EngineException { - return acquireIndexCommitRef(() -> combinedDeletionPolicy.acquireIndexCommit(true)); + return acquireIndexCommitRef(() -> indexDeletionPolicy.acquireIndexCommit(true)); } private void releaseIndexCommit(IndexCommit snapshot) throws IOException { // Revisit the deletion policy if we can clean up the snapshotting commit. - if (combinedDeletionPolicy.releaseCommit(snapshot)) { + if (indexDeletionPolicy.releaseIndexCommit(snapshot)) { try { // Here we don't have to trim translog because snapshotting an index commit // does not lock translog or prevents unreferenced files from trimming. @@ -2568,7 +2581,7 @@ private void releaseIndexCommit(IndexCommit snapshot) throws IOException { @Override public SafeCommitInfo getSafeCommitInfo() { - return combinedDeletionPolicy.getSafeCommitInfo(); + return indexDeletionPolicy.getSafeCommitInfo(); } private boolean failOnTragicEvent(AlreadyClosedException ex) { @@ -2746,7 +2759,7 @@ private IndexWriterConfig getIndexWriterConfig() { final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer()); iwc.setCommitOnClose(false); // we by default don't commit on close iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); - iwc.setIndexDeletionPolicy(combinedDeletionPolicy); + iwc.setIndexDeletionPolicy(indexDeletionPolicy); iwc.setInfoStream(TESTS_VERBOSE ? InfoStream.getDefault() : new LoggerInfoStream(logger)); iwc.setMergeScheduler(mergeScheduler.getMergeScheduler()); // Give us the opportunity to upgrade old segments while performing diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 76ecd8141f79d..93c8728a106b3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3764,7 +3764,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { routingEntry().isPromotableToPrimary(), mapperService(), engineResetLock, - mergeMetrics + mergeMetrics, + Function.identity() ); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 699a34b312592..535a2dc100955 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -193,7 +193,7 @@ public void testAcquireIndexCommit() throws Exception { final IndexCommit lastCommit = commitList.get(commitList.size() - 1); safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get()); assertThat( - indexPolicy.releaseCommit(snapshot), + indexPolicy.releaseIndexCommit(snapshot), equalTo(pendingSnapshots == 0 && snapshot.equals(lastCommit) == false && snapshot.equals(safeCommit) == false) ); } @@ -211,7 +211,7 @@ public void testAcquireIndexCommit() throws Exception { ) ); } - snapshottingCommits.forEach(indexPolicy::releaseCommit); + snapshottingCommits.forEach(indexPolicy::releaseIndexCommit); globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE)); commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList); @@ -350,8 +350,8 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) { } @Override - synchronized boolean releaseCommit(IndexCommit acquiredCommit) { - return super.releaseCommit(wrapCommit(acquiredCommit)); + public synchronized boolean releaseIndexCommit(IndexCommit acquiredCommit) { + return super.releaseIndexCommit(wrapCommit(acquiredCommit)); } }; @@ -383,7 +383,7 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) { assertThat(deletedCommits, hasSize(0)); assertThat(newCommitFiles, containsInAnyOrder(equalTo("_1.cfe"), equalTo("_1.si"), equalTo("_1.cfs"), equalTo("segments_2"))); - boolean maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit0); + boolean maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit0); assertThat(maybeCleanUpCommits, equalTo(true)); globalCheckpoint.set(20L); @@ -454,7 +454,7 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) { ) ); - maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit2); + maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit2); assertThat("No commits to clean up (commit #2 is the safe commit)", maybeCleanUpCommits, equalTo(false)); globalCheckpoint.set(30L); @@ -499,13 +499,13 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) { assertThat(deletedCommits, contains(commit0, commit2)); assertThat(newCommitFiles, containsInAnyOrder(equalTo("_4.cfe"), equalTo("_4.si"), equalTo("_4.cfs"), equalTo("segments_4"))); - maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit3); + maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit3); assertThat("No commits to clean up (commit #3 is the safe commit)", maybeCleanUpCommits, equalTo(false)); - maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit4); + maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit4); assertThat("No commits to clean up (commit #4 is the last commit)", maybeCleanUpCommits, equalTo(false)); - maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit1); + maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit1); assertThat(maybeCleanUpCommits, equalTo(true)); final boolean globalCheckpointCatchUp = randomBoolean(); @@ -560,7 +560,7 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) { } assertThat(newCommitFiles, containsInAnyOrder(equalTo("_5.cfe"), equalTo("_5.si"), equalTo("_5.cfs"), equalTo("segments_5"))); - maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit5); + maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit5); assertThat("No commits to clean up (commit #5 is the last commit)", maybeCleanUpCommits, equalTo(false)); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 680f6fca9652e..b012668064fec 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3636,7 +3636,8 @@ public void testRecoverFromForeignTranslog() throws IOException { true, config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + Function.identity() ); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); @@ -7245,7 +7246,8 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + config.getIndexDeletionPolicyWrapper() ); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index cc682901876b6..2f3765473cbb7 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -5065,7 +5065,8 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + Function.identity() ); return new InternalEngine(configWithWarmer); }); @@ -5348,7 +5349,8 @@ public void afterRefresh(boolean didRefresh) throws IOException {} config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + Function.identity() ); lazyEngineConfig.set(engineConfigWithBlockingRefreshListener); return new InternalEngine(engineConfigWithBlockingRefreshListener) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 8f0604956a98b..bf6a4f4ec2d56 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -80,6 +80,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BooleanSupplier; import java.util.function.Consumer; +import java.util.function.Function; import static org.elasticsearch.core.TimeValue.timeValueMillis; import static org.hamcrest.Matchers.arrayContaining; @@ -177,7 +178,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { true, EngineTestCase.createMapperService(), new EngineResetLock(), - MergeMetrics.NOOP + MergeMetrics.NOOP, + Function.identity() ); engine = new InternalEngine(config); EngineTestCase.recoverFromTranslog(engine, (e, s) -> 0, Long.MAX_VALUE); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 2327ac06b9e81..b52a31689f838 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -316,7 +316,8 @@ public static EngineConfig copy(EngineConfig config, LongSupplier globalCheckpoi config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + config.getIndexDeletionPolicyWrapper() ); } @@ -351,7 +352,8 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + config.getIndexDeletionPolicyWrapper() ); } @@ -386,7 +388,8 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + config.getIndexDeletionPolicyWrapper() ); } @@ -893,7 +896,8 @@ public EngineConfig config( true, mapperService, new EngineResetLock(), - mergeMetrics + mergeMetrics, + Function.identity() ); } @@ -936,7 +940,8 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + config.getIndexDeletionPolicyWrapper() ); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 54bdbd0c0e91c..93b3a00c1019a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -75,6 +75,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.index.engine.EngineTestCase.getDocIds; @@ -286,7 +287,8 @@ public void onFailedEngine(String reason, Exception e) { true, mapperService, new EngineResetLock(), - MergeMetrics.NOOP + MergeMetrics.NOOP, + Function.identity() ); } From 5c4ba33ebb24b653cdc2631504af4d729b578e3b Mon Sep 17 00:00:00 2001 From: tlrx Date: Wed, 2 Jul 2025 17:39:48 +0200 Subject: [PATCH 2/4] fix tests --- .../index/engine/InternalEngineTests.java | 3 ++- .../elasticsearch/index/engine/EngineTestCase.java | 11 +++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index b012668064fec..4e6b8ee15d58b 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -7527,7 +7527,8 @@ public void onIndexCommitDelete(ShardId shardId, IndexCommit deletedCommit) { globalCheckpoint::get, () -> RetentionLeases.EMPTY, new NoneCircuitBreakerService(), - indexCommitListener + indexCommitListener, + Function.identity() ) ) ) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index b52a31689f838..5ac2df1c7a998 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -791,7 +791,8 @@ public EngineConfig config( globalCheckpointSupplier, retentionLeasesSupplier, new NoneCircuitBreakerService(), - null + null, + Function.identity() ); } @@ -817,7 +818,8 @@ public EngineConfig config( maybeGlobalCheckpointSupplier, maybeGlobalCheckpointSupplier == null ? null : () -> RetentionLeases.EMPTY, breakerService, - null + null, + Function.identity() ); } @@ -832,7 +834,8 @@ public EngineConfig config( final @Nullable LongSupplier maybeGlobalCheckpointSupplier, final @Nullable Supplier maybeRetentionLeasesSupplier, final CircuitBreakerService breakerService, - final @Nullable Engine.IndexCommitListener indexCommitListener + final @Nullable Engine.IndexCommitListener indexCommitListener, + final @Nullable Function indexDeletionPolicyWrapper ) { final IndexWriterConfig iwc = newIndexWriterConfig(); final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); @@ -897,7 +900,7 @@ public EngineConfig config( mapperService, new EngineResetLock(), mergeMetrics, - Function.identity() + indexDeletionPolicyWrapper == null ? Function.identity() : indexDeletionPolicyWrapper ); } From 9fbb72c690d80e8c39142fdbf068533dce79d0ea Mon Sep 17 00:00:00 2001 From: tlrx Date: Fri, 4 Jul 2025 13:15:48 +0200 Subject: [PATCH 3/4] feedback --- .../index/engine/ElasticsearchIndexDeletionPolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchIndexDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchIndexDeletionPolicy.java index 2fbfb2aa2128e..0294dbc519781 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchIndexDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchIndexDeletionPolicy.java @@ -25,7 +25,7 @@ public abstract class ElasticsearchIndexDeletionPolicy extends IndexDeletionPoli public abstract IndexCommit acquireIndexCommit(boolean acquiringSafeCommit); /** - * Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}. + * Releases an index commit that was acquired by {@link #acquireIndexCommit(boolean)}. * * @return true if the acquired commit can be clean up. */ From 0599d39e6fbf16e8554d72757189b7556374945f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Mon, 21 Jul 2025 15:46:07 +0200 Subject: [PATCH 4/4] Remove unused assertion --- .../org/elasticsearch/index/engine/CombinedDeletionPolicy.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 088c07930e109..d6ea943cc050b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -180,7 +180,6 @@ private void deleteCommit(IndexCommit commit) throws IOException { assert commit.isDeleted() == false : "Index commit [" + commitDescription(commit) + "] is deleted twice"; logger.debug("Delete index commit [{}]", commitDescription(commit)); commit.delete(); - // assert commit.isDeleted() : "Deletion commit [" + commitDescription(commit) + "] was suppressed"; } private void updateRetentionPolicy() throws IOException {