Skip to content

Commit 29f7c2a

Browse files
committed
Add option for immediate execution in McpSyncServer
- The McpSyncServer wraps an async server. By default, reactive operations are scheduled on a bounded-elastic scheduler, to offload blocking work and prevent accidental blocking of non-blocking operations. - With the default behavior, there will be thead ops, even in a blocking context, which means thread-locals from the request thread will be lost. This is inconenvient for frameworks that store state in thread-locals. - This commit adds the ability to avoid offloading, when the user is sure they are executing code in a blocking environment. Work happens in the calling thread, and thread-locals are available throughout the execution.
1 parent c711f83 commit 29f7c2a

File tree

4 files changed

+75
-30
lines changed

4 files changed

+75
-30
lines changed

mcp/src/main/java/io/modelcontextprotocol/server/McpServer.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,8 @@ class SyncSpecification {
695695

696696
private Duration requestTimeout = Duration.ofSeconds(10); // Default timeout
697697

698+
private boolean immediate = false;
699+
698700
private SyncSpecification(McpServerTransportProvider transportProvider) {
699701
Assert.notNull(transportProvider, "Transport provider must not be null");
700702
this.transportProvider = transportProvider;
@@ -1116,6 +1118,19 @@ public SyncSpecification objectMapper(ObjectMapper objectMapper) {
11161118
return this;
11171119
}
11181120

1121+
/**
1122+
* Sets a flag to turn on "immediate execution" of the operations when creating
1123+
* the underlying {@link McpAsyncServer}. Defaults to false, which does blocking
1124+
* code offloading to prevent accidental blocking of the non-blocking transport.
1125+
* @param immediateExecution When true, do not offload work asynchronously.
1126+
* @return This builder instance for method chaining.
1127+
*
1128+
*/
1129+
public SyncSpecification immediateExecution(boolean immediateExecution) {
1130+
this.immediate = immediateExecution;
1131+
return this;
1132+
}
1133+
11191134
/**
11201135
* Builds a synchronous MCP server that provides blocking operations.
11211136
* @return A new instance of {@link McpSyncServer} configured with this builder's
@@ -1125,12 +1140,12 @@ public McpSyncServer build() {
11251140
McpServerFeatures.Sync syncFeatures = new McpServerFeatures.Sync(this.serverInfo, this.serverCapabilities,
11261141
this.tools, this.resources, this.resourceTemplates, this.prompts, this.completions,
11271142
this.rootsChangeHandlers, this.instructions);
1128-
McpServerFeatures.Async asyncFeatures = McpServerFeatures.Async.fromSync(syncFeatures);
1143+
McpServerFeatures.Async asyncFeatures = McpServerFeatures.Async.fromSync(syncFeatures, this.immediate);
11291144
var mapper = this.objectMapper != null ? this.objectMapper : new ObjectMapper();
11301145
var asyncServer = new McpAsyncServer(this.transportProvider, mapper, asyncFeatures, this.requestTimeout,
11311146
this.uriTemplateManagerFactory);
11321147

1133-
return new McpSyncServer(asyncServer);
1148+
return new McpSyncServer(asyncServer, this.immediate);
11341149
}
11351150

11361151
}

mcp/src/main/java/io/modelcontextprotocol/server/McpServerFeatures.java

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -95,28 +95,30 @@ record Async(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities s
9595
* blocking code offloading to prevent accidental blocking of the non-blocking
9696
* transport.
9797
* @param syncSpec a potentially blocking, synchronous specification.
98+
* @param immediateExecution when true, do not offload. Only use if you are sure
99+
* you are in a blocking context.
98100
* @return a specification which is protected from blocking calls specified by the
99101
* user.
100102
*/
101-
static Async fromSync(Sync syncSpec) {
103+
static Async fromSync(Sync syncSpec, boolean immediateExecution) {
102104
List<McpServerFeatures.AsyncToolSpecification> tools = new ArrayList<>();
103105
for (var tool : syncSpec.tools()) {
104-
tools.add(AsyncToolSpecification.fromSync(tool));
106+
tools.add(AsyncToolSpecification.fromSync(tool, immediateExecution));
105107
}
106108

107109
Map<String, AsyncResourceSpecification> resources = new HashMap<>();
108110
syncSpec.resources().forEach((key, resource) -> {
109-
resources.put(key, AsyncResourceSpecification.fromSync(resource));
111+
resources.put(key, AsyncResourceSpecification.fromSync(resource, immediateExecution));
110112
});
111113

112114
Map<String, AsyncPromptSpecification> prompts = new HashMap<>();
113115
syncSpec.prompts().forEach((key, prompt) -> {
114-
prompts.put(key, AsyncPromptSpecification.fromSync(prompt));
116+
prompts.put(key, AsyncPromptSpecification.fromSync(prompt, immediateExecution));
115117
});
116118

117119
Map<McpSchema.CompleteReference, McpServerFeatures.AsyncCompletionSpecification> completions = new HashMap<>();
118120
syncSpec.completions().forEach((key, completion) -> {
119-
completions.put(key, AsyncCompletionSpecification.fromSync(completion));
121+
completions.put(key, AsyncCompletionSpecification.fromSync(completion, immediateExecution));
120122
});
121123

122124
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootChangeConsumers = new ArrayList<>();
@@ -239,15 +241,15 @@ record Sync(McpSchema.Implementation serverInfo, McpSchema.ServerCapabilities se
239241
public record AsyncToolSpecification(McpSchema.Tool tool,
240242
BiFunction<McpAsyncServerExchange, Map<String, Object>, Mono<McpSchema.CallToolResult>> call) {
241243

242-
static AsyncToolSpecification fromSync(SyncToolSpecification tool) {
244+
static AsyncToolSpecification fromSync(SyncToolSpecification tool, boolean immediate) {
243245
// FIXME: This is temporary, proper validation should be implemented
244246
if (tool == null) {
245247
return null;
246248
}
247-
return new AsyncToolSpecification(tool.tool(),
248-
(exchange, map) -> Mono
249-
.fromCallable(() -> tool.call().apply(new McpSyncServerExchange(exchange), map))
250-
.subscribeOn(Schedulers.boundedElastic()));
249+
return new AsyncToolSpecification(tool.tool(), (exchange, map) -> {
250+
var toolResult = Mono.fromCallable(() -> tool.call().apply(new McpSyncServerExchange(exchange), map));
251+
return immediate ? toolResult : toolResult.subscribeOn(Schedulers.boundedElastic());
252+
});
251253
}
252254
}
253255

@@ -281,15 +283,16 @@ static AsyncToolSpecification fromSync(SyncToolSpecification tool) {
281283
public record AsyncResourceSpecification(McpSchema.Resource resource,
282284
BiFunction<McpAsyncServerExchange, McpSchema.ReadResourceRequest, Mono<McpSchema.ReadResourceResult>> readHandler) {
283285

284-
static AsyncResourceSpecification fromSync(SyncResourceSpecification resource) {
286+
static AsyncResourceSpecification fromSync(SyncResourceSpecification resource, boolean immediateExecution) {
285287
// FIXME: This is temporary, proper validation should be implemented
286288
if (resource == null) {
287289
return null;
288290
}
289-
return new AsyncResourceSpecification(resource.resource(),
290-
(exchange, req) -> Mono
291-
.fromCallable(() -> resource.readHandler().apply(new McpSyncServerExchange(exchange), req))
292-
.subscribeOn(Schedulers.boundedElastic()));
291+
return new AsyncResourceSpecification(resource.resource(), (exchange, req) -> {
292+
var resourceResult = Mono
293+
.fromCallable(() -> resource.readHandler().apply(new McpSyncServerExchange(exchange), req));
294+
return immediateExecution ? resourceResult : resourceResult.subscribeOn(Schedulers.boundedElastic());
295+
});
293296
}
294297
}
295298

@@ -327,15 +330,16 @@ static AsyncResourceSpecification fromSync(SyncResourceSpecification resource) {
327330
public record AsyncPromptSpecification(McpSchema.Prompt prompt,
328331
BiFunction<McpAsyncServerExchange, McpSchema.GetPromptRequest, Mono<McpSchema.GetPromptResult>> promptHandler) {
329332

330-
static AsyncPromptSpecification fromSync(SyncPromptSpecification prompt) {
333+
static AsyncPromptSpecification fromSync(SyncPromptSpecification prompt, boolean immediateExecution) {
331334
// FIXME: This is temporary, proper validation should be implemented
332335
if (prompt == null) {
333336
return null;
334337
}
335-
return new AsyncPromptSpecification(prompt.prompt(),
336-
(exchange, req) -> Mono
337-
.fromCallable(() -> prompt.promptHandler().apply(new McpSyncServerExchange(exchange), req))
338-
.subscribeOn(Schedulers.boundedElastic()));
338+
return new AsyncPromptSpecification(prompt.prompt(), (exchange, req) -> {
339+
var promptResult = Mono
340+
.fromCallable(() -> prompt.promptHandler().apply(new McpSyncServerExchange(exchange), req));
341+
return immediateExecution ? promptResult : promptResult.subscribeOn(Schedulers.boundedElastic());
342+
});
339343
}
340344
}
341345

@@ -366,14 +370,17 @@ public record AsyncCompletionSpecification(McpSchema.CompleteReference reference
366370
* @return an asynchronous wrapper of the provided sync specification, or
367371
* {@code null} if input is null
368372
*/
369-
static AsyncCompletionSpecification fromSync(SyncCompletionSpecification completion) {
373+
static AsyncCompletionSpecification fromSync(SyncCompletionSpecification completion,
374+
boolean immediateExecution) {
370375
if (completion == null) {
371376
return null;
372377
}
373-
return new AsyncCompletionSpecification(completion.referenceKey(),
374-
(exchange, request) -> Mono.fromCallable(
375-
() -> completion.completionHandler().apply(new McpSyncServerExchange(exchange), request))
376-
.subscribeOn(Schedulers.boundedElastic()));
378+
return new AsyncCompletionSpecification(completion.referenceKey(), (exchange, request) -> {
379+
var completionResult = Mono.fromCallable(
380+
() -> completion.completionHandler().apply(new McpSyncServerExchange(exchange), request));
381+
return immediateExecution ? completionResult
382+
: completionResult.subscribeOn(Schedulers.boundedElastic());
383+
});
377384
}
378385
}
379386

mcp/src/main/java/io/modelcontextprotocol/server/McpSyncServer.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,36 @@ public class McpSyncServer {
5454
*/
5555
private final McpAsyncServer asyncServer;
5656

57+
private final boolean immediateExecution;
58+
5759
/**
5860
* Creates a new synchronous server that wraps the provided async server.
5961
* @param asyncServer The async server to wrap
6062
*/
6163
public McpSyncServer(McpAsyncServer asyncServer) {
64+
this(asyncServer, false);
65+
}
66+
67+
/**
68+
* Creates a new synchronous server that wraps the provided async server.
69+
* @param asyncServer The async server to wrap
70+
* @param immediateExecution Tools, prompts, and resources handlers execute work
71+
* without blocking code offloading.
72+
*/
73+
public McpSyncServer(McpAsyncServer asyncServer, boolean immediateExecution) {
6274
Assert.notNull(asyncServer, "Async server must not be null");
6375
this.asyncServer = asyncServer;
76+
this.immediateExecution = immediateExecution;
6477
}
6578

6679
/**
6780
* Add a new tool handler.
6881
* @param toolHandler The tool handler to add
6982
*/
7083
public void addTool(McpServerFeatures.SyncToolSpecification toolHandler) {
71-
this.asyncServer.addTool(McpServerFeatures.AsyncToolSpecification.fromSync(toolHandler)).block();
84+
this.asyncServer
85+
.addTool(McpServerFeatures.AsyncToolSpecification.fromSync(toolHandler, this.immediateExecution))
86+
.block();
7287
}
7388

7489
/**
@@ -84,7 +99,10 @@ public void removeTool(String toolName) {
8499
* @param resourceHandler The resource handler to add
85100
*/
86101
public void addResource(McpServerFeatures.SyncResourceSpecification resourceHandler) {
87-
this.asyncServer.addResource(McpServerFeatures.AsyncResourceSpecification.fromSync(resourceHandler)).block();
102+
this.asyncServer
103+
.addResource(
104+
McpServerFeatures.AsyncResourceSpecification.fromSync(resourceHandler, this.immediateExecution))
105+
.block();
88106
}
89107

90108
/**
@@ -100,7 +118,10 @@ public void removeResource(String resourceUri) {
100118
* @param promptSpecification The prompt specification to add
101119
*/
102120
public void addPrompt(McpServerFeatures.SyncPromptSpecification promptSpecification) {
103-
this.asyncServer.addPrompt(McpServerFeatures.AsyncPromptSpecification.fromSync(promptSpecification)).block();
121+
this.asyncServer
122+
.addPrompt(
123+
McpServerFeatures.AsyncPromptSpecification.fromSync(promptSpecification, this.immediateExecution))
124+
.block();
104125
}
105126

106127
/**

mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpSyncServerTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ void testRemoveTool() {
155155
assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
156156
}
157157

158+
// TODO: call tool? -> verify thread local still present
159+
158160
@Test
159161
void testRemoveNonexistentTool() {
160162
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())

0 commit comments

Comments
 (0)