diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/DefaultMcpAsyncClient.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/DefaultMcpAsyncClient.java new file mode 100644 index 000000000..ad7c3ebc7 --- /dev/null +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/DefaultMcpAsyncClient.java @@ -0,0 +1,822 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.client; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + +import io.modelcontextprotocol.client.LifecycleInitializer.Initialization; +import io.modelcontextprotocol.json.TypeRef; +import io.modelcontextprotocol.json.schema.JsonSchemaValidator; +import io.modelcontextprotocol.spec.McpClientSession; +import io.modelcontextprotocol.spec.McpClientSession.NotificationHandler; +import io.modelcontextprotocol.spec.McpClientSession.RequestHandler; +import io.modelcontextprotocol.spec.McpClientTransport; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities; +import io.modelcontextprotocol.spec.McpSchema.CreateMessageRequest; +import io.modelcontextprotocol.spec.McpSchema.CreateMessageResult; +import io.modelcontextprotocol.spec.McpSchema.ElicitRequest; +import io.modelcontextprotocol.spec.McpSchema.ElicitResult; +import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest; +import io.modelcontextprotocol.spec.McpSchema.GetPromptResult; +import io.modelcontextprotocol.spec.McpSchema.ListPromptsResult; +import io.modelcontextprotocol.spec.McpSchema.LoggingLevel; +import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification; +import io.modelcontextprotocol.spec.McpSchema.PaginatedRequest; +import io.modelcontextprotocol.spec.McpSchema.Root; +import io.modelcontextprotocol.util.Assert; +import io.modelcontextprotocol.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * The Model Context Protocol (MCP) client implementation that provides asynchronous + * communication with MCP servers using Project Reactor's Mono and Flux types. + * + *

+ * This client implements the MCP specification, enabling AI models to interact with + * external tools and resources through a standardized interface. Key features include: + *

+ * + *

+ * The client follows a lifecycle: + *

    + *
  1. Initialization - Establishes connection and negotiates capabilities + *
  2. Normal Operation - Handles requests and notifications + *
  3. Graceful Shutdown - Ensures clean connection termination + *
+ * + *

