Skip to content

Adding StreamableHttpServerTransportProvider class along with unit & integration tests #290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
22 changes: 22 additions & 0 deletions mcp-spring/mcp-spring-webflux/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,28 @@
<scope>test</scope>
</dependency>

<!-- Tomcat dependencies for testing -->
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>${tomcat.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-websocket</artifactId>
<version>${tomcat.version}</version>
<scope>test</scope>
</dependency>

<!-- Used by the StreamableHttpServerTransportProvider -->
<dependency>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
<version>${jakarta.servlet.version}</version>
<scope>test</scope>
</dependency>

</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.modelcontextprotocol.spec.McpTransportSession;
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
import io.modelcontextprotocol.spec.McpTransportStream;
import io.modelcontextprotocol.spec.McpSchema.JSONRPCNotification;
import io.modelcontextprotocol.util.Assert;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -117,10 +118,6 @@ public static Builder builder(WebClient.Builder webClientBuilder) {
public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
return Mono.deferContextual(ctx -> {
this.handler.set(handler);
if (openConnectionOnStartup) {
logger.debug("Eagerly opening connection on startup");
return this.reconnect(null).then();
}
return Mono.empty();
});
}
Expand Down Expand Up @@ -250,11 +247,13 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
})
.bodyValue(message)
.exchangeToFlux(response -> {
if (transportSession
.markInitialized(response.headers().asHttpHeaders().getFirst("mcp-session-id"))) {
// Once we have a session, we try to open an async stream for
// the server to send notifications and requests out-of-band.
reconnect(null).contextWrite(sink.contextView()).subscribe();
transportSession.markInitialized(response.headers().asHttpHeaders().getFirst("mcp-session-id"));
if (response.statusCode().is2xxSuccessful()
&& message instanceof JSONRPCNotification notification) {
if (notification.method().equals("notifications/initialized")) {
// Establish SSE stream after session is initialized
reconnect(null).contextWrite(sink.contextView()).subscribe();
}
}

String sessionRepresentation = sessionIdOrPlaceholder(transportSession);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest;
import io.modelcontextprotocol.spec.McpSchema.McpId;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -161,7 +163,7 @@ void testBuilderPattern() {
@Test
void testMessageProcessing() {
// Create a test message
JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id",
JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", McpId.of("test-id"),
Map.of("key", "value"));

// Simulate receiving the message
Expand Down Expand Up @@ -192,7 +194,7 @@ void testResponseMessageProcessing() {
""");

// Create and send a request message
JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id",
JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", McpId.of("test-id"),
Map.of("key", "value"));

// Verify message handling
Expand All @@ -216,7 +218,7 @@ void testErrorMessageProcessing() {
""");

// Create and send a request message
JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id",
JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", McpId.of("test-id"),
Map.of("key", "value"));

// Verify message handling
Expand Down Expand Up @@ -246,7 +248,7 @@ void testGracefulShutdown() {
StepVerifier.create(transport.closeGracefully()).verifyComplete();

// Create a test message
JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", "test-id",
JSONRPCRequest testMessage = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "test-method", McpId.of("test-id"),
Map.of("key", "value"));

// Verify message is not processed after shutdown
Expand Down Expand Up @@ -292,10 +294,10 @@ void testMultipleMessageProcessing() {
""");

// Create and send corresponding messages
JSONRPCRequest message1 = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "method1", "id1",
JSONRPCRequest message1 = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "method1", McpId.of("id1"),
Map.of("key", "value1"));

JSONRPCRequest message2 = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "method2", "id2",
JSONRPCRequest message2 = new JSONRPCRequest(McpSchema.JSONRPC_VERSION, "method2", McpId.of("id2"),
Map.of("key", "value2"));

// Verify both messages are processed
Expand Down
Loading