Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;

/**
* The Model Context Protocol (MCP) client implementation that provides asynchronous
Expand Down Expand Up @@ -314,13 +315,21 @@ public class McpAsyncClient {
};

this.initializer = new LifecycleInitializer(clientCapabilities, clientInfo, transport.protocolVersions(),
initializationTimeout, ctx -> new McpClientSession(requestTimeout, transport, requestHandlers,
notificationHandlers, con -> con.contextWrite(ctx)),
initializationTimeout, ctx -> createSession(ctx, requestTimeout, requestHandlers, notificationHandlers),
postInitializationHook);

this.transport.setExceptionHandler(this.initializer::handleException);
}

/**
* An extension point to create a custom McpClientSession with additional context.
*/
protected McpClientSession createSession(ContextView ctx, Duration requestTimeout,
Map<String, RequestHandler<?>> requestHandlers, Map<String, NotificationHandler> notificationHandlers) {
return new McpClientSession(requestTimeout, transport, requestHandlers, notificationHandlers,
con -> con.contextWrite(ctx));
}

/**
* Get the current initialization result.
* @return the initialization result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ private void dismissPendingResponses() {
this.pendingResponses.clear();
}

private void handle(McpSchema.JSONRPCMessage message) {
/**
* An extension point for handling incoming JSON-RPC messages.
* @param message The incoming JSON-RPC message
*/
protected void handle(McpSchema.JSONRPCMessage message) {
if (message instanceof McpSchema.JSONRPCResponse response) {
logger.debug("Received response: {}", response);
if (response.id() != null) {
Expand Down Expand Up @@ -198,7 +202,7 @@ else if (message instanceof McpSchema.JSONRPCNotification notification) {
* @param request The incoming JSON-RPC request
* @return A Mono containing the JSON-RPC response
*/
private Mono<McpSchema.JSONRPCResponse> handleIncomingRequest(McpSchema.JSONRPCRequest request) {
protected Mono<McpSchema.JSONRPCResponse> handleIncomingRequest(McpSchema.JSONRPCRequest request) {
return Mono.defer(() -> {
var handler = this.requestHandlers.get(request.method());
if (handler == null) {
Expand Down Expand Up @@ -231,7 +235,7 @@ private MethodNotFoundError getMethodNotFoundError(String method) {
* @param notification The incoming JSON-RPC notification
* @return A Mono that completes when the notification is processed
*/
private Mono<Void> handleIncomingNotification(McpSchema.JSONRPCNotification notification) {
protected Mono<Void> handleIncomingNotification(McpSchema.JSONRPCNotification notification) {
return Mono.defer(() -> {
var handler = notificationHandlers.get(notification.method());
if (handler == null) {
Expand Down