Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/131706.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 131706
summary: Add extension points to remediate index metadata in during snapshot restore
area: Snapshot/Restore
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@
import org.elasticsearch.search.SearchUtils;
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
import org.elasticsearch.shutdown.PluginShutdownService;
import org.elasticsearch.snapshots.IndexMetadataRestoreTransformer;
import org.elasticsearch.snapshots.IndexMetadataRestoreTransformer.NoOpRestoreTransformer;
import org.elasticsearch.snapshots.InternalSnapshotsInfoService;
import org.elasticsearch.snapshots.RepositoryIntegrityHealthIndicatorService;
import org.elasticsearch.snapshots.RestoreService;
Expand Down Expand Up @@ -1149,7 +1151,8 @@ public Map<String, String> queryFields() {
systemIndices,
indicesService,
fileSettingsService,
threadPool
threadPool,
pluginsService.loadSingletonServiceProvider(IndexMetadataRestoreTransformer.class, NoOpRestoreTransformer::new)
);

DiscoveryModule discoveryModule = createDiscoveryModule(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.snapshots;

import org.elasticsearch.cluster.metadata.IndexMetadata;

public interface IndexMetadataRestoreTransformer {
IndexMetadata updateIndexMetadata(IndexMetadata original);

class NoOpRestoreTransformer implements IndexMetadataRestoreTransformer {
@Override
public IndexMetadata updateIndexMetadata(IndexMetadata original) {
return original;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ public final class RestoreService implements ClusterStateApplier {

private final Executor snapshotMetaExecutor;

private final IndexMetadataRestoreTransformer indexMetadataRestoreTransformer;

private volatile boolean refreshRepositoryUuidOnRestore;

public RestoreService(
Expand All @@ -217,7 +219,8 @@ public RestoreService(
SystemIndices systemIndices,
IndicesService indicesService,
FileSettingsService fileSettingsService,
ThreadPool threadPool
ThreadPool threadPool,
IndexMetadataRestoreTransformer indexMetadataRestoreTransformer
) {
this.clusterService = clusterService;
this.repositoriesService = repositoriesService;
Expand All @@ -237,6 +240,7 @@ public RestoreService(
this.refreshRepositoryUuidOnRestore = REFRESH_REPO_UUID_ON_RESTORE_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(REFRESH_REPO_UUID_ON_RESTORE_SETTING, this::setRefreshRepositoryUuidOnRestore);
this.indexMetadataRestoreTransformer = indexMetadataRestoreTransformer;
}

/**
Expand Down Expand Up @@ -492,6 +496,8 @@ private void startRestore(
}
for (IndexId indexId : repositoryData.resolveIndices(requestedIndicesIncludingSystem).values()) {
IndexMetadata snapshotIndexMetaData = repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId);
// Update the snapshot index metadata before adding it to the metadata
snapshotIndexMetaData = indexMetadataRestoreTransformer.updateIndexMetadata(snapshotIndexMetaData);
if (snapshotIndexMetaData.isSystem()) {
if (requestIndices.contains(indexId.getName())) {
explicitlyRequestedSystemIndices.add(indexId.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.get.TransportGetIndexAction;
import org.elasticsearch.action.admin.indices.mapping.put.TransportAutoPutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
Expand Down Expand Up @@ -126,13 +130,15 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.TestEnvironment;
Expand Down Expand Up @@ -234,6 +240,7 @@
import static java.util.Collections.emptySet;
import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener;
import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
import static org.elasticsearch.index.IndexSettings.INDEX_SEARCH_IDLE_AFTER;
import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.allOf;
Expand Down Expand Up @@ -364,9 +371,15 @@ public void testSuccessfulSnapshotAndRestore() {
)
);

final SubscribableListener<SearchResponse> searchResponseListener = new SubscribableListener<>();
final SubscribableListener<GetIndexResponse> getIndexResponseListener = new SubscribableListener<>();
continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> {
assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
client().admin().indices().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(index), getIndexResponseListener);
});

final SubscribableListener<SearchResponse> searchResponseListener = new SubscribableListener<>();
continueOrDie(getIndexResponseListener, getIndexResponse -> {
assertEquals(TimeValue.timeValueMinutes(2), INDEX_SEARCH_IDLE_AFTER.get(getIndexResponse.settings().get(index)));
client().search(
new SearchRequest(index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)),
searchResponseListener
Expand All @@ -382,6 +395,7 @@ public void testSuccessfulSnapshotAndRestore() {
runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L));
assertNotNull(safeResult(createSnapshotResponseListener));
assertNotNull(safeResult(restoreSnapshotResponseListener));
assertNotNull(safeResult(getIndexResponseListener));
assertTrue(documentCountVerified.get());
assertTrue(SnapshotsInProgress.get(masterNode.clusterService.state()).isEmpty());
final Repository repository = masterNode.repositoriesService.repository(repoName);
Expand Down Expand Up @@ -2625,6 +2639,20 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() {
DefaultProjectResolver.INSTANCE
)
);
actions.put(
GetIndexAction.INSTANCE,
new TransportGetIndexAction(
transportService,
clusterService,
threadPool,
new SettingsFilter(List.of()),
actionFilters,
indexNameExpressionResolver,
indicesService,
indexScopedSettings,
DefaultProjectResolver.INSTANCE
)
);
final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings);
final IndexingPressure indexingMemoryLimits = new IndexingPressure(settings);
mappingUpdatedAction.setClient(client);
Expand Down Expand Up @@ -2702,7 +2730,16 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
EmptySystemIndices.INSTANCE,
indicesService,
mock(FileSettingsService.class),
threadPool
threadPool,
originalIndexMetadata -> IndexMetadata.builder(originalIndexMetadata)
.settings(
// Set a property to something mild, outside its default
Settings.builder()
.put(originalIndexMetadata.getSettings())
.put(INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.timeValueMinutes(2))
.build()
)
.build()
);
actions.put(
TransportPutMappingAction.TYPE,
Expand Down
Loading