diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index a93b6238..a80062ff 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -6,6 +6,7 @@ import static java.util.concurrent.TimeUnit.*; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -56,6 +57,8 @@ import io.a2a.spec.TaskPushNotificationConfig; import io.a2a.spec.TaskQueryParams; import io.a2a.spec.TaskState; +import io.a2a.spec.TaskStatus; +import io.a2a.spec.TextPart; import io.a2a.spec.UnsupportedOperationError; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; @@ -262,7 +265,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // 2. Close the queue to signal consumption can complete // 3. Wait for consumption to finish processing events // 4. Fetch final task state from TaskStore - + boolean failed = false; + String error = null; try { // Step 1: Wait for agent to finish (with configurable timeout) if (agentFuture != null) { @@ -288,22 +292,36 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - String msg = String.format("Error waiting for task %s completion", taskId); - LOGGER.warn(msg, e); - throw new InternalError(msg); + error = String.format("Error waiting for task %s completion", taskId); + LOGGER.warn(error, e); + failed = true; } catch (java.util.concurrent.ExecutionException e) { - String msg = String.format("Error during task %s execution", taskId); - LOGGER.warn(msg, e.getCause()); - throw new InternalError(msg); + error = String.format("Error during task %s execution", taskId); + LOGGER.warn(error, e.getCause()); + failed = true; } catch (java.util.concurrent.TimeoutException e) { - String msg = String.format("Timeout waiting for consumption to complete for task %s", taskId); - LOGGER.warn(msg, taskId); - throw new InternalError(msg); + error = String.format("Timeout waiting for consumption to complete for task %s", taskId); + LOGGER.warn(error, e); + failed = true; } // Step 4: Fetch the final task state from TaskStore (all events have been processed) Task updatedTask = taskStore.get(taskId); if (updatedTask != null) { + if (failed && !updatedTask.getStatus().state().isFinal()) { + // If we had exceptions above, update the Task to reflect that + Message msg = new Message.Builder() + .role(Message.Role.AGENT) + .parts(Collections.singletonList(new TextPart(error))) + .contextId(mss.requestContext.getContextId()) + .taskId(taskId) + .build(); + + updatedTask = new Task.Builder(updatedTask) + .status(new TaskStatus(TaskState.FAILED, msg, null)) + .build(); + taskStore.save(updatedTask); + } kind = updatedTask; if (LOGGER.isDebugEnabled()) { LOGGER.debug("Fetched final task for {} with state {} and {} artifacts",