Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
config.isPromotableToPrimary(),
config.getMapperService(),
config.getEngineResetLock(),
config.getMergeMetrics()
config.getMergeMetrics(),
config.getIndexDeletionPolicyWrapper()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,11 @@ public Map<String, String> getUserData() throws IOException {
public String toString() {
return "FilterIndexCommit{" + "in=" + in + '}';
}

public static IndexCommit unwrap(IndexCommit in) {
while (in instanceof FilterIndexCommit) {
in = ((FilterIndexCommit) in).getIndexCommit();
}
return in;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* In particular, this policy will delete index commits whose max sequence number is at most
* the current global checkpoint except the index commit which has the highest max sequence number among those.
*/
public class CombinedDeletionPolicy extends IndexDeletionPolicy {
public class CombinedDeletionPolicy extends ElasticsearchIndexDeletionPolicy {
private final Logger logger;
private final TranslogDeletionPolicy translogDeletionPolicy;
private final SoftDeletesPolicy softDeletesPolicy;
Expand All @@ -48,13 +48,6 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
// when checking for externally acquired index commits that haven't been released
private final Set<IndexCommit> internallyAcquiredIndexCommits;

interface CommitsListener {

void onNewAcquiredCommit(IndexCommit commit, Set<String> additionalFiles);

void onDeletedCommit(IndexCommit commit);
}

@Nullable
private final CommitsListener commitsListener;

Expand Down Expand Up @@ -187,7 +180,6 @@ private void deleteCommit(IndexCommit commit) throws IOException {
assert commit.isDeleted() == false : "Index commit [" + commitDescription(commit) + "] is deleted twice";
logger.debug("Delete index commit [{}]", commitDescription(commit));
commit.delete();
assert commit.isDeleted() : "Deletion commit [" + commitDescription(commit) + "] was suppressed";
}

private void updateRetentionPolicy() throws IOException {
Expand All @@ -204,7 +196,8 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException {
return SegmentInfos.readCommit(indexCommit.getDirectory(), indexCommit.getSegmentsFileName()).totalMaxDoc();
}

SafeCommitInfo getSafeCommitInfo() {
@Override
public SafeCommitInfo getSafeCommitInfo() {
return safeCommitInfo;
}

Expand All @@ -214,7 +207,8 @@ SafeCommitInfo getSafeCommitInfo() {
*
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
*/
synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
@Override
public synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
return acquireIndexCommit(acquiringSafeCommit, false);
}

Expand All @@ -241,7 +235,8 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredIntern
*
* @return true if the acquired commit can be clean up.
*/
synchronized boolean releaseCommit(final IndexCommit acquiredCommit) {
@Override
public synchronized boolean releaseIndexCommit(final IndexCommit acquiredCommit) {
final SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit;
final IndexCommit releasingCommit = snapshotIndexCommit.getIndexCommit();
assert acquiredIndexCommits.containsKey(releasingCommit)
Expand Down Expand Up @@ -316,7 +311,8 @@ private static Set<String> listOfNewFileNames(IndexCommit previous, IndexCommit
/**
* Checks whether the deletion policy is holding on to externally acquired index commits
*/
synchronized boolean hasAcquiredIndexCommitsForTesting() {
@Override
public synchronized boolean hasAcquiredIndexCommitsForTesting() {
// We explicitly check only external commits and disregard internal commits acquired by the commits listener
for (var e : acquiredIndexCommits.entrySet()) {
if (internallyAcquiredIndexCommits.contains(e.getKey()) == false || e.getValue() > 1) {
Expand All @@ -329,7 +325,8 @@ synchronized boolean hasAcquiredIndexCommitsForTesting() {
/**
* Checks if the deletion policy can delete some index commits with the latest global checkpoint.
*/
boolean hasUnreferencedCommits() {
@Override
public boolean hasUnreferencedCommits() {
return maxSeqNoOfNextSafeCommit <= globalCheckpointSupplier.getAsLong();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;

import java.util.Set;

public abstract class ElasticsearchIndexDeletionPolicy extends IndexDeletionPolicy {

/**
* Captures the most recent commit point or the most recent safe commit point.
* Index files of the capturing commit point won't be released until the commit reference is closed.
*
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
*/
public abstract IndexCommit acquireIndexCommit(boolean acquiringSafeCommit);

/**
* Releases an index commit that was acquired by {@link #acquireIndexCommit(boolean)}.
*
* @return true if the acquired commit can be clean up.
*/
public abstract boolean releaseIndexCommit(IndexCommit acquiredIndexCommit);

/**
* @return information about the safe commit
*/
public abstract SafeCommitInfo getSafeCommitInfo();

public abstract boolean hasAcquiredIndexCommitsForTesting();

public abstract boolean hasUnreferencedCommits();

public interface CommitsListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about getting rid of this interface since we don't use it anymore, wdyt @kingherc? we can do in a follow-up once all the pieces are in place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fcofdez . Yes I believe it can be removed.

Separately, we were discussing whether IndexModule#setIndexCommitListener() could be removed in this thread with @tlrx , feel free to read to see as well. I had a question whether someone, e.g., externally, like a plugin writer user, might have been using it, but we think no. So that may also be considered for removal. However, maybe an optional follow-up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that was my impression too. I'll remove it in a follow-up so we keep this PRs manageable.


void onNewAcquiredCommit(IndexCommit commit, Set<String> additionalFiles);

void onDeletedCommit(IndexCommit commit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -151,6 +152,11 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {

private final MergeMetrics mergeMetrics;

/**
* Allows to pass an {@link ElasticsearchIndexDeletionPolicy} wrapper to egine implementations.
*/
private final Function<ElasticsearchIndexDeletionPolicy, ElasticsearchIndexDeletionPolicy> indexDeletionPolicyWrapper;

/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
Expand Down Expand Up @@ -184,7 +190,8 @@ public EngineConfig(
boolean promotableToPrimary,
MapperService mapperService,
EngineResetLock engineResetLock,
MergeMetrics mergeMetrics
MergeMetrics mergeMetrics,
Function<ElasticsearchIndexDeletionPolicy, ElasticsearchIndexDeletionPolicy> indexDeletionPolicyWrapper
) {
this.shardId = shardId;
this.indexSettings = indexSettings;
Expand Down Expand Up @@ -233,6 +240,7 @@ public EngineConfig(
this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true);
this.engineResetLock = engineResetLock;
this.mergeMetrics = mergeMetrics;
this.indexDeletionPolicyWrapper = indexDeletionPolicyWrapper;
}

/**
Expand Down Expand Up @@ -485,4 +493,11 @@ public EngineResetLock getEngineResetLock() {
public MergeMetrics getMergeMetrics() {
return mergeMetrics;
}

/**
* @return an {@link ElasticsearchIndexDeletionPolicy} wrapper, to be use by engine implementations.
*/
public Function<ElasticsearchIndexDeletionPolicy, ElasticsearchIndexDeletionPolicy> getIndexDeletionPolicyWrapper() {
return indexDeletionPolicyWrapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public class InternalEngine extends Engine {

private final LocalCheckpointTracker localCheckpointTracker;

private final CombinedDeletionPolicy combinedDeletionPolicy;
private final ElasticsearchIndexDeletionPolicy indexDeletionPolicy;

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttle
Expand Down Expand Up @@ -277,13 +277,7 @@ public InternalEngine(EngineConfig engineConfig) {
this.totalDiskSpace = ByteSizeValue.of(Environment.getFileStore(translog.location()).getTotalSpace(), ByteSizeUnit.BYTES);
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
this.softDeletesPolicy = newSoftDeletesPolicy();
this.combinedDeletionPolicy = new CombinedDeletionPolicy(
logger,
translogDeletionPolicy,
softDeletesPolicy,
translog::getLastSyncedGlobalCheckpoint,
newCommitsListener()
);
this.indexDeletionPolicy = newIndexDeletionPolicy(engineConfig, logger, translog, softDeletesPolicy);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
Expand Down Expand Up @@ -391,6 +385,25 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
);
}

protected ElasticsearchIndexDeletionPolicy newIndexDeletionPolicy(
EngineConfig engineConfig,
Logger logger,
Translog translog,
SoftDeletesPolicy softDeletesPolicy
) {
var wrapper = engineConfig.getIndexDeletionPolicyWrapper();
assert wrapper != null : "no index deletion policy wrapper for " + engineConfig.getShardId();
return wrapper.apply(
new CombinedDeletionPolicy(
logger,
translog.getDeletionPolicy(),
softDeletesPolicy,
translog::getLastSyncedGlobalCheckpoint,
newCommitsListener()
)
);
}

@Nullable
private CombinedDeletionPolicy.CommitsListener newCommitsListener() {
IndexCommitListener listener = engineConfig.getIndexCommitListener();
Expand Down Expand Up @@ -682,7 +695,7 @@ Translog getTranslog() {

// Package private for testing purposes only
boolean hasAcquiredIndexCommitsForTesting() {
return combinedDeletionPolicy.hasAcquiredIndexCommitsForTesting();
return indexDeletionPolicy.hasAcquiredIndexCommitsForTesting();
}

@Override
Expand Down Expand Up @@ -748,7 +761,7 @@ public Translog.Location getTranslogLastWriteLocation() {
}

private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
if (indexDeletionPolicy.hasUnreferencedCommits()) {
indexWriter.deleteUnusedFiles();
}
translog.trimUnreferencedReaders();
Expand Down Expand Up @@ -2555,17 +2568,17 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En
future.actionGet();
logger.trace("finish flush for snapshot");
}
return acquireIndexCommitRef(() -> combinedDeletionPolicy.acquireIndexCommit(false));
return acquireIndexCommitRef(() -> indexDeletionPolicy.acquireIndexCommit(false));
}

@Override
public IndexCommitRef acquireSafeIndexCommit() throws EngineException {
return acquireIndexCommitRef(() -> combinedDeletionPolicy.acquireIndexCommit(true));
return acquireIndexCommitRef(() -> indexDeletionPolicy.acquireIndexCommit(true));
}

private void releaseIndexCommit(IndexCommit snapshot) throws IOException {
// Revisit the deletion policy if we can clean up the snapshotting commit.
if (combinedDeletionPolicy.releaseCommit(snapshot)) {
if (indexDeletionPolicy.releaseIndexCommit(snapshot)) {
try {
// Here we don't have to trim translog because snapshotting an index commit
// does not lock translog or prevents unreferenced files from trimming.
Expand All @@ -2578,7 +2591,7 @@ private void releaseIndexCommit(IndexCommit snapshot) throws IOException {

@Override
public SafeCommitInfo getSafeCommitInfo() {
return combinedDeletionPolicy.getSafeCommitInfo();
return indexDeletionPolicy.getSafeCommitInfo();
}

private boolean failOnTragicEvent(AlreadyClosedException ex) {
Expand Down Expand Up @@ -2756,7 +2769,7 @@ private IndexWriterConfig getIndexWriterConfig() {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
iwc.setIndexDeletionPolicy(indexDeletionPolicy);
iwc.setInfoStream(TESTS_VERBOSE ? InfoStream.getDefault() : new LoggerInfoStream(logger));
iwc.setMergeScheduler(mergeScheduler.getMergeScheduler());
// Give us the opportunity to upgrade old segments while performing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3765,7 +3765,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
routingEntry().isPromotableToPrimary(),
mapperService(),
engineResetLock,
mergeMetrics
mergeMetrics,
Function.identity()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void testAcquireIndexCommit() throws Exception {
final IndexCommit lastCommit = commitList.get(commitList.size() - 1);
safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get());
assertThat(
indexPolicy.releaseCommit(snapshot),
indexPolicy.releaseIndexCommit(snapshot),
equalTo(pendingSnapshots == 0 && snapshot.equals(lastCommit) == false && snapshot.equals(safeCommit) == false)
);
}
Expand All @@ -211,7 +211,7 @@ public void testAcquireIndexCommit() throws Exception {
)
);
}
snapshottingCommits.forEach(indexPolicy::releaseCommit);
snapshottingCommits.forEach(indexPolicy::releaseIndexCommit);
globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
commitList.forEach(this::resetDeletion);
indexPolicy.onCommit(commitList);
Expand Down Expand Up @@ -350,8 +350,8 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) {
}

@Override
synchronized boolean releaseCommit(IndexCommit acquiredCommit) {
return super.releaseCommit(wrapCommit(acquiredCommit));
public synchronized boolean releaseIndexCommit(IndexCommit acquiredCommit) {
return super.releaseIndexCommit(wrapCommit(acquiredCommit));
}
};

Expand Down Expand Up @@ -383,7 +383,7 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) {
assertThat(deletedCommits, hasSize(0));
assertThat(newCommitFiles, containsInAnyOrder(equalTo("_1.cfe"), equalTo("_1.si"), equalTo("_1.cfs"), equalTo("segments_2")));

boolean maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit0);
boolean maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit0);
assertThat(maybeCleanUpCommits, equalTo(true));

globalCheckpoint.set(20L);
Expand Down Expand Up @@ -454,7 +454,7 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) {
)
);

maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit2);
maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit2);
assertThat("No commits to clean up (commit #2 is the safe commit)", maybeCleanUpCommits, equalTo(false));

globalCheckpoint.set(30L);
Expand Down Expand Up @@ -499,13 +499,13 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) {
assertThat(deletedCommits, contains(commit0, commit2));
assertThat(newCommitFiles, containsInAnyOrder(equalTo("_4.cfe"), equalTo("_4.si"), equalTo("_4.cfs"), equalTo("segments_4")));

maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit3);
maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit3);
assertThat("No commits to clean up (commit #3 is the safe commit)", maybeCleanUpCommits, equalTo(false));

maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit4);
maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit4);
assertThat("No commits to clean up (commit #4 is the last commit)", maybeCleanUpCommits, equalTo(false));

maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit1);
maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit1);
assertThat(maybeCleanUpCommits, equalTo(true));

final boolean globalCheckpointCatchUp = randomBoolean();
Expand Down Expand Up @@ -560,7 +560,7 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) {
}
assertThat(newCommitFiles, containsInAnyOrder(equalTo("_5.cfe"), equalTo("_5.si"), equalTo("_5.cfs"), equalTo("segments_5")));

maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit5);
maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit5);
assertThat("No commits to clean up (commit #5 is the last commit)", maybeCleanUpCommits, equalTo(false));
}

Expand Down
Loading