Skip to content

Commit c0facac

Browse files
tlrxfcofdez
andauthored
Add ElasticsearchIndexDeletionPolicy and EngineConfig policy wrapper (#130442)
This change adds a new abstract class ElasticsearchIndexDeletionPolicy that allows using different policy implementations for InternalEngine. It also adds an index deletion policy wrapper to the EngineConfig to allow wrapping the default CombinedDeletionPolicy by inheritors of InternalEngine. Relates ES-10929 --------- Co-authored-by: Francisco Fernández Castaño <[email protected]>
1 parent 7485750 commit c0facac

File tree

13 files changed

+157
-57
lines changed

13 files changed

+157
-57
lines changed

server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
8989
config.isPromotableToPrimary(),
9090
config.getMapperService(),
9191
config.getEngineResetLock(),
92-
config.getMergeMetrics()
92+
config.getMergeMetrics(),
93+
config.getIndexDeletionPolicyWrapper()
9394
);
9495
}
9596

server/src/main/java/org/elasticsearch/common/lucene/FilterIndexCommit.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,11 @@ public Map<String, String> getUserData() throws IOException {
7171
public String toString() {
7272
return "FilterIndexCommit{" + "in=" + in + '}';
7373
}
74+
75+
public static IndexCommit unwrap(IndexCommit in) {
76+
while (in instanceof FilterIndexCommit) {
77+
in = ((FilterIndexCommit) in).getIndexCommit();
78+
}
79+
return in;
80+
}
7481
}

