-
Notifications
You must be signed in to change notification settings - Fork 2k
Close server session after handle stateless request #1116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Close server session after handle stateless request #1116
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this contribution!
Do you have a way of reproducing this issue? An regression test as part of this PR would be great.
@@ -190,6 +190,9 @@ async def run_stateless_server(*, task_status: TaskStatus[None] = anyio.TASK_STA | |||
# Handle the HTTP request and return the response | |||
await http_transport.handle_request(scope, receive, send) | |||
|
|||
# Terminate the session after the request is handled | |||
await http_transport._terminate_session() # type: ignore[reportPrivateUsage] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create a public cleanup()
method in StreamableHTTPSeverTransport
and use that internally inside _terminate_session()
instead of using a private method?
There's no "Session" in a stateless request, so I think we shouldn't be terminating a non-existent session and extract the cleanup logic instead.
Something like this:
# streamable_http.py
async def cleanup(self) -> None:
"""Clean up all transport resources (streams, connections).
This method closes all streams and cleans up resources but does NOT
mark the session as terminated. Use this for stateless cleanup.
"""
# We need a copy of the keys to avoid modification during iteration
request_stream_keys = list(self._request_streams.keys())
# Close all request streams asynchronously
for key in request_stream_keys:
await self._clean_up_memory_streams(key)
# Clear the request streams dictionary immediately
self._request_streams.clear()
try:
if self._read_stream_writer is not None:
await self._read_stream_writer.aclose()
if self._read_stream is not None:
await self._read_stream.aclose()
if self._write_stream_reader is not None:
await self._write_stream_reader.aclose()
if self._write_stream is not None:
await self._write_stream.aclose()
except Exception as e:
# During cleanup, we catch all exceptions since streams might be in various states
logger.debug(f"Error closing streams: {e}")
async def _terminate_session(self) -> None:
"""Terminate the current session, closing all streams.
Once terminated, all requests with this session ID will receive 404 Not Found.
"""
self._terminated = True
logger.info(f"Terminating session: {self.mcp_session_id}")
await self.cleanup()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your review, I strongly agree with avoiding calls to internal methods.
There's no "Session" in a stateless request
I agree with your point, but I think we can also view it as a temporary session that only exists during this stateless request
Can we create a public
cleanup()
method inStreamableHTTPSeverTransport
and use that internally inside_terminate_session()
instead of using a private method?
I prefer to modify the _terminal_session
method directly to be public rather than extracting another method from it, because the transport itself should also be terminated after handling the stateless request, which means the _terminated
should also be set to True
. I think terminate
might be a good name, because we actually directly terminate the transport itself, and the session is automatically terminated due to the stream being closed.
And this terminated method needs to be called not only in stateless requests but also in stateful requests. If the server-side manager cannot actively clean up these transports or sessions, it means that as long as the client does not send delete requests, they will exist in memory FOREVER, and the same problem exists even in stateful requests.
I think a "session" can exist forever, but a session instance should not.
You can use the Example Code in #1076, just change the FastMCP server as stateless
Or for stateful servers, just add
I tried many methods, but there was no way to implement a reasonable test, because in a |
Motivation and Context
The
ServerSession
created by stateless Streamable HTTP requests is not being properly cleaned up, leading to memory leaks, which may be related to #1076.This PR allows the transport to immediately close the session after handling the request, which I believe is reasonable, as the session will no longer be needed once the request has been processed.
How Has This Been Tested?
Breaking Changes
Types of changes
Checklist
Additional context
At the same time, for stateful requests, when the client does not send a DELETE request due to abnormal termination or network issues, the service session will not be cleaned up either, which similarly leads to memory leaks.
I think a heartbeat mechanism can be used to determine whether the client is alive. allowing the server to timely clean up expired sessions. I would be happy to open a new PR for this or continue addressing this issue in the current PR.