Skip to content

Commit d7f0e5b

Browse files
committed
Adding support for uploading merged segments in IndexWriter.warm
1 parent e4d938c commit d7f0e5b

19 files changed

+370
-118
lines changed

server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public MergedSegmentWarmerFactory(TransportService transportService, RecoverySet
5959

6060
public IndexWriter.IndexReaderWarmer get(IndexShard shard) {
6161
if (shard.indexSettings().isAssignedOnRemoteNode()) {
62-
return new RemoteStoreMergedSegmentWarmer(transportService, recoverySettings, clusterService);
62+
return new RemoteStoreMergedSegmentWarmer(transportService, recoverySettings, clusterService, shard);
6363
} else if (shard.indexSettings().isSegRepLocalEnabled()) {
6464
return new LocalMergedSegmentWarmer(transportService, recoverySettings, clusterService, shard);
6565
} else if (shard.indexSettings().isDocumentReplication()) {

server/src/main/java/org/opensearch/index/engine/RemoteStoreMergedSegmentWarmer.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,14 @@
3232

3333
package org.opensearch.index.engine;
3434

35+
import org.apache.logging.log4j.LogManager;
36+
import org.apache.logging.log4j.Logger;
3537
import org.apache.lucene.index.IndexWriter;
3638
import org.apache.lucene.index.LeafReader;
39+
import org.apache.lucene.index.SegmentCommitInfo;
40+
import org.apache.lucene.index.SegmentReader;
3741
import org.opensearch.cluster.service.ClusterService;
42+
import org.opensearch.index.shard.IndexShard;
3843
import org.opensearch.indices.recovery.RecoverySettings;
3944
import org.opensearch.transport.TransportService;
4045

@@ -49,19 +54,30 @@ public class RemoteStoreMergedSegmentWarmer implements IndexWriter.IndexReaderWa
4954
private final TransportService transportService;
5055
private final RecoverySettings recoverySettings;
5156
private final ClusterService clusterService;
57+
private final IndexShard indexShard;
58+
59+
private final Logger logger = LogManager.getLogger(RemoteStoreMergedSegmentWarmer.class);
5260

5361
public RemoteStoreMergedSegmentWarmer(
5462
TransportService transportService,
5563
RecoverySettings recoverySettings,
56-
ClusterService clusterService
64+
ClusterService clusterService,
65+
IndexShard indexShard
5766
) {
5867
this.transportService = transportService;
5968
this.recoverySettings = recoverySettings;
6069
this.clusterService = clusterService;
70+
this.indexShard = indexShard;
6171
}
6272

6373
@Override
6474
public void warm(LeafReader leafReader) throws IOException {
65-
// TODO: remote store merged segment warmer
75+
logger.info("Started WARM");
76+
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
77+
assert leafReader instanceof SegmentReader;
78+
79+
SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo();
80+
indexShard.publishMergedSegment(segmentCommitInfo);
81+
logger.info("Finished WARM");
6682
}
6783
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,7 @@
201201
import org.opensearch.indices.recovery.RecoverySettings;
202202
import org.opensearch.indices.recovery.RecoveryState;
203203
import org.opensearch.indices.recovery.RecoveryTarget;
204-
import org.opensearch.indices.replication.checkpoint.MergedSegmentCheckpoint;
205-
import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher;
206-
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
207-
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
204+
import org.opensearch.indices.replication.checkpoint.*;
208205
import org.opensearch.indices.replication.common.ReplicationTimer;
209206
import org.opensearch.repositories.RepositoriesService;
210207
import org.opensearch.repositories.Repository;
@@ -1871,11 +1868,22 @@ public void publishMergedSegment(SegmentCommitInfo segmentCommitInfo) throws IOE
18711868
* @return {@link MergedSegmentCheckpoint} Checkpoint computed from the segmentCommitInfo.
18721869
* @throws IOException When there is an error computing segment metadata from the store.
18731870
*/
1874-
public MergedSegmentCheckpoint computeMergeSegmentCheckpoint(SegmentCommitInfo segmentCommitInfo) throws IOException {
1871+
public ReplicationCheckpoint computeMergeSegmentCheckpoint(SegmentCommitInfo segmentCommitInfo) throws IOException {
18751872
// Only need to get the file metadata information in segmentCommitInfo and reuse Store#getSegmentMetadataMap.
18761873
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
18771874
segmentInfos.add(segmentCommitInfo);
18781875
Map<String, StoreFileMetadata> segmentMetadataMap = store.getSegmentMetadataMap(segmentInfos);
1876+
if (indexSettings.isRemoteStoreEnabled()) {
1877+
return new RemoteStoreMergedSegmentCheckpoint(
1878+
shardId,
1879+
getOperationPrimaryTerm(),
1880+
segmentMetadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(),
1881+
getEngine().config().getCodec().getName(),
1882+
segmentMetadataMap,
1883+
segmentCommitInfo.info.name,
1884+
null
1885+
);
1886+
}
18791887
return new MergedSegmentCheckpoint(
18801888
shardId,
18811889
getOperationPrimaryTerm(),

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
3636
import org.opensearch.index.translog.Translog;
3737
import org.opensearch.indices.RemoteStoreSettings;
38+
import org.opensearch.indices.replication.ActiveMergesSegmentRegistry;
3839
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
3940
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
4041
import org.opensearch.threadpool.ThreadPool;
@@ -84,6 +85,7 @@ public final class RemoteStoreRefreshListener extends ReleasableRetryableRefresh
8485

8586
public static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
8687

88+
private final ActiveMergesSegmentRegistry activeMergesSegmentRegistry = ActiveMergesSegmentRegistry.getInstance();
8789
private final IndexShard indexShard;
8890
private final Directory storeDirectory;
8991
private final RemoteSegmentStoreDirectory remoteDirectory;
@@ -258,6 +260,8 @@ private boolean syncSegments() {
258260
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
259261
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);
260262

263+
remoteDirectory.syncSegmentsUploadedToRemoteStoreWithActiveMergesSegmentRegistry(storeDirectory, localSegmentsPostRefresh);
264+
261265
// Create a map of file name to size and update the refresh segment tracker
262266
Map<String, Long> localSegmentsSizeMap = updateLocalSizeMapAndTracker(localSegmentsPostRefresh).entrySet()
263267
.stream()

server/src/main/java/org/opensearch/index/store/RemoteDirectory.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.opensearch.core.action.ActionListener;
3333
import org.opensearch.core.common.unit.ByteSizeUnit;
3434
import org.opensearch.index.store.exception.ChecksumCombinationException;
35+
import org.opensearch.indices.replication.ActiveMergesSegmentRegistry;
3536

3637
import java.io.FileNotFoundException;
3738
import java.io.IOException;
@@ -67,7 +68,7 @@ public class RemoteDirectory extends Directory {
6768

6869
private final UnaryOperator<OffsetRangeInputStream> lowPriorityUploadRateLimiter;
6970

70-
private final UnaryOperator<InputStream> downloadRateLimiter;
71+
private final DownloadRateLimiterProvider downloadRateLimiterProvider;
7172

7273
/**
7374
* Number of bytes in the segment file to store checksum
@@ -79,19 +80,20 @@ public BlobContainer getBlobContainer() {
7980
}
8081

8182
public RemoteDirectory(BlobContainer blobContainer) {
82-
this(blobContainer, UnaryOperator.identity(), UnaryOperator.identity(), UnaryOperator.identity());
83+
this(blobContainer, UnaryOperator.identity(), UnaryOperator.identity(), UnaryOperator.identity(), UnaryOperator.identity());
8384
}
8485

8586
public RemoteDirectory(
8687
BlobContainer blobContainer,
8788
UnaryOperator<OffsetRangeInputStream> uploadRateLimiter,
8889
UnaryOperator<OffsetRangeInputStream> lowPriorityUploadRateLimiter,
89-
UnaryOperator<InputStream> downloadRateLimiter
90+
UnaryOperator<InputStream> downloadRateLimiter,
91+
UnaryOperator<InputStream> lowPriorityDownloadRateLimiter
9092
) {
9193
this.blobContainer = blobContainer;
9294
this.lowPriorityUploadRateLimiter = lowPriorityUploadRateLimiter;
9395
this.uploadRateLimiter = uploadRateLimiter;
94-
this.downloadRateLimiter = downloadRateLimiter;
96+
this.downloadRateLimiterProvider = new DownloadRateLimiterProvider(downloadRateLimiter, lowPriorityDownloadRateLimiter);
9597
}
9698

9799
/**
@@ -236,7 +238,7 @@ public IndexInput openInput(String name, long fileLength, IOContext context) thr
236238
InputStream inputStream = null;
237239
try {
238240
inputStream = blobContainer.readBlob(name);
239-
return new RemoteIndexInput(name, downloadRateLimiter.apply(inputStream), fileLength);
241+
return new RemoteIndexInput(name, downloadRateLimiterProvider.get(name).apply(inputStream), fileLength);
240242
} catch (Exception e) {
241243
// In case the RemoteIndexInput creation fails, close the input stream to avoid file handler leak.
242244
if (inputStream != null) {
@@ -271,7 +273,12 @@ public void close() throws IOException {
271273
public long fileLength(String name) throws IOException {
272274
// ToDo: Instead of calling remote store each time, keep a cache with segment metadata
273275
List<BlobMetadata> metadata = blobContainer.listBlobsByPrefixInSortedOrder(name, 1, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC);
276+
logger.info("{}#fileLength blob metadatas", getClass().getName());
277+
metadata.forEach(x -> {
278+
logger.info(x);
279+
});
274280
if (metadata.size() == 1 && metadata.get(0).name().equals(name)) {
281+
logger.info("{}#fileLength Returning {}", getClass().getName(), metadata.get(0).length());
275282
return metadata.get(0).length();
276283
}
277284
throw new NoSuchFileException(name);
@@ -475,8 +482,27 @@ private IndexInput getBlockInput(String name, long position, long length, long f
475482
byte[] bytes;
476483
try (InputStream inputStream = blobContainer.readBlob(name, position, length)) {
477484
// TODO - Explore how we can buffer small chunks of data instead of having the whole 8MB block in memory
478-
bytes = downloadRateLimiter.apply(inputStream).readAllBytes();
485+
bytes = downloadRateLimiterProvider.get(name).apply(inputStream).readAllBytes();
479486
}
480487
return new ByteArrayIndexInput(name, bytes);
481488
}
489+
490+
private class DownloadRateLimiterProvider {
491+
private final ActiveMergesSegmentRegistry activeMergesSegmentRegistry = ActiveMergesSegmentRegistry.getInstance();
492+
493+
private final UnaryOperator<InputStream> downloadRateLimiter;
494+
private final UnaryOperator<InputStream> lowPriorityDownloadRateLimiter;
495+
496+
DownloadRateLimiterProvider(UnaryOperator<InputStream> downloadRateLimiter, UnaryOperator<InputStream> lowPriorityDownloadRateLimiter) {
497+
this.downloadRateLimiter = downloadRateLimiter;
498+
this.lowPriorityDownloadRateLimiter = lowPriorityDownloadRateLimiter;
499+
}
500+
501+
public UnaryOperator<InputStream> get(final String filename){
502+
if (activeMergesSegmentRegistry.contains(filename)) {
503+
return lowPriorityDownloadRateLimiter;
504+
}
505+
return downloadRateLimiter;
506+
}
507+
}
482508
}

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
4040
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
4141
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandlerFactory;
42+
import org.opensearch.indices.replication.ActiveMergesSegmentRegistry;
4243
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
4344
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
4445
import org.opensearch.threadpool.ThreadPool;
@@ -97,6 +98,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement
9798

9899
private final ThreadPool threadPool;
99100

101+
private final ActiveMergesSegmentRegistry activeMergesSegmentRegistry = ActiveMergesSegmentRegistry.getInstance();
102+
100103
/**
101104
* Keeps track of local segment filename to uploaded filename along with other attributes like checksum.
102105
* This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time.
@@ -354,6 +357,10 @@ public String getOriginalFilename() {
354357
return originalFilename;
355358
}
356359

360+
public String getUploadedFilename() {
361+
return uploadedFilename;
362+
}
363+
357364
public void setWrittenByMajor(int writtenByMajor) {
358365
if (writtenByMajor <= Version.LATEST.major && writtenByMajor >= Version.MIN_SUPPORTED_MAJOR) {
359366
this.writtenByMajor = writtenByMajor;
@@ -369,6 +376,8 @@ public void setWrittenByMajor(int writtenByMajor) {
369376
);
370377
}
371378
}
379+
380+
public static UploadedSegmentMetadata EMPTY = new UploadedSegmentMetadata(null, null, null, -1);
372381
}
373382

374383
/**
@@ -500,12 +509,15 @@ public long fileLength(String name) throws IOException {
500509
if (segmentsUploadedToRemoteStore.containsKey(name)) {
501510
return segmentsUploadedToRemoteStore.get(name).getLength();
502511
}
512+
if (isRemoteStoreFileName(name)){
513+
return remoteDataDirectory.fileLength(name);
514+
}
503515
String remoteFilename = getExistingRemoteFilename(name);
504516
if (remoteFilename != null) {
505517
return remoteDataDirectory.fileLength(remoteFilename);
506-
} else {
507-
throw new NoSuchFileException(name);
508518
}
519+
throw new NoSuchFileException(name);
520+
509521
}
510522

511523
/**
@@ -530,7 +542,7 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti
530542
*/
531543
@Override
532544
public IndexInput openInput(String name, IOContext context) throws IOException {
533-
String remoteFilename = getExistingRemoteFilename(name);
545+
String remoteFilename = isRemoteStoreFileName(name) ? name : getExistingRemoteFilename(name);
534546
long fileLength = fileLength(name);
535547
if (remoteFilename != null) {
536548
return remoteDataDirectory.openInput(remoteFilename, fileLength, context);
@@ -669,7 +681,42 @@ String getMetadataFileForCommit(long primaryTerm, long generation) throws IOExce
669681

670682
private void postUpload(Directory from, String src, String remoteFilename, String checksum) throws IOException {
671683
UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum, from.fileLength(src));
684+
if(activeMergesSegmentRegistry.contains(src)){
685+
logger.info("[{}] Calling updateRemoteSegmentFileName for {} {}", Thread.currentThread().getName(), src, remoteFilename);
686+
activeMergesSegmentRegistry.updateMetadata(src, segmentMetadata);
687+
return;
688+
}
672689
segmentsUploadedToRemoteStore.put(src, segmentMetadata);
690+
691+
}
692+
693+
public void syncSegmentsUploadedToRemoteStoreWithActiveMergesSegmentRegistry(Directory directory, Collection<String> segments) {
694+
logger.info("At syncSegmentsUploadedToRemoteStoreWithActiveMergesSegmentRegistry. SEGMENTMETADATAMAP");
695+
logger.info(activeMergesSegmentRegistry.segmentMetadataMap());
696+
segments.forEach(segment -> {
697+
try {
698+
if (activeMergesSegmentRegistry.contains(segment) == true) {
699+
// TODO@kheraadi: Do we need to compare the checksum here?
700+
String localChecksum = getChecksumOfLocalFile(directory, segment);
701+
UploadedSegmentMetadata metadata = activeMergesSegmentRegistry.getMetadata(segment);
702+
String storedChecksum = metadata.getChecksum();
703+
if (localChecksum.equals(storedChecksum)) {
704+
segmentsUploadedToRemoteStore.put(segment, metadata);
705+
} else {
706+
// No-op, the segment file will be uploaded to the remote store again with
707+
// a different name. GC will clean up the older file in time.
708+
}
709+
activeMergesSegmentRegistry.unregister(segment);
710+
}
711+
} catch (IOException e) {
712+
logger.error("Exception while updating segmentsUploadedToRemoteStore for segment {}", segment, e);
713+
}
714+
});
715+
}
716+
717+
private boolean isRemoteStoreFileName(String name) {
718+
// TODO@kheraadi: Do we have a better way to check this?
719+
return name.contains(SEGMENT_NAME_UUID_SEPARATOR);
673720
}
674721

675722
/**
@@ -988,6 +1035,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
9881035
staleSegmentRemoteFilenames.stream()
9891036
.filter(file -> activeSegmentRemoteFilenames.contains(file) == false)
9901037
.filter(file -> deletedSegmentFiles.contains(file) == false)
1038+
.filter(file -> activeMergesSegmentRegistry.canDelete(file) == true)
9911039
.forEach(file -> {
9921040
try {
9931041
remoteDataDirectory.deleteFile(file);

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
8585
blobStoreRepository.blobStore().blobContainer(dataPath),
8686
blobStoreRepository::maybeRateLimitRemoteUploadTransfers,
8787
blobStoreRepository::maybeRateLimitLowPriorityRemoteUploadTransfers,
88-
blobStoreRepository::maybeRateLimitRemoteDownloadTransfers
88+
blobStoreRepository::maybeRateLimitRemoteDownloadTransfers,
89+
blobStoreRepository::maybeRateLimitLowPriorityDownloadTransfers
8990
);
9091

9192
RemoteStorePathStrategy.ShardDataPathInput mdPathInput = RemoteStorePathStrategy.ShardDataPathInput.builder()

0 commit comments

Comments
 (0)