server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
* In particular, this policy will delete index commits whose max sequence number is at most
3939
* the current global checkpoint except the index commit which has the highest max sequence number among those.
4040
*/
41-
public class CombinedDeletionPolicy extends IndexDeletionPolicy {
41+
public class CombinedDeletionPolicy extends ElasticsearchIndexDeletionPolicy {
4242
private final Logger logger;
4343
private final TranslogDeletionPolicy translogDeletionPolicy;
4444
private final SoftDeletesPolicy softDeletesPolicy;
@@ -48,13 +48,6 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
4848
// when checking for externally acquired index commits that haven't been released
4949
private final Set<IndexCommit> internallyAcquiredIndexCommits;
5050

51-
interface CommitsListener {
52-
53-
void onNewAcquiredCommit(IndexCommit commit, Set<String> additionalFiles);
54-
55-
void onDeletedCommit(IndexCommit commit);
56-
}
57-
5851
@Nullable
5952
private final CommitsListener commitsListener;
6053

@@ -187,7 +180,6 @@ private void deleteCommit(IndexCommit commit) throws IOException {
187180
assert commit.isDeleted() == false : "Index commit [" + commitDescription(commit) + "] is deleted twice";
188181
logger.debug("Delete index commit [{}]", commitDescription(commit));
189182
commit.delete();
190-
assert commit.isDeleted() : "Deletion commit [" + commitDescription(commit) + "] was suppressed";
191183
}
192184

193185
private void updateRetentionPolicy() throws IOException {
@@ -204,7 +196,8 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException {
204196
return SegmentInfos.readCommit(indexCommit.getDirectory(), indexCommit.getSegmentsFileName()).totalMaxDoc();
205197
}
206198

207-
SafeCommitInfo getSafeCommitInfo() {
199+
@Override
200+
public SafeCommitInfo getSafeCommitInfo() {
208201
return safeCommitInfo;
209202
}
210203

@@ -214,7 +207,8 @@ SafeCommitInfo getSafeCommitInfo() {
214207
*
215208
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
216209
*/
217-
synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
210+
@Override
211+
public synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
218212
return acquireIndexCommit(acquiringSafeCommit, false);
219213
}
220214

@@ -241,7 +235,8 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredIntern
241235
*
242236
* @return true if the acquired commit can be clean up.
243237
*/
244-
synchronized boolean releaseCommit(final IndexCommit acquiredCommit) {
238+
@Override
239+
public synchronized boolean releaseIndexCommit(final IndexCommit acquiredCommit) {
245240
final SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit;
246241
final IndexCommit releasingCommit = snapshotIndexCommit.getIndexCommit();
247242
assert acquiredIndexCommits.containsKey(releasingCommit)
@@ -316,7 +311,8 @@ private static Set<String> listOfNewFileNames(IndexCommit previous, IndexCommit
316311
/**
317312
* Checks whether the deletion policy is holding on to externally acquired index commits
318313
*/
319-
synchronized boolean hasAcquiredIndexCommitsForTesting() {
314+
@Override
315+
public synchronized boolean hasAcquiredIndexCommitsForTesting() {
320316
// We explicitly check only external commits and disregard internal commits acquired by the commits listener
321317
for (var e : acquiredIndexCommits.entrySet()) {
322318
if (internallyAcquiredIndexCommits.contains(e.getKey()) == false || e.getValue() > 1) {
@@ -329,7 +325,8 @@ synchronized boolean hasAcquiredIndexCommitsForTesting() {
329325
/**
330326
* Checks if the deletion policy can delete some index commits with the latest global checkpoint.
331327
*/
332-
boolean hasUnreferencedCommits() {
328+
@Override
329+
public boolean hasUnreferencedCommits() {
333330
return maxSeqNoOfNextSafeCommit <= globalCheckpointSupplier.getAsLong();
334331
}
335332

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.index.IndexCommit;
13+
import org.apache.lucene.index.IndexDeletionPolicy;
14+
15+
import java.util.Set;
16+
17+
public abstract class ElasticsearchIndexDeletionPolicy extends IndexDeletionPolicy {
18+
19+
/**
20+
* Captures the most recent commit point or the most recent safe commit point.
21+
* Index files of the capturing commit point won't be released until the commit reference is closed.
22+
*
23+
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
24+
*/
25+
public abstract IndexCommit acquireIndexCommit(boolean acquiringSafeCommit);
26+
27+
/**
28+
* Releases an index commit that was acquired by {@link #acquireIndexCommit(boolean)}.
29+
*
30+
* @return true if the acquired commit can be clean up.
31+
*/
32+
public abstract boolean releaseIndexCommit(IndexCommit acquiredIndexCommit);
33+
34+
/**
35+
* @return information about the safe commit
36+
*/
37+
public abstract SafeCommitInfo getSafeCommitInfo();
38+
39+
public abstract boolean hasAcquiredIndexCommitsForTesting();
40+
41+
public abstract boolean hasUnreferencedCommits();
42+
43+
public interface CommitsListener {
44+
45+
void onNewAcquiredCommit(IndexCommit commit, Set<String> additionalFiles);
46+
47+
void onDeletedCommit(IndexCommit commit);
48+
}
49+
}

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Comparator;
4242
import java.util.List;
4343
import java.util.Objects;
44+
import java.util.function.Function;
4445
import java.util.function.LongSupplier;
4546
import java.util.function.Supplier;
4647

@@ -151,6 +152,11 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
151152

152153
private final MergeMetrics mergeMetrics;
153154

155+
/**
156+
* Allows to pass an {@link ElasticsearchIndexDeletionPolicy} wrapper to egine implementations.
157+
*/
158+
private final Function<ElasticsearchIndexDeletionPolicy, ElasticsearchIndexDeletionPolicy> indexDeletionPolicyWrapper;
159+
154160
/**
155161
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
156162
*/
@@ -184,7 +190,8 @@ public EngineConfig(
184190
boolean promotableToPrimary,
185191
MapperService mapperService,
186192
EngineResetLock engineResetLock,
187-
MergeMetrics mergeMetrics
193+
MergeMetrics mergeMetrics,
194+
Function<ElasticsearchIndexDeletionPolicy, ElasticsearchIndexDeletionPolicy> indexDeletionPolicyWrapper
188195
) {
189196
this.shardId = shardId;
190197
this.indexSettings = indexSettings;
@@ -233,6 +240,7 @@ public EngineConfig(
233240
this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true);
234241
this.engineResetLock = engineResetLock;
235242
this.mergeMetrics = mergeMetrics;
243+
this.indexDeletionPolicyWrapper = indexDeletionPolicyWrapper;
236244
}
237245

238246
/**
@@ -485,4 +493,11 @@ public EngineResetLock getEngineResetLock() {
485493
public MergeMetrics getMergeMetrics() {
486494
return mergeMetrics;
487495
}
496+
497+
/**
498+
* @return an {@link ElasticsearchIndexDeletionPolicy} wrapper, to be use by engine implementations.
499+
*/
500+
public Function<ElasticsearchIndexDeletionPolicy, ElasticsearchIndexDeletionPolicy> getIndexDeletionPolicyWrapper() {
501+
return indexDeletionPolicyWrapper;
502+
}
488503
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public class InternalEngine extends Engine {
170170

171171
private final LocalCheckpointTracker localCheckpointTracker;
172172

173-
private final CombinedDeletionPolicy combinedDeletionPolicy;
173+
private final ElasticsearchIndexDeletionPolicy indexDeletionPolicy;
174174

175175
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
176176
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttle
@@ -277,13 +277,7 @@ public InternalEngine(EngineConfig engineConfig) {
277277
this.totalDiskSpace = ByteSizeValue.of(Environment.getFileStore(translog.location()).getTotalSpace(), ByteSizeUnit.BYTES);
278278
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
279279
this.softDeletesPolicy = newSoftDeletesPolicy();
280-
this.combinedDeletionPolicy = new CombinedDeletionPolicy(
281-
logger,
282-
translogDeletionPolicy,
283-
softDeletesPolicy,
284-
translog::getLastSyncedGlobalCheckpoint,
285-
newCommitsListener()
286-
);
280+
this.indexDeletionPolicy = newIndexDeletionPolicy(engineConfig, logger, translog, softDeletesPolicy);
287281
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
288282
writer = createWriter();
289283
bootstrapAppendOnlyInfoFromWriter(writer);
@@ -391,6 +385,25 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
391385
);
392386
}
393387

388+
protected ElasticsearchIndexDeletionPolicy newIndexDeletionPolicy(
389+
EngineConfig engineConfig,
390+
Logger logger,
391+
Translog translog,
392+
SoftDeletesPolicy softDeletesPolicy
393+
) {
394+
var wrapper = engineConfig.getIndexDeletionPolicyWrapper();
395+
assert wrapper != null : "no index deletion policy wrapper for " + engineConfig.getShardId();
396+
return wrapper.apply(
397+
new CombinedDeletionPolicy(
398+
logger,
399+
translog.getDeletionPolicy(),
400+
softDeletesPolicy,
401+
translog::getLastSyncedGlobalCheckpoint,
402+
newCommitsListener()
403+
)
404+
);
405+
}
406+
394407
@Nullable
395408
private CombinedDeletionPolicy.CommitsListener newCommitsListener() {
396409
IndexCommitListener listener = engineConfig.getIndexCommitListener();
@@ -682,7 +695,7 @@ Translog getTranslog() {
682695

683696
// Package private for testing purposes only
684697
boolean hasAcquiredIndexCommitsForTesting() {
685-
return combinedDeletionPolicy.hasAcquiredIndexCommitsForTesting();
698+
return indexDeletionPolicy.hasAcquiredIndexCommitsForTesting();
686699
}
687700

688701
@Override
@@ -748,7 +761,7 @@ public Translog.Location getTranslogLastWriteLocation() {
748761
}
749762

750763
private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
751-
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
764+
if (indexDeletionPolicy.hasUnreferencedCommits()) {
752765
indexWriter.deleteUnusedFiles();
753766
}
754767
translog.trimUnreferencedReaders();
@@ -2555,17 +2568,17 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En
25552568
future.actionGet();
25562569
logger.trace("finish flush for snapshot");
25572570
}
2558-
return acquireIndexCommitRef(() -> combinedDeletionPolicy.acquireIndexCommit(false));
2571+
return acquireIndexCommitRef(() -> indexDeletionPolicy.acquireIndexCommit(false));
25592572
}
25602573

25612574
@Override
25622575
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
2563-
return acquireIndexCommitRef(() -> combinedDeletionPolicy.acquireIndexCommit(true));
2576+
return acquireIndexCommitRef(() -> indexDeletionPolicy.acquireIndexCommit(true));
25642577
}
25652578

25662579
private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
25672580
// Revisit the deletion policy if we can clean up the snapshotting commit.
2568-
if (combinedDeletionPolicy.releaseCommit(snapshot)) {
2581+
if (indexDeletionPolicy.releaseIndexCommit(snapshot)) {
25692582
try {
25702583
// Here we don't have to trim translog because snapshotting an index commit
25712584
// does not lock translog or prevents unreferenced files from trimming.
@@ -2578,7 +2591,7 @@ private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
25782591

25792592
@Override
25802593
public SafeCommitInfo getSafeCommitInfo() {
2581-
return combinedDeletionPolicy.getSafeCommitInfo();
2594+
return indexDeletionPolicy.getSafeCommitInfo();
25822595
}
25832596

25842597
private boolean failOnTragicEvent(AlreadyClosedException ex) {
@@ -2756,7 +2769,7 @@ private IndexWriterConfig getIndexWriterConfig() {
27562769
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
27572770
iwc.setCommitOnClose(false); // we by default don't commit on close
27582771
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
2759-
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
2772+
iwc.setIndexDeletionPolicy(indexDeletionPolicy);
27602773
iwc.setInfoStream(TESTS_VERBOSE ? InfoStream.getDefault() : new LoggerInfoStream(logger));
27612774
iwc.setMergeScheduler(mergeScheduler.getMergeScheduler());
27622775
// Give us the opportunity to upgrade old segments while performing

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3765,7 +3765,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
37653765
routingEntry().isPromotableToPrimary(),
37663766
mapperService(),
37673767
engineResetLock,
3768-
mergeMetrics
3768+
mergeMetrics,
3769+
Function.identity()
37693770
);
37703771
}
37713772

server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ public void testAcquireIndexCommit() throws Exception {
193193
final IndexCommit lastCommit = commitList.get(commitList.size() - 1);
194194
safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get());
195195
assertThat(
196-
indexPolicy.releaseCommit(snapshot),
196+
indexPolicy.releaseIndexCommit(snapshot),
197197
equalTo(pendingSnapshots == 0 && snapshot.equals(lastCommit) == false && snapshot.equals(safeCommit) == false)
198198
);
199199
}
@@ -211,7 +211,7 @@ public void testAcquireIndexCommit() throws Exception {
211211
)
212212
);
213213
}
214-
snapshottingCommits.forEach(indexPolicy::releaseCommit);
214+
snapshottingCommits.forEach(indexPolicy::releaseIndexCommit);
215215
globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
216216
commitList.forEach(this::resetDeletion);
217217
indexPolicy.onCommit(commitList);
@@ -350,8 +350,8 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) {
350350
}
351351

352352
@Override
353-
synchronized boolean releaseCommit(IndexCommit acquiredCommit) {
354-
return super.releaseCommit(wrapCommit(acquiredCommit));
353+
public synchronized boolean releaseIndexCommit(IndexCommit acquiredCommit) {
354+
return super.releaseIndexCommit(wrapCommit(acquiredCommit));
355355
}
356356
};
357357

@@ -383,7 +383,7 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) {
383383
assertThat(deletedCommits, hasSize(0));
384384
assertThat(newCommitFiles, containsInAnyOrder(equalTo("_1.cfe"), equalTo("_1.si"), equalTo("_1.cfs"), equalTo("segments_2")));
385385

386-
boolean maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit0);
386+
boolean maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit0);
387387
assertThat(maybeCleanUpCommits, equalTo(true));
388388

389389
globalCheckpoint.set(20L);
@@ -454,7 +454,7 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) {
454454
)
455455
);
456456

457-
maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit2);
457+
maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit2);
458458
assertThat("No commits to clean up (commit #2 is the safe commit)", maybeCleanUpCommits, equalTo(false));
459459

460460
globalCheckpoint.set(30L);
@@ -499,13 +499,13 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) {
499499
assertThat(deletedCommits, contains(commit0, commit2));
500500
assertThat(newCommitFiles, containsInAnyOrder(equalTo("_4.cfe"), equalTo("_4.si"), equalTo("_4.cfs"), equalTo("segments_4")));
501501

502-
maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit3);
502+
maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit3);
503503
assertThat("No commits to clean up (commit #3 is the safe commit)", maybeCleanUpCommits, equalTo(false));
504504

505-
maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit4);
505+
maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit4);
506506
assertThat("No commits to clean up (commit #4 is the last commit)", maybeCleanUpCommits, equalTo(false));
507507

508-
maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit1);
508+
maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit1);
509509
assertThat(maybeCleanUpCommits, equalTo(true));
510510

511511
final boolean globalCheckpointCatchUp = randomBoolean();
@@ -560,7 +560,7 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) {
560560
}
561561
assertThat(newCommitFiles, containsInAnyOrder(equalTo("_5.cfe"), equalTo("_5.si"), equalTo("_5.cfs"), equalTo("segments_5")));
562562

563-
maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit5);
563+
maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit5);
564564
assertThat("No commits to clean up (commit #5 is the last commit)", maybeCleanUpCommits, equalTo(false));
565565
}
566566

0 commit comments

Comments
 (0)