Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion environments/mcp_env/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,44 @@ Run an evaluation with default settings:
uv run vf-eval mcp-env
```

The default configuration launches the Exa and Fetch MCP servers over stdio.
These are real MCP servers maintained by the community and require no manual
setup beyond providing an `EXA_API_KEY` when you want Exa search access.

To run servers with StreamableHTTP, build configurations with the
`build_streamable_http_server_config` helper in `mcp_env.py`. The helper
allocates a port, formats placeholders in your launch arguments, and points the
client at the resulting `http://host:port/path` endpoint—matching the
deployment flow described in the official MCP SDK documentation for
StreamableHTTP transports.【b1dc47†L1-L18】

```python
from environments.mcp_env.mcp_env import build_streamable_http_server_config

streamable_local = build_streamable_http_server_config(
name="local-http",
command="uv",
args=[
"run",
"examples/snippets/servers/streamable_config.py",
"--",
"--port",
"{port}",
],
url_path="/mcp",
)
```

For remote StreamableHTTP providers, skip the command entirely and supply the
endpoint and headers directly—for example, Telnyx exposes an MCP endpoint at
`https://api.telnyx.com/v2/mcp` that can be accessed with a bearer token.【0a9bd8†L1-L6】

Configure model and sampling:

```bash
uv run vf-eval mcp-env -m gpt-4.1-mini -n 1 -r 1
uv run vf-eval mcp-env \
-m gpt-4.1-mini \
-n 1 -r 1
```

Notes:
Expand Down
76 changes: 71 additions & 5 deletions environments/mcp_env/mcp_env.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,83 @@
import asyncio
import atexit
import os
import socket
import threading
from typing import Callable, Dict, List
from typing import Callable, Dict, List, Sequence

from datasets import Dataset
from dotenv import load_dotenv

import verifiers as vf
from src.mcp_server_connection import MCPServerConnection
from src.mcp_tool_wrapper import MCPToolWrapper
from src.models import MCPServerConfig

import verifiers as vf
from verifiers.envs.tool_env import ToolEnv
from verifiers.types import Message

load_dotenv()

def _allocate_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
return sock.getsockname()[1]


def build_streamable_http_server_config(
*,
name: str,
command: str | None = None,
args: Sequence[str] | None = None,
env: Dict[str, str | None] | None = None,
headers: Dict[str, str] | None = None,
url: str | None = None,
host: str = "127.0.0.1",
port: int | None = None,
url_path: str = "/mcp",
description: str | None = None,
) -> Dict[str, object]:
"""Build a StreamableHTTP MCP server configuration.

The helper allocates a port when launching a local process and supports
placeholder substitution in command arguments. Placeholders `{port}`,
`{host}`, and `{path}` are expanded automatically so launch commands can be
written declaratively.
"""

normalized_path = url_path if url_path.startswith("/") else f"/{url_path}"

if url:
target_url = url
else:
port = port or _allocate_port()
target_url = f"http://{host}:{port}{normalized_path}"

format_kwargs = {
"port": port,
"host": host,
"path": normalized_path,
"url": target_url,
}

formatted_args: List[str] | None = None
if args is not None:
formatted_args = [arg.format(**format_kwargs) for arg in args]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Port Placeholder Causes Malformed Command Args

When url is provided to build_streamable_http_server_config without an explicit port, the port variable remains None because port allocation is skipped. This causes {port} placeholders in command args to format as the string "None", resulting in malformed arguments for launched processes.

Fix in Cursor Fix in Web


sanitized_env = None
if env:
sanitized_env = {k: v for k, v in env.items() if v is not None}

return {
"name": name,
"transport": "streamablehttp",
"command": command,
"args": formatted_args,
"env": sanitized_env,
"headers": headers,
"url": target_url,
"description": description or f"StreamableHTTP server '{name}'",
}

