Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,10 @@ public MetadataCreateIndexService(
// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
createIndexTaskKey = clusterService.registerClusterManagerTask(CREATE_INDEX, true);
Supplier<Version> minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion();
remoteStoreCustomMetadataResolver = isRemoteDataAttributePresent(settings)
? new RemoteStoreCustomMetadataResolver(remoteStoreSettings, minNodeVersionSupplier, repositoriesServiceSupplier, settings)
: null;
remoteStoreCustomMetadataResolver = RemoteStoreNodeAttribute.isSegmentRepoConfigured(settings)
&& RemoteStoreNodeAttribute.isTranslogRepoConfigured(settings)
? new RemoteStoreCustomMetadataResolver(remoteStoreSettings, minNodeVersionSupplier, repositoriesServiceSupplier, settings)
: null;
}

public IndexScopedSettings getIndexScopedSettings() {
Expand Down Expand Up @@ -1178,10 +1179,19 @@ public static void updateRemoteStoreSettings(
if (remoteNode.isPresent()) {
translogRepo = RemoteStoreNodeAttribute.getTranslogRepoName(remoteNode.get().getAttributes());
segmentRepo = RemoteStoreNodeAttribute.getSegmentRepoName(remoteNode.get().getAttributes());
if (segmentRepo != null && translogRepo != null) {
settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true)
.put(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, segmentRepo)
.put(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, translogRepo);
if (segmentRepo != null) {
settingsBuilder.put(SETTING_REMOTE_STORE_ENABLED, true).put(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, segmentRepo);
if (translogRepo != null) {
settingsBuilder.put(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, translogRepo);
} else if (isMigratingToRemoteStore(clusterSettings)) {
ValidationException validationException = new ValidationException();
validationException.addValidationErrors(
Collections.singletonList(
"Cluster is migrating to remote store but remote translog is not configured, failing index creation"
)
);
throw new IndexCreationException(indexName, validationException);
}
} else {
ValidationException validationException = new ValidationException();
validationException.addValidationErrors(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,14 @@ public boolean isRemoteStoreNode() {
return isClusterStateRepoConfigured(this.getAttributes()) && RemoteStoreNodeAttribute.isSegmentRepoConfigured(this.getAttributes());
}

/**
* Returns whether the node is a remote segment store node.
* @return true if the node contains remote segment store node attributes, false otherwise
*/
public boolean isRemoteSegmentStoreNode() {
return RemoteStoreNodeAttribute.isSegmentRepoConfigured(this.getAttributes());
}

/**
* Returns whether settings required for remote cluster state publication is configured
* @return true if the node contains remote cluster state node attribute and remote routing table node attribute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ public synchronized IndexShard createShard(
Store remoteStore = null;
Directory remoteDirectory = null;
boolean seedRemote = false;
if (targetNode.isRemoteStoreNode()) {
if (targetNode.isRemoteSegmentStoreNode()) {
if (this.indexSettings.isRemoteStoreEnabled()) {
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1412,9 +1412,7 @@ public boolean isAssignedOnRemoteNode() {
* Returns if remote translog store is enabled for this index.
*/
public boolean isRemoteTranslogStoreEnabled() {
// Today enabling remote store automatically enables remote translog as well.
// which is why isRemoteStoreEnabled is used to represent isRemoteTranslogStoreEnabled
return isRemoteStoreEnabled;
return remoteStoreTranslogRepository != null && remoteStoreTranslogRepository.isEmpty() == false;
}

/**
Expand Down
18 changes: 10 additions & 8 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2821,14 +2821,16 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
syncSegmentsFromRemoteSegmentStore(false);
}
if (shardRouting.primary()) {
if (syncFromRemote) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
} else if (isSnapshotV2Restore() == false) {
// we will enter this block when we do not want to recover from remote translog.
// currently only during snapshot restore, we are coming into this block.
// here, as while initiliazing remote translog we cannot skip downloading translog files,
// so before that step, we are deleting the translog files present in remote store.
deleteTranslogFilesFromRemoteTranslog();
if (indexSettings.isRemoteTranslogStoreEnabled()) {
if (syncFromRemote) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
} else if (isSnapshotV2Restore() == false) {
// we will enter this block when we do not want to recover from remote translog.
// currently only during snapshot restore, we are coming into this block.
// here, as while initiliazing remote translog we cannot skip downloading translog files,
// so before that step, we are deleting the translog files present in remote store.
deleteTranslogFilesFromRemoteTranslog();
}
}
} else if (syncFromRemote) {
// For replicas, when we download segments from remote segment store, we need to make sure that local
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@
import static org.opensearch.index.TieredMergePolicyProvider.MIN_DEFAULT_MAX_MERGE_AT_ONCE;
import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.search.SearchService.ALLOW_EXPENSIVE_QUERIES;

/**
Expand Down Expand Up @@ -707,7 +706,7 @@ private static BiFunction<IndexSettings, ShardRouting, TranslogFactory> getTrans
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()),
remoteStoreSettings
);
} else if (isRemoteDataAttributePresent(settings) && shardRouting.primary()) {
} else if (RemoteStoreNodeAttribute.isTranslogRepoConfigured(settings) && shardRouting.primary()) {
return new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class RemoteStoreNodeAttribute {
REMOTE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEYS
);

public static final String REMOTE_STORE_MODE_KEY = "remote_store.mode";

/**
* Creates a new {@link RemoteStoreNodeAttribute}
*/
Expand Down Expand Up @@ -190,17 +192,42 @@ private static Tuple<String, String> getValue(Map<String, String> attributes, Li
return null;
}

private enum RemoteStoreMode {
SEGMENTS_ONLY,
DEFAULT
}

private Map<String, String> getValidatedRepositoryNames(DiscoveryNode node) {
Set<Tuple<String, String>> repositoryNames = new HashSet<>();
if (containsKey(node.getAttributes(), REMOTE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEYS)
|| containsKey(node.getAttributes(), REMOTE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEYS)) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEYS));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEYS));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEYS));
} else if (containsKey(node.getAttributes(), REMOTE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEYS)) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEYS));
RemoteStoreMode remoteStoreMode = RemoteStoreMode.DEFAULT;
if (containsKey(node.getAttributes(), List.of(REMOTE_STORE_MODE_KEY))) {
String mode = node.getAttributes().get(REMOTE_STORE_MODE_KEY);
if (mode != null && mode.equalsIgnoreCase(RemoteStoreMode.SEGMENTS_ONLY.name())) {
remoteStoreMode = RemoteStoreMode.SEGMENTS_ONLY;
} else if (mode != null && mode.equalsIgnoreCase(RemoteStoreMode.DEFAULT.name()) == false) {
throw new IllegalStateException("Unknown remote store mode [" + mode + "] for node [" + node + "]");
}
}
if (remoteStoreMode == RemoteStoreMode.SEGMENTS_ONLY) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEYS));
} else if (containsKey(node.getAttributes(), REMOTE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEYS)
|| containsKey(node.getAttributes(), REMOTE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEYS)) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEYS));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEYS));
repositoryNames.add(validateAttributeNonNull(node, REMOTE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEYS));
} else if (containsKey(node.getAttributes(), REMOTE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEYS)) {
repositoryNames.add(validateAttributeNonNull(node, REMOTE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEYS));
}
if (containsKey(node.getAttributes(), REMOTE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEYS)) {
if (remoteStoreMode == RemoteStoreMode.SEGMENTS_ONLY) {
throw new IllegalStateException(
"Cannot set "
+ REMOTE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEYS
+ " attributes when remote store mode is set to segments only for node ["
+ node
+ "]"
);
}
repositoryNames.add(validateAttributeNonNull(node, REMOTE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEYS));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ public void testJoinRemotePubClusterWithRemoteStoreNodes() {
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(e.getMessage().equals("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"));
assertEquals("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster", e.getMessage());
}

public void testPreventJoinRemotePublicationClusterWithIncompatibleAttributes() {
Expand Down Expand Up @@ -719,7 +719,7 @@ public void testPreventJoinClusterWithRemoteStateNodeJoiningRemoteStoreCluster()
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(e.getMessage().equals("a non remote store node [" + joiningNode + "] is trying to join a remote store cluster"));
assertEquals("a non remote store node [" + joiningNode + "] is trying to join a remote store cluster", e.getMessage());
}

public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster() {
Expand All @@ -739,7 +739,7 @@ public void testPreventJoinClusterWithRemoteStoreNodeJoiningRemoteStateCluster()
IllegalStateException.class,
() -> JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata())
);
assertTrue(e.getMessage().equals("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"));
assertEquals("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster", e.getMessage());
}

