Skip to content

Commit 21b5286

Browse files
Aditi Goyalkh3ra
authored andcommitted
Changes for the Upload Workflow for merged segments
1 parent cca6984 commit 21b5286

File tree

2 files changed

+207
-0
lines changed

2 files changed

+207
-0
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.shard;
10+
11+
import org.opensearch.core.action.ActionListener;
12+
13+
import java.util.Collection;
14+
import java.util.Map;
15+
16+
/**
17+
* Interface to handle the functionality for upload data in the remote store
18+
*/
19+
public interface RemoteStoreUploader {
20+
21+
void syncAndUploadNewSegments(Collection<String> localSegments, Map<String, Long> localSegmentsSizeMap, ActionListener<Void> listener);
22+
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.shard;
10+
11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.logging.log4j.message.ParameterizedMessage;
13+
import org.apache.lucene.codecs.CodecUtil;
14+
import org.apache.lucene.index.CorruptIndexException;
15+
import org.apache.lucene.store.Directory;
16+
import org.apache.lucene.store.FilterDirectory;
17+
import org.apache.lucene.store.IOContext;
18+
import org.apache.lucene.store.IndexInput;
19+
import org.opensearch.action.support.GroupedActionListener;
20+
import org.opensearch.cluster.routing.RecoverySource;
21+
import org.opensearch.common.logging.Loggers;
22+
import org.opensearch.common.util.UploadListener;
23+
import org.opensearch.core.action.ActionListener;
24+
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
25+
import org.opensearch.index.store.CompositeDirectory;
26+
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
27+
28+
import java.io.IOException;
29+
import java.util.Collection;
30+
import java.util.HashMap;
31+
import java.util.Map;
32+
import java.util.Set;
33+
34+
/**
35+
* The service essentially acts as a bridge between local segment storage and remote storage,
36+
* ensuring efficient and reliable segment synchronization while providing comprehensive monitoring and error handling.
37+
*/
38+
public class RemoteStoreUploaderService implements RemoteStoreUploader {
39+
40+
private final Logger logger;
41+
42+
public static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
43+
44+
private final IndexShard indexShard;
45+
private final Directory storeDirectory;
46+
private final RemoteSegmentStoreDirectory remoteDirectory;
47+
private final Map<String, String> localSegmentChecksumMap;
48+
// todo: check if we need to create a separate segment tracker as it is said to be linked to RL
49+
private final RemoteSegmentTransferTracker segmentTracker;
50+
51+
public RemoteStoreUploaderService(
52+
IndexShard indexShard,
53+
Directory storeDirectory,
54+
RemoteSegmentStoreDirectory remoteDirectory,
55+
RemoteSegmentTransferTracker segmentTracker
56+
) {
57+
this.indexShard = indexShard;
58+
// todo: check the prefix to be add for this class
59+
logger = Loggers.getLogger(getClass(), indexShard.shardId());
60+
this.storeDirectory = storeDirectory;
61+
this.remoteDirectory = remoteDirectory;
62+
this.segmentTracker = segmentTracker;
63+
this.localSegmentChecksumMap = new HashMap<>();
64+
}
65+
66+
@Override
67+
public void syncAndUploadNewSegments(
68+
Collection<String> localSegments,
69+
Map<String, Long> localSegmentsSizeMap,
70+
ActionListener<Void> listener
71+
) {
72+
73+
Collection<String> filteredFiles = localSegments.stream().filter(file -> !skipUpload(file)).toList();
74+
if (filteredFiles.isEmpty()) {
75+
logger.debug("No new segments to upload in uploadNewSegments");
76+
listener.onResponse(null);
77+
return;
78+
}
79+
80+
logger.debug("Effective new segments files to upload {}", filteredFiles);
81+
ActionListener<Collection<Void>> mappedListener = ActionListener.map(listener, resp -> null);
82+
GroupedActionListener<Void> batchUploadListener = new GroupedActionListener<>(mappedListener, filteredFiles.size());
83+
Directory directory = ((FilterDirectory) (((FilterDirectory) storeDirectory).getDelegate())).getDelegate();
84+
85+
for (String filteredFile : filteredFiles) {
86+
// Initializing listener here to ensure that the stats increment operations are thread-safe
87+
UploadListener statsListener = createUploadListener(localSegmentsSizeMap);
88+
ActionListener<Void> aggregatedListener = ActionListener.wrap(resp -> {
89+
statsListener.onSuccess(filteredFile);
90+
batchUploadListener.onResponse(resp);
91+
if (directory instanceof CompositeDirectory) {
92+
((CompositeDirectory) directory).afterSyncToRemote(filteredFile);
93+
}
94+
}, ex -> {
95+
logger.warn(() -> new ParameterizedMessage("Exception: [{}] while uploading segment files", ex), ex);
96+
if (ex instanceof CorruptIndexException) {
97+
indexShard.failShard(ex.getMessage(), ex);
98+
}
99+
statsListener.onFailure(filteredFile);
100+
batchUploadListener.onFailure(ex);
101+
});
102+
statsListener.beforeUpload(filteredFile);
103+
// Place where the actual upload is happening
104+
remoteDirectory.copyFrom(storeDirectory, filteredFile, IOContext.DEFAULT, aggregatedListener, isLowPriorityUpload());
105+
}
106+
}
107+
108+
boolean isLowPriorityUpload() {
109+
return isLocalOrSnapshotRecoveryOrSeeding();
110+
}
111+
112+
boolean isLocalOrSnapshotRecoveryOrSeeding() {
113+
// In this case when the primary mode is false, we need to upload segments to Remote Store
114+
// This is required in case of remote migration seeding/snapshots/shrink/ split/clone where we need to durable persist
115+
// all segments to remote before completing the recovery to ensure durability.
116+
return (indexShard.state() == IndexShardState.RECOVERING && indexShard.shardRouting.primary())
117+
&& indexShard.recoveryState() != null
118+
&& (indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
119+
|| indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT
120+
|| indexShard.shouldSeedRemoteStore());
121+
}
122+
123+
/**
124+
* Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events
125+
*
126+
* @param fileSizeMap updated map of current snapshot of local segments to their sizes
127+
*/
128+
private UploadListener createUploadListener(Map<String, Long> fileSizeMap) {
129+
return new UploadListener() {
130+
private long uploadStartTime = 0;
131+
132+
@Override
133+
public void beforeUpload(String file) {
134+
// Start tracking the upload bytes started
135+
segmentTracker.addUploadBytesStarted(fileSizeMap.get(file));
136+
uploadStartTime = System.currentTimeMillis();
137+
}
138+
139+
@Override
140+
public void onSuccess(String file) {
141+
// Track upload success
142+
segmentTracker.addUploadBytesSucceeded(fileSizeMap.get(file));
143+
segmentTracker.addToLatestUploadedFiles(file);
144+
segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime));
145+
}
146+
147+
@Override
148+
public void onFailure(String file) {
149+
// Track upload failure
150+
segmentTracker.addUploadBytesFailed(fileSizeMap.get(file));
151+
segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime));
152+
}
153+
};
154+
}
155+
156+
/**
157+
* Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded.
158+
*
159+
* @param file that needs to be uploaded.
160+
* @return true if the upload has to be skipped for the file.
161+
*/
162+
private boolean skipUpload(String file) {
163+
try {
164+
// Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded.
165+
// todo: Check if we need the second condition or is it just fail safe
166+
return EXCLUDE_FILES.contains(file) || remoteDirectory.containsFile(file, getChecksumOfLocalFile(file));
167+
} catch (IOException e) {
168+
logger.error(
169+
"Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file",
170+
file
171+
);
172+
}
173+
return false;
174+
}
175+
176+
private String getChecksumOfLocalFile(String file) throws IOException {
177+
if (!localSegmentChecksumMap.containsKey(file)) {
178+
try (IndexInput indexInput = storeDirectory.openInput(file, IOContext.READONCE)) {
179+
String checksum = Long.toString(CodecUtil.retrieveChecksum(indexInput));
180+
localSegmentChecksumMap.put(file, checksum);
181+
}
182+
}
183+
return localSegmentChecksumMap.get(file);
184+
}
185+
}

0 commit comments

Comments
 (0)