Skip to content

Commit 8fd0c41

Browse files
committed
feat: add streamable http client
1 parent 958e5e5 commit 8fd0c41

File tree

2 files changed

+220
-0
lines changed

2 files changed

+220
-0
lines changed
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package io.modelcontextprotocol.kotlin.sdk.client
2+
3+
import io.github.oshai.kotlinlogging.KotlinLogging
4+
import io.ktor.client.HttpClient
5+
import io.ktor.client.plugins.sse.ClientSSESession
6+
import io.ktor.client.plugins.sse.sseSession
7+
import io.ktor.client.request.HttpRequestBuilder
8+
import io.ktor.client.request.post
9+
import io.ktor.client.request.setBody
10+
import io.ktor.client.statement.bodyAsText
11+
import io.ktor.http.ContentType
12+
import io.ktor.http.HttpHeaders
13+
import io.ktor.http.append
14+
import io.ktor.http.contentType
15+
import io.ktor.http.isSuccess
16+
import io.modelcontextprotocol.kotlin.sdk.JSONRPCMessage
17+
import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport
18+
import io.modelcontextprotocol.kotlin.sdk.shared.McpJson
19+
import kotlinx.coroutines.CoroutineName
20+
import kotlinx.coroutines.CoroutineScope
21+
import kotlinx.coroutines.Job
22+
import kotlinx.coroutines.SupervisorJob
23+
import kotlinx.coroutines.cancel
24+
import kotlinx.coroutines.cancelAndJoin
25+
import kotlinx.coroutines.launch
26+
import kotlin.concurrent.atomics.AtomicBoolean
27+
import kotlin.concurrent.atomics.ExperimentalAtomicApi
28+
29+
private val logger = KotlinLogging.logger {}
30+
31+
/**
32+
* Client transport for Streamable HTTP: this will send messages via HTTP POST requests
33+
* and optionally receive streaming responses via SSE.
34+
*
35+
* This implements the Streamable HTTP transport as specified in MCP 2024-11-05.
36+
*/
37+
@OptIn(ExperimentalAtomicApi::class)
38+
public class StreamableHttpClientTransport(
39+
private val client: HttpClient,
40+
private val url: String,
41+
private val requestBuilder: HttpRequestBuilder.() -> Unit = {},
42+
) : AbstractTransport() {
43+
44+
private val initialized: AtomicBoolean = AtomicBoolean(false)
45+
private var sseSession: ClientSSESession? = null
46+
private val scope by lazy { CoroutineScope(SupervisorJob()) }
47+
private var sseJob: Job? = null
48+
private var sessionId: String? = null
49+
50+
override suspend fun start() {
51+
if (!initialized.compareAndSet(expectedValue = false, newValue = true)) {
52+
error("StreamableHttpClientTransport already started!")
53+
}
54+
logger.debug { "Client transport starting..." }
55+
startSseSession()
56+
}
57+
58+
private suspend fun startSseSession() {
59+
logger.debug { "Client attempting to start SSE session at url: $url" }
60+
try {
61+
sseSession = client.sseSession(
62+
urlString = url,
63+
block = requestBuilder,
64+
)
65+
logger.debug { "Client SSE session started successfully." }
66+
67+
sseJob = scope.launch(CoroutineName("StreamableHttpTransport.collect#${hashCode()}")) {
68+
sseSession?.incoming?.collect { event ->
69+
logger.trace { "Client received SSE event: event=${event.event}, data=${event.data}" }
70+
when (event.event) {
71+
"error" -> {
72+
val e = IllegalStateException("SSE error: ${event.data}")
73+
logger.error(e) { "SSE stream reported an error event." }
74+
_onError(e)
75+
}
76+
77+
else -> {
78+
// All non-error events are treated as JSON-RPC messages
79+
try {
80+
val eventData = event.data
81+
if (!eventData.isNullOrEmpty()) {
82+
val message = McpJson.decodeFromString<JSONRPCMessage>(eventData)
83+
_onMessage(message)
84+
}
85+
} catch (e: Exception) {
86+
logger.error(e) { "Error processing SSE message" }
87+
_onError(e)
88+
}
89+
}
90+
}
91+
}
92+
}
93+
} catch (e: Exception) {
94+
// SSE session is optional, don't fail if it can't be established
95+
// The server might not support GET requests for SSE
96+
logger.warn(e) { "Client failed to start SSE session. This may be expected if the server does not support GET." }
97+
_onError(e)
98+
}
99+
}
100+
101+
override suspend fun send(message: JSONRPCMessage) {
102+
logger.debug { "Client sending message via POST to $url: ${McpJson.encodeToString(message)}" }
103+
try {
104+
val response = client.post(url) {
105+
requestBuilder()
106+
contentType(ContentType.Application.Json)
107+
headers.append(HttpHeaders.Accept, "${ContentType.Application.Json}, ${ContentType.Text.EventStream}")
108+
109+
// Add session ID if we have one
110+
sessionId?.let {
111+
headers.append("Mcp-Session-Id", it)
112+
}
113+
114+
setBody(McpJson.encodeToString(message))
115+
}
116+
logger.debug { "Client received POST response: ${response.status}" }
117+
118+
if (!response.status.isSuccess()) {
119+
val text = response.bodyAsText()
120+
val error = Exception("HTTP ${response.status}: $text")
121+
logger.error(error) { "Client POST request failed." }
122+
_onError(error)
123+
throw error
124+
}
125+
126+
// Extract session ID from response headers if present
127+
response.headers["Mcp-Session-Id"]?.let {
128+
sessionId = it
129+
}
130+
131+
// Handle response based on content type
132+
when (response.contentType()?.contentType) {
133+
ContentType.Application.Json.contentType -> {
134+
// Single JSON response
135+
val responseBody = response.bodyAsText()
136+
logger.trace { "Client processing JSON response: $responseBody" }
137+
if (responseBody.isNotEmpty()) {
138+
try {
139+
val responseMessage = McpJson.decodeFromString<JSONRPCMessage>(responseBody)
140+
_onMessage(responseMessage)
141+
} catch (e: Exception) {
142+
logger.error(e) { "Error processing JSON response" }
143+
_onError(e)
144+
}
145+
}
146+
}
147+
148+
ContentType.Text.EventStream.contentType -> {
149+
logger.trace { "Client received SSE stream in POST response. Messages will be handled by the main SSE session." }
150+
}
151+
152+
else -> {
153+
logger.trace { "Client received response with unexpected or no content type: ${response.contentType()}" }
154+
}
155+
}
156+
} catch (e: Exception) {
157+
logger.error(e) { "Client send failed." }
158+
_onError(e)
159+
throw e
160+
}
161+
}
162+
163+
override suspend fun close() {
164+
if (!initialized.load()) {
165+
return // Already closed or never started
166+
}
167+
logger.debug { "Client transport closing." }
168+
169+
try {
170+
sseSession?.cancel()
171+
sseJob?.cancelAndJoin()
172+
scope.cancel()
173+
} catch (e: Exception) {
174+
// Ignore errors during cleanup
175+
} finally {
176+
_onClose()
177+
}
178+
}
179+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.modelcontextprotocol.kotlin.sdk.client
2+
3+
import io.ktor.client.HttpClient
4+
import io.ktor.client.request.HttpRequestBuilder
5+
import io.modelcontextprotocol.kotlin.sdk.Implementation
6+
import io.modelcontextprotocol.kotlin.sdk.LIB_VERSION
7+
import io.modelcontextprotocol.kotlin.sdk.shared.IMPLEMENTATION_NAME
8+
9+
/**
10+
* Returns a new Streamable HTTP transport for the Model Context Protocol using the provided HttpClient.
11+
*
12+
* @param url URL of the MCP server.
13+
* @param requestBuilder Optional lambda to configure the HTTP request.
14+
* @return A [StreamableHttpClientTransport] configured for MCP communication.
15+
*/
16+
public fun HttpClient.mcpStreamableHttpTransport(
17+
url: String,
18+
requestBuilder: HttpRequestBuilder.() -> Unit = {},
19+
): StreamableHttpClientTransport = StreamableHttpClientTransport(this, url, requestBuilder)
20+
21+
/**
22+
* Creates and connects an MCP client over Streamable HTTP using the provided HttpClient.
23+
*
24+
* @param url URL of the MCP server.
25+
* @param requestBuilder Optional lambda to configure the HTTP request.
26+
* @return A connected [Client] ready for MCP communication.
27+
*/
28+
public suspend fun HttpClient.mcpStreamableHttp(
29+
url: String,
30+
requestBuilder: HttpRequestBuilder.() -> Unit = {},
31+
): Client {
32+
val transport = mcpStreamableHttpTransport(url, requestBuilder)
33+
val client = Client(
34+
Implementation(
35+
name = IMPLEMENTATION_NAME,
36+
version = LIB_VERSION
37+
)
38+
)
39+
client.connect(transport)
40+
return client
41+
}

0 commit comments

Comments
 (0)