Skip to content

Commit 6dd4d67

Browse files
authored
Executing shard recovery in project context (#130525)
See title
1 parent a07e7e9 commit 6dd4d67

File tree

3 files changed

+30
-20
lines changed

3 files changed

+30
-20
lines changed

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada
950950

951951
@Override
952952
public void createShard(
953+
final ProjectId projectId,
953954
final ShardRouting shardRouting,
954955
final PeerRecoveryTargetService recoveryTargetService,
955956
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
@@ -968,26 +969,29 @@ public void createShard(
968969
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
969970
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
970971
indexShard.addShardFailureCallback(onShardFailure);
971-
indexShard.startRecovery(
972-
recoveryState,
973-
recoveryTargetService,
974-
postRecoveryMerger.maybeMergeAfterRecovery(indexService.getMetadata(), shardRouting, recoveryListener),
975-
repositoriesService,
976-
(mapping, listener) -> {
977-
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
978-
: "mapping update consumer only required by local shards recovery";
979-
AcknowledgedRequest<PutMappingRequest> putMappingRequestAcknowledgedRequest = new PutMappingRequest()
980-
// concrete index - no name clash, it uses uuid
981-
.setConcreteIndex(shardRouting.index())
982-
.source(mapping.source().string(), XContentType.JSON);
983-
client.execute(
984-
TransportAutoPutMappingAction.TYPE,
985-
putMappingRequestAcknowledgedRequest.ackTimeout(TimeValue.MAX_VALUE).masterNodeTimeout(TimeValue.MAX_VALUE),
986-
new RefCountAwareThreadedActionListener<>(threadPool.generic(), listener.map(ignored -> null))
987-
);
988-
},
989-
this,
990-
clusterStateVersion
972+
projectResolver.executeOnProject(
973+
projectId,
974+
() -> indexShard.startRecovery(
975+
recoveryState,
976+
recoveryTargetService,
977+
postRecoveryMerger.maybeMergeAfterRecovery(indexService.getMetadata(), shardRouting, recoveryListener),
978+
repositoriesService,
979+
(mapping, listener) -> {
980+
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
981+
: "mapping update consumer only required by local shards recovery";
982+
AcknowledgedRequest<PutMappingRequest> putMappingRequestAcknowledgedRequest = new PutMappingRequest()
983+
// concrete index - no name clash, it uses uuid
984+
.setConcreteIndex(shardRouting.index())
985+
.source(mapping.source().string(), XContentType.JSON);
986+
client.execute(
987+
TransportAutoPutMappingAction.TYPE,
988+
putMappingRequestAcknowledgedRequest.ackTimeout(TimeValue.MAX_VALUE).masterNodeTimeout(TimeValue.MAX_VALUE),
989+
new RefCountAwareThreadedActionListener<>(threadPool.generic(), listener.map(ignored -> null))
990+
);
991+
},
992+
this,
993+
clusterStateVersion
994+
)
991995
);
992996
}
993997

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.cluster.ClusterStateApplier;
2727
import org.elasticsearch.cluster.action.shard.ShardStateAction;
2828
import org.elasticsearch.cluster.metadata.IndexMetadata;
29+
import org.elasticsearch.cluster.metadata.ProjectId;
2930
import org.elasticsearch.cluster.metadata.ProjectMetadata;
3031
import org.elasticsearch.cluster.node.DiscoveryNode;
3132
import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -781,6 +782,7 @@ private void createShardWhenLockAvailable(
781782
try {
782783
logger.debug("{} creating shard with primary term [{}], iteration [{}]", shardRouting.shardId(), primaryTerm, iteration);
783784
indicesService.createShard(
785+
originalState.metadata().projectFor(shardRouting.index()).id(),
784786
shardRouting,
785787
recoveryTargetService,
786788
new RecoveryListener(shardRouting, primaryTerm),
@@ -1330,6 +1332,7 @@ void removeIndex(
13301332
/**
13311333
* Creates a shard for the specified shard routing and starts recovery.
13321334
*
1335+
* @param projectId the project for the shard
13331336
* @param shardRouting the shard routing
13341337
* @param recoveryTargetService recovery service for the target
13351338
* @param recoveryListener a callback when recovery changes state (finishes or fails)
@@ -1343,6 +1346,7 @@ void removeIndex(
13431346
* @throws IOException if an I/O exception occurs when creating the shard
13441347
*/
13451348
void createShard(
1349+
ProjectId projectId,
13461350
ShardRouting shardRouting,
13471351
PeerRecoveryTargetService recoveryTargetService,
13481352
PeerRecoveryTargetService.RecoveryListener recoveryListener,

test/framework/src/main/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.cluster.ClusterState;
1414
import org.elasticsearch.cluster.metadata.IndexMetadata;
15+
import org.elasticsearch.cluster.metadata.ProjectId;
1516
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@@ -242,6 +243,7 @@ public MockIndexService indexService(Index index) {
242243

243244
@Override
244245
public void createShard(
246+
final ProjectId projectId,
245247
final ShardRouting shardRouting,
246248
final PeerRecoveryTargetService recoveryTargetService,
247249
final PeerRecoveryTargetService.RecoveryListener recoveryListener,

0 commit comments

Comments
 (0)