Skip to content

Commit 0b4b8a4

Browse files
committed
Added test for Timeout case for RemoteStoreReplicationSource
Signed-off-by: kh3ra <[email protected]>
1 parent 9e49316 commit 0b4b8a4

File tree

2 files changed

+73
-5
lines changed

2 files changed

+73
-5
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
public class MergedSegmentWarmerIT extends SegmentReplicationIT {
4646
@Override
4747
protected Settings nodeSettings(int nodeOrdinal) {
48-
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("logger.org.opensearch.indices.replication", "TRACE").build();
48+
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build();
4949
}
5050

5151
@Override

server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,19 @@
1414
import org.opensearch.cluster.metadata.IndexMetadata;
1515
import org.opensearch.cluster.routing.ShardRouting;
1616
import org.opensearch.common.settings.Settings;
17+
import org.opensearch.common.unit.TimeValue;
1718
import org.opensearch.core.action.ActionListener;
1819
import org.opensearch.index.engine.InternalEngineFactory;
1920
import org.opensearch.index.engine.NRTReplicationEngineFactory;
2021
import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase;
2122
import org.opensearch.index.shard.IndexShard;
2223
import org.opensearch.index.shard.IndexShardState;
23-
import org.opensearch.index.shard.RemoteStoreRefreshListenerTests;
24+
import org.opensearch.index.shard.RemoteStoreRefreshListenerTests.TestFilterDirectory;
2425
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
26+
import org.opensearch.index.store.RemoteSegmentStoreDirectory.UploadedSegmentMetadata;
2527
import org.opensearch.index.store.Store;
2628
import org.opensearch.index.store.StoreFileMetadata;
29+
import org.opensearch.indices.recovery.RecoverySettings;
2730
import org.opensearch.indices.replication.checkpoint.MergedSegmentCheckpoint;
2831
import org.opensearch.indices.replication.checkpoint.RemoteStoreMergedSegmentCheckpoint;
2932
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
@@ -38,10 +41,12 @@
3841
import java.util.Map;
3942
import java.util.concurrent.CountDownLatch;
4043
import java.util.concurrent.ExecutionException;
44+
import java.util.concurrent.TimeoutException;
4145
import java.util.concurrent.atomic.AtomicReference;
4246
import java.util.stream.Collectors;
4347

4448
import static org.mockito.Mockito.mock;
49+
import static org.mockito.Mockito.spy;
4550
import static org.mockito.Mockito.when;
4651

4752
public class RemoteStoreReplicationSourceTests extends OpenSearchIndexLevelReplicationTestCase {
@@ -190,8 +195,7 @@ public void testGetMergedSegmentFiles() throws IOException, ExecutionException,
190195
final ReplicationCheckpoint checkpoint = primaryShard.getLatestReplicationCheckpoint();
191196
Map<String, String> localToRemoteFilenameMap = new HashMap<>() {
192197
{
193-
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> segmentsUploadedToRemoteStore = primaryShard
194-
.getRemoteDirectory()
198+
Map<String, UploadedSegmentMetadata> segmentsUploadedToRemoteStore = primaryShard.getRemoteDirectory()
195199
.getSegmentsUploadedToRemoteStore();
196200
segmentsUploadedToRemoteStore.forEach((segment, metadata) -> {
197201
if (segment.startsWith("segments_") == false) put(segment, metadata.getUploadedFilename());
@@ -228,6 +232,70 @@ public void testGetMergedSegmentFiles() throws IOException, ExecutionException,
228232
closeShards(replicaShard);
229233
}
230234

235+
public void testGetMergedSegmentFilesDownloadTimeout() throws IOException, ExecutionException, InterruptedException {
236+
final ReplicationCheckpoint checkpoint = primaryShard.getLatestReplicationCheckpoint();
237+
Map<String, String> localToRemoteFilenameMap = new HashMap<>() {
238+
{
239+
Map<String, UploadedSegmentMetadata> segmentsUploadedToRemoteStore = primaryShard.getRemoteDirectory()
240+
.getSegmentsUploadedToRemoteStore();
241+
segmentsUploadedToRemoteStore.forEach((segment, metadata) -> {
242+
if (segment.startsWith("segments_") == false) put(segment, metadata.getUploadedFilename());
243+
});
244+
}
245+
};
246+
replicaShard.getRemoteDirectory().markPendingMergedSegmentsDownload(localToRemoteFilenameMap);
247+
RemoteStoreMergedSegmentCheckpoint mergedSegmentCheckpoint = new RemoteStoreMergedSegmentCheckpoint(
248+
new MergedSegmentCheckpoint(
249+
replicaShard.shardId(),
250+
primaryTerm,
251+
checkpoint.getSegmentInfosVersion(),
252+
checkpoint.getLength(),
253+
checkpoint.getCodec(),
254+
checkpoint.getMetadataMap(),
255+
"_0"
256+
),
257+
localToRemoteFilenameMap
258+
);
259+
List<StoreFileMetadata> filesToFetch = new ArrayList<>(primaryShard.getSegmentMetadataMap().values());
260+
replicationSource = new RemoteStoreReplicationSource(primaryShard);
261+
IndexShard replica = spy(replicaShard);
262+
RecoverySettings mockRecoverySettings = mock(RecoverySettings.class);
263+
when(mockRecoverySettings.getMergedSegmentReplicationTimeout()).thenReturn(TimeValue.ZERO);
264+
when(replica.getRecoverySettings()).thenReturn(mockRecoverySettings);
265+
AtomicReference<Exception> failureRef = new AtomicReference<>();
266+
CountDownLatch latch = new CountDownLatch(1);
267+
268+
ActionListener<GetSegmentFilesResponse> listener = new ActionListener<>() {
269+
@Override
270+
public void onResponse(GetSegmentFilesResponse response) {
271+
fail("Expected onFailure to be called");
272+
}
273+
274+
@Override
275+
public void onFailure(Exception e) {
276+
failureRef.set(e);
277+
latch.countDown();
278+
}
279+
};
280+
replicationSource.getMergedSegmentFiles(
281+
REPLICATION_ID,
282+
mergedSegmentCheckpoint,
283+
filesToFetch,
284+
replica,
285+
(fileName, bytesRecovered) -> {},
286+
listener
287+
);
288+
latch.await();
289+
assertNotNull("onFailure should have been called", failureRef.get());
290+
Exception observedException = failureRef.get();
291+
assertTrue(observedException instanceof TimeoutException);
292+
assertTrue(
293+
observedException.getMessage() != null
294+
&& observedException.getMessage().equals("Timed out waiting for merged segments download from remote store")
295+
);
296+
closeShards(replicaShard);
297+
}
298+
231299
public void testGetMergedSegmentFilesFailure() throws IOException, ExecutionException, InterruptedException {
232300
// Testing failure scenario where segments are not a part of RemoteSegmentStoreDirectory.pendingMergedSegmentsDownloads
233301
final ReplicationCheckpoint checkpoint = primaryShard.getLatestReplicationCheckpoint();
@@ -285,7 +353,7 @@ private void buildIndexShardBehavior(IndexShard mockShard, IndexShard indexShard
285353
Store remoteStore = mock(Store.class);
286354
when(mockShard.remoteStore()).thenReturn(remoteStore);
287355
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate()).getDelegate();
288-
FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory(new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory));
356+
FilterDirectory remoteStoreFilterDirectory = new TestFilterDirectory(new TestFilterDirectory(remoteSegmentStoreDirectory));
289357
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);
290358
}
291359
}

0 commit comments

Comments
 (0)