public void testUpdatesClusterStateWithSingleNodeCluster() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1702,7 +1702,7 @@ public void testNewIndexIsRemoteStoreBackedForRemoteStoreDirectionAndMixedMode()
assertEquals(error.getMessage(), "failed to create index [test-index]");
assertThat(
error.getCause().getMessage(),
containsString("Cluster is migrating to remote store but no remote node found, failing index creation")
containsString("Cluster is migrating to remote store but remote translog is not configured, failing index creation")
);
}

Expand Down Expand Up @@ -1771,6 +1771,8 @@ private IndexMetadata testRemoteCustomData(boolean remoteStoreEnabled, PathType
Settings.Builder settingsBuilder = Settings.builder();
if (remoteStoreEnabled) {
settingsBuilder.put(NODE_ATTRIBUTES.getKey() + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "test");
settingsBuilder.put(NODE_ATTRIBUTES.getKey() + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, "test");
settingsBuilder.put(NODE_ATTRIBUTES.getKey() + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "test");
}
settingsBuilder.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), pathType.toString());
Settings settings = settingsBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1570,6 +1570,7 @@ public void testInSyncIdsAreIgnoredIfNotValidatedByClusterManagerWithRemoteTrans
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "translog-repo")
.build();
final ReplicationTracker tracker = newTracker(primaryId, settings);
tracker.updateFromClusterManager(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1825,6 +1825,7 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException {
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT")
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "translog-repo")
.build()
);
RemoteSegmentTransferTracker remoteSegmentTransferTracker = shard.getRemoteStoreStatsTrackerFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public void setup() throws IOException {
.put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "translog-repo")
.build();

indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory());
Expand Down
Loading