1010
1111import org .apache .logging .log4j .Logger ;
1212import org .apache .logging .log4j .message .ParameterizedMessage ;
13- import org .apache .lucene .codecs .CodecUtil ;
1413import org .apache .lucene .index .CorruptIndexException ;
1514import org .apache .lucene .store .Directory ;
1615import org .apache .lucene .store .FilterDirectory ;
1716import org .apache .lucene .store .IOContext ;
18- import org .apache .lucene .store .IndexInput ;
1917import org .opensearch .action .support .GroupedActionListener ;
2018import org .opensearch .cluster .routing .RecoverySource ;
2119import org .opensearch .common .logging .Loggers ;
2220import org .opensearch .common .util .UploadListener ;
2321import org .opensearch .core .action .ActionListener ;
24- import org .opensearch .index .remote .RemoteSegmentTransferTracker ;
2522import org .opensearch .index .store .CompositeDirectory ;
2623import org .opensearch .index .store .RemoteSegmentStoreDirectory ;
2724
28- import java .io .IOException ;
2925import java .util .Collection ;
30- import java .util .HashMap ;
3126import java .util .Map ;
32- import java .util .Set ;
27+ import java .util .function . Function ;
3328
3429/**
3530 * The service essentially acts as a bridge between local segment storage and remote storage,
@@ -39,69 +34,56 @@ public class RemoteStoreUploaderService implements RemoteStoreUploader {
3934
4035 private final Logger logger ;
4136
42- public static final Set <String > EXCLUDE_FILES = Set .of ("write.lock" );
43-
4437 private final IndexShard indexShard ;
4538 private final Directory storeDirectory ;
4639 private final RemoteSegmentStoreDirectory remoteDirectory ;
47- private final Map <String , String > localSegmentChecksumMap ;
48- // todo: check if we need to create a separate segment tracker as it is said to be linked to RL
49- private final RemoteSegmentTransferTracker segmentTracker ;
5040
51- public RemoteStoreUploaderService (
52- IndexShard indexShard ,
53- Directory storeDirectory ,
54- RemoteSegmentStoreDirectory remoteDirectory ,
55- RemoteSegmentTransferTracker segmentTracker
56- ) {
57- this .indexShard = indexShard ;
58- // todo: check the prefix to be add for this class
41+ public RemoteStoreUploaderService (IndexShard indexShard , Directory storeDirectory , RemoteSegmentStoreDirectory remoteDirectory ) {
5942 logger = Loggers .getLogger (getClass (), indexShard .shardId ());
43+ this .indexShard = indexShard ;
6044 this .storeDirectory = storeDirectory ;
6145 this .remoteDirectory = remoteDirectory ;
62- this .segmentTracker = segmentTracker ;
63- this .localSegmentChecksumMap = new HashMap <>();
6446 }
6547
6648 @ Override
67- public void syncAndUploadNewSegments (
49+ public void uploadSegments (
6850 Collection <String > localSegments ,
6951 Map <String , Long > localSegmentsSizeMap ,
70- ActionListener <Void > listener
52+ ActionListener <Void > listener ,
53+ Function <Map <String , Long >, UploadListener > uploadListenerFunction
7154 ) {
72-
73- Collection <String > filteredFiles = localSegments .stream ().filter (file -> !skipUpload (file )).toList ();
74- if (filteredFiles .isEmpty ()) {
55+ if (localSegments .isEmpty ()) {
7556 logger .debug ("No new segments to upload in uploadNewSegments" );
7657 listener .onResponse (null );
7758 return ;
7859 }
7960
80- logger .debug ("Effective new segments files to upload {}" , filteredFiles );
61+ logger .debug ("Effective new segments files to upload {}" , localSegments );
8162 ActionListener <Collection <Void >> mappedListener = ActionListener .map (listener , resp -> null );
82- GroupedActionListener <Void > batchUploadListener = new GroupedActionListener <>(mappedListener , filteredFiles .size ());
63+ GroupedActionListener <Void > batchUploadListener = new GroupedActionListener <>(mappedListener , localSegments .size ());
8364 Directory directory = ((FilterDirectory ) (((FilterDirectory ) storeDirectory ).getDelegate ())).getDelegate ();
8465
85- for (String filteredFile : filteredFiles ) {
66+ for (String localSegment : localSegments ) {
8667 // Initializing listener here to ensure that the stats increment operations are thread-safe
87- UploadListener statsListener = createUploadListener (localSegmentsSizeMap );
68+ UploadListener statsListener = uploadListenerFunction . apply (localSegmentsSizeMap );
8869 ActionListener <Void > aggregatedListener = ActionListener .wrap (resp -> {
89- statsListener .onSuccess (filteredFile );
70+ statsListener .onSuccess (localSegment );
9071 batchUploadListener .onResponse (resp );
72+ // Once uploaded to Remote, local files become eligible for eviction from FileCache
9173 if (directory instanceof CompositeDirectory ) {
92- ((CompositeDirectory ) directory ).afterSyncToRemote (filteredFile );
74+ ((CompositeDirectory ) directory ).afterSyncToRemote (localSegment );
9375 }
9476 }, ex -> {
9577 logger .warn (() -> new ParameterizedMessage ("Exception: [{}] while uploading segment files" , ex ), ex );
9678 if (ex instanceof CorruptIndexException ) {
9779 indexShard .failShard (ex .getMessage (), ex );
9880 }
99- statsListener .onFailure (filteredFile );
81+ statsListener .onFailure (localSegment );
10082 batchUploadListener .onFailure (ex );
10183 });
102- statsListener .beforeUpload (filteredFile );
84+ statsListener .beforeUpload (localSegment );
10385 // Place where the actual upload is happening
104- remoteDirectory .copyFrom (storeDirectory , filteredFile , IOContext .DEFAULT , aggregatedListener , isLowPriorityUpload ());
86+ remoteDirectory .copyFrom (storeDirectory , localSegment , IOContext .DEFAULT , aggregatedListener , isLowPriorityUpload ());
10587 }
10688 }
10789
@@ -119,67 +101,4 @@ boolean isLocalOrSnapshotRecoveryOrSeeding() {
119101 || indexShard .recoveryState ().getRecoverySource ().getType () == RecoverySource .Type .SNAPSHOT
120102 || indexShard .shouldSeedRemoteStore ());
121103 }
122-
123- /**
124- * Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events
125- *
126- * @param fileSizeMap updated map of current snapshot of local segments to their sizes
127- */
128- private UploadListener createUploadListener (Map <String , Long > fileSizeMap ) {
129- return new UploadListener () {
130- private long uploadStartTime = 0 ;
131-
132- @ Override
133- public void beforeUpload (String file ) {
134- // Start tracking the upload bytes started
135- segmentTracker .addUploadBytesStarted (fileSizeMap .get (file ));
136- uploadStartTime = System .currentTimeMillis ();
137- }
138-
139- @ Override
140- public void onSuccess (String file ) {
141- // Track upload success
142- segmentTracker .addUploadBytesSucceeded (fileSizeMap .get (file ));
143- segmentTracker .addToLatestUploadedFiles (file );
144- segmentTracker .addUploadTimeInMillis (Math .max (1 , System .currentTimeMillis () - uploadStartTime ));
145- }
146-
147- @ Override
148- public void onFailure (String file ) {
149- // Track upload failure
150- segmentTracker .addUploadBytesFailed (fileSizeMap .get (file ));
151- segmentTracker .addUploadTimeInMillis (Math .max (1 , System .currentTimeMillis () - uploadStartTime ));
152- }
153- };
154- }
155-
156- /**
157- * Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded.
158- *
159- * @param file that needs to be uploaded.
160- * @return true if the upload has to be skipped for the file.
161- */
162- private boolean skipUpload (String file ) {
163- try {
164- // Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded.
165- // todo: Check if we need the second condition or is it just fail safe
166- return EXCLUDE_FILES .contains (file ) || remoteDirectory .containsFile (file , getChecksumOfLocalFile (file ));
167- } catch (IOException e ) {
168- logger .error (
169- "Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file" ,
170- file
171- );
172- }
173- return false ;
174- }
175-
176- private String getChecksumOfLocalFile (String file ) throws IOException {
177- if (!localSegmentChecksumMap .containsKey (file )) {
178- try (IndexInput indexInput = storeDirectory .openInput (file , IOContext .READONCE )) {
179- String checksum = Long .toString (CodecUtil .retrieveChecksum (indexInput ));
180- localSegmentChecksumMap .put (file , checksum );
181- }
182- }
183- return localSegmentChecksumMap .get (file );
184- }
185104}
0 commit comments