+ * This implementation uses Project Reactor for non-blocking operations, making it + * suitable for high-throughput scenarios and reactive applications. All operations return + * Mono or Flux types that can be composed into reactive pipelines. + * + * @author Dariusz Jędrzejczyk + * @author Christian Tzolov + * @author Jihoon Kim + * @author Anurag Pant + * @see McpClient + * @see McpSchema + * @see McpClientSession + * @see McpClientTransport + */ +public class DefaultMcpAsyncClient implements McpAsyncClient { + + private static final Logger logger = LoggerFactory.getLogger(DefaultMcpAsyncClient.class); + + private static final TypeRef VOID_TYPE_REFERENCE = new TypeRef<>() { + }; + + public static final TypeRef OBJECT_TYPE_REF = new TypeRef<>() { + }; + + public static final TypeRef PAGINATED_REQUEST_TYPE_REF = new TypeRef<>() { + }; + + public static final TypeRef INITIALIZE_RESULT_TYPE_REF = new TypeRef<>() { + }; + + public static final TypeRef CREATE_MESSAGE_REQUEST_TYPE_REF = new TypeRef<>() { + }; + + public static final TypeRef LOGGING_MESSAGE_NOTIFICATION_TYPE_REF = new TypeRef<>() { + }; + + public static final TypeRef PROGRESS_NOTIFICATION_TYPE_REF = new TypeRef<>() { + }; + + /** + * Client capabilities. + */ + private final McpSchema.ClientCapabilities clientCapabilities; + + /** + * Client implementation information. + */ + private final McpSchema.Implementation clientInfo; + + /** + * Roots define the boundaries of where servers can operate within the filesystem, + * allowing them to understand which directories and files they have access to. + * Servers can request the list of roots from supporting clients and receive + * notifications when that list changes. + */ + private final ConcurrentHashMap roots; + + /** + * MCP provides a standardized way for servers to request LLM sampling ("completions" + * or "generations") from language models via clients. This flow allows clients to + * maintain control over model access, selection, and permissions while enabling + * servers to leverage AI capabilities—with no server API keys necessary. Servers can + * request text or image-based interactions and optionally include context from MCP + * servers in their prompts. + */ + private Function> samplingHandler; + + /** + * MCP provides a standardized way for servers to request additional information from + * users through the client during interactions. This flow allows clients to maintain + * control over user interactions and data sharing while enabling servers to gather + * necessary information dynamically. Servers can request structured data from users + * with optional JSON schemas to validate responses. + */ + private Function> elicitationHandler; + + /** + * Client transport implementation. + */ + private final McpClientTransport transport; + + /** + * The lifecycle initializer that manages the client-server connection initialization. + */ + private final LifecycleInitializer initializer; + + /** + * JSON schema validator to use for validating tool responses against output schemas. + */ + private final JsonSchemaValidator jsonSchemaValidator; + + /** + * Cached tool output schemas. + */ + private final ConcurrentHashMap> toolsOutputSchemaCache; + + /** + * Whether to enable automatic schema caching during callTool operations. + */ + private final boolean enableCallToolSchemaCaching; + + /** + * Create a new McpAsyncClient with the given transport and session request-response + * timeout. + * @param transport the transport to use. + * @param requestTimeout the session request-response timeout. + * @param initializationTimeout the max timeout to await for the client-server + * @param jsonSchemaValidator the JSON schema validator to use for validating tool + * @param features the MCP Client supported features. responses against output + * schemas. + */ + DefaultMcpAsyncClient(McpClientTransport transport, Duration requestTimeout, Duration initializationTimeout, + JsonSchemaValidator jsonSchemaValidator, McpClientFeatures.Async features) { + + Assert.notNull(transport, "Transport must not be null"); + Assert.notNull(requestTimeout, "Request timeout must not be null"); + Assert.notNull(initializationTimeout, "Initialization timeout must not be null"); + + this.clientInfo = features.clientInfo(); + this.clientCapabilities = features.clientCapabilities(); + this.transport = transport; + this.roots = new ConcurrentHashMap<>(features.roots()); + this.jsonSchemaValidator = jsonSchemaValidator; + this.toolsOutputSchemaCache = new ConcurrentHashMap<>(); + this.enableCallToolSchemaCaching = features.enableCallToolSchemaCaching(); + + // Request Handlers + Map> requestHandlers = new HashMap<>(); + + // Ping MUST respond with an empty data, but not NULL response. + requestHandlers.put(McpSchema.METHOD_PING, params -> { + logger.debug("Received ping: {}", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); + return Mono.just(Map.of()); + }); + + // Roots List Request Handler + if (this.clientCapabilities.roots() != null) { + requestHandlers.put(McpSchema.METHOD_ROOTS_LIST, rootsListRequestHandler()); + } + + // Sampling Handler + if (this.clientCapabilities.sampling() != null) { + if (features.samplingHandler() == null) { + throw new IllegalArgumentException( + "Sampling handler must not be null when client capabilities include sampling"); + } + this.samplingHandler = features.samplingHandler(); + requestHandlers.put(McpSchema.METHOD_SAMPLING_CREATE_MESSAGE, samplingCreateMessageHandler()); + } + + // Elicitation Handler + if (this.clientCapabilities.elicitation() != null) { + if (features.elicitationHandler() == null) { + throw new IllegalArgumentException( + "Elicitation handler must not be null when client capabilities include elicitation"); + } + this.elicitationHandler = features.elicitationHandler(); + requestHandlers.put(McpSchema.METHOD_ELICITATION_CREATE, elicitationCreateHandler()); + } + + // Notification Handlers + Map notificationHandlers = new HashMap<>(); + + // Tools Change Notification + List, Mono>> toolsChangeConsumersFinal = new ArrayList<>(); + toolsChangeConsumersFinal + .add((notification) -> Mono.fromRunnable(() -> logger.debug("Tools changed: {}", notification))); + + if (!Utils.isEmpty(features.toolsChangeConsumers())) { + toolsChangeConsumersFinal.addAll(features.toolsChangeConsumers()); + } + notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_TOOLS_LIST_CHANGED, + asyncToolsChangeNotificationHandler(toolsChangeConsumersFinal)); + + // Resources Change Notification + List, Mono>> resourcesChangeConsumersFinal = new ArrayList<>(); + resourcesChangeConsumersFinal + .add((notification) -> Mono.fromRunnable(() -> logger.debug("Resources changed: {}", notification))); + + if (!Utils.isEmpty(features.resourcesChangeConsumers())) { + resourcesChangeConsumersFinal.addAll(features.resourcesChangeConsumers()); + } + + notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED, + asyncResourcesChangeNotificationHandler(resourcesChangeConsumersFinal)); + + // Resources Update Notification + List, Mono>> resourcesUpdateConsumersFinal = new ArrayList<>(); + resourcesUpdateConsumersFinal + .add((notification) -> Mono.fromRunnable(() -> logger.debug("Resources updated: {}", notification))); + + if (!Utils.isEmpty(features.resourcesUpdateConsumers())) { + resourcesUpdateConsumersFinal.addAll(features.resourcesUpdateConsumers()); + } + + notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED, + asyncResourcesUpdatedNotificationHandler(resourcesUpdateConsumersFinal)); + + // Prompts Change Notification + List, Mono>> promptsChangeConsumersFinal = new ArrayList<>(); + promptsChangeConsumersFinal + .add((notification) -> Mono.fromRunnable(() -> logger.debug("Prompts changed: {}", notification))); + if (!Utils.isEmpty(features.promptsChangeConsumers())) { + promptsChangeConsumersFinal.addAll(features.promptsChangeConsumers()); + } + notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_PROMPTS_LIST_CHANGED, + asyncPromptsChangeNotificationHandler(promptsChangeConsumersFinal)); + + // Utility Logging Notification + List>> loggingConsumersFinal = new ArrayList<>(); + loggingConsumersFinal.add((notification) -> Mono.fromRunnable(() -> logger.debug("Logging: {}", notification))); + if (!Utils.isEmpty(features.loggingConsumers())) { + loggingConsumersFinal.addAll(features.loggingConsumers()); + } + notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_MESSAGE, + asyncLoggingNotificationHandler(loggingConsumersFinal)); + + // Utility Progress Notification + List>> progressConsumersFinal = new ArrayList<>(); + progressConsumersFinal + .add((notification) -> Mono.fromRunnable(() -> logger.debug("Progress: {}", notification))); + if (!Utils.isEmpty(features.progressConsumers())) { + progressConsumersFinal.addAll(features.progressConsumers()); + } + notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_PROGRESS, + asyncProgressNotificationHandler(progressConsumersFinal)); + + Function> postInitializationHook = init -> { + + if (init.initializeResult().capabilities().tools() == null || !enableCallToolSchemaCaching) { + return Mono.empty(); + } + + return this.listToolsInternal(init, McpSchema.FIRST_PAGE).doOnNext(listToolsResult -> { + listToolsResult.tools() + .forEach(tool -> logger.debug("Tool {} schema: {}", tool.name(), tool.outputSchema())); + if (enableCallToolSchemaCaching && listToolsResult.tools() != null) { + // Cache tools output schema + listToolsResult.tools() + .stream() + .filter(tool -> tool.outputSchema() != null) + .forEach(tool -> this.toolsOutputSchemaCache.put(tool.name(), tool.outputSchema())); + } + }).then(); + }; + + this.initializer = new LifecycleInitializer(clientCapabilities, clientInfo, transport.protocolVersions(), + initializationTimeout, ctx -> new McpClientSession(requestTimeout, transport, requestHandlers, + notificationHandlers, con -> con.contextWrite(ctx)), + postInitializationHook); + + this.transport.setExceptionHandler(this.initializer::handleException); + } + + @Override + public McpSchema.InitializeResult getCurrentInitializationResult() { + return this.initializer.currentInitializationResult(); + } + + @Override + public McpSchema.ServerCapabilities getServerCapabilities() { + McpSchema.InitializeResult initializeResult = this.initializer.currentInitializationResult(); + return initializeResult != null ? initializeResult.capabilities() : null; + } + + @Override + public String getServerInstructions() { + McpSchema.InitializeResult initializeResult = this.initializer.currentInitializationResult(); + return initializeResult != null ? initializeResult.instructions() : null; + } + + @Override + public McpSchema.Implementation getServerInfo() { + McpSchema.InitializeResult initializeResult = this.initializer.currentInitializationResult(); + return initializeResult != null ? initializeResult.serverInfo() : null; + } + + @Override + public boolean isInitialized() { + return this.initializer.isInitialized(); + } + + @Override + public ClientCapabilities getClientCapabilities() { + return this.clientCapabilities; + } + + @Override + public McpSchema.Implementation getClientInfo() { + return this.clientInfo; + } + + @Override + public void close() { + this.initializer.close(); + this.transport.close(); + } + + @Override + public Mono closeGracefully() { + return Mono.defer(() -> { + return this.initializer.closeGracefully().then(transport.closeGracefully()); + }); + } + + // -------------------------- + // Initialization + // -------------------------- + + @Override + public Mono initialize() { + return this.initializer.withInitialization("by explicit API call", init -> Mono.just(init.initializeResult())); + } + + // -------------------------- + // Basic Utilities + // -------------------------- + + @Override + public Mono ping() { + return this.initializer.withInitialization("pinging the server", + init -> init.mcpSession().sendRequest(McpSchema.METHOD_PING, null, OBJECT_TYPE_REF)); + } + + // -------------------------- + // Roots + // -------------------------- + + @Override + public Mono addRoot(Root root) { + + if (root == null) { + return Mono.error(new IllegalArgumentException("Root must not be null")); + } + + if (this.clientCapabilities.roots() == null) { + return Mono.error(new IllegalStateException("Client must be configured with roots capabilities")); + } + + if (this.roots.containsKey(root.uri())) { + return Mono.error(new IllegalStateException("Root with uri '" + root.uri() + "' already exists")); + } + + this.roots.put(root.uri(), root); + + logger.debug("Added root: {}", root); + + if (this.clientCapabilities.roots().listChanged()) { + if (this.isInitialized()) { + return this.rootsListChangedNotification(); + } + else { + logger.warn("Client is not initialized, ignore sending a roots list changed notification"); + } + } + return Mono.empty(); + } + + @Override + public Mono removeRoot(String rootUri) { + + if (rootUri == null) { + return Mono.error(new IllegalArgumentException("Root uri must not be null")); + } + + if (this.clientCapabilities.roots() == null) { + return Mono.error(new IllegalStateException("Client must be configured with roots capabilities")); + } + + Root removed = this.roots.remove(rootUri); + + if (removed != null) { + logger.debug("Removed Root: {}", rootUri); + if (this.clientCapabilities.roots().listChanged()) { + if (this.isInitialized()) { + return this.rootsListChangedNotification(); + } + else { + logger.warn("Client is not initialized, ignore sending a roots list changed notification"); + } + + } + return Mono.empty(); + } + return Mono.error(new IllegalStateException("Root with uri '" + rootUri + "' not found")); + } + + @Override + public Mono rootsListChangedNotification() { + return this.initializer.withInitialization("sending roots list changed notification", + init -> init.mcpSession().sendNotification(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED)); + } + + private RequestHandler rootsListRequestHandler() { + return params -> { + @SuppressWarnings("unused") + McpSchema.PaginatedRequest request = transport.unmarshalFrom(params, PAGINATED_REQUEST_TYPE_REF); + + List roots = this.roots.values().stream().toList(); + + return Mono.just(new McpSchema.ListRootsResult(roots)); + }; + } + + // -------------------------- + // Sampling + // -------------------------- + private RequestHandler samplingCreateMessageHandler() { + return params -> { + McpSchema.CreateMessageRequest request = transport.unmarshalFrom(params, CREATE_MESSAGE_REQUEST_TYPE_REF); + + return this.samplingHandler.apply(request); + }; + } + + // -------------------------- + // Elicitation + // -------------------------- + private RequestHandler elicitationCreateHandler() { + return params -> { + ElicitRequest request = transport.unmarshalFrom(params, new TypeRef<>() { + }); + + return this.elicitationHandler.apply(request); + }; + } + + // -------------------------- + // Tools + // -------------------------- + private static final TypeRef CALL_TOOL_RESULT_TYPE_REF = new TypeRef<>() { + }; + + private static final TypeRef LIST_TOOLS_RESULT_TYPE_REF = new TypeRef<>() { + }; + + @Override + public Mono callTool(McpSchema.CallToolRequest callToolRequest) { + return this.initializer.withInitialization("calling tool", init -> { + if (init.initializeResult().capabilities().tools() == null) { + return Mono.error(new IllegalStateException("Server does not provide tools capability")); + } + + return init.mcpSession() + .sendRequest(McpSchema.METHOD_TOOLS_CALL, callToolRequest, CALL_TOOL_RESULT_TYPE_REF) + .flatMap(result -> Mono.just(validateToolResult(callToolRequest.name(), result))); + }); + } + + private McpSchema.CallToolResult validateToolResult(String toolName, McpSchema.CallToolResult result) { + + if (!this.enableCallToolSchemaCaching || result == null || result.isError() == Boolean.TRUE) { + // if tool schema caching is disabled or tool call resulted in an error - skip + // validation and return the result as it is + return result; + } + + Map optOutputSchema = this.toolsOutputSchemaCache.get(toolName); + + if (optOutputSchema == null) { + logger.warn( + "Calling a tool with no outputSchema is not expected to return result with structured content, but got: {}", + result.structuredContent()); + return result; + } + + // Validate the tool output against the cached output schema + var validation = this.jsonSchemaValidator.validate(optOutputSchema, result.structuredContent()); + + if (!validation.valid()) { + logger.warn("Tool call result validation failed: {}", validation.errorMessage()); + throw new IllegalArgumentException("Tool call result validation failed: " + validation.errorMessage()); + } + + return result; + } + + @Override + public Mono listTools() { + return this.listTools(McpSchema.FIRST_PAGE).expand(result -> { + String next = result.nextCursor(); + return (next != null && !next.isEmpty()) ? this.listTools(next) : Mono.empty(); + }).reduce(new McpSchema.ListToolsResult(new ArrayList<>(), null), (allToolsResult, result) -> { + allToolsResult.tools().addAll(result.tools()); + return allToolsResult; + }).map(result -> new McpSchema.ListToolsResult(Collections.unmodifiableList(result.tools()), null)); + } + + @Override + public Mono listTools(String cursor) { + return this.initializer.withInitialization("listing tools", init -> this.listToolsInternal(init, cursor)); + } + + private Mono listToolsInternal(Initialization init, String cursor) { + + if (init.initializeResult().capabilities().tools() == null) { + return Mono.error(new IllegalStateException("Server does not provide tools capability")); + } + return init.mcpSession() + .sendRequest(McpSchema.METHOD_TOOLS_LIST, new McpSchema.PaginatedRequest(cursor), + LIST_TOOLS_RESULT_TYPE_REF) + .doOnNext(result -> { + if (this.enableCallToolSchemaCaching && result.tools() != null) { + // Cache tools output schema + result.tools() + .stream() + .filter(tool -> tool.outputSchema() != null) + .forEach(tool -> this.toolsOutputSchemaCache.put(tool.name(), tool.outputSchema())); + } + }); + } + + private NotificationHandler asyncToolsChangeNotificationHandler( + List, Mono>> toolsChangeConsumers) { + // TODO: params are not used yet + return params -> this.listTools() + .flatMap(listToolsResult -> Flux.fromIterable(toolsChangeConsumers) + .flatMap(consumer -> consumer.apply(listToolsResult.tools())) + .onErrorResume(error -> { + logger.error("Error handling tools list change notification", error); + return Mono.empty(); + }) + .then()); + } + + // -------------------------- + // Resources + // -------------------------- + + private static final TypeRef LIST_RESOURCES_RESULT_TYPE_REF = new TypeRef<>() { + }; + + private static final TypeRef READ_RESOURCE_RESULT_TYPE_REF = new TypeRef<>() { + }; + + private static final TypeRef LIST_RESOURCE_TEMPLATES_RESULT_TYPE_REF = new TypeRef<>() { + }; + + @Override + public Mono listResources() { + return this.listResources(McpSchema.FIRST_PAGE) + .expand(result -> (result.nextCursor() != null) ? this.listResources(result.nextCursor()) : Mono.empty()) + .reduce(new McpSchema.ListResourcesResult(new ArrayList<>(), null), (allResourcesResult, result) -> { + allResourcesResult.resources().addAll(result.resources()); + return allResourcesResult; + }) + .map(result -> new McpSchema.ListResourcesResult(Collections.unmodifiableList(result.resources()), null)); + } + + @Override + public Mono listResources(String cursor) { + return this.initializer.withInitialization("listing resources", init -> { + if (init.initializeResult().capabilities().resources() == null) { + return Mono.error(new IllegalStateException("Server does not provide the resources capability")); + } + return init.mcpSession() + .sendRequest(McpSchema.METHOD_RESOURCES_LIST, new McpSchema.PaginatedRequest(cursor), + LIST_RESOURCES_RESULT_TYPE_REF); + }); + } + + @Override + public Mono readResource(McpSchema.Resource resource) { + return this.readResource(new McpSchema.ReadResourceRequest(resource.uri())); + } + + @Override + public Mono readResource(McpSchema.ReadResourceRequest readResourceRequest) { + return this.initializer.withInitialization("reading resources", init -> { + if (init.initializeResult().capabilities().resources() == null) { + return Mono.error(new IllegalStateException("Server does not provide the resources capability")); + } + return init.mcpSession() + .sendRequest(McpSchema.METHOD_RESOURCES_READ, readResourceRequest, READ_RESOURCE_RESULT_TYPE_REF); + }); + } + + @Override + public Mono listResourceTemplates() { + return this.listResourceTemplates(McpSchema.FIRST_PAGE) + .expand(result -> (result.nextCursor() != null) ? this.listResourceTemplates(result.nextCursor()) + : Mono.empty()) + .reduce(new McpSchema.ListResourceTemplatesResult(new ArrayList<>(), null), + (allResourceTemplatesResult, result) -> { + allResourceTemplatesResult.resourceTemplates().addAll(result.resourceTemplates()); + return allResourceTemplatesResult; + }) + .map(result -> new McpSchema.ListResourceTemplatesResult( + Collections.unmodifiableList(result.resourceTemplates()), null)); + } + + @Override + public Mono listResourceTemplates(String cursor) { + return this.initializer.withInitialization("listing resource templates", init -> { + if (init.initializeResult().capabilities().resources() == null) { + return Mono.error(new IllegalStateException("Server does not provide the resources capability")); + } + return init.mcpSession() + .sendRequest(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, new McpSchema.PaginatedRequest(cursor), + LIST_RESOURCE_TEMPLATES_RESULT_TYPE_REF); + }); + } + + @Override + public Mono subscribeResource(McpSchema.SubscribeRequest subscribeRequest) { + return this.initializer.withInitialization("subscribing to resources", init -> init.mcpSession() + .sendRequest(McpSchema.METHOD_RESOURCES_SUBSCRIBE, subscribeRequest, VOID_TYPE_REFERENCE)); + } + + @Override + public Mono unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) { + return this.initializer.withInitialization("unsubscribing from resources", init -> init.mcpSession() + .sendRequest(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, unsubscribeRequest, VOID_TYPE_REFERENCE)); + } + + private NotificationHandler asyncResourcesChangeNotificationHandler( + List, Mono>> resourcesChangeConsumers) { + return params -> listResources().flatMap(listResourcesResult -> Flux.fromIterable(resourcesChangeConsumers) + .flatMap(consumer -> consumer.apply(listResourcesResult.resources())) + .onErrorResume(error -> { + logger.error("Error handling resources list change notification", error); + return Mono.empty(); + }) + .then()); + } + + private NotificationHandler asyncResourcesUpdatedNotificationHandler( + List, Mono>> resourcesUpdateConsumers) { + return params -> { + McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification = transport.unmarshalFrom(params, + new TypeRef<>() { + }); + + return readResource(new McpSchema.ReadResourceRequest(resourcesUpdatedNotification.uri())) + .flatMap(readResourceResult -> Flux.fromIterable(resourcesUpdateConsumers) + .flatMap(consumer -> consumer.apply(readResourceResult.contents())) + .onErrorResume(error -> { + logger.error("Error handling resource update notification", error); + return Mono.empty(); + }) + .then()); + }; + } + + // -------------------------- + // Prompts + // -------------------------- + private static final TypeRef LIST_PROMPTS_RESULT_TYPE_REF = new TypeRef<>() { + }; + + private static final TypeRef GET_PROMPT_RESULT_TYPE_REF = new TypeRef<>() { + }; + + @Override + public Mono listPrompts() { + return this.listPrompts(McpSchema.FIRST_PAGE) + .expand(result -> (result.nextCursor() != null) ? this.listPrompts(result.nextCursor()) : Mono.empty()) + .reduce(new ListPromptsResult(new ArrayList<>(), null), (allPromptsResult, result) -> { + allPromptsResult.prompts().addAll(result.prompts()); + return allPromptsResult; + }) + .map(result -> new McpSchema.ListPromptsResult(Collections.unmodifiableList(result.prompts()), null)); + } + + @Override + public Mono listPrompts(String cursor) { + return this.initializer.withInitialization("listing prompts", init -> init.mcpSession() + .sendRequest(McpSchema.METHOD_PROMPT_LIST, new PaginatedRequest(cursor), LIST_PROMPTS_RESULT_TYPE_REF)); + } + + @Override + public Mono getPrompt(GetPromptRequest getPromptRequest) { + return this.initializer.withInitialization("getting prompts", init -> init.mcpSession() + .sendRequest(McpSchema.METHOD_PROMPT_GET, getPromptRequest, GET_PROMPT_RESULT_TYPE_REF)); + } + + private NotificationHandler asyncPromptsChangeNotificationHandler( + List, Mono>> promptsChangeConsumers) { + return params -> listPrompts().flatMap(listPromptsResult -> Flux.fromIterable(promptsChangeConsumers) + .flatMap(consumer -> consumer.apply(listPromptsResult.prompts())) + .onErrorResume(error -> { + logger.error("Error handling prompts list change notification", error); + return Mono.empty(); + }) + .then()); + } + + // -------------------------- + // Logging + // -------------------------- + private NotificationHandler asyncLoggingNotificationHandler( + List>> loggingConsumers) { + + return params -> { + McpSchema.LoggingMessageNotification loggingMessageNotification = transport.unmarshalFrom(params, + LOGGING_MESSAGE_NOTIFICATION_TYPE_REF); + + return Flux.fromIterable(loggingConsumers) + .flatMap(consumer -> consumer.apply(loggingMessageNotification)) + .then(); + }; + } + + @Override + public Mono setLoggingLevel(LoggingLevel loggingLevel) { + if (loggingLevel == null) { + return Mono.error(new IllegalArgumentException("Logging level must not be null")); + } + + return this.initializer.withInitialization("setting logging level", init -> { + if (init.initializeResult().capabilities().logging() == null) { + return Mono.error(new IllegalStateException("Server's Logging capabilities are not enabled!")); + } + var params = new McpSchema.SetLevelRequest(loggingLevel); + return init.mcpSession().sendRequest(McpSchema.METHOD_LOGGING_SET_LEVEL, params, OBJECT_TYPE_REF).then(); + }); + } + + private NotificationHandler asyncProgressNotificationHandler( + List>> progressConsumers) { + + return params -> { + McpSchema.ProgressNotification progressNotification = transport.unmarshalFrom(params, + PROGRESS_NOTIFICATION_TYPE_REF); + + return Flux.fromIterable(progressConsumers) + .flatMap(consumer -> consumer.apply(progressNotification)) + .then(); + }; + } + + /** + * This method is package-private and used for test only. Should not be called by user + * code. + * @param protocolVersions the Client supported protocol versions. + */ + void setProtocolVersions(List protocolVersions) { + this.initializer.setProtocolVersions(protocolVersions); + } + + // -------------------------- + // Completions + // -------------------------- + private static final TypeRef COMPLETION_COMPLETE_RESULT_TYPE_REF = new TypeRef<>() { + }; + + @Override + public Mono completeCompletion(McpSchema.CompleteRequest completeRequest) { + return this.initializer.withInitialization("complete completions", init -> init.mcpSession() + .sendRequest(McpSchema.METHOD_COMPLETION_COMPLETE, completeRequest, COMPLETION_COMPLETE_RESULT_TYPE_REF)); + } + +} diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/DefaultMcpSyncClient.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/DefaultMcpSyncClient.java new file mode 100644 index 000000000..2d8e01803 --- /dev/null +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/DefaultMcpSyncClient.java @@ -0,0 +1,278 @@ +/* + * Copyright 2024-2024 the original author or authors. + */ + +package io.modelcontextprotocol.client; + +import java.time.Duration; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.modelcontextprotocol.common.McpTransportContext; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities; +import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest; +import io.modelcontextprotocol.spec.McpSchema.GetPromptResult; +import io.modelcontextprotocol.spec.McpSchema.ListPromptsResult; +import io.modelcontextprotocol.util.Assert; +import reactor.core.publisher.Mono; + +/** + * A synchronous client implementation for the Model Context Protocol (MCP) that wraps an + * {@link McpAsyncClient} to provide blocking operations. + * + *

+ * This client implements the MCP specification by delegating to an asynchronous client + * and blocking on the results. Key features include: + *

    + *
  • Synchronous, blocking API for simpler integration in non-reactive applications + *
  • Tool discovery and invocation for server-provided functionality + *
  • Resource access and management with URI-based addressing + *
  • Prompt template handling for standardized AI interactions + *
  • Real-time notifications for tools, resources, and prompts changes + *
  • Structured logging with configurable severity levels + *
+ * + *

+ * The client follows the same lifecycle as its async counterpart: + *

    + *
  1. Initialization - Establishes connection and negotiates capabilities + *
  2. Normal Operation - Handles requests and notifications + *
  3. Graceful Shutdown - Ensures clean connection termination + *
+ * + *

+ * This implementation implements {@link AutoCloseable} for resource cleanup and provides + * both immediate and graceful shutdown options. All operations block until completion or + * timeout, making it suitable for traditional synchronous programming models. + * + * @author Dariusz Jędrzejczyk + * @author Christian Tzolov + * @author Jihoon Kim + * @see McpClient + * @see McpAsyncClient + * @see McpSchema + */ +public class DefaultMcpSyncClient implements McpSyncClient { + + private static final Logger logger = LoggerFactory.getLogger(DefaultMcpSyncClient.class); + + // TODO: Consider providing a client config to set this properly + // this is currently a concern only because AutoCloseable is used - perhaps it + // is not a requirement? + private static final long DEFAULT_CLOSE_TIMEOUT_MS = 10_000L; + + private final McpAsyncClient delegate; + + private final Supplier contextProvider; + + /** + * Create a new McpSyncClient with the given delegate. + * @param delegate the asynchronous kernel on top of which this synchronous client + * provides a blocking API. + * @param contextProvider the supplier of context before calling any non-blocking + * operation on underlying delegate + */ + DefaultMcpSyncClient(McpAsyncClient delegate, Supplier contextProvider) { + Assert.notNull(delegate, "The delegate can not be null"); + Assert.notNull(contextProvider, "The contextProvider can not be null"); + this.delegate = delegate; + this.contextProvider = contextProvider; + } + + @Override + public McpSchema.InitializeResult getCurrentInitializationResult() { + return this.delegate.getCurrentInitializationResult(); + } + + @Override + public McpSchema.ServerCapabilities getServerCapabilities() { + return this.delegate.getServerCapabilities(); + } + + @Override + public String getServerInstructions() { + return this.delegate.getServerInstructions(); + } + + @Override + public McpSchema.Implementation getServerInfo() { + return this.delegate.getServerInfo(); + } + + @Override + public boolean isInitialized() { + return this.delegate.isInitialized(); + } + + @Override + public ClientCapabilities getClientCapabilities() { + return this.delegate.getClientCapabilities(); + } + + @Override + public McpSchema.Implementation getClientInfo() { + return this.delegate.getClientInfo(); + } + + @Override + public void close() { + this.delegate.close(); + } + + @Override + public boolean closeGracefully() { + try { + this.delegate.closeGracefully().block(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); + } + catch (RuntimeException e) { + logger.warn("Client didn't close within timeout of {} ms.", DEFAULT_CLOSE_TIMEOUT_MS, e); + return false; + } + return true; + } + + @Override + public McpSchema.InitializeResult initialize() { + // TODO: block takes no argument here as we assume the async client is + // configured with a requestTimeout at all times + return withProvidedContext(this.delegate.initialize()).block(); + } + + @Override + public void rootsListChangedNotification() { + withProvidedContext(this.delegate.rootsListChangedNotification()).block(); + } + + @Override + public void addRoot(McpSchema.Root root) { + this.delegate.addRoot(root).block(); + } + + @Override + public void removeRoot(String rootUri) { + this.delegate.removeRoot(rootUri).block(); + } + + @Override + public Object ping() { + return withProvidedContext(this.delegate.ping()).block(); + } + + // -------------------------- + // Tools + // -------------------------- + @Override + public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) { + return withProvidedContext(this.delegate.callTool(callToolRequest)).block(); + + } + + @Override + public McpSchema.ListToolsResult listTools() { + return withProvidedContext(this.delegate.listTools()).block(); + } + + @Override + public McpSchema.ListToolsResult listTools(String cursor) { + return withProvidedContext(this.delegate.listTools(cursor)).block(); + + } + + // -------------------------- + // Resources + // -------------------------- + + @Override + public McpSchema.ListResourcesResult listResources() { + return withProvidedContext(this.delegate.listResources()).block(); + + } + + @Override + public McpSchema.ListResourcesResult listResources(String cursor) { + return withProvidedContext(this.delegate.listResources(cursor)).block(); + + } + + @Override + public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) { + return withProvidedContext(this.delegate.readResource(resource)).block(); + + } + + @Override + public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest readResourceRequest) { + return withProvidedContext(this.delegate.readResource(readResourceRequest)).block(); + + } + + @Override + public McpSchema.ListResourceTemplatesResult listResourceTemplates() { + return withProvidedContext(this.delegate.listResourceTemplates()).block(); + + } + + @Override + public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor) { + return withProvidedContext(this.delegate.listResourceTemplates(cursor)).block(); + + } + + @Override + public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) { + withProvidedContext(this.delegate.subscribeResource(subscribeRequest)).block(); + + } + + @Override + public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) { + withProvidedContext(this.delegate.unsubscribeResource(unsubscribeRequest)).block(); + + } + + // -------------------------- + // Prompts + // -------------------------- + + @Override + public ListPromptsResult listPrompts() { + return withProvidedContext(this.delegate.listPrompts()).block(); + } + + @Override + public ListPromptsResult listPrompts(String cursor) { + return withProvidedContext(this.delegate.listPrompts(cursor)).block(); + + } + + @Override + public GetPromptResult getPrompt(GetPromptRequest getPromptRequest) { + return withProvidedContext(this.delegate.getPrompt(getPromptRequest)).block(); + } + + @Override + public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) { + withProvidedContext(this.delegate.setLoggingLevel(loggingLevel)).block(); + + } + + @Override + public McpSchema.CompleteResult completeCompletion(McpSchema.CompleteRequest completeRequest) { + return withProvidedContext(this.delegate.completeCompletion(completeRequest)).block(); + + } + + /** + * For a given action, on assembly, capture the "context" via the + * {@link #contextProvider} and store it in the Reactor context. + * @param action the action to perform + * @return the result of the action + */ + private Mono withProvidedContext(Mono action) { + return action.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, this.contextProvider.get())); + } + +} diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java index f56c79a6d..85b3fd743 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java @@ -304,7 +304,7 @@ private Mono doInitialize(DefaultInitialization init this.clientCapabilities, this.clientInfo); Mono result = mcpClientSession.sendRequest(McpSchema.METHOD_INITIALIZE, - initializeRequest, McpAsyncClient.INITIALIZE_RESULT_TYPE_REF); + initializeRequest, DefaultMcpAsyncClient.INITIALIZE_RESULT_TYPE_REF); return result.flatMap(initializeResult -> { logger.info("Server response with Protocol: {}, Capabilities: {}, Info: {} and Instructions {}", diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index 2d1f4b43c..3ef591aa7 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -1,49 +1,12 @@ -/* - * Copyright 2024-2024 the original author or authors. - */ - package io.modelcontextprotocol.client; -import java.time.Duration; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; - -import io.modelcontextprotocol.client.LifecycleInitializer.Initialization; -import io.modelcontextprotocol.json.TypeRef; -import io.modelcontextprotocol.json.schema.JsonSchemaValidator; import io.modelcontextprotocol.spec.McpClientSession; -import io.modelcontextprotocol.spec.McpClientSession.NotificationHandler; -import io.modelcontextprotocol.spec.McpClientSession.RequestHandler; import io.modelcontextprotocol.spec.McpClientTransport; import io.modelcontextprotocol.spec.McpSchema; -import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities; -import io.modelcontextprotocol.spec.McpSchema.CreateMessageRequest; -import io.modelcontextprotocol.spec.McpSchema.CreateMessageResult; -import io.modelcontextprotocol.spec.McpSchema.ElicitRequest; -import io.modelcontextprotocol.spec.McpSchema.ElicitResult; -import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest; -import io.modelcontextprotocol.spec.McpSchema.GetPromptResult; -import io.modelcontextprotocol.spec.McpSchema.ListPromptsResult; -import io.modelcontextprotocol.spec.McpSchema.LoggingLevel; -import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification; -import io.modelcontextprotocol.spec.McpSchema.PaginatedRequest; -import io.modelcontextprotocol.spec.McpSchema.Root; -import io.modelcontextprotocol.util.Assert; -import io.modelcontextprotocol.util.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * The Model Context Protocol (MCP) client implementation that provides asynchronous + * The Model Context Protocol (MCP) client interface that provides asynchronous * communication with MCP servers using Project Reactor's Mono and Flux types. * *

@@ -68,336 +31,75 @@ * * *

- * This implementation uses Project Reactor for non-blocking operations, making it - * suitable for high-throughput scenarios and reactive applications. All operations return - * Mono or Flux types that can be composed into reactive pipelines. + * This interface uses Project Reactor for non-blocking operations, making it suitable for + * high-throughput scenarios and reactive applications. All operations return Mono or Flux + * types that can be composed into reactive pipelines. * * @author Dariusz Jędrzejczyk * @author Christian Tzolov * @author Jihoon Kim * @author Anurag Pant + * @author Pin He * @see McpClient * @see McpSchema * @see McpClientSession * @see McpClientTransport */ -public class McpAsyncClient { - - private static final Logger logger = LoggerFactory.getLogger(McpAsyncClient.class); - - private static final TypeRef VOID_TYPE_REFERENCE = new TypeRef<>() { - }; - - public static final TypeRef OBJECT_TYPE_REF = new TypeRef<>() { - }; - - public static final TypeRef PAGINATED_REQUEST_TYPE_REF = new TypeRef<>() { - }; - - public static final TypeRef INITIALIZE_RESULT_TYPE_REF = new TypeRef<>() { - }; - - public static final TypeRef CREATE_MESSAGE_REQUEST_TYPE_REF = new TypeRef<>() { - }; - - public static final TypeRef LOGGING_MESSAGE_NOTIFICATION_TYPE_REF = new TypeRef<>() { - }; - - public static final TypeRef PROGRESS_NOTIFICATION_TYPE_REF = new TypeRef<>() { - }; - - /** - * Client capabilities. - */ - private final McpSchema.ClientCapabilities clientCapabilities; - - /** - * Client implementation information. - */ - private final McpSchema.Implementation clientInfo; - - /** - * Roots define the boundaries of where servers can operate within the filesystem, - * allowing them to understand which directories and files they have access to. - * Servers can request the list of roots from supporting clients and receive - * notifications when that list changes. - */ - private final ConcurrentHashMap roots; - - /** - * MCP provides a standardized way for servers to request LLM sampling ("completions" - * or "generations") from language models via clients. This flow allows clients to - * maintain control over model access, selection, and permissions while enabling - * servers to leverage AI capabilities—with no server API keys necessary. Servers can - * request text or image-based interactions and optionally include context from MCP - * servers in their prompts. - */ - private Function> samplingHandler; - - /** - * MCP provides a standardized way for servers to request additional information from - * users through the client during interactions. This flow allows clients to maintain - * control over user interactions and data sharing while enabling servers to gather - * necessary information dynamically. Servers can request structured data from users - * with optional JSON schemas to validate responses. - */ - private Function> elicitationHandler; - - /** - * Client transport implementation. - */ - private final McpClientTransport transport; - - /** - * The lifecycle initializer that manages the client-server connection initialization. - */ - private final LifecycleInitializer initializer; - - /** - * JSON schema validator to use for validating tool responses against output schemas. - */ - private final JsonSchemaValidator jsonSchemaValidator; - - /** - * Cached tool output schemas. - */ - private final ConcurrentHashMap> toolsOutputSchemaCache; - - /** - * Whether to enable automatic schema caching during callTool operations. - */ - private final boolean enableCallToolSchemaCaching; - - /** - * Create a new McpAsyncClient with the given transport and session request-response - * timeout. - * @param transport the transport to use. - * @param requestTimeout the session request-response timeout. - * @param initializationTimeout the max timeout to await for the client-server - * @param jsonSchemaValidator the JSON schema validator to use for validating tool - * @param features the MCP Client supported features. responses against output - * schemas. - */ - McpAsyncClient(McpClientTransport transport, Duration requestTimeout, Duration initializationTimeout, - JsonSchemaValidator jsonSchemaValidator, McpClientFeatures.Async features) { - - Assert.notNull(transport, "Transport must not be null"); - Assert.notNull(requestTimeout, "Request timeout must not be null"); - Assert.notNull(initializationTimeout, "Initialization timeout must not be null"); - - this.clientInfo = features.clientInfo(); - this.clientCapabilities = features.clientCapabilities(); - this.transport = transport; - this.roots = new ConcurrentHashMap<>(features.roots()); - this.jsonSchemaValidator = jsonSchemaValidator; - this.toolsOutputSchemaCache = new ConcurrentHashMap<>(); - this.enableCallToolSchemaCaching = features.enableCallToolSchemaCaching(); - - // Request Handlers - Map> requestHandlers = new HashMap<>(); - - // Ping MUST respond with an empty data, but not NULL response. - requestHandlers.put(McpSchema.METHOD_PING, params -> { - logger.debug("Received ping: {}", LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); - return Mono.just(Map.of()); - }); - - // Roots List Request Handler - if (this.clientCapabilities.roots() != null) { - requestHandlers.put(McpSchema.METHOD_ROOTS_LIST, rootsListRequestHandler()); - } - - // Sampling Handler - if (this.clientCapabilities.sampling() != null) { - if (features.samplingHandler() == null) { - throw new IllegalArgumentException( - "Sampling handler must not be null when client capabilities include sampling"); - } - this.samplingHandler = features.samplingHandler(); - requestHandlers.put(McpSchema.METHOD_SAMPLING_CREATE_MESSAGE, samplingCreateMessageHandler()); - } - - // Elicitation Handler - if (this.clientCapabilities.elicitation() != null) { - if (features.elicitationHandler() == null) { - throw new IllegalArgumentException( - "Elicitation handler must not be null when client capabilities include elicitation"); - } - this.elicitationHandler = features.elicitationHandler(); - requestHandlers.put(McpSchema.METHOD_ELICITATION_CREATE, elicitationCreateHandler()); - } - - // Notification Handlers - Map notificationHandlers = new HashMap<>(); - - // Tools Change Notification - List, Mono>> toolsChangeConsumersFinal = new ArrayList<>(); - toolsChangeConsumersFinal - .add((notification) -> Mono.fromRunnable(() -> logger.debug("Tools changed: {}", notification))); - - if (!Utils.isEmpty(features.toolsChangeConsumers())) { - toolsChangeConsumersFinal.addAll(features.toolsChangeConsumers()); - } - notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_TOOLS_LIST_CHANGED, - asyncToolsChangeNotificationHandler(toolsChangeConsumersFinal)); - - // Resources Change Notification - List, Mono>> resourcesChangeConsumersFinal = new ArrayList<>(); - resourcesChangeConsumersFinal - .add((notification) -> Mono.fromRunnable(() -> logger.debug("Resources changed: {}", notification))); - - if (!Utils.isEmpty(features.resourcesChangeConsumers())) { - resourcesChangeConsumersFinal.addAll(features.resourcesChangeConsumers()); - } - - notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED, - asyncResourcesChangeNotificationHandler(resourcesChangeConsumersFinal)); - - // Resources Update Notification - List, Mono>> resourcesUpdateConsumersFinal = new ArrayList<>(); - resourcesUpdateConsumersFinal - .add((notification) -> Mono.fromRunnable(() -> logger.debug("Resources updated: {}", notification))); - - if (!Utils.isEmpty(features.resourcesUpdateConsumers())) { - resourcesUpdateConsumersFinal.addAll(features.resourcesUpdateConsumers()); - } - - notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED, - asyncResourcesUpdatedNotificationHandler(resourcesUpdateConsumersFinal)); - - // Prompts Change Notification - List, Mono>> promptsChangeConsumersFinal = new ArrayList<>(); - promptsChangeConsumersFinal - .add((notification) -> Mono.fromRunnable(() -> logger.debug("Prompts changed: {}", notification))); - if (!Utils.isEmpty(features.promptsChangeConsumers())) { - promptsChangeConsumersFinal.addAll(features.promptsChangeConsumers()); - } - notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_PROMPTS_LIST_CHANGED, - asyncPromptsChangeNotificationHandler(promptsChangeConsumersFinal)); - - // Utility Logging Notification - List>> loggingConsumersFinal = new ArrayList<>(); - loggingConsumersFinal.add((notification) -> Mono.fromRunnable(() -> logger.debug("Logging: {}", notification))); - if (!Utils.isEmpty(features.loggingConsumers())) { - loggingConsumersFinal.addAll(features.loggingConsumers()); - } - notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_MESSAGE, - asyncLoggingNotificationHandler(loggingConsumersFinal)); - - // Utility Progress Notification - List>> progressConsumersFinal = new ArrayList<>(); - progressConsumersFinal - .add((notification) -> Mono.fromRunnable(() -> logger.debug("Progress: {}", notification))); - if (!Utils.isEmpty(features.progressConsumers())) { - progressConsumersFinal.addAll(features.progressConsumers()); - } - notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_PROGRESS, - asyncProgressNotificationHandler(progressConsumersFinal)); - - Function> postInitializationHook = init -> { - - if (init.initializeResult().capabilities().tools() == null || !enableCallToolSchemaCaching) { - return Mono.empty(); - } - - return this.listToolsInternal(init, McpSchema.FIRST_PAGE).doOnNext(listToolsResult -> { - listToolsResult.tools() - .forEach(tool -> logger.debug("Tool {} schema: {}", tool.name(), tool.outputSchema())); - if (enableCallToolSchemaCaching && listToolsResult.tools() != null) { - // Cache tools output schema - listToolsResult.tools() - .stream() - .filter(tool -> tool.outputSchema() != null) - .forEach(tool -> this.toolsOutputSchemaCache.put(tool.name(), tool.outputSchema())); - } - }).then(); - }; - - this.initializer = new LifecycleInitializer(clientCapabilities, clientInfo, transport.protocolVersions(), - initializationTimeout, ctx -> new McpClientSession(requestTimeout, transport, requestHandlers, - notificationHandlers, con -> con.contextWrite(ctx)), - postInitializationHook); - - this.transport.setExceptionHandler(this.initializer::handleException); - } +public interface McpAsyncClient { /** * Get the current initialization result. * @return the initialization result. */ - public McpSchema.InitializeResult getCurrentInitializationResult() { - return this.initializer.currentInitializationResult(); - } + McpSchema.InitializeResult getCurrentInitializationResult(); /** * Get the server capabilities that define the supported features and functionality. * @return The server capabilities */ - public McpSchema.ServerCapabilities getServerCapabilities() { - McpSchema.InitializeResult initializeResult = this.initializer.currentInitializationResult(); - return initializeResult != null ? initializeResult.capabilities() : null; - } + McpSchema.ServerCapabilities getServerCapabilities(); /** * Get the server instructions that provide guidance to the client on how to interact * with this server. * @return The server instructions */ - public String getServerInstructions() { - McpSchema.InitializeResult initializeResult = this.initializer.currentInitializationResult(); - return initializeResult != null ? initializeResult.instructions() : null; - } + String getServerInstructions(); /** * Get the server implementation information. * @return The server implementation details */ - public McpSchema.Implementation getServerInfo() { - McpSchema.InitializeResult initializeResult = this.initializer.currentInitializationResult(); - return initializeResult != null ? initializeResult.serverInfo() : null; - } + McpSchema.Implementation getServerInfo(); /** * Check if the client-server connection is initialized. * @return true if the client-server connection is initialized */ - public boolean isInitialized() { - return this.initializer.isInitialized(); - } + boolean isInitialized(); /** * Get the client capabilities that define the supported features and functionality. * @return The client capabilities */ - public ClientCapabilities getClientCapabilities() { - return this.clientCapabilities; - } + McpSchema.ClientCapabilities getClientCapabilities(); /** * Get the client implementation information. * @return The client implementation details */ - public McpSchema.Implementation getClientInfo() { - return this.clientInfo; - } + McpSchema.Implementation getClientInfo(); /** * Closes the client connection immediately. */ - public void close() { - this.initializer.close(); - this.transport.close(); - } + void close(); /** * Gracefully closes the client connection. * @return A Mono that completes when the connection is closed */ - public Mono closeGracefully() { - return Mono.defer(() -> { - return this.initializer.closeGracefully().then(transport.closeGracefully()); - }); - } + Mono closeGracefully(); // -------------------------- // Initialization @@ -429,9 +131,7 @@ public Mono closeGracefully() { * Initialization Spec *

*/ - public Mono initialize() { - return this.initializer.withInitialization("by explicit API call", init -> Mono.just(init.initializeResult())); - } + Mono initialize(); // -------------------------- // Basic Utilities @@ -441,10 +141,7 @@ public Mono initialize() { * Sends a ping request to the server. * @return A Mono that completes with the server's ping response */ - public Mono ping() { - return this.initializer.withInitialization("pinging the server", - init -> init.mcpSession().sendRequest(McpSchema.METHOD_PING, null, OBJECT_TYPE_REF)); - } + Mono ping(); // -------------------------- // Roots @@ -455,67 +152,14 @@ public Mono ping() { * @param root The root to add. * @return A Mono that completes when the root is added and notifications are sent. */ - public Mono addRoot(Root root) { - - if (root == null) { - return Mono.error(new IllegalArgumentException("Root must not be null")); - } - - if (this.clientCapabilities.roots() == null) { - return Mono.error(new IllegalStateException("Client must be configured with roots capabilities")); - } - - if (this.roots.containsKey(root.uri())) { - return Mono.error(new IllegalStateException("Root with uri '" + root.uri() + "' already exists")); - } - - this.roots.put(root.uri(), root); - - logger.debug("Added root: {}", root); - - if (this.clientCapabilities.roots().listChanged()) { - if (this.isInitialized()) { - return this.rootsListChangedNotification(); - } - else { - logger.warn("Client is not initialized, ignore sending a roots list changed notification"); - } - } - return Mono.empty(); - } + Mono addRoot(McpSchema.Root root); /** * Removes a root from the client's root list. * @param rootUri The URI of the root to remove. * @return A Mono that completes when the root is removed and notifications are sent. */ - public Mono removeRoot(String rootUri) { - - if (rootUri == null) { - return Mono.error(new IllegalArgumentException("Root uri must not be null")); - } - - if (this.clientCapabilities.roots() == null) { - return Mono.error(new IllegalStateException("Client must be configured with roots capabilities")); - } - - Root removed = this.roots.remove(rootUri); - - if (removed != null) { - logger.debug("Removed Root: {}", rootUri); - if (this.clientCapabilities.roots().listChanged()) { - if (this.isInitialized()) { - return this.rootsListChangedNotification(); - } - else { - logger.warn("Client is not initialized, ignore sending a roots list changed notification"); - } - - } - return Mono.empty(); - } - return Mono.error(new IllegalStateException("Root with uri '" + rootUri + "' not found")); - } + Mono removeRoot(String rootUri); /** * Manually sends a roots/list_changed notification. The addRoot and removeRoot @@ -523,53 +167,11 @@ public Mono removeRoot(String rootUri) { * an initialized state. * @return A Mono that completes when the notification is sent. */ - public Mono rootsListChangedNotification() { - return this.initializer.withInitialization("sending roots list changed notification", - init -> init.mcpSession().sendNotification(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED)); - } - - private RequestHandler rootsListRequestHandler() { - return params -> { - @SuppressWarnings("unused") - McpSchema.PaginatedRequest request = transport.unmarshalFrom(params, PAGINATED_REQUEST_TYPE_REF); - - List roots = this.roots.values().stream().toList(); - - return Mono.just(new McpSchema.ListRootsResult(roots)); - }; - } - - // -------------------------- - // Sampling - // -------------------------- - private RequestHandler samplingCreateMessageHandler() { - return params -> { - McpSchema.CreateMessageRequest request = transport.unmarshalFrom(params, CREATE_MESSAGE_REQUEST_TYPE_REF); - - return this.samplingHandler.apply(request); - }; - } - - // -------------------------- - // Elicitation - // -------------------------- - private RequestHandler elicitationCreateHandler() { - return params -> { - ElicitRequest request = transport.unmarshalFrom(params, new TypeRef<>() { - }); - - return this.elicitationHandler.apply(request); - }; - } + Mono rootsListChangedNotification(); // -------------------------- // Tools // -------------------------- - private static final TypeRef CALL_TOOL_RESULT_TYPE_REF = new TypeRef<>() { - }; - - private static final TypeRef LIST_TOOLS_RESULT_TYPE_REF = new TypeRef<>() { - }; /** * Calls a tool provided by the server. Tools enable servers to expose executable @@ -582,114 +184,25 @@ private RequestHandler elicitationCreateHandler() { * @see McpSchema.CallToolResult * @see #listTools() */ - public Mono callTool(McpSchema.CallToolRequest callToolRequest) { - return this.initializer.withInitialization("calling tool", init -> { - if (init.initializeResult().capabilities().tools() == null) { - return Mono.error(new IllegalStateException("Server does not provide tools capability")); - } - - return init.mcpSession() - .sendRequest(McpSchema.METHOD_TOOLS_CALL, callToolRequest, CALL_TOOL_RESULT_TYPE_REF) - .flatMap(result -> Mono.just(validateToolResult(callToolRequest.name(), result))); - }); - } - - private McpSchema.CallToolResult validateToolResult(String toolName, McpSchema.CallToolResult result) { - - if (!this.enableCallToolSchemaCaching || result == null || result.isError() == Boolean.TRUE) { - // if tool schema caching is disabled or tool call resulted in an error - skip - // validation and return the result as it is - return result; - } - - Map optOutputSchema = this.toolsOutputSchemaCache.get(toolName); - - if (optOutputSchema == null) { - logger.warn( - "Calling a tool with no outputSchema is not expected to return result with structured content, but got: {}", - result.structuredContent()); - return result; - } - - // Validate the tool output against the cached output schema - var validation = this.jsonSchemaValidator.validate(optOutputSchema, result.structuredContent()); - - if (!validation.valid()) { - logger.warn("Tool call result validation failed: {}", validation.errorMessage()); - throw new IllegalArgumentException("Tool call result validation failed: " + validation.errorMessage()); - } - - return result; - } + Mono callTool(McpSchema.CallToolRequest callToolRequest); /** * Retrieves the list of all tools provided by the server. * @return A Mono that emits the list of all tools result */ - public Mono listTools() { - return this.listTools(McpSchema.FIRST_PAGE).expand(result -> { - String next = result.nextCursor(); - return (next != null && !next.isEmpty()) ? this.listTools(next) : Mono.empty(); - }).reduce(new McpSchema.ListToolsResult(new ArrayList<>(), null), (allToolsResult, result) -> { - allToolsResult.tools().addAll(result.tools()); - return allToolsResult; - }).map(result -> new McpSchema.ListToolsResult(Collections.unmodifiableList(result.tools()), null)); - } + Mono listTools(); /** * Retrieves a paginated list of tools provided by the server. * @param cursor Optional pagination cursor from a previous list request * @return A Mono that emits the list of tools result */ - public Mono listTools(String cursor) { - return this.initializer.withInitialization("listing tools", init -> this.listToolsInternal(init, cursor)); - } - - private Mono listToolsInternal(Initialization init, String cursor) { - - if (init.initializeResult().capabilities().tools() == null) { - return Mono.error(new IllegalStateException("Server does not provide tools capability")); - } - return init.mcpSession() - .sendRequest(McpSchema.METHOD_TOOLS_LIST, new McpSchema.PaginatedRequest(cursor), - LIST_TOOLS_RESULT_TYPE_REF) - .doOnNext(result -> { - if (this.enableCallToolSchemaCaching && result.tools() != null) { - // Cache tools output schema - result.tools() - .stream() - .filter(tool -> tool.outputSchema() != null) - .forEach(tool -> this.toolsOutputSchemaCache.put(tool.name(), tool.outputSchema())); - } - }); - } - - private NotificationHandler asyncToolsChangeNotificationHandler( - List, Mono>> toolsChangeConsumers) { - // TODO: params are not used yet - return params -> this.listTools() - .flatMap(listToolsResult -> Flux.fromIterable(toolsChangeConsumers) - .flatMap(consumer -> consumer.apply(listToolsResult.tools())) - .onErrorResume(error -> { - logger.error("Error handling tools list change notification", error); - return Mono.empty(); - }) - .then()); - } + Mono listTools(String cursor); // -------------------------- // Resources // -------------------------- - private static final TypeRef LIST_RESOURCES_RESULT_TYPE_REF = new TypeRef<>() { - }; - - private static final TypeRef READ_RESOURCE_RESULT_TYPE_REF = new TypeRef<>() { - }; - - private static final TypeRef LIST_RESOURCE_TEMPLATES_RESULT_TYPE_REF = new TypeRef<>() { - }; - /** * Retrieves the list of all resources provided by the server. Resources represent any * kind of UTF-8 encoded data that an MCP server makes available to clients, such as @@ -698,15 +211,7 @@ private NotificationHandler asyncToolsChangeNotificationHandler( * @see McpSchema.ListResourcesResult * @see #readResource(McpSchema.Resource) */ - public Mono listResources() { - return this.listResources(McpSchema.FIRST_PAGE) - .expand(result -> (result.nextCursor() != null) ? this.listResources(result.nextCursor()) : Mono.empty()) - .reduce(new McpSchema.ListResourcesResult(new ArrayList<>(), null), (allResourcesResult, result) -> { - allResourcesResult.resources().addAll(result.resources()); - return allResourcesResult; - }) - .map(result -> new McpSchema.ListResourcesResult(Collections.unmodifiableList(result.resources()), null)); - } + Mono listResources(); /** * Retrieves a paginated list of resources provided by the server. Resources represent @@ -717,16 +222,7 @@ public Mono listResources() { * @see McpSchema.ListResourcesResult * @see #readResource(McpSchema.Resource) */ - public Mono listResources(String cursor) { - return this.initializer.withInitialization("listing resources", init -> { - if (init.initializeResult().capabilities().resources() == null) { - return Mono.error(new IllegalStateException("Server does not provide the resources capability")); - } - return init.mcpSession() - .sendRequest(McpSchema.METHOD_RESOURCES_LIST, new McpSchema.PaginatedRequest(cursor), - LIST_RESOURCES_RESULT_TYPE_REF); - }); - } + Mono listResources(String cursor); /** * Reads the content of a specific resource identified by the provided Resource @@ -737,9 +233,7 @@ public Mono listResources(String cursor) { * @see McpSchema.Resource * @see McpSchema.ReadResourceResult */ - public Mono readResource(McpSchema.Resource resource) { - return this.readResource(new McpSchema.ReadResourceRequest(resource.uri())); - } + Mono readResource(McpSchema.Resource resource); /** * Reads the content of a specific resource identified by the provided request. This @@ -749,15 +243,7 @@ public Mono readResource(McpSchema.Resource resour * @see McpSchema.ReadResourceRequest * @see McpSchema.ReadResourceResult */ - public Mono readResource(McpSchema.ReadResourceRequest readResourceRequest) { - return this.initializer.withInitialization("reading resources", init -> { - if (init.initializeResult().capabilities().resources() == null) { - return Mono.error(new IllegalStateException("Server does not provide the resources capability")); - } - return init.mcpSession() - .sendRequest(McpSchema.METHOD_RESOURCES_READ, readResourceRequest, READ_RESOURCE_RESULT_TYPE_REF); - }); - } + Mono readResource(McpSchema.ReadResourceRequest readResourceRequest); /** * Retrieves the list of all resource templates provided by the server. Resource @@ -766,18 +252,7 @@ public Mono readResource(McpSchema.ReadResourceReq * @return A Mono that completes with the list of all resource templates result * @see McpSchema.ListResourceTemplatesResult */ - public Mono listResourceTemplates() { - return this.listResourceTemplates(McpSchema.FIRST_PAGE) - .expand(result -> (result.nextCursor() != null) ? this.listResourceTemplates(result.nextCursor()) - : Mono.empty()) - .reduce(new McpSchema.ListResourceTemplatesResult(new ArrayList<>(), null), - (allResourceTemplatesResult, result) -> { - allResourceTemplatesResult.resourceTemplates().addAll(result.resourceTemplates()); - return allResourceTemplatesResult; - }) - .map(result -> new McpSchema.ListResourceTemplatesResult( - Collections.unmodifiableList(result.resourceTemplates()), null)); - } + Mono listResourceTemplates(); /** * Retrieves a paginated list of resource templates provided by the server. Resource @@ -787,16 +262,7 @@ public Mono listResourceTemplates() { * @return A Mono that completes with the list of resource templates result. * @see McpSchema.ListResourceTemplatesResult */ - public Mono listResourceTemplates(String cursor) { - return this.initializer.withInitialization("listing resource templates", init -> { - if (init.initializeResult().capabilities().resources() == null) { - return Mono.error(new IllegalStateException("Server does not provide the resources capability")); - } - return init.mcpSession() - .sendRequest(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, new McpSchema.PaginatedRequest(cursor), - LIST_RESOURCE_TEMPLATES_RESULT_TYPE_REF); - }); - } + Mono listResourceTemplates(String cursor); /** * Subscribes to changes in a specific resource. When the resource changes on the @@ -807,10 +273,7 @@ public Mono listResourceTemplates(String * @see McpSchema.SubscribeRequest * @see #unsubscribeResource(McpSchema.UnsubscribeRequest) */ - public Mono subscribeResource(McpSchema.SubscribeRequest subscribeRequest) { - return this.initializer.withInitialization("subscribing to resources", init -> init.mcpSession() - .sendRequest(McpSchema.METHOD_RESOURCES_SUBSCRIBE, subscribeRequest, VOID_TYPE_REFERENCE)); - } + Mono subscribeResource(McpSchema.SubscribeRequest subscribeRequest); /** * Cancels an existing subscription to a resource. After unsubscribing, the client @@ -821,76 +284,28 @@ public Mono subscribeResource(McpSchema.SubscribeRequest subscribeRequest) * @see McpSchema.UnsubscribeRequest * @see #subscribeResource(McpSchema.SubscribeRequest) */ - public Mono unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) { - return this.initializer.withInitialization("unsubscribing from resources", init -> init.mcpSession() - .sendRequest(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, unsubscribeRequest, VOID_TYPE_REFERENCE)); - } - - private NotificationHandler asyncResourcesChangeNotificationHandler( - List, Mono>> resourcesChangeConsumers) { - return params -> listResources().flatMap(listResourcesResult -> Flux.fromIterable(resourcesChangeConsumers) - .flatMap(consumer -> consumer.apply(listResourcesResult.resources())) - .onErrorResume(error -> { - logger.error("Error handling resources list change notification", error); - return Mono.empty(); - }) - .then()); - } - - private NotificationHandler asyncResourcesUpdatedNotificationHandler( - List, Mono>> resourcesUpdateConsumers) { - return params -> { - McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification = transport.unmarshalFrom(params, - new TypeRef<>() { - }); - - return readResource(new McpSchema.ReadResourceRequest(resourcesUpdatedNotification.uri())) - .flatMap(readResourceResult -> Flux.fromIterable(resourcesUpdateConsumers) - .flatMap(consumer -> consumer.apply(readResourceResult.contents())) - .onErrorResume(error -> { - logger.error("Error handling resource update notification", error); - return Mono.empty(); - }) - .then()); - }; - } + Mono unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest); // -------------------------- // Prompts // -------------------------- - private static final TypeRef LIST_PROMPTS_RESULT_TYPE_REF = new TypeRef<>() { - }; - - private static final TypeRef GET_PROMPT_RESULT_TYPE_REF = new TypeRef<>() { - }; /** * Retrieves the list of all prompts provided by the server. * @return A Mono that completes with the list of all prompts result. * @see McpSchema.ListPromptsResult - * @see #getPrompt(GetPromptRequest) + * @see #getPrompt(McpSchema.GetPromptRequest) */ - public Mono listPrompts() { - return this.listPrompts(McpSchema.FIRST_PAGE) - .expand(result -> (result.nextCursor() != null) ? this.listPrompts(result.nextCursor()) : Mono.empty()) - .reduce(new ListPromptsResult(new ArrayList<>(), null), (allPromptsResult, result) -> { - allPromptsResult.prompts().addAll(result.prompts()); - return allPromptsResult; - }) - .map(result -> new McpSchema.ListPromptsResult(Collections.unmodifiableList(result.prompts()), null)); - } + Mono listPrompts(); /** * Retrieves a paginated list of prompts provided by the server. * @param cursor Optional pagination cursor from a previous list request * @return A Mono that completes with the list of prompts result. * @see McpSchema.ListPromptsResult - * @see #getPrompt(GetPromptRequest) + * @see #getPrompt(McpSchema.GetPromptRequest) */ - public Mono listPrompts(String cursor) { - return this.initializer.withInitialization("listing prompts", init -> init.mcpSession() - .sendRequest(McpSchema.METHOD_PROMPT_LIST, new PaginatedRequest(cursor), LIST_PROMPTS_RESULT_TYPE_REF)); - } + Mono listPrompts(String cursor); /** * Retrieves a specific prompt by its ID. This provides the complete prompt template @@ -901,37 +316,11 @@ public Mono listPrompts(String cursor) { * @see McpSchema.GetPromptResult * @see #listPrompts() */ - public Mono getPrompt(GetPromptRequest getPromptRequest) { - return this.initializer.withInitialization("getting prompts", init -> init.mcpSession() - .sendRequest(McpSchema.METHOD_PROMPT_GET, getPromptRequest, GET_PROMPT_RESULT_TYPE_REF)); - } - - private NotificationHandler asyncPromptsChangeNotificationHandler( - List, Mono>> promptsChangeConsumers) { - return params -> listPrompts().flatMap(listPromptsResult -> Flux.fromIterable(promptsChangeConsumers) - .flatMap(consumer -> consumer.apply(listPromptsResult.prompts())) - .onErrorResume(error -> { - logger.error("Error handling prompts list change notification", error); - return Mono.empty(); - }) - .then()); - } + Mono getPrompt(McpSchema.GetPromptRequest getPromptRequest); // -------------------------- // Logging // -------------------------- - private NotificationHandler asyncLoggingNotificationHandler( - List>> loggingConsumers) { - - return params -> { - McpSchema.LoggingMessageNotification loggingMessageNotification = transport.unmarshalFrom(params, - LOGGING_MESSAGE_NOTIFICATION_TYPE_REF); - - return Flux.fromIterable(loggingConsumers) - .flatMap(consumer -> consumer.apply(loggingMessageNotification)) - .then(); - }; - } /** * Sets the minimum logging level for messages received from the server. The client @@ -940,48 +329,11 @@ private NotificationHandler asyncLoggingNotificationHandler( * @return A Mono that completes when the logging level is set. * @see McpSchema.LoggingLevel */ - public Mono setLoggingLevel(LoggingLevel loggingLevel) { - if (loggingLevel == null) { - return Mono.error(new IllegalArgumentException("Logging level must not be null")); - } - - return this.initializer.withInitialization("setting logging level", init -> { - if (init.initializeResult().capabilities().logging() == null) { - return Mono.error(new IllegalStateException("Server's Logging capabilities are not enabled!")); - } - var params = new McpSchema.SetLevelRequest(loggingLevel); - return init.mcpSession().sendRequest(McpSchema.METHOD_LOGGING_SET_LEVEL, params, OBJECT_TYPE_REF).then(); - }); - } - - private NotificationHandler asyncProgressNotificationHandler( - List>> progressConsumers) { - - return params -> { - McpSchema.ProgressNotification progressNotification = transport.unmarshalFrom(params, - PROGRESS_NOTIFICATION_TYPE_REF); - - return Flux.fromIterable(progressConsumers) - .flatMap(consumer -> consumer.apply(progressNotification)) - .then(); - }; - } - - /** - * This method is package-private and used for test only. Should not be called by user - * code. - * @param protocolVersions the Client supported protocol versions. - */ - void setProtocolVersions(List protocolVersions) { - this.initializer.setProtocolVersions(protocolVersions); - } + Mono setLoggingLevel(McpSchema.LoggingLevel loggingLevel); // -------------------------- // Completions // -------------------------- - private static final TypeRef COMPLETION_COMPLETE_RESULT_TYPE_REF = new TypeRef<>() { - }; - /** * Sends a completion/complete request to generate value suggestions based on a given * reference and argument. This is typically used to provide auto-completion options @@ -992,9 +344,6 @@ void setProtocolVersions(List protocolVersions) { * @see McpSchema.CompleteRequest * @see McpSchema.CompleteResult */ - public Mono completeCompletion(McpSchema.CompleteRequest completeRequest) { - return this.initializer.withInitialization("complete completions", init -> init.mcpSession() - .sendRequest(McpSchema.METHOD_COMPLETION_COMPLETE, completeRequest, COMPLETION_COMPLETE_RESULT_TYPE_REF)); - } + Mono completeCompletion(McpSchema.CompleteRequest completeRequest); } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/McpClient.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpClient.java index 421f2fc7f..f162bf2b2 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/McpClient.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpClient.java @@ -474,9 +474,11 @@ public McpSyncClient build() { McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures); - return new McpSyncClient(new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, - jsonSchemaValidator != null ? jsonSchemaValidator : JsonSchemaValidator.getDefault(), - asyncFeatures), this.contextProvider); + return new DefaultMcpSyncClient( + new DefaultMcpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, + jsonSchemaValidator != null ? jsonSchemaValidator : JsonSchemaValidator.getDefault(), + asyncFeatures), + this.contextProvider); } } @@ -810,7 +812,7 @@ public AsyncSpec enableCallToolSchemaCaching(boolean enableCallToolSchemaCaching public McpAsyncClient build() { var jsonSchemaValidator = (this.jsonSchemaValidator != null) ? this.jsonSchemaValidator : JsonSchemaValidator.getDefault(); - return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout, + return new DefaultMcpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout, jsonSchemaValidator, new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers, diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java index 7fdaa8941..4a89d28c1 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java @@ -1,31 +1,14 @@ -/* - * Copyright 2024-2024 the original author or authors. - */ - package io.modelcontextprotocol.client; -import java.time.Duration; -import java.util.function.Supplier; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.modelcontextprotocol.common.McpTransportContext; import io.modelcontextprotocol.spec.McpSchema; -import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities; -import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest; -import io.modelcontextprotocol.spec.McpSchema.GetPromptResult; -import io.modelcontextprotocol.spec.McpSchema.ListPromptsResult; -import io.modelcontextprotocol.util.Assert; -import reactor.core.publisher.Mono; /** - * A synchronous client implementation for the Model Context Protocol (MCP) that wraps an + * A synchronous client interface for the Model Context Protocol (MCP) that wraps an * {@link McpAsyncClient} to provide blocking operations. * *

- * This client implements the MCP specification by delegating to an asynchronous client - * and blocking on the results. Key features include: + * This client interface the MCP specification by delegating to an asynchronous client and + * blocking on the results. Key features include: *

    *
  • Synchronous, blocking API for simpler integration in non-reactive applications *
  • Tool discovery and invocation for server-provided functionality @@ -44,116 +27,71 @@ * * *

    - * This implementation implements {@link AutoCloseable} for resource cleanup and provides + * This implementation interface {@link AutoCloseable} for resource cleanup and provides * both immediate and graceful shutdown options. All operations block until completion or * timeout, making it suitable for traditional synchronous programming models. * * @author Dariusz Jędrzejczyk * @author Christian Tzolov * @author Jihoon Kim + * @author Pin He * @see McpClient * @see McpAsyncClient * @see McpSchema */ -public class McpSyncClient implements AutoCloseable { - - private static final Logger logger = LoggerFactory.getLogger(McpSyncClient.class); - - // TODO: Consider providing a client config to set this properly - // this is currently a concern only because AutoCloseable is used - perhaps it - // is not a requirement? - private static final long DEFAULT_CLOSE_TIMEOUT_MS = 10_000L; - - private final McpAsyncClient delegate; - - private final Supplier contextProvider; - - /** - * Create a new McpSyncClient with the given delegate. - * @param delegate the asynchronous kernel on top of which this synchronous client - * provides a blocking API. - * @param contextProvider the supplier of context before calling any non-blocking - * operation on underlying delegate - */ - McpSyncClient(McpAsyncClient delegate, Supplier contextProvider) { - Assert.notNull(delegate, "The delegate can not be null"); - Assert.notNull(contextProvider, "The contextProvider can not be null"); - this.delegate = delegate; - this.contextProvider = contextProvider; - } +public interface McpSyncClient extends AutoCloseable { /** * Get the current initialization result. * @return the initialization result. */ - public McpSchema.InitializeResult getCurrentInitializationResult() { - return this.delegate.getCurrentInitializationResult(); - } + McpSchema.InitializeResult getCurrentInitializationResult(); /** * Get the server capabilities that define the supported features and functionality. * @return The server capabilities */ - public McpSchema.ServerCapabilities getServerCapabilities() { - return this.delegate.getServerCapabilities(); - } + McpSchema.ServerCapabilities getServerCapabilities(); /** * Get the server instructions that provide guidance to the client on how to interact * with this server. * @return The instructions */ - public String getServerInstructions() { - return this.delegate.getServerInstructions(); - } + String getServerInstructions(); /** * Get the server implementation information. * @return The server implementation details */ - public McpSchema.Implementation getServerInfo() { - return this.delegate.getServerInfo(); - } + McpSchema.Implementation getServerInfo(); /** * Check if the client-server connection is initialized. * @return true if the client-server connection is initialized */ - public boolean isInitialized() { - return this.delegate.isInitialized(); - } + boolean isInitialized(); /** * Get the client capabilities that define the supported features and functionality. * @return The client capabilities */ - public ClientCapabilities getClientCapabilities() { - return this.delegate.getClientCapabilities(); - } + McpSchema.ClientCapabilities getClientCapabilities(); /** * Get the client implementation information. * @return The client implementation details */ - public McpSchema.Implementation getClientInfo() { - return this.delegate.getClientInfo(); - } + McpSchema.Implementation getClientInfo(); @Override - public void close() { - this.delegate.close(); - } - - public boolean closeGracefully() { - try { - this.delegate.closeGracefully().block(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); - } - catch (RuntimeException e) { - logger.warn("Client didn't close within timeout of {} ms.", DEFAULT_CLOSE_TIMEOUT_MS, e); - return false; - } - return true; - } + void close(); + + /** + * Gracefully closes the client connection. + * @return true if closed gracefully, false otherwise. + */ + boolean closeGracefully(); /** * The initialization phase MUST be the first interaction between client and server. @@ -183,40 +121,28 @@ public boolean closeGracefully() { * Spec * @return the initialize result. */ - public McpSchema.InitializeResult initialize() { - // TODO: block takes no argument here as we assume the async client is - // configured with a requestTimeout at all times - return withProvidedContext(this.delegate.initialize()).block(); - } + McpSchema.InitializeResult initialize(); /** * Send a roots/list_changed notification. */ - public void rootsListChangedNotification() { - withProvidedContext(this.delegate.rootsListChangedNotification()).block(); - } + void rootsListChangedNotification(); /** * Add a roots dynamically. */ - public void addRoot(McpSchema.Root root) { - this.delegate.addRoot(root).block(); - } + void addRoot(McpSchema.Root root); /** * Remove a root dynamically. */ - public void removeRoot(String rootUri) { - this.delegate.removeRoot(rootUri).block(); - } + void removeRoot(String rootUri); /** * Send a synchronous ping request. * @return */ - public Object ping() { - return withProvidedContext(this.delegate.ping()).block(); - } + Object ping(); // -------------------------- // Tools @@ -232,10 +158,7 @@ public Object ping() { * (text, images, or embedded resources) representing the tool's output - isError: * Boolean indicating if the execution failed (true) or succeeded (false/absent) */ - public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) { - return withProvidedContext(this.delegate.callTool(callToolRequest)).block(); - - } + McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest); /** * Retrieves the list of all tools provided by the server. @@ -243,9 +166,7 @@ public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolReque * each with a name, description, and input schema - nextCursor: Optional cursor for * pagination if more tools are available */ - public McpSchema.ListToolsResult listTools() { - return withProvidedContext(this.delegate.listTools()).block(); - } + McpSchema.ListToolsResult listTools(); /** * Retrieves a paginated list of tools provided by the server. @@ -254,10 +175,7 @@ public McpSchema.ListToolsResult listTools() { * with a name, description, and input schema - nextCursor: Optional cursor for * pagination if more tools are available */ - public McpSchema.ListToolsResult listTools(String cursor) { - return withProvidedContext(this.delegate.listTools(cursor)).block(); - - } + McpSchema.ListToolsResult listTools(String cursor); // -------------------------- // Resources @@ -267,49 +185,34 @@ public McpSchema.ListToolsResult listTools(String cursor) { * Retrieves the list of all resources provided by the server. * @return The list of all resources result */ - public McpSchema.ListResourcesResult listResources() { - return withProvidedContext(this.delegate.listResources()).block(); - - } + McpSchema.ListResourcesResult listResources(); /** * Retrieves a paginated list of resources provided by the server. * @param cursor Optional pagination cursor from a previous list request * @return The list of resources result */ - public McpSchema.ListResourcesResult listResources(String cursor) { - return withProvidedContext(this.delegate.listResources(cursor)).block(); - - } + McpSchema.ListResourcesResult listResources(String cursor); /** * Send a resources/read request. * @param resource the resource to read * @return the resource content. */ - public McpSchema.ReadResourceResult readResource(McpSchema.Resource resource) { - return withProvidedContext(this.delegate.readResource(resource)).block(); - - } + McpSchema.ReadResourceResult readResource(McpSchema.Resource resource); /** * Send a resources/read request. * @param readResourceRequest the read resource request. * @return the resource content. */ - public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest readResourceRequest) { - return withProvidedContext(this.delegate.readResource(readResourceRequest)).block(); - - } + McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest readResourceRequest); /** * Retrieves the list of all resource templates provided by the server. * @return The list of all resource templates result. */ - public McpSchema.ListResourceTemplatesResult listResourceTemplates() { - return withProvidedContext(this.delegate.listResourceTemplates()).block(); - - } + McpSchema.ListResourceTemplatesResult listResourceTemplates(); /** * Resource templates allow servers to expose parameterized resources using URI @@ -319,10 +222,7 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates() { * @param cursor Optional pagination cursor from a previous list request * @return The list of resource templates result. */ - public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor) { - return withProvidedContext(this.delegate.listResourceTemplates(cursor)).block(); - - } + McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor); /** * Subscriptions. The protocol supports optional subscriptions to resource changes. @@ -333,20 +233,14 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates(String cursor * @param subscribeRequest the subscribe request contains the uri of the resource to * subscribe to. */ - public void subscribeResource(McpSchema.SubscribeRequest subscribeRequest) { - withProvidedContext(this.delegate.subscribeResource(subscribeRequest)).block(); - - } + void subscribeResource(McpSchema.SubscribeRequest subscribeRequest); /** * Send a resources/unsubscribe request. * @param unsubscribeRequest the unsubscribe request contains the uri of the resource * to unsubscribe from. */ - public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) { - withProvidedContext(this.delegate.unsubscribeResource(unsubscribeRequest)).block(); - - } + void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest); // -------------------------- // Prompts @@ -356,32 +250,22 @@ public void unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) * Retrieves the list of all prompts provided by the server. * @return The list of all prompts result. */ - public ListPromptsResult listPrompts() { - return withProvidedContext(this.delegate.listPrompts()).block(); - } + McpSchema.ListPromptsResult listPrompts(); /** * Retrieves a paginated list of prompts provided by the server. * @param cursor Optional pagination cursor from a previous list request * @return The list of prompts result. */ - public ListPromptsResult listPrompts(String cursor) { - return withProvidedContext(this.delegate.listPrompts(cursor)).block(); - - } + McpSchema.ListPromptsResult listPrompts(String cursor); - public GetPromptResult getPrompt(GetPromptRequest getPromptRequest) { - return withProvidedContext(this.delegate.getPrompt(getPromptRequest)).block(); - } + McpSchema.GetPromptResult getPrompt(McpSchema.GetPromptRequest getPromptRequest); /** * Client can set the minimum logging level it wants to receive from the server. * @param loggingLevel the min logging level */ - public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) { - withProvidedContext(this.delegate.setLoggingLevel(loggingLevel)).block(); - - } + void setLoggingLevel(McpSchema.LoggingLevel loggingLevel); /** * Send a completion/complete request. @@ -389,19 +273,6 @@ public void setLoggingLevel(McpSchema.LoggingLevel loggingLevel) { * reference and arguments for generating suggestions. * @return the completion result containing suggested values. */ - public McpSchema.CompleteResult completeCompletion(McpSchema.CompleteRequest completeRequest) { - return withProvidedContext(this.delegate.completeCompletion(completeRequest)).block(); - - } - - /** - * For a given action, on assembly, capture the "context" via the - * {@link #contextProvider} and store it in the Reactor context. - * @param action the action to perform - * @return the result of the action - */ - private Mono withProvidedContext(Mono action) { - return action.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, this.contextProvider.get())); - } + McpSchema.CompleteResult completeCompletion(McpSchema.CompleteRequest completeRequest); } diff --git a/mcp-core/src/test/java/io/modelcontextprotocol/client/McpClientProtocolVersionTests.java b/mcp-core/src/test/java/io/modelcontextprotocol/client/McpClientProtocolVersionTests.java index a94b9b6a7..c5011bb0d 100644 --- a/mcp-core/src/test/java/io/modelcontextprotocol/client/McpClientProtocolVersionTests.java +++ b/mcp-core/src/test/java/io/modelcontextprotocol/client/McpClientProtocolVersionTests.java @@ -68,7 +68,7 @@ void shouldNegotiateSpecificVersion() { .requestTimeout(REQUEST_TIMEOUT) .build(); - client.setProtocolVersions(List.of(oldVersion, McpSchema.LATEST_PROTOCOL_VERSION)); + ((DefaultMcpAsyncClient) client).setProtocolVersions(List.of(oldVersion, McpSchema.LATEST_PROTOCOL_VERSION)); try { Mono initializeResultMono = client.initialize(); @@ -131,7 +131,7 @@ void shouldUseHighestVersionWhenMultipleSupported() { .requestTimeout(REQUEST_TIMEOUT) .build(); - client.setProtocolVersions(List.of(oldVersion, middleVersion, latestVersion)); + ((DefaultMcpAsyncClient) client).setProtocolVersions(List.of(oldVersion, middleVersion, latestVersion)); try { Mono initializeResultMono = client.initialize();