Skip to content

Commit 69712e3

Browse files
feat(transport): add builder & expose child stderr (#305)
1 parent 31155a4 commit 69712e3

File tree

3 files changed

+122
-32
lines changed

3 files changed

+122
-32
lines changed

crates/rmcp/src/transport/child_process.rs

Lines changed: 82 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,37 @@
1+
use std::process::Stdio;
2+
13
use process_wrap::tokio::{TokioChildWrapper, TokioCommandWrap};
24
use tokio::{
35
io::AsyncRead,
4-
process::{ChildStdin, ChildStdout},
6+
process::{ChildStderr, ChildStdin, ChildStdout},
57
};
68

79
use super::{IntoTransport, Transport};
810
use crate::service::ServiceRole;
911

10-
pub(crate) fn child_process(
11-
mut child: Box<dyn TokioChildWrapper>,
12-
) -> std::io::Result<(Box<dyn TokioChildWrapper>, (ChildStdout, ChildStdin))> {
12+
/// The parts of a child process.
13+
type ChildProcessParts = (
14+
Box<dyn TokioChildWrapper>,
15+
ChildStdout,
16+
ChildStdin,
17+
Option<ChildStderr>,
18+
);
19+
20+
/// Extract the stdio handles from a spawned child.
21+
/// Returns `(child, stdout, stdin, stderr)` where `stderr` is `Some` only
22+
/// if the process was spawned with `Stdio::piped()`.
23+
#[inline]
24+
fn child_process(mut child: Box<dyn TokioChildWrapper>) -> std::io::Result<ChildProcessParts> {
1325
let child_stdin = match child.inner_mut().stdin().take() {
1426
Some(stdin) => stdin,
15-
None => return Err(std::io::Error::other("std in was taken")),
27+
None => return Err(std::io::Error::other("stdin was already taken")),
1628
};
1729
let child_stdout = match child.inner_mut().stdout().take() {
1830
Some(stdout) => stdout,
19-
None => return Err(std::io::Error::other("std out was taken")),
31+
None => return Err(std::io::Error::other("stdout was already taken")),
2032
};
21-
Ok((child, (child_stdout, child_stdin)))
33+
let child_stderr = child.inner_mut().stderr().take();
34+
Ok((child, child_stdout, child_stdin, child_stderr))
2235
}
2336

2437
pub struct TokioChildProcess {
@@ -66,38 +79,23 @@ impl AsyncRead for TokioChildProcessOut {
6679
}
6780

6881
impl TokioChildProcess {
69-
/// Create a new Tokio child process with the given command.
70-
///
71-
/// # Manage the child process
72-
/// You can also check these issue and pr for more information on how to manage the child process:
73-
/// - [#156](https://github.com/modelcontextprotocol/rust-sdk/pull/156)
74-
/// - [#253](https://github.com/modelcontextprotocol/rust-sdk/issues/253)
75-
/// ```rust,ignore
76-
/// #[cfg(unix)]
77-
/// command_wrap.wrap(process_wrap::tokio::ProcessGroup::leader());
78-
/// #[cfg(windows)]
79-
/// command_wrap.wrap(process_wrap::tokio::JobObject);
80-
/// ```
81-
///
82+
/// Convenience: spawn with default `piped` stdio
8283
pub fn new(command: impl Into<TokioCommandWrap>) -> std::io::Result<Self> {
83-
let mut command_wrap = command.into();
84-
command_wrap
85-
.command_mut()
86-
.stdin(std::process::Stdio::piped())
87-
.stdout(std::process::Stdio::piped());
88-
let (child, (child_stdout, child_stdin)) = child_process(command_wrap.spawn()?)?;
89-
Ok(Self {
90-
child: ChildWithCleanup { inner: child },
91-
child_stdin,
92-
child_stdout,
93-
})
84+
let (proc, _ignored) = TokioChildProcessBuilder::new(command).spawn()?;
85+
Ok(proc)
86+
}
87+
88+
/// Builder entry-point allowing fine-grained stdio control.
89+
pub fn builder(command: impl Into<TokioCommandWrap>) -> TokioChildProcessBuilder {
90+
TokioChildProcessBuilder::new(command)
9491
}
9592

9693
/// Get the process ID of the child process.
9794
pub fn id(&self) -> Option<u32> {
9895
self.child.inner.id()
9996
}
10097

98+
/// Split this helper into a reader (stdout) and writer (stdin).
10199
pub fn split(self) -> (TokioChildProcessOut, ChildStdin) {
102100
let TokioChildProcess {
103101
child,
@@ -114,6 +112,59 @@ impl TokioChildProcess {
114112
}
115113
}
116114

115+
/// Builder for `TokioChildProcess` allowing custom `Stdio` configuration.
116+
pub struct TokioChildProcessBuilder {
117+
cmd: TokioCommandWrap,
118+
stdin: Stdio,
119+
stdout: Stdio,
120+
stderr: Stdio,
121+
}
122+
123+
impl TokioChildProcessBuilder {
124+
fn new(cmd: impl Into<TokioCommandWrap>) -> Self {
125+
Self {
126+
cmd: cmd.into(),
127+
stdin: Stdio::piped(),
128+
stdout: Stdio::piped(),
129+
stderr: Stdio::inherit(),
130+
}
131+
}
132+
133+
/// Override the child stdin configuration.
134+
pub fn stdin(mut self, io: impl Into<Stdio>) -> Self {
135+
self.stdin = io.into();
136+
self
137+
}
138+
/// Override the child stdout configuration.
139+
pub fn stdout(mut self, io: impl Into<Stdio>) -> Self {
140+
self.stdout = io.into();
141+
self
142+
}
143+
/// Override the child stderr configuration.
144+
pub fn stderr(mut self, io: impl Into<Stdio>) -> Self {
145+
self.stderr = io.into();
146+
self
147+
}
148+
149+
/// Spawn the child process. Returns the transport plus an optional captured stderr handle.
150+
pub fn spawn(mut self) -> std::io::Result<(TokioChildProcess, Option<ChildStderr>)> {
151+
self.cmd
152+
.command_mut()
153+
.stdin(self.stdin)
154+
.stdout(self.stdout)
155+
.stderr(self.stderr);
156+
157+
let (child, stdout, stdin, stderr_opt) = child_process(self.cmd.spawn()?)?;
158+
159+
let proc = TokioChildProcess {
160+
child: ChildWithCleanup { inner: child },
161+
child_stdin: stdin,
162+
child_stdout: stdout,
163+
};
164+
Ok((proc, stderr_opt))
165+
}
166+
}
167+
117168
impl<R: ServiceRole> IntoTransport<R, std::io::Error, ()> for TokioChildProcess {
118169
fn into_transport(self) -> impl Transport<R, Error = std::io::Error> + 'static {
119170
IntoTransport::<R, std::io::Error, super::async_rw::TransportAdapterAsyncRW>::into_transport(

crates/rmcp/tests/test_with_python.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
use std::process::Stdio;
2+
13
use axum::Router;
24
use rmcp::{
35
ServiceExt,
46
transport::{ConfigureCommandExt, SseServer, TokioChildProcess, sse_server::SseServerConfig},
57
};
6-
use tokio::time::timeout;
8+
use tokio::{io::AsyncReadExt, time::timeout};
79
use tokio_util::sync::CancellationToken;
810
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
911
mod common;
@@ -119,3 +121,35 @@ async fn test_with_python_server() -> anyhow::Result<()> {
119121
client.cancel().await?;
120122
Ok(())
121123
}
124+
125+
#[tokio::test]
126+
async fn test_with_python_server_stderr() -> anyhow::Result<()> {
127+
init().await?;
128+
129+
let (transport, stderr) =
130+
TokioChildProcess::builder(tokio::process::Command::new("uv").configure(|cmd| {
131+
cmd.arg("run")
132+
.arg("server.py")
133+
.current_dir("tests/test_with_python");
134+
}))
135+
.stderr(Stdio::piped())
136+
.spawn()?;
137+
138+
let mut stderr = stderr.expect("stderr must be piped");
139+
140+
let stderr_task = tokio::spawn(async move {
141+
let mut buffer = String::new();
142+
stderr.read_to_string(&mut buffer).await?;
143+
Ok::<_, std::io::Error>(buffer)
144+
});
145+
146+
let client = ().serve(transport).await?;
147+
let _ = client.list_all_resources().await?;
148+
let _ = client.list_all_tools().await?;
149+
client.cancel().await?;
150+
151+
let stderr_output = stderr_task.await??;
152+
assert!(stderr_output.contains("server starting up..."));
153+
154+
Ok(())
155+
}

crates/rmcp/tests/test_with_python/server.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
from fastmcp import FastMCP
22

3+
import sys
4+
35
mcp = FastMCP("Demo")
46

7+
print("server starting up...", file=sys.stderr)
8+
9+
510
@mcp.tool()
611
def add(a: int, b: int) -> int:
712
"""Add two numbers"""

0 commit comments

Comments
 (0)