Skip to content

Commit 35e20bb

Browse files
kh3ravinaykpud
authored andcommitted
[merged segment warmer] support remote merged segment warmer (opensearch-project#18683)
* Changes to support upload and download of merge segments using the IndexWriter.IndexWarmer.warm() flow Signed-off-by: kh3ra <[email protected]> * Unit tests Signed-off-by: kh3ra <[email protected]> * Fixing build issues Signed-off-by: kh3ra <[email protected]> * Fixing build issues - forbiddenAPIs/spotlessApply Signed-off-by: kh3ra <[email protected]> * Upload merge segments in low priority, minor fixes Signed-off-by: kh3ra <[email protected]> * Addressing review comments + rebase main Signed-off-by: kh3ra <[email protected]> * Test fixes + javadocs Signed-off-by: kh3ra <[email protected]> * 1. Bug fix to RemoteDirectory.DownloadRateLimiterProvider 2. Fixes to tests Signed-off-by: kh3ra <[email protected]> * Bug fix to replica updates to ActiveMergesRegistry Signed-off-by: kh3ra <[email protected]> * Addressing review comments - round 2 Signed-off-by: kh3ra <[email protected]> * new tests + test fixes + minor bug fixes Signed-off-by: kh3ra <[email protected]> * Bug fix Signed-off-by: kh3ra <[email protected]> * Fixes to RemoteStorePublishMergedSegmentActionTests Signed-off-by: kh3ra <[email protected]> * Adding integration tests Signed-off-by: kh3ra <[email protected]> * Tracking stats for merged segment warmer Signed-off-by: kh3ra <[email protected]> * Revert "Tracking stats for merged segment warmer" This reverts commit 1476e22. Signed-off-by: kh3ra <[email protected]> * Addressing review comments for tests Signed-off-by: kh3ra <[email protected]> * Addressing review comments Signed-off-by: kh3ra <[email protected]> * Rebasing Signed-off-by: kh3ra <[email protected]> * spotlessApply Signed-off-by: kh3ra <[email protected]> * test fix Signed-off-by: kh3ra <[email protected]> * Empty commit Signed-off-by: kh3ra <[email protected]> * Adding tests, enhancing logs Signed-off-by: kh3ra <[email protected]> * Adding MergedSegmentWarmerFactory tests + enhancing existing tests Signed-off-by: kh3ra <[email protected]> * Applying spotless Signed-off-by: kh3ra <[email protected]> * Added test for Timeout case for RemoteStoreReplicationSource Signed-off-by: kh3ra <[email protected]> * Restored RemoteSegmentStoreDirectory PublicAPI + added changelog Signed-off-by: kh3ra <[email protected]> --------- Signed-off-by: kh3ra <[email protected]> Signed-off-by: kh3ra <[email protected]>
1 parent ff477f2 commit 35e20bb

File tree

34 files changed

+1757
-108
lines changed

34 files changed

+1757
-108
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4646
- Upgrade to protobufs 0.6.0 and clean up deprecated TermQueryProtoUtils code ([#18880](https://github.com/opensearch-project/OpenSearch/pull/18880))
4747
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))
4848
- APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722))
49+
- Added the core process for warming merged segments in remote-store enabled domains ([#18683](https://github.com/opensearch-project/OpenSearch/pull/18683))
4950

5051
### Changed
5152
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.replication;
10+
11+
import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest;
12+
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
13+
import org.opensearch.action.support.WriteRequest;
14+
import org.opensearch.action.support.replication.TransportReplicationAction;
15+
import org.opensearch.common.settings.Settings;
16+
import org.opensearch.common.util.FeatureFlags;
17+
import org.opensearch.index.IndexSettings;
18+
import org.opensearch.index.TieredMergePolicyProvider;
19+
import org.opensearch.indices.replication.checkpoint.RemoteStorePublishMergedSegmentRequest;
20+
import org.opensearch.test.OpenSearchIntegTestCase;
21+
import org.opensearch.test.transport.MockTransportService;
22+
import org.opensearch.test.transport.StubbableTransport;
23+
import org.opensearch.transport.TransportService;
24+
import org.junit.Before;
25+
26+
import java.nio.file.Path;
27+
import java.util.Set;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicLong;
32+
33+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
34+
public class RemoteStoreMergedSegmentWarmerIT extends SegmentReplicationBaseIT {
35+
private Path absolutePath;
36+
37+
@Override
38+
protected Settings nodeSettings(int nodeOrdinal) {
39+
if (absolutePath == null) {
40+
absolutePath = randomRepoPath().toAbsolutePath();
41+
}
42+
return Settings.builder()
43+
.put(super.nodeSettings(nodeOrdinal))
44+
.put(remoteStoreClusterSettings("test-remote-store-repo", absolutePath))
45+
.build();
46+
}
47+
48+
@Override
49+
protected Settings featureFlagSettings() {
50+
Settings.Builder featureSettings = Settings.builder();
51+
featureSettings.put(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_FLAG, true);
52+
return featureSettings.build();
53+
}
54+
55+
@Before
56+
public void setup() {
57+
internalCluster().startClusterManagerOnlyNode();
58+
}
59+
60+
public void testMergeSegmentWarmerRemote() throws Exception {
61+
final String node1 = internalCluster().startDataOnlyNode();
62+
final String node2 = internalCluster().startDataOnlyNode();
63+
createIndex(INDEX_NAME);
64+
ensureGreen(INDEX_NAME);
65+
MockTransportService mockTransportServiceNode1 = (MockTransportService) internalCluster().getInstance(
66+
TransportService.class,
67+
node1
68+
);
69+
MockTransportService mockTransportServiceNode2 = (MockTransportService) internalCluster().getInstance(
70+
TransportService.class,
71+
node2
72+
);
73+
final CountDownLatch latch = new CountDownLatch(1);
74+
StubbableTransport.SendRequestBehavior behavior = (connection, requestId, action, request, options) -> {
75+
if (action.equals("indices:admin/remote_publish_merged_segment[r]")) {
76+
assertTrue(
77+
((TransportReplicationAction.ConcreteReplicaRequest) request)
78+
.getRequest() instanceof RemoteStorePublishMergedSegmentRequest
79+
);
80+
latch.countDown();
81+
}
82+
connection.sendRequest(requestId, action, request, options);
83+
};
84+
85+
for (int i = 0; i < 30; i++) {
86+
client().prepareIndex(INDEX_NAME)
87+
.setId(String.valueOf(i))
88+
.setSource("foo" + i, "bar" + i)
89+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
90+
.get();
91+
}
92+
93+
waitForSearchableDocs(30, node1, node2);
94+
95+
mockTransportServiceNode1.addSendBehavior(behavior);
96+
mockTransportServiceNode2.addSendBehavior(behavior);
97+
98+
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2));
99+
waitForSegmentCount(INDEX_NAME, 2, logger);
100+
assertTrue(latch.await(10, TimeUnit.SECONDS));
101+
mockTransportServiceNode1.clearAllRules();
102+
mockTransportServiceNode2.clearAllRules();
103+
}
104+
105+
public void testConcurrentMergeSegmentWarmerRemote() throws Exception {
106+
String node1 = internalCluster().startDataOnlyNode();
107+
String node2 = internalCluster().startDataOnlyNode();
108+
createIndex(
109+
INDEX_NAME,
110+
Settings.builder()
111+
.put(indexSettings())
112+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), 5)
113+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 5)
114+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false)
115+
.build()
116+
);
117+
ensureGreen(INDEX_NAME);
118+
MockTransportService mockTransportServiceNode1 = (MockTransportService) internalCluster().getInstance(
119+
TransportService.class,
120+
node1
121+
);
122+
MockTransportService mockTransportServiceNode2 = (MockTransportService) internalCluster().getInstance(
123+
TransportService.class,
124+
node2
125+
);
126+
CountDownLatch latch = new CountDownLatch(2);
127+
AtomicLong numInvocations = new AtomicLong(0);
128+
Set<String> executingThreads = ConcurrentHashMap.newKeySet();
129+
StubbableTransport.SendRequestBehavior behavior = (connection, requestId, action, request, options) -> {
130+
if (action.equals("indices:admin/remote_publish_merged_segment[r]")) {
131+
assertTrue(
132+
((TransportReplicationAction.ConcreteReplicaRequest) request)
133+
.getRequest() instanceof RemoteStorePublishMergedSegmentRequest
134+
);
135+
latch.countDown();
136+
numInvocations.incrementAndGet();
137+
executingThreads.add(Thread.currentThread().getName());
138+
}
139+
connection.sendRequest(requestId, action, request, options);
140+
};
141+
142+
mockTransportServiceNode1.addSendBehavior(behavior);
143+
mockTransportServiceNode2.addSendBehavior(behavior);
144+
145+
for (int i = 0; i < 30; i++) {
146+
client().prepareIndex(INDEX_NAME)
147+
.setId(String.valueOf(i))
148+
.setSource("foo" + i, "bar" + i)
149+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
150+
.get();
151+
}
152+
153+
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(2));
154+
155+
waitForSegmentCount(INDEX_NAME, 2, logger);
156+
logger.info("Number of merge invocations: {}", numInvocations.get());
157+
assertTrue(latch.await(10, TimeUnit.SECONDS));
158+
assertTrue(executingThreads.size() > 1);
159+
// Verify concurrent execution by checking that multiple unique threads handled merge operations
160+
assertTrue(numInvocations.get() > 1);
161+
mockTransportServiceNode1.clearAllRules();
162+
mockTransportServiceNode2.clearAllRules();
163+
}
164+
165+
public void testMergeSegmentWarmerWithInactiveReplicaRemote() throws Exception {
166+
internalCluster().startDataOnlyNode();
167+
createIndex(INDEX_NAME);
168+
ensureYellowAndNoInitializingShards(INDEX_NAME);
169+
170+
for (int i = 0; i < 30; i++) {
171+
client().prepareIndex(INDEX_NAME)
172+
.setId(String.valueOf(i))
173+
.setSource("foo" + i, "bar" + i)
174+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
175+
.get();
176+
}
177+
178+
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get();
179+
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(INDEX_NAME).get();
180+
assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size());
181+
}
182+
}

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88

