Skip to content

Commit 21d8c13

Browse files
committed
Restored RemoteSegmentStoreDirectory PublicAPI + added changelog
Signed-off-by: kh3ra <[email protected]>
1 parent 0b4b8a4 commit 21d8c13

File tree

7 files changed

+43
-8
lines changed

7 files changed

+43
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4545
- Added approximation support for range queries with now in date field ([#18511](https://github.com/opensearch-project/OpenSearch/pull/18511))
4646
- Upgrade to protobufs 0.6.0 and clean up deprecated TermQueryProtoUtils code ([#18880](https://github.com/opensearch-project/OpenSearch/pull/18880))
4747
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))
48+
- Added the core process for warming merged segments in remote-store enabled domains ([#18683](https://github.com/opensearch-project/OpenSearch/pull/18683))
4849

4950
### Changed
5051
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))

server/src/internalClusterTest/java/org/opensearch/indices/replication/RemoteStoreMergedSegmentWarmerIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ public void testConcurrentMergeSegmentWarmerRemote() throws Exception {
156156
logger.info("Number of merge invocations: {}", numInvocations.get());
157157
assertTrue(latch.await(10, TimeUnit.SECONDS));
158158
assertTrue(executingThreads.size() > 1);
159+
// Verify concurrent execution by checking that multiple unique threads handled merge operations
159160
assertTrue(numInvocations.get() > 1);
160161
mockTransportServiceNode1.clearAllRules();
161162
mockTransportServiceNode2.clearAllRules();

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import org.apache.lucene.store.IndexInput;
2424
import org.apache.lucene.store.IndexOutput;
2525
import org.apache.lucene.util.Version;
26+
import org.opensearch.common.Nullable;
2627
import org.opensearch.common.UUIDs;
28+
import org.opensearch.common.annotation.InternalApi;
2729
import org.opensearch.common.annotation.PublicApi;
2830
import org.opensearch.common.collect.Tuple;
2931
import org.opensearch.common.io.VersionedCodecStreamWrapper;
@@ -131,13 +133,24 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
131133

132134
public static final int METADATA_FILES_TO_FETCH = 10;
133135

136+
public RemoteSegmentStoreDirectory(
137+
RemoteDirectory remoteDataDirectory,
138+
RemoteDirectory remoteMetadataDirectory,
139+
RemoteStoreLockManager mdLockManager,
140+
ThreadPool threadPool,
141+
ShardId shardId
142+
) throws IOException {
143+
this(remoteDataDirectory, remoteMetadataDirectory, mdLockManager, threadPool, shardId, null);
144+
}
145+
146+
@InternalApi
134147
public RemoteSegmentStoreDirectory(
135148
RemoteDirectory remoteDataDirectory,
136149
RemoteDirectory remoteMetadataDirectory,
137150
RemoteStoreLockManager mdLockManager,
138151
ThreadPool threadPool,
139152
ShardId shardId,
140-
Map<String, String> pendingDownloadMergedSegments
153+
@Nullable Map<String, String> pendingDownloadMergedSegments
141154
) throws IOException {
142155
super(remoteDataDirectory);
143156
this.remoteDataDirectory = remoteDataDirectory;
@@ -1126,7 +1139,7 @@ public void close() throws IOException {
11261139
*
11271140
* @param localToRemoteFilenames Map of local filenames to their corresponding remote filenames
11281141
*/
1129-
public void markPendingMergedSegmentsDownload(Map<String, String> localToRemoteFilenames) {
1142+
public void markMergedSegmentsPendingDownload(Map<String, String> localToRemoteFilenames) {
11301143
pendingDownloadMergedSegments.putAll(localToRemoteFilenames);
11311144
}
11321145

@@ -1135,7 +1148,7 @@ public void markPendingMergedSegmentsDownload(Map<String, String> localToRemoteF
11351148
*
11361149
* @param localFilenames Set of local filenames to remove from pending downloads
11371150
*/
1138-
public void unmarkPendingDownloadMergedSegments(Set<String> localFilenames) {
1151+
public void unmarkMergedSegmentsPendingDownload(Set<String> localFilenames) {
11391152
localFilenames.forEach(pendingDownloadMergedSegments::remove);
11401153
}
11411154

@@ -1146,6 +1159,6 @@ public void unmarkPendingDownloadMergedSegments(Set<String> localFilenames) {
11461159
* @return true if segment is pending download, false otherwise
11471160
*/
11481161
public boolean isMergedSegmentPendingDownload(String localFilename) {
1149-
return pendingDownloadMergedSegments.containsKey(localFilename);
1162+
return pendingDownloadMergedSegments != null && pendingDownloadMergedSegments.containsKey(localFilename);
11501163
}
11511164
}

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,7 @@ public void onReplicationFailure(
737737

738738
private void unmarkPendingDownloadMergedSegments(IndexShard shard, ReplicationCheckpoint receivedCheckpoint) {
739739
if (isRemoteStoreMergedSegmentCheckpoint(receivedCheckpoint)) {
740-
shard.getRemoteDirectory().unmarkPendingDownloadMergedSegments(receivedCheckpoint.getMetadataMap().keySet());
740+
shard.getRemoteDirectory().unmarkMergedSegmentsPendingDownload(receivedCheckpoint.getMetadataMap().keySet());
741741
}
742742
}
743743

server/src/main/java/org/opensearch/indices/replication/checkpoint/RemoteStorePublishMergedSegmentAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public RemoteStorePublishMergedSegmentAction(
8383
protected void doReplicaOperation(RemoteStorePublishMergedSegmentRequest shardRequest, IndexShard replica) {
8484
RemoteStoreMergedSegmentCheckpoint checkpoint = shardRequest.getMergedSegment();
8585
if (checkpoint.getShardId().equals(replica.shardId())) {
86-
replica.getRemoteDirectory().markPendingMergedSegmentsDownload(checkpoint.getLocalToRemoteSegmentFilenameMap());
86+
replica.getRemoteDirectory().markMergedSegmentsPendingDownload(checkpoint.getLocalToRemoteSegmentFilenameMap());
8787
replicationService.onNewMergedSegmentCheckpoint(checkpoint, replica);
8888
} else {
8989
logger.warn(

server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,6 +1201,26 @@ public void testInitializeToSpecificTimestampMatchingMdFile() throws IOException
12011201
assertTrue(uploadedSegments.containsKey("_0.cfs"));
12021202
}
12031203

1204+
public void testMarkMergedSegmentPendingDownload() {
1205+
String localSegmentName1 = "_1.si";
1206+
String remoteSegmentName1 = "_1.si__uuid";
1207+
String localSegmentName2 = "_1.dvd";
1208+
String remoteSegmentName2 = "_1.dvd__uuid";
1209+
remoteSegmentStoreDirectory.markMergedSegmentsPendingDownload(
1210+
Map.of(localSegmentName1, remoteSegmentName1, localSegmentName2, remoteSegmentName2)
1211+
);
1212+
1213+
assertTrue(remoteSegmentStoreDirectory.isMergedSegmentPendingDownload(localSegmentName1));
1214+
assertTrue(remoteSegmentStoreDirectory.isMergedSegmentPendingDownload(localSegmentName2));
1215+
assertFalse(remoteSegmentStoreDirectory.isMergedSegmentPendingDownload("_3.si"));
1216+
assertEquals(remoteSegmentName1, remoteSegmentStoreDirectory.getExistingRemoteFilename(localSegmentName1));
1217+
assertEquals(remoteSegmentName2, remoteSegmentStoreDirectory.getExistingRemoteFilename(localSegmentName2));
1218+
1219+
remoteSegmentStoreDirectory.unmarkMergedSegmentsPendingDownload(Set.of(localSegmentName1));
1220+
assertFalse(remoteSegmentStoreDirectory.isMergedSegmentPendingDownload(localSegmentName1));
1221+
assertNull(remoteSegmentStoreDirectory.getExistingRemoteFilename(localSegmentName1));
1222+
}
1223+
12041224
private static class WrapperIndexOutput extends IndexOutput {
12051225
public IndexOutput indexOutput;
12061226

server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public void testGetMergedSegmentFiles() throws IOException, ExecutionException,
202202
});
203203
}
204204
};
205-
replicaShard.getRemoteDirectory().markPendingMergedSegmentsDownload(localToRemoteFilenameMap);
205+
replicaShard.getRemoteDirectory().markMergedSegmentsPendingDownload(localToRemoteFilenameMap);
206206
RemoteStoreMergedSegmentCheckpoint mergedSegmentCheckpoint = new RemoteStoreMergedSegmentCheckpoint(
207207
new MergedSegmentCheckpoint(
208208
replicaShard.shardId(),
@@ -243,7 +243,7 @@ public void testGetMergedSegmentFilesDownloadTimeout() throws IOException, Execu
243243
});
244244
}
245245
};
246-
replicaShard.getRemoteDirectory().markPendingMergedSegmentsDownload(localToRemoteFilenameMap);
246+
replicaShard.getRemoteDirectory().markMergedSegmentsPendingDownload(localToRemoteFilenameMap);
247247
RemoteStoreMergedSegmentCheckpoint mergedSegmentCheckpoint = new RemoteStoreMergedSegmentCheckpoint(
248248
new MergedSegmentCheckpoint(
249249
replicaShard.shardId(),

0 commit comments

Comments
 (0)