Skip to content

Commit a8fcc1b

Browse files
seanzhougooglecopybara-github
authored andcommitted
fix: Return final task result in task artifact instead of status message
According to a2a protocol task artifact is a different concept from adk artifact. if a task is completed the final result should be in task artifact. PiperOrigin-RevId: 782154265
1 parent a57d629 commit a8fcc1b

File tree

4 files changed

+353
-442
lines changed

4 files changed

+353
-442
lines changed

src/google/adk/a2a/converters/event_converter.py

Lines changed: 5 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,11 @@
2424
import uuid
2525

2626
from a2a.server.events import Event as A2AEvent
27-
from a2a.types import Artifact
2827
from a2a.types import DataPart
2928
from a2a.types import Message
3029
from a2a.types import Part as A2APart
3130
from a2a.types import Role
3231
from a2a.types import Task
33-
from a2a.types import TaskArtifactUpdateEvent
3432
from a2a.types import TaskState
3533
from a2a.types import TaskStatus
3634
from a2a.types import TaskStatusUpdateEvent
@@ -145,81 +143,6 @@ def _create_artifact_id(
145143
return ARTIFACT_ID_SEPARATOR.join(components)
146144

147145

148-
def _convert_artifact_to_a2a_events(
149-
event: Event,
150-
invocation_context: InvocationContext,
151-
filename: str,
152-
version: int,
153-
task_id: Optional[str] = None,
154-
context_id: Optional[str] = None,
155-
) -> TaskArtifactUpdateEvent:
156-
"""Converts a new artifact version to an A2A TaskArtifactUpdateEvent.
157-
158-
Args:
159-
event: The ADK event containing the artifact information.
160-
invocation_context: The invocation context.
161-
filename: The name of the artifact file.
162-
version: The version number of the artifact.
163-
task_id: Optional task ID to use for generated events. If not provided, new UUIDs will be generated.
164-
165-
Returns:
166-
A TaskArtifactUpdateEvent representing the artifact update.
167-
168-
Raises:
169-
ValueError: If required parameters are invalid.
170-
RuntimeError: If artifact loading fails.
171-
"""
172-
if not filename:
173-
raise ValueError("Filename cannot be empty")
174-
if version < 0:
175-
raise ValueError("Version must be non-negative")
176-
177-
try:
178-
artifact_part = invocation_context.artifact_service.load_artifact(
179-
app_name=invocation_context.app_name,
180-
user_id=invocation_context.user_id,
181-
session_id=invocation_context.session.id,
182-
filename=filename,
183-
version=version,
184-
)
185-
186-
converted_part = convert_genai_part_to_a2a_part(part=artifact_part)
187-
if not converted_part:
188-
raise RuntimeError(f"Failed to convert artifact part for {filename}")
189-
190-
artifact_id = _create_artifact_id(
191-
invocation_context.app_name,
192-
invocation_context.user_id,
193-
invocation_context.session.id,
194-
filename,
195-
version,
196-
)
197-
198-
return TaskArtifactUpdateEvent(
199-
taskId=task_id,
200-
append=False,
201-
contextId=context_id,
202-
lastChunk=True,
203-
artifact=Artifact(
204-
artifactId=artifact_id,
205-
name=filename,
206-
metadata={
207-
"filename": filename,
208-
"version": version,
209-
},
210-
parts=[converted_part],
211-
),
212-
)
213-
except Exception as e:
214-
logger.error(
215-
"Failed to convert artifact for %s, version %s: %s",
216-
filename,
217-
version,
218-
e,
219-
)
220-
raise RuntimeError(f"Artifact conversion failed: {e}") from e
221-
222-
223146
def _process_long_running_tool(a2a_part: A2APart, event: Event) -> None:
224147
"""Processes long-running tool metadata for an A2A part.
225148
@@ -268,7 +191,11 @@ def convert_a2a_task_to_event(
268191
try:
269192
# Extract message from task status or history
270193
message = None
271-
if a2a_task.status and a2a_task.status.message:
194+
if a2a_task.artifacts:
195+
message = Message(
196+
messageId="", role=Role.agent, parts=a2a_task.artifacts[-1].parts
197+
)
198+
elif a2a_task.status and a2a_task.status.message:
272199
message = a2a_task.status.message
273200
elif a2a_task.history:
274201
message = a2a_task.history[-1]
@@ -573,13 +500,6 @@ def convert_event_to_a2a_events(
573500
a2a_events = []
574501

575502
try:
576-
# Handle artifact deltas
577-
if event.actions.artifact_delta:
578-
for filename, version in event.actions.artifact_delta.items():
579-
artifact_event = _convert_artifact_to_a2a_events(
580-
event, invocation_context, filename, version, task_id, context_id
581-
)
582-
a2a_events.append(artifact_event)
583503

584504
# Handle error scenarios
585505
if event.error_code:

src/google/adk/a2a/executor/a2a_agent_executor.py

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
from a2a.server.agent_execution import AgentExecutor
2929
from a2a.server.agent_execution.context import RequestContext
3030
from a2a.server.events.event_queue import EventQueue
31+
from a2a.types import Artifact
3132
from a2a.types import Message
3233
from a2a.types import Role
34+
from a2a.types import TaskArtifactUpdateEvent
3335
from a2a.types import TaskState
3436
from a2a.types import TaskStatus
3537
from a2a.types import TaskStatusUpdateEvent
@@ -218,22 +220,49 @@ async def _handle_request(
218220
await event_queue.enqueue_event(a2a_event)
219221

220222
# publish the task result event - this is final
221-
await event_queue.enqueue_event(
222-
TaskStatusUpdateEvent(
223-
taskId=context.task_id,
224-
status=TaskStatus(
225-
state=(
226-
task_result_aggregator.task_state
227-
if task_result_aggregator.task_state != TaskState.working
228-
else TaskState.completed
229-
),
230-
timestamp=datetime.now(timezone.utc).isoformat(),
231-
message=task_result_aggregator.task_status_message,
232-
),
233-
contextId=context.context_id,
234-
final=True,
235-
)
236-
)
223+
if (
224+
task_result_aggregator.task_state == TaskState.working
225+
and task_result_aggregator.task_status_message is not None
226+
and task_result_aggregator.task_status_message.parts
227+
):
228+
# if task is still working properly, publish the artifact update event as
229+
# the final result according to a2a protocol.
230+
await event_queue.enqueue_event(
231+
TaskArtifactUpdateEvent(
232+
taskId=context.task_id,
233+
lastChunk=True,
234+
contextId=context.context_id,
235+
artifact=Artifact(
236+
artifactId=str(uuid.uuid4()),
237+
parts=task_result_aggregator.task_status_message.parts,
238+
),
239+
)
240+
)
241+
# public the final status update event
242+
await event_queue.enqueue_event(
243+
TaskStatusUpdateEvent(
244+
taskId=context.task_id,
245+
status=TaskStatus(
246+
state=TaskState.completed,
247+
timestamp=datetime.now(timezone.utc).isoformat(),
248+
),
249+
contextId=context.context_id,
250+
final=True,
251+
)
252+
)
253+
else:
254+
await event_queue.enqueue_event(
255+
TaskStatusUpdateEvent(
256+
taskId=context.task_id,
257+
status=TaskStatus(
258+
state=task_result_aggregator.task_state,
259+
timestamp=datetime.now(timezone.utc).isoformat(),
260+
message=task_result_aggregator.task_status_message,
261+
),
262+
contextId=context.context_id,
263+
final=True,
264+
)
265+
)
237266

238267
async def _prepare_session(
239268
self, context: RequestContext, run_args: dict[str, Any], runner: Runner

0 commit comments

Comments
 (0)