99
package org.opensearch.indices.replication;
1010

11+
import org.apache.logging.log4j.Logger;
1112
import org.apache.lucene.index.SegmentInfos;
13+
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
14+
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
15+
import org.opensearch.action.admin.indices.segments.ShardSegments;
1216
import org.opensearch.action.search.SearchResponse;
1317
import org.opensearch.cluster.ClusterState;
1418
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -22,11 +26,13 @@
2226
import org.opensearch.common.lease.Releasable;
2327
import org.opensearch.common.settings.Settings;
2428
import org.opensearch.common.unit.TimeValue;
29+
import org.opensearch.common.util.set.Sets;
2530
import org.opensearch.core.index.Index;
2631
import org.opensearch.index.IndexModule;
2732
import org.opensearch.index.IndexService;
2833
import org.opensearch.index.SegmentReplicationShardStats;
2934
import org.opensearch.index.engine.Engine;
35+
import org.opensearch.index.engine.Segment;
3036
import org.opensearch.index.shard.IndexShard;
3137
import org.opensearch.index.store.Store;
3238
import org.opensearch.index.store.StoreFileMetadata;
@@ -244,4 +250,26 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
244250
return closeable.get();
245251
}
246252
}
253+
254+
public static void waitForSegmentCount(String indexName, int segmentCount, Logger logger) throws Exception {
255+
assertBusy(() -> {
256+
Set<String> primarySegments = Sets.newHashSet();
257+
Set<String> replicaSegments = Sets.newHashSet();
258+
final IndicesSegmentResponse response = client().admin().indices().prepareSegments(indexName).get();
259+
for (IndexShardSegments indexShardSegments : response.getIndices().get(indexName).getShards().values()) {
260+
for (ShardSegments shardSegment : indexShardSegments.getShards()) {
261+
for (Segment segment : shardSegment.getSegments()) {
262+
if (shardSegment.getShardRouting().primary()) {
263+
primarySegments.add(segment.getName());
264+
} else {
265+
replicaSegments.add(segment.getName());
266+
}
267+
}
268+
}
269+
}
270+
logger.info("primary segments: {}, replica segments: {}", primarySegments, replicaSegments);
271+
assertEquals(segmentCount, primarySegments.size());
272+
assertEquals(segmentCount, replicaSegments.size());
273+
}, 1, TimeUnit.MINUTES);
274+
}
247275
}

