Skip to content

Commit 150ac07

Browse files
authored
Template improvements (#5)
* Messenger: add timeout parameter * Improve executor logic; add echo agent as example * Clarify publishing steps in README
1 parent 49f5d51 commit 150ac07

File tree

5 files changed

+76
-35
lines changed

5 files changed

+76
-35
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,8 @@ ghcr.io/<your-username>/<your-repo-name>:latest
5656
```
5757
ghcr.io/<your-username>/<your-repo-name>:1.0.0
5858
ghcr.io/<your-username>/<your-repo-name>:1
59-
```
59+
```
60+
61+
Once the workflow completes, find your Docker image in the Packages section (right sidebar of your repository). Configure the package visibility in package settings.
62+
63+
> **Note:** Organization repositories may need package write permissions enabled manually (Settings → Actions → General). Version tags must follow [semantic versioning](https://semver.org/) (e.g., `v1.0.0`).

src/agent.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,32 @@
11
from a2a.server.tasks import TaskUpdater
2+
from a2a.types import Message, TaskState, Part, TextPart
3+
from a2a.utils import get_message_text, new_agent_text_message
4+
25
from messenger import Messenger
36

47

58
class Agent:
69
def __init__(self):
710
self.messenger = Messenger()
8-
# initialize other state here
11+
# Initialize other state here
912

10-
async def run(self, input_text: str, updater: TaskUpdater) -> None:
13+
async def run(self, message: Message, updater: TaskUpdater) -> None:
1114
"""Implement your agent logic here.
1215
1316
Args:
14-
input_text: The incoming message text
17+
message: The incoming message
1518
updater: Report progress (update_status) and results (add_artifact)
1619
1720
Use self.messenger.talk_to_agent(message, url) to call other agents.
1821
"""
19-
raise NotImplementedError("Agent not implemented.")
22+
input_text = get_message_text(message)
23+
24+
# Replace this example code with your agent logic
25+
26+
await updater.update_status(
27+
TaskState.working, new_agent_text_message("Thinking...")
28+
)
29+
await updater.add_artifact(
30+
parts=[Part(root=TextPart(text=input_text))],
31+
name="Echo",
32+
)

src/executor.py

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from a2a.server.tasks import TaskUpdater
44
from a2a.types import (
55
Task,
6+
TaskState,
67
UnsupportedOperationError,
78
InvalidRequestError,
89
)
@@ -15,44 +16,47 @@
1516
from agent import Agent
1617

1718

18-
TERMINAL_STATES = ["completed", "canceled", "rejected", "failed"]
19+
TERMINAL_STATES = {
20+
TaskState.completed,
21+
TaskState.canceled,
22+
TaskState.failed,
23+
TaskState.rejected
24+
}
1925

2026

2127
class Executor(AgentExecutor):
2228
def __init__(self):
23-
self.agent_store: dict[str, Agent] = {}
29+
self.agents: dict[str, Agent] = {} # context_id to agent instance
2430

25-
async def execute(
26-
self,
27-
context: RequestContext,
28-
event_queue: EventQueue,
29-
) -> None:
31+
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
3032
msg = context.message
3133
if not msg:
3234
raise ServerError(error=InvalidRequestError(message="Missing message in request"))
3335

3436
task = context.current_task
35-
if not task or task.status.state in TERMINAL_STATES:
37+
if task and task.status.state in TERMINAL_STATES:
38+
raise ServerError(error=InvalidRequestError(message=f"Task {task.id} already processed (state: {task.status.state})"))
39+
40+
if not task:
3641
task = new_task(msg)
42+
await event_queue.enqueue_event(task)
3743

3844
context_id = task.context_id
39-
agent = self.agent_store.get(context_id)
45+
agent = self.agents.get(context_id)
4046
if not agent:
4147
agent = Agent()
42-
self.agent_store[context_id] = agent
48+
self.agents[context_id] = agent
4349

44-
await event_queue.enqueue_event(task)
4550
updater = TaskUpdater(event_queue, task.id, context_id)
46-
await updater.start_work(new_agent_text_message(text="Thinking...", context_id=context_id, task_id=task.id))
4751

52+
await updater.start_work()
4853
try:
49-
await agent.run(context.get_user_input(), updater)
50-
await updater.complete()
54+
await agent.run(msg, updater)
55+
if not updater._terminal_state_reached:
56+
await updater.complete()
5157
except Exception as e:
5258
print(f"Task failed with agent error: {e}")
53-
await updater.failed(new_agent_text_message(f"Agent error: {e}", context_id=context_id))
59+
await updater.failed(new_agent_text_message(f"Agent error: {e}", context_id=context_id, task_id=task.id))
5460

55-
async def cancel(
56-
self, request: RequestContext, event_queue: EventQueue
57-
) -> Task | None:
61+
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
5862
raise ServerError(error=UnsupportedOperationError())

src/messenger.py

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,18 @@
2020
DEFAULT_TIMEOUT = 300
2121

2222

23-
def create_message(*, role: Role = Role.user, text: str, context_id: str | None = None) -> Message:
23+
def create_message(
24+
*, role: Role = Role.user, text: str, context_id: str | None = None
25+
) -> Message:
2426
return Message(
2527
kind="message",
2628
role=role,
2729
parts=[Part(TextPart(kind="text", text=text))],
2830
message_id=uuid4().hex,
29-
context_id=context_id
31+
context_id=context_id,
3032
)
3133

34+
3235
def merge_parts(parts: list[Part]) -> str:
3336
chunks = []
3437
for part in parts:
@@ -38,9 +41,17 @@ def merge_parts(parts: list[Part]) -> str:
3841
chunks.append(json.dumps(part.root.data, indent=2))
3942
return "\n".join(chunks)
4043

41-
async def send_message(message: str, base_url: str, context_id: str | None = None, streaming=False, consumer: Consumer | None = None):
44+
45+
async def send_message(
46+
message: str,
47+
base_url: str,
48+
context_id: str | None = None,
49+
streaming: bool = False,
50+
timeout: int = DEFAULT_TIMEOUT,
51+
consumer: Consumer | None = None,
52+
):
4253
"""Returns dict with context_id, response and status (if exists)"""
43-
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as httpx_client:
54+
async with httpx.AsyncClient(timeout=timeout) as httpx_client:
4455
resolver = A2ACardResolver(httpx_client=httpx_client, base_url=base_url)
4556
agent_card = await resolver.get_agent_card()
4657
config = ClientConfig(
@@ -54,10 +65,7 @@ async def send_message(message: str, base_url: str, context_id: str | None = Non
5465

5566
outbound_msg = create_message(text=message, context_id=context_id)
5667
last_event = None
57-
outputs = {
58-
"response": "",
59-
"context_id": None
60-
}
68+
outputs = {"response": "", "context_id": None}
6169

6270
# if streaming == False, only one event is generated
6371
async for event in client.send_message(outbound_msg):
@@ -88,19 +96,31 @@ class Messenger:
8896
def __init__(self):
8997
self._context_ids = {}
9098

91-
async def talk_to_agent(self, message: str, url: str, new_conversation: bool = False):
99+
async def talk_to_agent(
100+
self,
101+
message: str,
102+
url: str,
103+
new_conversation: bool = False,
104+
timeout: int = DEFAULT_TIMEOUT,
105+
):
92106
"""
93107
Communicate with another agent by sending a message and receiving their response.
94108
95109
Args:
96110
message: The message to send to the agent
97111
url: The agent's URL endpoint
98112
new_conversation: If True, start fresh conversation; if False, continue existing conversation
113+
timeout: Timeout in seconds for the request (default: 300)
99114
100115
Returns:
101116
str: The agent's response message
102117
"""
103-
outputs = await send_message(message=message, base_url=url, context_id=None if new_conversation else self._context_ids.get(url, None))
118+
outputs = await send_message(
119+
message=message,
120+
base_url=url,
121+
context_id=None if new_conversation else self._context_ids.get(url, None),
122+
timeout=timeout,
123+
)
104124
if outputs.get("status", "completed") != "completed":
105125
raise RuntimeError(f"{url} responded with: {outputs}")
106126
self._context_ids[url] = outputs.get("context_id", None)

src/server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ def main():
2727
id="",
2828
name="",
2929
description="",
30-
tags=[""],
31-
examples=[""]
30+
tags=[],
31+
examples=[]
3232
)
3333

3434
agent_card = AgentCard(

0 commit comments

Comments
 (0)