diff --git a/environments/mcp_env/README.md b/environments/mcp_env/README.md index 700fee9b..b8505004 100644 --- a/environments/mcp_env/README.md +++ b/environments/mcp_env/README.md @@ -26,10 +26,44 @@ Run an evaluation with default settings: uv run vf-eval mcp-env ``` +The default configuration launches the Exa and Fetch MCP servers over stdio. +These are real MCP servers maintained by the community and require no manual +setup beyond providing an `EXA_API_KEY` when you want Exa search access. + +To run servers with StreamableHTTP, build configurations with the +`build_streamable_http_server_config` helper in `mcp_env.py`. The helper +allocates a port, formats placeholders in your launch arguments, and points the +client at the resulting `http://host:port/path` endpoint—matching the +deployment flow described in the official MCP SDK documentation for +StreamableHTTP transports.【b1dc47†L1-L18】 + +```python +from environments.mcp_env.mcp_env import build_streamable_http_server_config + +streamable_local = build_streamable_http_server_config( + name="local-http", + command="uv", + args=[ + "run", + "examples/snippets/servers/streamable_config.py", + "--", + "--port", + "{port}", + ], + url_path="/mcp", +) +``` + +For remote StreamableHTTP providers, skip the command entirely and supply the +endpoint and headers directly—for example, Telnyx exposes an MCP endpoint at +`https://api.telnyx.com/v2/mcp` that can be accessed with a bearer token.【0a9bd8†L1-L6】 + Configure model and sampling: ```bash -uv run vf-eval mcp-env -m gpt-4.1-mini -n 1 -r 1 +uv run vf-eval mcp-env \ + -m gpt-4.1-mini \ + -n 1 -r 1 ``` Notes: diff --git a/environments/mcp_env/mcp_env.py b/environments/mcp_env/mcp_env.py index a7d3f624..3c9d0cdf 100644 --- a/environments/mcp_env/mcp_env.py +++ b/environments/mcp_env/mcp_env.py @@ -1,21 +1,83 @@ import asyncio import atexit import os +import socket import threading -from typing import Callable, Dict, List +from typing import Callable, Dict, List, Sequence from datasets import Dataset from dotenv import load_dotenv + +import verifiers as vf from src.mcp_server_connection import MCPServerConnection from src.mcp_tool_wrapper import MCPToolWrapper from src.models import MCPServerConfig - -import verifiers as vf from verifiers.envs.tool_env import ToolEnv from verifiers.types import Message load_dotenv() +def _allocate_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + return sock.getsockname()[1] + + +def build_streamable_http_server_config( + *, + name: str, + command: str | None = None, + args: Sequence[str] | None = None, + env: Dict[str, str | None] | None = None, + headers: Dict[str, str] | None = None, + url: str | None = None, + host: str = "127.0.0.1", + port: int | None = None, + url_path: str = "/mcp", + description: str | None = None, +) -> Dict[str, object]: + """Build a StreamableHTTP MCP server configuration. + + The helper allocates a port when launching a local process and supports + placeholder substitution in command arguments. Placeholders `{port}`, + `{host}`, and `{path}` are expanded automatically so launch commands can be + written declaratively. + """ + + normalized_path = url_path if url_path.startswith("/") else f"/{url_path}" + + if url: + target_url = url + else: + port = port or _allocate_port() + target_url = f"http://{host}:{port}{normalized_path}" + + format_kwargs = { + "port": port, + "host": host, + "path": normalized_path, + "url": target_url, + } + + formatted_args: List[str] | None = None + if args is not None: + formatted_args = [arg.format(**format_kwargs) for arg in args] + + sanitized_env = None + if env: + sanitized_env = {k: v for k, v in env.items() if v is not None} + + return { + "name": name, + "transport": "streamablehttp", + "command": command, + "args": formatted_args, + "env": sanitized_env, + "headers": headers, + "url": target_url, + "description": description or f"StreamableHTTP server '{name}'", + } + EXA_FETCH_TOOLS = [ { "name": "exa", @@ -55,6 +117,9 @@ def __init__( self.mcp_servers.append(MCPServerConfig(**server)) else: self.mcp_servers.append(server) + else: + for server in EXA_FETCH_TOOLS: + self.mcp_servers.append(MCPServerConfig(**server)) self.server_connections: Dict[str, MCPServerConnection] = {} self.mcp_tools: Dict[str, MCPToolWrapper] = {} @@ -150,7 +215,7 @@ def _shutdown_loop(self): def load_environment( - mcp_servers: list = EXA_FETCH_TOOLS, dataset=None, **kwargs + mcp_servers: list | None = None, dataset=None, **kwargs ) -> vf.Environment: """Load an MCPEnv environment with fetch server for testing.""" dataset = dataset or Dataset.from_dict( @@ -169,8 +234,9 @@ async def judge_reward(judge, prompt, completion, answer, state): return 1.0 if "yes" in judge_response.lower() else 0.0 rubric.add_reward_func(judge_reward, weight=1.0) + server_configs = mcp_servers or EXA_FETCH_TOOLS vf_env = MCPEnv( - mcp_servers=mcp_servers, + mcp_servers=server_configs, dataset=dataset, rubric=rubric, **kwargs, diff --git a/environments/mcp_env/src/mcp_server_connection.py b/environments/mcp_env/src/mcp_server_connection.py index 67594765..228fe177 100644 --- a/environments/mcp_env/src/mcp_server_connection.py +++ b/environments/mcp_env/src/mcp_server_connection.py @@ -1,9 +1,13 @@ import asyncio import logging +import os +from urllib.parse import urlparse +from contextlib import AsyncExitStack from typing import Dict, Optional from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client +from mcp.client.streamable_http import streamablehttp_client from mcp.types import TextContent, Tool from .models import MCPServerConfig @@ -20,6 +24,7 @@ def __init__(self, config: MCPServerConfig, logger: logging.Logger): self._ready = asyncio.Event() self._error: Optional[Exception] = None self.loop: Optional[asyncio.AbstractEventLoop] = None + self._process: Optional[asyncio.subprocess.Process] = None async def connect(self): # Record the loop this connection is bound to @@ -33,30 +38,80 @@ async def connect(self): return self.tools - async def _get_connection(self): - try: - server_params = StdioServerParameters( - command=self.config.command, - args=self.config.args or [], - env=self.config.env, + async def _run_stdio(self): + server_params = StdioServerParameters( + command=self.config.command, + args=self.config.args or [], + env=self.config.env, + ) + + async with stdio_client(server_params) as (read, write): + async with ClientSession(read, write) as session: + self.session = session + + await session.initialize() + + tools_response = await session.list_tools() + + for tool in tools_response.tools: + self.tools[tool.name] = tool + + self._ready.set() + + while True: + await asyncio.sleep(1) + + async def _run_streamable_http(self): + if not self.config.url: + raise ValueError( + f"StreamableHTTP server '{self.config.name}' requires a url" ) - async with stdio_client(server_params) as (read, write): - async with ClientSession(read, write) as session: - self.session = session + if self.config.command: + env = os.environ.copy() + if self.config.env: + env.update({k: v for k, v in self.config.env.items() if v is not None}) - await session.initialize() + self._process = await asyncio.create_subprocess_exec( + self.config.command, + *(self.config.args or []), + env=env, + ) + + await self._wait_for_http_server() + + async with AsyncExitStack() as stack: + read, write, _ = await stack.enter_async_context( + streamablehttp_client( + self.config.url, + headers=self.config.headers, + ) + ) + session = await stack.enter_async_context(ClientSession(read, write)) + self.session = session + + await session.initialize() - tools_response = await session.list_tools() + tools_response = await session.list_tools() - for tool in tools_response.tools: - self.tools[tool.name] = tool + for tool in tools_response.tools: + self.tools[tool.name] = tool - self._ready.set() + self._ready.set() - while True: - await asyncio.sleep(1) + while True: + await asyncio.sleep(1) + async def _get_connection(self): + try: + if self.config.transport == "stdio": + await self._run_stdio() + elif self.config.transport == "streamablehttp": + await self._run_streamable_http() + else: + raise ValueError( + f"Unsupported MCP transport '{self.config.transport}'" + ) except asyncio.CancelledError: raise except Exception as e: @@ -66,6 +121,15 @@ async def _get_connection(self): self.session = None self.tools = {} + if self._process is not None: + self._process.terminate() + try: + await asyncio.wait_for(self._process.wait(), timeout=5) + except asyncio.TimeoutError: + self._process.kill() + await self._process.wait() + self._process = None + async def call_tool(self, tool_name: str, arguments: dict) -> str: assert self.session is not None, f"Server '{self.config.name}' not connected" assert self.loop is not None, "Connection loop not initialized" @@ -97,3 +161,36 @@ async def disconnect(self): except asyncio.CancelledError: pass self.logger.info(f"MCP server '{self.config.name}' terminated") + + async def _wait_for_http_server(self, timeout: float = 30.0) -> None: + if not self.config.url: + await asyncio.sleep(2.0) + return + + parsed = urlparse(self.config.url) + host = parsed.hostname + port = parsed.port + + if host is None: + await asyncio.sleep(2.0) + return + + if port is None: + port = 443 if parsed.scheme == "https" else 80 + + loop = asyncio.get_running_loop() + deadline = loop.time() + timeout + + while True: + try: + reader, writer = await asyncio.open_connection(host, port) + except Exception: + if loop.time() >= deadline: + raise RuntimeError( + f"Timed out waiting for StreamableHTTP server '{self.config.name}'" + ) + await asyncio.sleep(0.5) + else: + writer.close() + await writer.wait_closed() + return diff --git a/environments/mcp_env/src/models.py b/environments/mcp_env/src/models.py index 7a20dd38..e5877a66 100644 --- a/environments/mcp_env/src/models.py +++ b/environments/mcp_env/src/models.py @@ -1,11 +1,17 @@ from dataclasses import dataclass -from typing import Dict, List +from typing import Dict, List, Literal, Optional + + +Transport = Literal["stdio", "streamablehttp"] @dataclass class MCPServerConfig: name: str - command: str + transport: Transport = "stdio" + command: Optional[str] = None args: List[str] | None = None env: Dict[str, str] | None = None + headers: Dict[str, str] | None = None + url: str | None = None description: str = ""