server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,8 +1613,8 @@ public String toString() {
16131613
*
16141614
* @opensearch.internal
16151615
*/
1616-
protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {
1617-
1616+
public static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {
1617+
// public for tests
16181618
private final long globalCheckpoint;
16191619
private final long maxSeqNoOfUpdatesOrDeletes;
16201620

server/src/main/java/org/opensearch/index/engine/LocalMergedSegmentWarmer.java renamed to server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,35 @@
88

99
package org.opensearch.index.engine;
1010

11+
import org.apache.logging.log4j.Logger;
12+
import org.apache.logging.log4j.message.ParameterizedMessage;
1113
import org.apache.lucene.index.IndexWriter;
1214
import org.apache.lucene.index.LeafReader;
1315
import org.apache.lucene.index.SegmentCommitInfo;
1416
import org.apache.lucene.index.SegmentReader;
1517
import org.opensearch.cluster.service.ClusterService;
18+
import org.opensearch.common.logging.Loggers;
1619
import org.opensearch.index.shard.IndexShard;
1720
import org.opensearch.indices.recovery.RecoverySettings;
1821
import org.opensearch.transport.TransportService;
1922

2023
import java.io.IOException;
2124

2225
/**
23-
* Implementation of a {@link IndexWriter.IndexReaderWarmer} when local on-disk segment replication is enabled.
26+
* Implementation of a {@link IndexWriter.IndexReaderWarmer} for merged segment replication in
27+
* local on-disk and remote store enabled domains.
2428
*
2529
* @opensearch.internal
2630
*/
27-
public class LocalMergedSegmentWarmer implements IndexWriter.IndexReaderWarmer {
31+
public class MergedSegmentWarmer implements IndexWriter.IndexReaderWarmer {
2832
private final TransportService transportService;
2933
private final RecoverySettings recoverySettings;
3034
private final ClusterService clusterService;
3135
private final IndexShard indexShard;
3236

33-
public LocalMergedSegmentWarmer(
37+
private final Logger logger;
38+
39+
public MergedSegmentWarmer(
3440
TransportService transportService,
3541
RecoverySettings recoverySettings,
3642
ClusterService clusterService,
@@ -40,14 +46,30 @@ public LocalMergedSegmentWarmer(
4046
this.recoverySettings = recoverySettings;
4147
this.clusterService = clusterService;
4248
this.indexShard = indexShard;
49+
this.logger = Loggers.getLogger(getClass(), indexShard.shardId());
4350
}
4451

4552
@Override
4653
public void warm(LeafReader leafReader) throws IOException {
4754
// IndexWriter.IndexReaderWarmer#warm is called by IndexWriter#mergeMiddle. The type of leafReader should be SegmentReader.
4855
assert leafReader instanceof SegmentReader;
56+
assert indexShard.indexSettings().isSegRepLocalEnabled() || indexShard.indexSettings().isRemoteStoreEnabled();
4957

58+
long startTime = System.currentTimeMillis();
5059
SegmentCommitInfo segmentCommitInfo = ((SegmentReader) leafReader).getSegmentInfo();
60+
logger.trace(() -> new ParameterizedMessage("Warming segment: {}", segmentCommitInfo));
5161
indexShard.publishMergedSegment(segmentCommitInfo);
62+
logger.trace(() -> {
63+
long segmentSize = -1;
64+
try {
65+
segmentSize = segmentCommitInfo.sizeInBytes();
66+
} catch (IOException ignored) {}
67+
return new ParameterizedMessage(
68+
"Completed segment warming for {}. Size: {}B, Timing: {}ms",
69+
segmentCommitInfo.info.name,
70+
segmentSize,
71+
(System.currentTimeMillis() - startTime)
72+
);
73+
});
5274
}
5375
}

server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmerFactory.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,8 @@ public MergedSegmentWarmerFactory(TransportService transportService, RecoverySet
3434
}
3535

3636
public IndexWriter.IndexReaderWarmer get(IndexShard shard) {
37-
if (shard.indexSettings().isAssignedOnRemoteNode()) {
38-
return new RemoteStoreMergedSegmentWarmer(transportService, recoverySettings, clusterService);
39-
} else if (shard.indexSettings().isSegRepLocalEnabled()) {
40-
return new LocalMergedSegmentWarmer(transportService, recoverySettings, clusterService, shard);
37+
if (shard.indexSettings().isSegRepLocalEnabled() || shard.indexSettings().isRemoteStoreEnabled()) {
38+
return new MergedSegmentWarmer(transportService, recoverySettings, clusterService, shard);
4139
} else if (shard.indexSettings().isDocumentReplication()) {
4240
// MergedSegmentWarmerFactory#get is called when IndexShard is initialized. In scenario document replication,
4341
// IndexWriter.IndexReaderWarmer should be null.

server/src/main/java/org/opensearch/index/engine/RemoteStoreMergedSegmentWarmer.java

Lines changed: 0 additions & 43 deletions
This file was deleted.

0 commit comments

Comments
 (0)