Skip to content

Commit 628d4c8

Browse files
committed
Unit tests
1 parent aa74e47 commit 628d4c8

File tree

9 files changed

+642
-19
lines changed

9 files changed

+642
-19
lines changed
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package org.opensearch.indices.replication;
2+
3+
import org.junit.After;
4+
import org.junit.Before;
5+
import org.junit.BeforeClass;
6+
import org.junit.Test;
7+
import org.junit.runner.OrderWith;
8+
import org.opensearch.index.store.RemoteSegmentStoreDirectory.UploadedSegmentMetadata;
9+
10+
import java.util.Map;
11+
import java.util.concurrent.CountDownLatch;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
import java.util.concurrent.TimeUnit;
15+
16+
import static org.junit.Assert.*;
17+
import static org.mockito.Mockito.*;
18+
19+
public class ActiveMergesSegmentRegistryTests {
20+
21+
private ActiveMergesSegmentRegistry registry;
22+
private UploadedSegmentMetadata mockMetadata;
23+
24+
@Before
25+
public void setUp() {
26+
registry = ActiveMergesSegmentRegistry.getInstance();
27+
// Clear registry state before each test
28+
clearRegistry();
29+
30+
mockMetadata = mock(UploadedSegmentMetadata.class);
31+
when(mockMetadata.getUploadedFilename()).thenReturn("remote_segment_1.si");
32+
}
33+
34+
private void clearRegistry() {
35+
// Clear all registered segments
36+
Map<String, UploadedSegmentMetadata> metadataMap = registry.segmentMetadataMap();
37+
metadataMap.keySet().forEach(registry::unregister);
38+
registry.filenameRegistry.clear();
39+
}
40+
41+
@Test
42+
public void testSingletonInstance() {
43+
ActiveMergesSegmentRegistry instance1 = ActiveMergesSegmentRegistry.getInstance();
44+
ActiveMergesSegmentRegistry instance2 = ActiveMergesSegmentRegistry.getInstance();
45+
assertSame(instance1, instance2);
46+
}
47+
48+
@Test
49+
public void testRegisterSegment() {
50+
String filename = "segment_1.si";
51+
registry.register(filename);
52+
assertTrue(registry.contains(filename));
53+
}
54+
55+
@Test(expected = IllegalArgumentException.class)
56+
public void testRegisterDuplicateSegment() {
57+
String filename = "segment_1.si";
58+
registry.register(filename);
59+
registry.register(filename); // Should throw exception
60+
}
61+
62+
@Test
63+
public void testUpdateMetadata() {
64+
String filename = "segment_1.si";
65+
registry.register(filename);
66+
registry.updateMetadata(filename, mockMetadata);
67+
68+
assertEquals(mockMetadata, registry.getMetadata(filename));
69+
assertTrue(registry.contains("remote_segment_1.si"));
70+
}
71+
72+
@Test(expected = IllegalArgumentException.class)
73+
public void testUpdateMetadataUnregisteredSegment() {
74+
registry.updateMetadata("unregistered_segment.si", mockMetadata);
75+
}
76+
77+
@Test
78+
public void testUnregisterSegment() {
79+
String filename = "segment_1.si";
80+
registry.register(filename);
81+
registry.updateMetadata(filename, mockMetadata);
82+
83+
registry.unregister(filename);
84+
85+
assertFalse(registry.contains(filename));
86+
assertFalse(registry.contains("remote_segment_1.si"));
87+
assertNull(registry.getMetadata(filename));
88+
}
89+
90+
@Test
91+
public void testUnregisterNonExistentSegment() {
92+
// Should not throw exception
93+
registry.unregister("non_existent.si");
94+
}
95+
96+
@Test
97+
public void testGetExistingRemoteSegmentFilename() {
98+
String filename = "segment_1.si";
99+
registry.register(filename);
100+
registry.updateMetadata(filename, mockMetadata);
101+
102+
assertEquals("remote_segment_1.si", registry.getExistingRemoteSegmentFilename(filename));
103+
}
104+
105+
@Test(expected = IllegalArgumentException.class)
106+
public void testGetExistingRemoteSegmentFilenameNoMetadata() {
107+
String filename = "segment_1.si";
108+
registry.register(filename);
109+
registry.getExistingRemoteSegmentFilename(filename); // Metadata not available
110+
}
111+
112+
@Test
113+
public void testCanDelete() {
114+
String filename = "segment_1.si";
115+
assertTrue(registry.canDelete(filename)); // Not registered
116+
117+
registry.register(filename);
118+
assertFalse(registry.canDelete(filename)); // Registered
119+
120+
registry.unregister(filename);
121+
assertTrue(registry.canDelete(filename)); // Unregistered
122+
}
123+
124+
@Test
125+
public void testConcurrentAccess() throws InterruptedException {
126+
int threadCount = 10;
127+
int operationsPerThread = 100;
128+
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
129+
CountDownLatch latch = new CountDownLatch(threadCount);
130+
131+
for (int i = 0; i < threadCount; i++) {
132+
final int threadId = i;
133+
executor.submit(() -> {
134+
try {
135+
for (int j = 0; j < operationsPerThread; j++) {
136+
String filename = "segment_" + threadId + "_" + j + ".si";
137+
String remoteFilename = "remote_" + filename;
138+
UploadedSegmentMetadata metadata = mock(UploadedSegmentMetadata.class);
139+
when(metadata.getUploadedFilename()).thenReturn(remoteFilename);
140+
registry.register(filename);
141+
assertTrue(registry.contains(filename));
142+
registry.updateMetadata(filename, metadata);
143+
assertEquals(registry.getExistingRemoteSegmentFilename(filename), remoteFilename);
144+
registry.unregister(filename);
145+
assertFalse(registry.contains(filename));
146+
}
147+
} finally {
148+
latch.countDown();
149+
}
150+
});
151+
}
152+
153+
assertTrue(latch.await(30, TimeUnit.SECONDS));
154+
executor.shutdown();
155+
}
156+
157+
@Test
158+
public void testMultipleSegmentsLifecycle() {
159+
String[] filenames = {"seg1.si", "seg2.si", "seg3.si"};
160+
UploadedSegmentMetadata[] metadatas = new UploadedSegmentMetadata[3];
161+
162+
// Setup mocks
163+
for (int i = 0; i < 3; i++) {
164+
metadatas[i] = mock(UploadedSegmentMetadata.class);
165+
when(metadatas[i].getUploadedFilename()).thenReturn("remote_" + filenames[i]);
166+
}
167+
168+
// Register all
169+
for (String filename : filenames) {
170+
registry.register(filename);
171+
assertTrue(registry.contains(filename));
172+
}
173+
174+
// Update metadata
175+
for (int i = 0; i < 3; i++) {
176+
registry.updateMetadata(filenames[i], metadatas[i]);
177+
assertEquals(metadatas[i], registry.getMetadata(filenames[i]));
178+
}
179+
180+
// Verify all are tracked
181+
assertEquals(3, registry.segmentMetadataMap().size());
182+
183+
// Unregister one
184+
registry.unregister(filenames[1]);
185+
assertFalse(registry.contains(filenames[1]));
186+
assertEquals(2, registry.segmentMetadataMap().size());
187+
188+
// Others still exist
189+
assertTrue(registry.contains(filenames[0]));
190+
assertTrue(registry.contains(filenames[2]));
191+
}
192+
}

