Skip to content

Add streamable http client transport #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions api/kotlin-sdk.api
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,8 @@ public final class io/modelcontextprotocol/kotlin/sdk/JSONRPCResponse : io/model
public static final field Companion Lio/modelcontextprotocol/kotlin/sdk/JSONRPCResponse$Companion;
public fun <init> (Lio/modelcontextprotocol/kotlin/sdk/RequestId;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/RequestResult;Lio/modelcontextprotocol/kotlin/sdk/JSONRPCError;)V
public synthetic fun <init> (Lio/modelcontextprotocol/kotlin/sdk/RequestId;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/RequestResult;Lio/modelcontextprotocol/kotlin/sdk/JSONRPCError;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun copy (Lio/modelcontextprotocol/kotlin/sdk/RequestId;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/RequestResult;Lio/modelcontextprotocol/kotlin/sdk/JSONRPCError;)Lio/modelcontextprotocol/kotlin/sdk/JSONRPCResponse;
public static synthetic fun copy$default (Lio/modelcontextprotocol/kotlin/sdk/JSONRPCResponse;Lio/modelcontextprotocol/kotlin/sdk/RequestId;Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/RequestResult;Lio/modelcontextprotocol/kotlin/sdk/JSONRPCError;ILjava/lang/Object;)Lio/modelcontextprotocol/kotlin/sdk/JSONRPCResponse;
public final fun getError ()Lio/modelcontextprotocol/kotlin/sdk/JSONRPCError;
public final fun getId ()Lio/modelcontextprotocol/kotlin/sdk/RequestId;
public final fun getJsonrpc ()Ljava/lang/String;
Expand Down Expand Up @@ -2947,6 +2949,34 @@ public final class io/modelcontextprotocol/kotlin/sdk/client/StdioClientTranspor
public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport : io/modelcontextprotocol/kotlin/sdk/shared/AbstractTransport {
public synthetic fun <init> (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public synthetic fun <init> (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun close (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun getProtocolVersion ()Ljava/lang/String;
public final fun getSessionId ()Ljava/lang/String;
public final fun send (Lio/modelcontextprotocol/kotlin/sdk/JSONRPCMessage;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun send (Lio/modelcontextprotocol/kotlin/sdk/JSONRPCMessage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun send$default (Lio/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport;Lio/modelcontextprotocol/kotlin/sdk/JSONRPCMessage;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public final fun setProtocolVersion (Ljava/lang/String;)V
public fun start (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun terminateSession (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpError : java/lang/Exception {
public fun <init> ()V
public fun <init> (Ljava/lang/Integer;Ljava/lang/String;)V
public synthetic fun <init> (Ljava/lang/Integer;Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun getCode ()Ljava/lang/Integer;
}

public final class io/modelcontextprotocol/kotlin/sdk/client/StreamableHttpMcpKtorClientExtensionsKt {
public static final fun mcpStreamableHttp-BZiP2OM (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun mcpStreamableHttp-BZiP2OM$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun mcpStreamableHttpTransport-5_5nbZA (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;)Lio/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport;
public static synthetic fun mcpStreamableHttpTransport-5_5nbZA$default (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/time/Duration;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/modelcontextprotocol/kotlin/sdk/client/StreamableHttpClientTransport;
}

public final class io/modelcontextprotocol/kotlin/sdk/client/WebSocketClientTransport : io/modelcontextprotocol/kotlin/sdk/shared/WebSocketMcpTransport {
public fun <init> (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)V
public synthetic fun <init> (Lio/ktor/client/HttpClient;Ljava/lang/String;Lkotlin/jvm/functions/Function1;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
Expand Down
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ kotlin {

jvmTest {
dependencies {
implementation(libs.ktor.client.mock)
implementation(libs.mockk)
implementation(libs.slf4j.simple)
}
Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ktor-server-cio = { group = "io.ktor", name = "ktor-server-cio", version.ref = "
kotlin-test = { module = "org.jetbrains.kotlin:kotlin-test", version.ref = "kotlin" }
kotlinx-coroutines-test = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-test", version.ref = "coroutines" }
ktor-server-test-host = { group = "io.ktor", name = "ktor-server-test-host", version.ref = "ktor" }
ktor-client-mock = { group = "io.ktor", name = "ktor-client-mock", version.ref = "ktor" }
mockk = { group = "io.mockk", name = "mockk", version.ref = "mockk" }
slf4j-simple = { group = "org.slf4j", name = "slf4j-simple", version.ref = "slf4j" }
kotest-assertions-json = { group = "io.kotest", name = "kotest-assertions-json", version.ref = "kotest" }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,336 @@
package io.modelcontextprotocol.kotlin.sdk.client

import io.github.oshai.kotlinlogging.KotlinLogging
import io.ktor.client.HttpClient
import io.ktor.client.plugins.ClientRequestException
import io.ktor.client.plugins.sse.ClientSSESession
import io.ktor.client.plugins.sse.sseSession
import io.ktor.client.request.HttpRequestBuilder
import io.ktor.client.request.accept
import io.ktor.client.request.delete
import io.ktor.client.request.headers
import io.ktor.client.request.post
import io.ktor.client.request.setBody
import io.ktor.client.statement.HttpResponse
import io.ktor.client.statement.bodyAsChannel
import io.ktor.client.statement.bodyAsText
import io.ktor.http.ContentType
import io.ktor.http.HttpHeaders
import io.ktor.http.HttpMethod
import io.ktor.http.HttpStatusCode
import io.ktor.http.contentType
import io.ktor.http.isSuccess
import io.ktor.utils.io.readUTF8Line
import io.modelcontextprotocol.kotlin.sdk.JSONRPCMessage
import io.modelcontextprotocol.kotlin.sdk.JSONRPCNotification
import io.modelcontextprotocol.kotlin.sdk.JSONRPCRequest
import io.modelcontextprotocol.kotlin.sdk.JSONRPCResponse
import io.modelcontextprotocol.kotlin.sdk.RequestId
import io.modelcontextprotocol.kotlin.sdk.shared.AbstractTransport
import io.modelcontextprotocol.kotlin.sdk.shared.McpJson
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.launch
import kotlin.concurrent.atomics.AtomicBoolean
import kotlin.concurrent.atomics.ExperimentalAtomicApi
import kotlin.time.Duration

private val logger = KotlinLogging.logger {}

private const val MCP_SESSION_ID_HEADER = "mcp-session-id"
private const val MCP_PROTOCOL_VERSION_HEADER = "mcp-protocol-version"
private const val MCP_RESUMPTION_TOKEN_HEADER = "Last-Event-ID"

/**
* Error class for Streamable HTTP transport errors.
*/
public class StreamableHttpError(
public val code: Int? = null,
message: String? = null
) : Exception("Streamable HTTP error: $message")

/**
* Client transport for Streamable HTTP: this implements the MCP Streamable HTTP transport specification.
* It will connect to a server using HTTP POST for sending messages and HTTP GET with Server-Sent Events
* for receiving messages.
*/
@OptIn(ExperimentalAtomicApi::class)
public class StreamableHttpClientTransport(
private val client: HttpClient,
private val url: String,
private val reconnectionTime: Duration? = null,
private val requestBuilder: HttpRequestBuilder.() -> Unit = {},
) : AbstractTransport() {

public var sessionId: String? = null
private set
public var protocolVersion: String? = null

private val initialized: AtomicBoolean = AtomicBoolean(false)

private var sseSession: ClientSSESession? = null
private var sseJob: Job? = null

private val scope by lazy { CoroutineScope(SupervisorJob() + Dispatchers.Default) }

private var lastEventId: String? = null

override suspend fun start() {
if (!initialized.compareAndSet(expectedValue = false, newValue = true)) {
error("StreamableHttpClientTransport already started!")
}
logger.debug { "Client transport starting..." }
}

/**
* Sends a single message with optional resumption support
*/
override suspend fun send(message: JSONRPCMessage) {
send(message, null)
}

/**
* Sends one or more messages with optional resumption support.
* This is the main send method that matches the TypeScript implementation.
*/
public suspend fun send(
message: JSONRPCMessage,
resumptionToken: String?,
onResumptionToken: ((String) -> Unit)? = null
) {
logger.debug { "Client sending message via POST to $url: ${McpJson.encodeToString(message)}" }

// If we have a resumption token, reconnect the SSE stream with it
resumptionToken?.let { token ->
startSseSession(
resumptionToken = token, onResumptionToken = onResumptionToken,
replayMessageId = if (message is JSONRPCRequest) message.id else null
)
return
}

val jsonBody = McpJson.encodeToString(message)
val response = client.post(url) {
applyCommonHeaders(this)
headers.append(HttpHeaders.Accept, "${ContentType.Application.Json}, ${ContentType.Text.EventStream}")
Copy link
Preview

Copilot AI Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appends the Accept header after applyCommonHeaders, which itself appends an Accept header for JSON. Consider consolidating these into a single header call or using headers.set to avoid duplicate Accept entries.

Suggested change
headers.append(HttpHeaders.Accept, "${ContentType.Application.Json}, ${ContentType.Text.EventStream}")
headers.set(HttpHeaders.Accept, "${ContentType.Application.Json}, ${ContentType.Text.EventStream}")

Copilot uses AI. Check for mistakes.

contentType(ContentType.Application.Json)
setBody(jsonBody)
requestBuilder()
}

response.headers[MCP_SESSION_ID_HEADER]?.let { sessionId = it }

if (response.status == HttpStatusCode.Accepted) {
if (message is JSONRPCNotification && message.method == "notifications/initialized") {
startSseSession(onResumptionToken = onResumptionToken)
}
return
}

if (!response.status.isSuccess()) {
val error = StreamableHttpError(response.status.value, response.bodyAsText())
_onError(error)
throw error
}

when (response.contentType()?.withoutParameters()) {
ContentType.Application.Json -> response.bodyAsText().takeIf { it.isNotEmpty() }?.let { json ->
runCatching { McpJson.decodeFromString<JSONRPCMessage>(json) }
.onSuccess { _onMessage(it) }
.onFailure(_onError)
}

ContentType.Text.EventStream -> handleInlineSse(
response, onResumptionToken = onResumptionToken,
replayMessageId = if (message is JSONRPCRequest) message.id else null
)
else -> {
val body = response.bodyAsText()
if (response.contentType() == null && body.isBlank()) return

val ct = response.contentType()?.toString() ?: "<none>"
val error = StreamableHttpError(-1, "Unexpected content type: $$ct")
Copy link
Preview

Copilot AI Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string template uses $$ct which escapes to a literal $ct. To include the actual content type, use $ct instead of $$ct.

Suggested change
val error = StreamableHttpError(-1, "Unexpected content type: $$ct")
val error = StreamableHttpError(-1, "Unexpected content type: $ct")

Copilot uses AI. Check for mistakes.

_onError(error)
throw error
}
}
}

override suspend fun close() {
if (!initialized.load()) return // Already closed or never started
logger.debug { "Client transport closing." }

try {
// Try to terminate session if we have one
terminateSession()

sseSession?.cancel()
sseJob?.cancelAndJoin()
scope.cancel()
} catch (_: Exception) {
// Ignore errors during cleanup
} finally {
initialized.store(false)
_onClose()
}
}

/**
* Terminates the current session by sending a DELETE request to the server.
*/
public suspend fun terminateSession() {
if (sessionId == null) return
logger.debug { "Terminating session: $sessionId" }
val response = client.delete(url) {
applyCommonHeaders(this)
requestBuilder()
}

// 405 means server doesn't support explicit session termination
if (!response.status.isSuccess() && response.status != HttpStatusCode.MethodNotAllowed) {
val error = StreamableHttpError(
response.status.value,
"Failed to terminate session: ${response.status.description}"
)
logger.error(error) { "Failed to terminate session" }
_onError(error)
throw error
}

sessionId = null
lastEventId = null
logger.debug { "Session terminated successfully" }
}

private suspend fun startSseSession(
resumptionToken: String? = null,
replayMessageId: RequestId? = null,
onResumptionToken: ((String) -> Unit)? = null
) {
sseSession?.cancel()
sseJob?.cancelAndJoin()

logger.debug { "Client attempting to start SSE session at url: $url" }
try {
sseSession = client.sseSession(
urlString = url,
reconnectionTime = reconnectionTime,
) {
method = HttpMethod.Get
applyCommonHeaders(this)
accept(ContentType.Text.EventStream)
(resumptionToken ?: lastEventId)?.let { headers.append(MCP_RESUMPTION_TOKEN_HEADER, it) }
requestBuilder()
}
logger.debug { "Client SSE session started successfully." }
} catch (e: ClientRequestException) {
if (e.response.status == HttpStatusCode.MethodNotAllowed) {
logger.info { "Server returned 405 for GET/SSE, stream disabled." }
return
}
_onError(e)
throw e
}

sseJob = scope.launch(CoroutineName("StreamableHttpTransport.collect#${hashCode()}")) {
sseSession?.let { collectSse(it, replayMessageId, onResumptionToken) }
}
}

private fun applyCommonHeaders(builder: HttpRequestBuilder) {
builder.headers {
sessionId?.let { append(MCP_SESSION_ID_HEADER, it) }
protocolVersion?.let { append(MCP_PROTOCOL_VERSION_HEADER, it) }
}
}

private suspend fun collectSse(
session: ClientSSESession,
replayMessageId: RequestId?,
onResumptionToken: ((String) -> Unit)?
) {
try {
session.incoming.collect { event ->
event.id?.let {
lastEventId = it
onResumptionToken?.invoke(it)
}
logger.trace { "Client received SSE event: event=${event.event}, data=${event.data}, id=${event.id}" }
when (event.event) {
null, "message" ->
event.data?.takeIf { it.isNotEmpty() }?.let { json ->
runCatching { McpJson.decodeFromString<JSONRPCMessage>(json) }
.onSuccess { msg ->
if (replayMessageId != null && msg is JSONRPCResponse) {
_onMessage(msg.copy(id = replayMessageId))
} else {
_onMessage(msg)
}
}
.onFailure(_onError)
}

"error" -> _onError(StreamableHttpError(null, event.data))
}
}
} catch (_: CancellationException) {
// ignore
} catch (t: Throwable) {
_onError(t)
}
}

private suspend fun handleInlineSse(
response: HttpResponse,
replayMessageId: RequestId?,
onResumptionToken: ((String) -> Unit)?
) {
logger.trace { "Handling inline SSE from POST response" }
val channel = response.bodyAsChannel()

val sb = StringBuilder()
var id: String? = null
var eventName: String? = null

suspend fun dispatch(data: String) {
id?.let {
lastEventId = it
onResumptionToken?.invoke(it)
}
if (eventName == null || eventName == "message") {
runCatching { McpJson.decodeFromString<JSONRPCMessage>(data) }
.onSuccess { msg ->
if (replayMessageId != null && msg is JSONRPCResponse) {
_onMessage(msg.copy(id = replayMessageId))
} else {
_onMessage(msg)
}
}
.onFailure(_onError)
}
// reset
id = null
eventName = null
sb.clear()
}

while (!channel.isClosedForRead) {
val line = channel.readUTF8Line() ?: break
if (line.isEmpty()) {
dispatch(sb.toString())
continue
}
when {
line.startsWith("id:") -> id = line.substringAfter("id:").trim()
line.startsWith("event:") -> eventName = line.substringAfter("event:").trim()
line.startsWith("data:") -> sb.append(line.substringAfter("data:").trim())
}
}
}
}
Loading
Loading