|
58 | 58 | import java.util.List;
|
59 | 59 | import java.util.Map;
|
60 | 60 | import java.util.Set;
|
| 61 | +import java.util.concurrent.CountDownLatch; |
61 | 62 | import java.util.concurrent.CyclicBarrier;
|
62 | 63 | import java.util.concurrent.Executor;
|
63 | 64 | import java.util.concurrent.TimeUnit;
|
@@ -377,7 +378,13 @@ protected void newResponseAsync(
|
377 | 378 | public void testConcurrentlyCompletionAndCancellation() throws InterruptedException {
|
378 | 379 | final var action = getTestTransportNodesAction();
|
379 | 380 |
|
380 |
| - final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()); |
| 381 | + final CountDownLatch onCancelledLatch = new CountDownLatch(1); |
| 382 | + final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()) { |
| 383 | + @Override |
| 384 | + protected void onCancelled() { |
| 385 | + onCancelledLatch.countDown(); |
| 386 | + } |
| 387 | + }; |
381 | 388 |
|
382 | 389 | final PlainActionFuture<TestNodesResponse> future = new PlainActionFuture<>();
|
383 | 390 | action.execute(cancellableTask, new TestNodesRequest(), future);
|
@@ -414,6 +421,11 @@ public void testConcurrentlyCompletionAndCancellation() throws InterruptedExcept
|
414 | 421 | assertNotNull("expect task cancellation exception, but got\n" + ExceptionsHelper.stackTrace(e), taskCancelledException);
|
415 | 422 | assertThat(e.getMessage(), containsString("task cancelled [simulated]"));
|
416 | 423 | assertTrue(cancellableTask.isCancelled());
|
| 424 | + // Wait for the latch, the listener for releasing node responses is called before it. |
| 425 | + // We need to wait for the latch because the cancellation may be detected in CancellableFanOut#onCompletion with |
| 426 | + // the responseHandled flag being true. The flag is set by the cancellation listener which is still in process of |
| 427 | + // draining existing responses. |
| 428 | + safeAwait(onCancelledLatch); |
417 | 429 | // All previously captured responses are released due to cancellation
|
418 | 430 | assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false));
|
419 | 431 | // Wait for the last response to be gathered and assert it is also released by either the concurrent cancellation or
|
|
0 commit comments