Skip to content

Commit b89e17d

Browse files
committed
test: restore nested tool-event streaming coverage (from 5a0ce3a)
1 parent 31f29da commit b89e17d

File tree

1 file changed

+335
-0
lines changed

1 file changed

+335
-0
lines changed

tests/test_as_streaming_tool.py

Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
import pytest
2+
3+
from agents import Agent, Runner, function_tool, RunConfig, ModelSettings
4+
from agents.models.openai_chatcompletions import OpenAIChatCompletionsModel
5+
6+
7+
@function_tool
8+
async def grab(x: int) -> int:
9+
return x * 2
10+
11+
12+
async def collect_tool_events(run):
13+
events = []
14+
async for ev in run.stream_events():
15+
if hasattr(ev, "name"):
16+
item = getattr(ev, "item", None)
17+
events.append((ev.name, getattr(item, "name", None)))
18+
else:
19+
events.append((ev.type, None))
20+
return events
21+
22+
23+
@pytest.mark.asyncio
24+
async def test_stream_inner_events_single_agent(monkeypatch):
25+
"""Verify we stream inner tool events for a single agent."""
26+
27+
async def fake_stream(self):
28+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab_tool"})})
29+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})})
30+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})})
31+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab_tool"})})
32+
33+
monkeypatch.setattr(
34+
Runner,
35+
"run_streamed",
36+
lambda *args, **kwargs: type("R", (), {"stream_events": fake_stream})(),
37+
)
38+
39+
sub = Agent(name="sub", instructions="", tools=[grab])
40+
tool = sub.as_tool("grab_tool", "test", stream_inner_events=True)
41+
main = Agent(name="main", instructions="", tools=[tool])
42+
run = Runner.run_streamed(main, input="5")
43+
names = await collect_tool_events(run)
44+
assert names == [
45+
("tool_called", "grab_tool"),
46+
("tool_called", "grab"),
47+
("tool_output", "grab"),
48+
("tool_output", "grab_tool"),
49+
]
50+
51+
52+
@pytest.mark.asyncio
53+
async def test_parallel_stream_inner_events(monkeypatch):
54+
"""Verify we stream inner tool events for parallel tools."""
55+
56+
async def fake_stream(self):
57+
for tool_name in ("A", "B"):
58+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": tool_name})})
59+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})})
60+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})})
61+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": tool_name})})
62+
63+
monkeypatch.setattr(
64+
Runner,
65+
"run_streamed",
66+
lambda *args, **kwargs: type("R", (), {"stream_events": fake_stream})(),
67+
)
68+
69+
sub = Agent(name="sub", instructions="", tools=[grab])
70+
a = sub.as_tool("A", "A", stream_inner_events=True)
71+
b = sub.as_tool("B", "B", stream_inner_events=True)
72+
main = Agent(name="main", instructions="", tools=[a, b])
73+
run = Runner.run_streamed(
74+
main,
75+
input="",
76+
run_config=RunConfig(model_settings=ModelSettings(parallel_tool_calls=True)),
77+
)
78+
names = await collect_tool_events(run)
79+
assert names.count(("tool_called", "grab")) == 2
80+
assert names.count(("tool_output", "grab")) == 2
81+
assert ("tool_called", "A") in names and ("tool_called", "B") in names
82+
83+
84+
@pytest.mark.asyncio
85+
async def test_as_tool_streams_nested_tool_calls(monkeypatch):
86+
"""Ensure nested tool events surface when streaming a sub-agent."""
87+
88+
async def fake_stream(self):
89+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "wrapper"})})
90+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})})
91+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})})
92+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "wrapper"})})
93+
94+
monkeypatch.setattr(
95+
Runner, "run_streamed", lambda *a, **k: type("R", (), {"stream_events": fake_stream})()
96+
)
97+
monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None)
98+
99+
sub = Agent(name="sub", instructions="", tools=[grab])
100+
tool = sub.as_tool("wrapper", "desc", stream_inner_events=True)
101+
main = Agent(name="main", instructions="", tools=[tool])
102+
run = Runner.run_streamed(main, input="")
103+
104+
events = await collect_tool_events(run)
105+
assert events == [
106+
("tool_called", "wrapper"),
107+
("tool_called", "grab"),
108+
("tool_output", "grab"),
109+
("tool_output", "wrapper"),
110+
]
111+
112+
113+
@pytest.mark.asyncio
114+
async def test_as_tool_parallel_streams(monkeypatch):
115+
"""Nested tool events appear for each tool when parallelized."""
116+
117+
async def fake_stream(self):
118+
for name in ("A", "B"):
119+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": name})})
120+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})})
121+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})})
122+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": name})})
123+
124+
monkeypatch.setattr(
125+
Runner, "run_streamed", lambda *a, **k: type("R", (), {"stream_events": fake_stream})()
126+
)
127+
monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None)
128+
129+
sub = Agent(name="sub", instructions="", tools=[grab])
130+
t1 = sub.as_tool("A", "A", stream_inner_events=True)
131+
t2 = sub.as_tool("B", "B", stream_inner_events=True)
132+
main = Agent(name="main", instructions="", tools=[t1, t2])
133+
run = Runner.run_streamed(
134+
main,
135+
input="",
136+
run_config=RunConfig(model_settings=ModelSettings(parallel_tool_calls=True)),
137+
)
138+
139+
events = await collect_tool_events(run)
140+
assert events.count(("tool_called", "grab")) == 2
141+
assert events.count(("tool_output", "grab")) == 2
142+
assert ("tool_called", "A") in events and ("tool_called", "B") in events
143+
144+
145+
@pytest.mark.asyncio
146+
async def test_as_tool_error_propagation(monkeypatch):
147+
"""Errors inside a sub-agent surface as outer tool_error events."""
148+
149+
async def fake_stream(self):
150+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "outer"})})
151+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})})
152+
yield type("E", (), {"name": "tool_error", "item": type("I", (), {"name": "grab"})})
153+
yield type("E", (), {"name": "tool_error", "item": type("I", (), {"name": "outer"})})
154+
155+
monkeypatch.setattr(
156+
Runner, "run_streamed", lambda *a, **k: type("R", (), {"stream_events": fake_stream})()
157+
)
158+
monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None)
159+
160+
sub = Agent(name="sub", instructions="", tools=[grab])
161+
tool = sub.as_tool("outer", "desc", stream_inner_events=True)
162+
main = Agent(name="main", instructions="", tools=[tool])
163+
run = Runner.run_streamed(main, input="")
164+
165+
events = await collect_tool_events(run)
166+
assert ("tool_error", "outer") in events
167+
168+
169+
@pytest.mark.asyncio
170+
async def test_as_tool_empty_inner_run(monkeypatch):
171+
"""An inner agent that does nothing still emits wrapper events."""
172+
173+
async def fake_stream(self):
174+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "outer"})})
175+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "outer"})})
176+
177+
monkeypatch.setattr(
178+
Runner, "run_streamed", lambda *a, **k: type("R", (), {"stream_events": fake_stream})()
179+
)
180+
monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None)
181+
182+
sub = Agent(name="sub", instructions="", tools=[grab])
183+
tool = sub.as_tool("outer", "desc", stream_inner_events=True)
184+
main = Agent(name="main", instructions="", tools=[tool])
185+
run = Runner.run_streamed(main, input="")
186+
187+
events = await collect_tool_events(run)
188+
assert events == [
189+
("tool_called", "outer"),
190+
("tool_output", "outer"),
191+
]
192+
193+
194+
@pytest.mark.asyncio
195+
async def test_as_tool_mixed_reasoning_and_tools(monkeypatch):
196+
"""Wrapper forwards reasoning and tool events in order."""
197+
198+
async def fake_stream(self):
199+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "outer"})})
200+
yield type(
201+
"E", (), {"name": "reasoning_item_created", "item": type("I", (), {"name": "r"})}
202+
)
203+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})})
204+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})})
205+
yield type(
206+
"E", (), {"name": "reasoning_item_created", "item": type("I", (), {"name": "r2"})}
207+
)
208+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "outer"})})
209+
210+
monkeypatch.setattr(
211+
Runner, "run_streamed", lambda *a, **k: type("R", (), {"stream_events": fake_stream})()
212+
)
213+
monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None)
214+
215+
sub = Agent(name="sub", instructions="", tools=[grab])
216+
tool = sub.as_tool("outer", "desc", stream_inner_events=True)
217+
main = Agent(name="main", instructions="", tools=[tool])
218+
run = Runner.run_streamed(main, input="")
219+
220+
events = await collect_tool_events(run)
221+
assert events == [
222+
("tool_called", "outer"),
223+
("reasoning_item_created", "r"),
224+
("tool_called", "grab"),
225+
("tool_output", "grab"),
226+
("reasoning_item_created", "r2"),
227+
("tool_output", "outer"),
228+
]
229+
230+
231+
@pytest.mark.asyncio
232+
async def test_as_tool_multiple_inner_tools(monkeypatch):
233+
"""Two inner tools are streamed sequentially."""
234+
235+
async def fake_stream(self):
236+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "outer"})})
237+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})})
238+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})})
239+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab2"})})
240+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab2"})})
241+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "outer"})})
242+
243+
monkeypatch.setattr(
244+
Runner, "run_streamed", lambda *a, **k: type("R", (), {"stream_events": fake_stream})()
245+
)
246+
monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None)
247+
248+
@function_tool
249+
async def grab2(x: int) -> int:
250+
return x + 1
251+
252+
sub = Agent(name="sub", instructions="", tools=[grab, grab2])
253+
tool = sub.as_tool("outer", "desc", stream_inner_events=True)
254+
main = Agent(name="main", instructions="", tools=[tool])
255+
run = Runner.run_streamed(main, input="")
256+
257+
events = await collect_tool_events(run)
258+
assert events == [
259+
("tool_called", "outer"),
260+
("tool_called", "grab"),
261+
("tool_output", "grab"),
262+
("tool_called", "grab2"),
263+
("tool_output", "grab2"),
264+
("tool_output", "outer"),
265+
]
266+
267+
268+
@pytest.mark.asyncio
269+
async def test_as_tool_heavy_concurrency_ordering(monkeypatch):
270+
"""Nested events from many tools appear once and in order."""
271+
272+
async def fake_stream(self):
273+
for name in ("A", "B", "C", "D"):
274+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": name})})
275+
yield type("E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})})
276+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})})
277+
yield type("E", (), {"name": "tool_output", "item": type("I", (), {"name": name})})
278+
279+
monkeypatch.setattr(
280+
Runner, "run_streamed", lambda *a, **k: type("R", (), {"stream_events": fake_stream})()
281+
)
282+
monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None)
283+
284+
sub = Agent(name="sub", instructions="", tools=[grab])
285+
tools = [sub.as_tool(n, n, stream_inner_events=True) for n in ("A", "B", "C", "D")]
286+
main = Agent(name="main", instructions="", tools=tools)
287+
run = Runner.run_streamed(
288+
main,
289+
input="",
290+
run_config=RunConfig(model_settings=ModelSettings(parallel_tool_calls=True)),
291+
)
292+
293+
events = await collect_tool_events(run)
294+
assert events.count(("tool_called", "grab")) == 4
295+
assert events.count(("tool_output", "grab")) == 4
296+
for name in ("A", "B", "C", "D"):
297+
assert ("tool_called", name) in events
298+
299+
300+
@pytest.mark.asyncio
301+
async def test_as_tool_backward_compatibility(monkeypatch):
302+
"""When stream_inner_events is False, inner events are hidden."""
303+
304+
def fake_run_streamed(agent, *args, **kwargs):
305+
async def fake_stream(self):
306+
if agent.name == "sub":
307+
yield type(
308+
"E", (), {"name": "tool_called", "item": type("I", (), {"name": "grab"})}
309+
)
310+
yield type(
311+
"E", (), {"name": "tool_output", "item": type("I", (), {"name": "grab"})}
312+
)
313+
else:
314+
yield type(
315+
"E", (), {"name": "tool_called", "item": type("I", (), {"name": "outer"})}
316+
)
317+
yield type(
318+
"E", (), {"name": "tool_output", "item": type("I", (), {"name": "outer"})}
319+
)
320+
321+
return type("R", (), {"stream_events": fake_stream})()
322+
323+
monkeypatch.setattr(Runner, "run_streamed", fake_run_streamed)
324+
monkeypatch.setattr(OpenAIChatCompletionsModel, "_fetch_response", lambda *a, **k: None)
325+
326+
sub = Agent(name="sub", instructions="", tools=[grab])
327+
tool = sub.as_tool("outer", "desc", stream_inner_events=False)
328+
main = Agent(name="main", instructions="", tools=[tool])
329+
run = Runner.run_streamed(main, input="")
330+
331+
events = await collect_tool_events(run)
332+
assert events == [
333+
("tool_called", "outer"),
334+
("tool_output", "outer"),
335+
]

0 commit comments

Comments
 (0)