Skip to content

Commit 934f8ba

Browse files
committed
add replication threadpool
Signed-off-by: guojialiang <[email protected]>
1 parent a953860 commit 934f8ba

12 files changed

+33
-20
lines changed

server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ public void onFailure(Exception e) {
304304

305305
@Override
306306
protected String getThreadPool() {
307-
return ThreadPool.Names.GENERIC;
307+
return ThreadPool.Names.REPLICATION;
308308
}
309309

310310
@Override

server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void getCheckpointMetadata(
6969
GET_CHECKPOINT_INFO,
7070
request,
7171
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionRetryTimeout()).build(),
72-
new ActionListenerResponseHandler<>(listener, CheckpointInfoResponse::new, ThreadPool.Names.GENERIC)
72+
new ActionListenerResponseHandler<>(listener, CheckpointInfoResponse::new, ThreadPool.Names.REPLICATION)
7373
);
7474
}
7575

@@ -98,7 +98,7 @@ public void getSegmentFiles(
9898
GET_SEGMENT_FILES,
9999
request,
100100
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
101-
new ActionListenerResponseHandler<>(listener, GetSegmentFilesResponse::new, ThreadPool.Names.GENERIC)
101+
new ActionListenerResponseHandler<>(listener, GetSegmentFilesResponse::new, ThreadPool.Names.REPLICATION)
102102
);
103103
}
104104

@@ -131,7 +131,7 @@ public void getMergedSegmentFiles(
131131
GET_MERGED_SEGMENT_FILES,
132132
request,
133133
TransportRequestOptions.builder().withTimeout(recoverySettings.getMergedSegmentReplicationTimeout()).build(),
134-
new ActionListenerResponseHandler<>(listener, GetSegmentFilesResponse::new, ThreadPool.Names.GENERIC)
134+
new ActionListenerResponseHandler<>(listener, GetSegmentFilesResponse::new, ThreadPool.Names.REPLICATION)
135135
);
136136
}
137137

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,25 +91,25 @@ protected SegmentReplicationSourceService(
9191
this.ongoingSegmentReplications = ongoingSegmentReplications;
9292
transportService.registerRequestHandler(
9393
Actions.GET_CHECKPOINT_INFO,
94-
ThreadPool.Names.GENERIC,
94+
ThreadPool.Names.REPLICATION,
9595
CheckpointInfoRequest::new,
9696
new CheckpointInfoRequestHandler()
9797
);
9898
transportService.registerRequestHandler(
9999
Actions.GET_SEGMENT_FILES,
100-
ThreadPool.Names.GENERIC,
100+
ThreadPool.Names.REPLICATION,
101101
GetSegmentFilesRequest::new,
102102
new GetSegmentFilesRequestHandler()
103103
);
104104
transportService.registerRequestHandler(
105105
Actions.UPDATE_VISIBLE_CHECKPOINT,
106-
ThreadPool.Names.GENERIC,
106+
ThreadPool.Names.REPLICATION,
107107
UpdateVisibleCheckpointRequest::new,
108108
new UpdateVisibleCheckpointRequestHandler()
109109
);
110110
transportService.registerRequestHandler(
111111
Actions.GET_MERGED_SEGMENT_FILES,
112-
ThreadPool.Names.GENERIC,
112+
ThreadPool.Names.REPLICATION,
113113
GetSegmentFilesRequest::new,
114114
new GetMergedSegmentFilesRequestHandler()
115115
);

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,19 +146,19 @@ public SegmentReplicationTargetService(
146146

147147
transportService.registerRequestHandler(
148148
Actions.FILE_CHUNK,
149-
ThreadPool.Names.GENERIC,
149+
ThreadPool.Names.REPLICATION,
150150
FileChunkRequest::new,
151151
new FileChunkTransportRequestHandler()
152152
);
153153
transportService.registerRequestHandler(
154154
Actions.FORCE_SYNC,
155-
ThreadPool.Names.GENERIC,
155+
ThreadPool.Names.REPLICATION,
156156
ForceSyncRequest::new,
157157
new ForceSyncTransportRequestHandler()
158158
);
159159
transportService.registerRequestHandler(
160160
Actions.MERGED_SEGMENT_FILE_CHUNK,
161-
ThreadPool.Names.GENERIC,
161+
ThreadPool.Names.REPLICATION,
162162
FileChunkRequest::new,
163163
new MergedSegmentFileChunkTransportRequestHandler()
164164
);

server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public PublishCheckpointAction(
7373
actionFilters,
7474
PublishCheckpointRequest::new,
7575
PublishCheckpointRequest::new,
76-
ThreadPool.Names.REFRESH,
76+
ThreadPool.Names.REPLICATION,
7777
logger
7878
);
7979
this.replicationService = targetService;

server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishMergedSegmentAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public PublishMergedSegmentAction(
6060
actionFilters,
6161
PublishMergedSegmentRequest::new,
6262
PublishMergedSegmentRequest::new,
63-
ThreadPool.Names.GENERIC,
63+
ThreadPool.Names.REPLICATION,
6464
logger
6565
);
6666
this.replicationService = targetService;

server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ private void startInternal(T target, TimeValue activityTimeout) {
107107
threadPool.schedule(
108108
new ReplicationMonitor(target.getId(), target.lastAccessTime(), activityTimeout),
109109
activityTimeout,
110-
ThreadPool.Names.GENERIC
110+
ThreadPool.Names.REPLICATION
111111
);
112112
}
113113

@@ -350,7 +350,7 @@ protected void doRun() throws Exception {
350350
}
351351
lastSeenAccessTime = accessTime;
352352
logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", id, lastSeenAccessTime);
353-
threadPool.schedule(this, checkInterval, ThreadPool.Names.GENERIC);
353+
threadPool.schedule(this, checkInterval, ThreadPool.Names.REPLICATION);
354354
}
355355
}
356356

