Skip to content

Commit ee47698

Browse files
committed
fix: Make onSendMessage() handling less aggressive
Record the failure in the Task, unless it is in a final state, and return the Task to the user like before the last change.
1 parent 8197ec8 commit ee47698

File tree

1 file changed

+30
-10
lines changed

1 file changed

+30
-10
lines changed

server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static java.util.concurrent.TimeUnit.*;
77

88
import java.util.ArrayList;
9+
import java.util.Collections;
910
import java.util.List;
1011
import java.util.Objects;
1112
import java.util.Optional;
@@ -56,6 +57,8 @@
5657
import io.a2a.spec.TaskPushNotificationConfig;
5758
import io.a2a.spec.TaskQueryParams;
5859
import io.a2a.spec.TaskState;
60+
import io.a2a.spec.TaskStatus;
61+
import io.a2a.spec.TextPart;
5962
import io.a2a.spec.UnsupportedOperationError;
6063
import org.eclipse.microprofile.config.inject.ConfigProperty;
6164
import org.slf4j.Logger;
@@ -262,7 +265,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
262265
// 2. Close the queue to signal consumption can complete
263266
// 3. Wait for consumption to finish processing events
264267
// 4. Fetch final task state from TaskStore
265-
268+
boolean failed = false;
269+
String error = null;
266270
try {
267271
// Step 1: Wait for agent to finish (with configurable timeout)
268272
if (agentFuture != null) {
@@ -288,22 +292,38 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
288292
}
289293
} catch (InterruptedException e) {
290294
Thread.currentThread().interrupt();
291-
String msg = String.format("Error waiting for task %s completion", taskId);
292-
LOGGER.warn(msg, e);
293-
throw new InternalError(msg);
295+
error = String.format("Error waiting for task %s completion", taskId);
296+
LOGGER.warn(error, e);
297+
failed = true;
294298
} catch (java.util.concurrent.ExecutionException e) {
295-
String msg = String.format("Error during task %s execution", taskId);
296-
LOGGER.warn(msg, e.getCause());
297-
throw new InternalError(msg);
299+
error = String.format("Error during task %s execution", taskId);
300+
LOGGER.warn(error, e.getCause());
301+
failed = true;
298302
} catch (java.util.concurrent.TimeoutException e) {
299-
String msg = String.format("Timeout waiting for consumption to complete for task %s", taskId);
300-
LOGGER.warn(msg, taskId);
301-
throw new InternalError(msg);
303+
error = String.format("Timeout waiting for consumption to complete for task %s", taskId);
304+
LOGGER.warn(error, e);
305+
failed = true;
302306
}
303307

304308
// Step 4: Fetch the final task state from TaskStore (all events have been processed)
305309
Task updatedTask = taskStore.get(taskId);
306310
if (updatedTask != null) {
311+
if (failed && !updatedTask.getStatus().state().isFinal()) {
312+
// If we had exceptions above, update the Task to reflect that
313+
Message msg = new Message.Builder()
314+
.role(Message.Role.AGENT)
315+
.parts(Collections.singletonList(new TextPart(error)))
316+
.contextId(mss.requestContext.getContextId())
317+
.messageId(mss.requestContext.getMessage().getMessageId())
318+
.taskId(taskId)
319+
.build();
320+
321+
//Message message = A2A
322+
updatedTask = new Task.Builder(updatedTask)
323+
.status(new TaskStatus(TaskState.FAILED, msg, null))
324+
.build();
325+
taskStore.save(updatedTask);
326+
}
307327
kind = updatedTask;
308328
if (LOGGER.isDebugEnabled()) {
309329
LOGGER.debug("Fetched final task for {} with state {} and {} artifacts",

0 commit comments

Comments
 (0)