server/src/test/java/org/opensearch/indices/replication/MergedSegmentReplicationTargetTests.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.store.ByteBuffersDataOutput;
1414
import org.apache.lucene.store.ByteBuffersIndexOutput;
1515
import org.apache.lucene.util.Version;
16+
import org.junit.Test;
1617
import org.opensearch.OpenSearchCorruptionException;
1718
import org.opensearch.cluster.metadata.IndexMetadata;
1819
import org.opensearch.common.settings.Settings;
@@ -22,7 +23,8 @@
2223
import org.opensearch.index.shard.IndexShard;
2324
import org.opensearch.index.shard.IndexShardTestCase;
2425
import org.opensearch.index.store.StoreFileMetadata;
25-
import org.opensearch.indices.replication.checkpoint.MergeSegmentCheckpoint;
26+
import org.opensearch.indices.replication.checkpoint.MergedSegmentCheckpoint;
27+
import org.opensearch.indices.replication.checkpoint.RemoteStoreMergedSegmentCheckpoint;
2628
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
2729
import org.opensearch.indices.replication.common.ReplicationFailedException;
2830
import org.opensearch.indices.replication.common.ReplicationType;
@@ -41,7 +43,8 @@ public class MergedSegmentReplicationTargetTests extends IndexShardTestCase {
4143

4244
private MergedSegmentReplicationTarget mergedSegmentReplicationTarget;
4345
private IndexShard indexShard, spyIndexShard;
44-
private MergeSegmentCheckpoint mergedSegment;
46+
private MergedSegmentCheckpoint mergedSegmentCheckpoint;
47+
private RemoteStoreMergedSegmentCheckpoint remoteStoreMergedSegmentCheckpoint;
4548
private ByteBuffersDataOutput buffer;
4649

4750
private static final String SEGMENT_NAME = "_0.si";
@@ -71,17 +74,26 @@ public void setUp() throws Exception {
7174
try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) {
7275
testSegmentInfos.write(indexOutput);
7376
}
74-
mergedSegment = new MergeSegmentCheckpoint(
77+
mergedSegmentCheckpoint = new MergedSegmentCheckpoint(
7578
spyIndexShard.shardId(),
7679
spyIndexShard.getPendingPrimaryTerm(),
7780
1,
7881
indexShard.getLatestReplicationCheckpoint().getCodec(),
7982
SI_SNAPSHOT,
8083
IndexFileNames.parseSegmentName(SEGMENT_NAME)
8184
);
85+
remoteStoreMergedSegmentCheckpoint = new RemoteStoreMergedSegmentCheckpoint(
86+
spyIndexShard.shardId(),
87+
spyIndexShard.getPendingPrimaryTerm(),
88+
1,
89+
indexShard.getLatestReplicationCheckpoint().getCodec(),
90+
SI_SNAPSHOT,
91+
IndexFileNames.parseSegmentName(SEGMENT_NAME),
92+
null
93+
);
8294
}
8395

84-
public void testSuccessfulResponse_startReplication() {
96+
private void testSuccessfulResponse_startReplication(MergedSegmentCheckpoint checkpointMergedSegment) {
8597

8698
SegmentReplicationSource segrepSource = new TestReplicationSource() {
8799
@Override
@@ -119,7 +131,7 @@ public void getMergedSegmentFiles(
119131
SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock(
120132
SegmentReplicationTargetService.SegmentReplicationListener.class
121133
);
122-
mergedSegmentReplicationTarget = new MergedSegmentReplicationTarget(spyIndexShard, mergedSegment, segrepSource, segRepListener);
134+
mergedSegmentReplicationTarget = new MergedSegmentReplicationTarget(spyIndexShard, checkpointMergedSegment, segrepSource, segRepListener);
123135

124136
mergedSegmentReplicationTarget.startReplication(new ActionListener<Void>() {
125137
@Override
@@ -133,12 +145,12 @@ public void onFailure(Exception e) {
133145
Assert.fail();
134146
}
135147
}, (ReplicationCheckpoint checkpoint, IndexShard indexShard) -> {
136-
assertEquals(mergedSegment, checkpoint);
148+
assertEquals(mergedSegmentCheckpoint, checkpoint);
137149
assertEquals(indexShard, spyIndexShard);
138150
});
139151
}
140152

141-
public void testFailureResponse_getMergedSegmentFiles() {
153+
private void testFailureResponse_getMergedSegmentFiles(MergedSegmentCheckpoint checkpointMergedSegment) {
142154

143155
Exception exception = new Exception("dummy failure");
144156
SegmentReplicationSource segrepSource = new TestReplicationSource() {
@@ -174,7 +186,7 @@ public void getMergedSegmentFiles(
174186
SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock(
175187
SegmentReplicationTargetService.SegmentReplicationListener.class
176188
);
177-
mergedSegmentReplicationTarget = new MergedSegmentReplicationTarget(spyIndexShard, mergedSegment, segrepSource, segRepListener);
189+
mergedSegmentReplicationTarget = new MergedSegmentReplicationTarget(spyIndexShard, checkpointMergedSegment, segrepSource, segRepListener);
178190

179191
mergedSegmentReplicationTarget.startReplication(new ActionListener<Void>() {
180192
@Override
@@ -190,7 +202,7 @@ public void onFailure(Exception e) {
190202
}, mock(BiConsumer.class));
191203
}
192204

193-
public void testFailure_differentSegmentFiles() throws IOException {
205+
private void testFailure_differentSegmentFiles(MergedSegmentCheckpoint checkpointMergedSegment) throws IOException {
194206

195207
SegmentReplicationSource segrepSource = new TestReplicationSource() {
196208
@Override
@@ -225,7 +237,7 @@ public void getMergedSegmentFiles(
225237
SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock(
226238
SegmentReplicationTargetService.SegmentReplicationListener.class
227239
);
228-
mergedSegmentReplicationTarget = new MergedSegmentReplicationTarget(spyIndexShard, mergedSegment, segrepSource, segRepListener);
240+
mergedSegmentReplicationTarget = new MergedSegmentReplicationTarget(spyIndexShard, checkpointMergedSegment, segrepSource, segRepListener);
229241
when(spyIndexShard.getSegmentMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT);
230242
mergedSegmentReplicationTarget.startReplication(new ActionListener<Void>() {
231243
@Override
@@ -242,6 +254,30 @@ public void onFailure(Exception e) {
242254
}, mock(BiConsumer.class));
243255
}
244256

257+
public void testFailure_differentSegmentFiles_remoteStoreEnabled() throws IOException {
258+
testFailure_differentSegmentFiles(remoteStoreMergedSegmentCheckpoint);
259+
}
260+
261+
public void testFailure_differentSegmentFiles() throws IOException {
262+
testFailure_differentSegmentFiles(mergedSegmentCheckpoint);
263+
}
264+
265+
public void testFailureResponse_getMergedSegmentFiles_remoteStoreEnabled() {
266+
testFailureResponse_getMergedSegmentFiles(remoteStoreMergedSegmentCheckpoint);
267+
}
268+
269+
public void testFailureResponse_getMergedSegmentFiles() {
270+
testFailureResponse_getMergedSegmentFiles(mergedSegmentCheckpoint);
271+
}
272+
273+
public void testSuccessfulResponse_startReplication_startReplication(){
274+
testSuccessfulResponse_startReplication(remoteStoreMergedSegmentCheckpoint);
275+
}
276+
277+
public void testSuccessfulResponse_startReplication() {
278+
testSuccessfulResponse_startReplication(mergedSegmentCheckpoint);
279+
}
280+
245281
@Override
246282
public void tearDown() throws Exception {
247283
super.tearDown();

server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.opensearch.index.shard.IndexShardTestCase;
2222
import org.opensearch.index.store.StoreFileMetadata;
2323
import org.opensearch.indices.recovery.RecoverySettings;
24-
import org.opensearch.indices.replication.checkpoint.MergeSegmentCheckpoint;
24+
import org.opensearch.indices.replication.checkpoint.MergedSegmentCheckpoint;
2525
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
2626
import org.opensearch.telemetry.tracing.noop.NoopTracer;
2727
import org.opensearch.test.ClusterServiceUtils;
@@ -136,7 +136,7 @@ public void testGetSegmentFiles() {
136136

137137
public void testGetMergedSegmentFiles() {
138138
StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", 1L, "checksum", Version.LATEST);
139-
final ReplicationCheckpoint checkpoint = new MergeSegmentCheckpoint(
139+
final ReplicationCheckpoint checkpoint = new MergedSegmentCheckpoint(
140140
indexShard.shardId(),
141141
PRIMARY_TERM,
142142
1,
@@ -195,7 +195,7 @@ public void testTransportTimeoutForGetSegmentFilesAction() {
195195
public void testTransportTimeoutForGetMergedSegmentFilesAction() {
196196
long fileSize = (long) (Math.pow(10, 9));
197197
StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", fileSize, "checksum", Version.LATEST);
198-
final ReplicationCheckpoint checkpoint = new MergeSegmentCheckpoint(
198+
final ReplicationCheckpoint checkpoint = new MergedSegmentCheckpoint(
199199
indexShard.shardId(),
200200
PRIMARY_TERM,
201201
1,

server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishMergedSegmentActionTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void testPublishMergedSegment() {
129129
mockTargetService
130130
);
131131

132-
final MergeSegmentCheckpoint checkpoint = new MergeSegmentCheckpoint(
132+
final MergedSegmentCheckpoint checkpoint = new MergedSegmentCheckpoint(
133133
indexShard.shardId(),
134134
1,
135135
1111,
@@ -168,7 +168,7 @@ public void testPublishMergedSegmentActionOnPrimary() {
168168
mockTargetService
169169
);
170170

171-
final MergeSegmentCheckpoint checkpoint = new MergeSegmentCheckpoint(
171+
final MergedSegmentCheckpoint checkpoint = new MergedSegmentCheckpoint(
172172
indexShard.shardId(),
173173
1,
174174
1111,
@@ -212,7 +212,7 @@ public void testPublishMergedSegmentActionOnReplica() {
212212
mockTargetService
213213
);
214214

215-
final MergeSegmentCheckpoint checkpoint = new MergeSegmentCheckpoint(
215+
final MergedSegmentCheckpoint checkpoint = new MergedSegmentCheckpoint(
216216
indexShard.shardId(),
217217
1,
218218
1111,
@@ -263,7 +263,7 @@ public void testPublishMergedSegmentActionOnDocrepReplicaDuringMigration() {
263263
mockTargetService
264264
);
265265

266-
final MergeSegmentCheckpoint checkpoint = new MergeSegmentCheckpoint(
266+
final MergedSegmentCheckpoint checkpoint = new MergedSegmentCheckpoint(
267267
indexShard.shardId(),
268268
1,
269269
1111,

0 commit comments

Comments
 (0)