server/src/main/java/org/opensearch/threadpool/ThreadPool.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ public static class Names {
124124
public static final String REMOTE_STATE_READ = "remote_state_read";
125125
public static final String INDEX_SEARCHER = "index_searcher";
126126
public static final String REMOTE_STATE_CHECKSUM = "remote_state_checksum";
127+
public static final String REPLICATION = "replication";
127128
}
128129

129130
static Set<String> scalingThreadPoolKeys = new HashSet<>(Arrays.asList("max", "core"));
@@ -201,6 +202,7 @@ public static ThreadPoolType fromType(String type) {
201202
map.put(Names.REMOTE_STATE_READ, ThreadPoolType.FIXED);
202203
map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE);
203204
map.put(Names.REMOTE_STATE_CHECKSUM, ThreadPoolType.FIXED);
205+
map.put(Names.REPLICATION, ThreadPoolType.SCALING);
204206
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
205207
}
206208

@@ -252,8 +254,13 @@ public ThreadPool(
252254
final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors);
253255
final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors);
254256
final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);
257+
final int replicationThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);
255258
final int snapshotDeletionPoolMax = boundedBy(4 * allocatedProcessors, 64, 256);
256259
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
260+
builders.put(
261+
Names.REPLICATION,
262+
new ScalingExecutorBuilder(Names.REPLICATION, 4, replicationThreadPoolMax, TimeValue.timeValueSeconds(30))
263+
);
257264
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000));
258265
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000));
259266
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));

server/src/test/java/org/opensearch/threadpool/OpenSearchThreadPoolTestCase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,7 @@ static String randomThreadPool(final ThreadPool.ThreadPoolType type) {
7474
);
7575
}
7676

77+
public static boolean isGenericOrReplicationThreadPool(String threadPoolName) {
78+
return ThreadPool.Names.GENERIC.equals(threadPoolName) || ThreadPool.Names.REPLICATION.equals(threadPoolName);
79+
}
7780
}

server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void testScalingThreadPoolConfiguration() throws InterruptedException {
7676
core = randomIntBetween(0, 8);
7777
builder.put("thread_pool." + threadPoolName + ".core", core);
7878
} else {
79-
core = "generic".equals(threadPoolName) ? 4 : 1; // the defaults
79+
core = isGenericOrReplicationThreadPool(threadPoolName) ? 4 : 1; // the defaults
8080
}
8181

8282
final int availableProcessors = Runtime.getRuntime().availableProcessors();
@@ -105,7 +105,7 @@ public void testScalingThreadPoolConfiguration() throws InterruptedException {
105105
keepAlive = randomIntBetween(1, 300);
106106
builder.put("thread_pool." + threadPoolName + ".keep_alive", keepAlive + "s");
107107
} else {
108-
keepAlive = "generic".equals(threadPoolName) ? 30 : 300; // the defaults
108+
keepAlive = isGenericOrReplicationThreadPool(threadPoolName) ? 30 : 300; // the defaults
109109
}
110110

111111
runScalingThreadPoolTest(builder.build(), (clusterSettings, threadPool) -> {
@@ -143,6 +143,7 @@ public void testScalingThreadPoolConfiguration() throws InterruptedException {
143143
private int expectedSize(final String threadPoolName, final int numberOfProcessors) {
144144
final Map<String, Function<Integer, Integer>> sizes = new HashMap<>();
145145
sizes.put(ThreadPool.Names.GENERIC, n -> ThreadPool.boundedBy(4 * n, 128, 512));
146+
sizes.put(ThreadPool.Names.REPLICATION, n -> ThreadPool.boundedBy(4 * n, 128, 512));
146147
sizes.put(ThreadPool.Names.MANAGEMENT, n -> 5);
147148
sizes.put(ThreadPool.Names.FLUSH, ThreadPool::halfAllocatedProcessorsMaxFive);
148149
sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen);
@@ -189,7 +190,7 @@ public void testScalingThreadPoolIsBounded() throws InterruptedException {
189190
}
190191

191192
public void testScalingThreadPoolThreadsAreTerminatedAfterKeepAlive() throws InterruptedException {
192-
final int min = "generic".equals(threadPoolName) ? 4 : 1;
193+
final int min = isGenericOrReplicationThreadPool(threadPoolName) ? 4 : 1;
193194
final Settings settings = Settings.builder()
194195
.put("thread_pool." + threadPoolName + ".max", 128)
195196
.put("thread_pool." + threadPoolName + ".keep_alive", "1ms")

0 commit comments

Comments
 (0)