Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/mcp/server/streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ async def run_stateless_server(*, task_status: TaskStatus[None] = anyio.TASK_STA
# Handle the HTTP request and return the response
await http_transport.handle_request(scope, receive, send)

# Terminate the session after the request is handled
await http_transport._terminate_session() # type: ignore[reportPrivateUsage]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a public cleanup() method in StreamableHTTPSeverTransport and use that internally inside _terminate_session() instead of using a private method?

There's no "Session" in a stateless request, so I think we shouldn't be terminating a non-existent session and extract the cleanup logic instead.

Something like this:

# streamable_http.py

    async def cleanup(self) -> None:
        """Clean up all transport resources (streams, connections).

        This method closes all streams and cleans up resources but does NOT
        mark the session as terminated. Use this for stateless cleanup.
        """
        # We need a copy of the keys to avoid modification during iteration
        request_stream_keys = list(self._request_streams.keys())

        # Close all request streams asynchronously
        for key in request_stream_keys:
            await self._clean_up_memory_streams(key)

        # Clear the request streams dictionary immediately
        self._request_streams.clear()
        try:
            if self._read_stream_writer is not None:
                await self._read_stream_writer.aclose()
            if self._read_stream is not None:
                await self._read_stream.aclose()
            if self._write_stream_reader is not None:
                await self._write_stream_reader.aclose()
            if self._write_stream is not None:
                await self._write_stream.aclose()
        except Exception as e:
            # During cleanup, we catch all exceptions since streams might be in various states
            logger.debug(f"Error closing streams: {e}")

    async def _terminate_session(self) -> None:
        """Terminate the current session, closing all streams.

        Once terminated, all requests with this session ID will receive 404 Not Found.
        """
        self._terminated = True
        logger.info(f"Terminating session: {self.mcp_session_id}")
        await self.cleanup()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review, I strongly agree with avoiding calls to internal methods.

There's no "Session" in a stateless request

I agree with your point, but I think we can also view it as a temporary session that only exists during this stateless request

Can we create a public cleanup() method in StreamableHTTPSeverTransport and use that internally inside _terminate_session() instead of using a private method?

I prefer to modify the _terminal_session method directly to be public rather than extracting another method from it, because the transport itself should also be terminated after handling the stateless request, which means the _terminated should also be set to True. I think terminate might be a good name, because we actually directly terminate the transport itself, and the session is automatically terminated due to the stream being closed.

And this terminated method needs to be called not only in stateless requests but also in stateful requests. If the server-side manager cannot actively clean up these transports or sessions, it means that as long as the client does not send delete requests, they will exist in memory FOREVER, and the same problem exists even in stateful requests.

I think a "session" can exist forever, but a session instance should not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, works for me! Do you want to make that change to this PR? In that case rename _terminate_session to just terminate()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I have submitted this change.

I am handling the termination of a stateful server session. Do you think I should continue in this PR or open a new one?


async def _handle_stateful_request(
self,
scope: Scope,
Expand Down
Loading