EXA_FETCH_TOOLS = [
{
"name": "exa",
Expand Down Expand Up @@ -55,6 +117,9 @@ def __init__(
self.mcp_servers.append(MCPServerConfig(**server))
else:
self.mcp_servers.append(server)
else:
for server in EXA_FETCH_TOOLS:
self.mcp_servers.append(MCPServerConfig(**server))

self.server_connections: Dict[str, MCPServerConnection] = {}
self.mcp_tools: Dict[str, MCPToolWrapper] = {}
Expand Down Expand Up @@ -150,7 +215,7 @@ def _shutdown_loop(self):


def load_environment(
mcp_servers: list = EXA_FETCH_TOOLS, dataset=None, **kwargs
mcp_servers: list | None = None, dataset=None, **kwargs
) -> vf.Environment:
"""Load an MCPEnv environment with fetch server for testing."""
dataset = dataset or Dataset.from_dict(
Expand All @@ -169,8 +234,9 @@ async def judge_reward(judge, prompt, completion, answer, state):
return 1.0 if "yes" in judge_response.lower() else 0.0

rubric.add_reward_func(judge_reward, weight=1.0)
server_configs = mcp_servers or EXA_FETCH_TOOLS
vf_env = MCPEnv(
mcp_servers=mcp_servers,
mcp_servers=server_configs,
dataset=dataset,
rubric=rubric,
**kwargs,
Expand Down
129 changes: 113 additions & 16 deletions environments/mcp_env/src/mcp_server_connection.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import asyncio
import logging
import os
from urllib.parse import urlparse
from contextlib import AsyncExitStack
from typing import Dict, Optional

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.streamable_http import streamablehttp_client
from mcp.types import TextContent, Tool

from .models import MCPServerConfig
Expand All @@ -20,6 +24,7 @@ def __init__(self, config: MCPServerConfig, logger: logging.Logger):
self._ready = asyncio.Event()
self._error: Optional[Exception] = None
self.loop: Optional[asyncio.AbstractEventLoop] = None
self._process: Optional[asyncio.subprocess.Process] = None

async def connect(self):
# Record the loop this connection is bound to
Expand All @@ -33,30 +38,80 @@ async def connect(self):

return self.tools

async def _get_connection(self):
try:
server_params = StdioServerParameters(
command=self.config.command,
args=self.config.args or [],
env=self.config.env,
async def _run_stdio(self):
server_params = StdioServerParameters(
command=self.config.command,
args=self.config.args or [],
env=self.config.env,
)

async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
self.session = session

await session.initialize()

tools_response = await session.list_tools()

for tool in tools_response.tools:
self.tools[tool.name] = tool

self._ready.set()

while True:
await asyncio.sleep(1)

async def _run_streamable_http(self):
if not self.config.url:
raise ValueError(
f"StreamableHTTP server '{self.config.name}' requires a url"
)

async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
self.session = session
if self.config.command:
env = os.environ.copy()
if self.config.env:
env.update({k: v for k, v in self.config.env.items() if v is not None})

await session.initialize()
self._process = await asyncio.create_subprocess_exec(
self.config.command,
*(self.config.args or []),
env=env,
)

await self._wait_for_http_server()

async with AsyncExitStack() as stack:
read, write, _ = await stack.enter_async_context(
streamablehttp_client(
self.config.url,
headers=self.config.headers,
)
)
session = await stack.enter_async_context(ClientSession(read, write))
self.session = session

await session.initialize()

tools_response = await session.list_tools()
tools_response = await session.list_tools()

for tool in tools_response.tools:
self.tools[tool.name] = tool
for tool in tools_response.tools:
self.tools[tool.name] = tool

self._ready.set()
self._ready.set()

while True:
await asyncio.sleep(1)
while True:
await asyncio.sleep(1)

async def _get_connection(self):
try:
if self.config.transport == "stdio":
await self._run_stdio()
elif self.config.transport == "streamablehttp":
await self._run_streamable_http()
else:
raise ValueError(
f"Unsupported MCP transport '{self.config.transport}'"
)
except asyncio.CancelledError:
raise
except Exception as e:
Expand All @@ -66,6 +121,15 @@ async def _get_connection(self):
self.session = None
self.tools = {}

if self._process is not None:
self._process.terminate()
try:
await asyncio.wait_for(self._process.wait(), timeout=5)
except asyncio.TimeoutError:
self._process.kill()
await self._process.wait()
self._process = None

async def call_tool(self, tool_name: str, arguments: dict) -> str:
assert self.session is not None, f"Server '{self.config.name}' not connected"
assert self.loop is not None, "Connection loop not initialized"
Expand Down Expand Up @@ -97,3 +161,36 @@ async def disconnect(self):
except asyncio.CancelledError:
pass
self.logger.info(f"MCP server '{self.config.name}' terminated")

async def _wait_for_http_server(self, timeout: float = 30.0) -> None:
if not self.config.url:
await asyncio.sleep(2.0)
return

parsed = urlparse(self.config.url)
host = parsed.hostname
port = parsed.port

if host is None:
await asyncio.sleep(2.0)
return

if port is None:
port = 443 if parsed.scheme == "https" else 80

loop = asyncio.get_running_loop()
deadline = loop.time() + timeout

while True:
try:
reader, writer = await asyncio.open_connection(host, port)
except Exception:
if loop.time() >= deadline:
raise RuntimeError(
f"Timed out waiting for StreamableHTTP server '{self.config.name}'"
)
await asyncio.sleep(0.5)
else:
writer.close()
await writer.wait_closed()
return
10 changes: 8 additions & 2 deletions environments/mcp_env/src/models.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from dataclasses import dataclass
from typing import Dict, List
from typing import Dict, List, Literal, Optional


Transport = Literal["stdio", "streamablehttp"]


@dataclass
class MCPServerConfig:
name: str
command: str
transport: Transport = "stdio"
command: Optional[str] = None
args: List[str] | None = None
env: Dict[str, str] | None = None
headers: Dict[str, str] | None = None
url: str | None = None
description: str = ""
Loading