58
58
import java .util .List ;
59
59
import java .util .Map ;
60
60
import java .util .Set ;
61
- import java .util .concurrent .CountDownLatch ;
62
61
import java .util .concurrent .CyclicBarrier ;
63
62
import java .util .concurrent .Executor ;
64
63
import java .util .concurrent .TimeUnit ;
@@ -378,28 +377,24 @@ protected void newResponseAsync(
378
377
public void testConcurrentlyCompletionAndCancellation () throws InterruptedException {
379
378
final var action = getTestTransportNodesAction ();
380
379
381
- final CountDownLatch onCancelledLatch = new CountDownLatch (1 );
382
- final CancellableTask cancellableTask = new CancellableTask (randomLong (), "transport" , "action" , "" , null , emptyMap ()) {
383
- @ Override
384
- protected void onCancelled () {
385
- onCancelledLatch .countDown ();
386
- }
387
- };
380
+ final CancellableTask cancellableTask = new CancellableTask (randomLong (), "transport" , "action" , "" , null , emptyMap ());
388
381
389
382
final PlainActionFuture <TestNodesResponse > future = new PlainActionFuture <>();
390
383
action .execute (cancellableTask , new TestNodesRequest (), future );
391
384
392
385
final List <TestNodeResponse > nodeResponses = new ArrayList <>();
393
386
final CapturingTransport .CapturedRequest [] capturedRequests = transport .getCapturedRequestsAndClear ();
387
+ // Complete all but the last request for racing completion with cancellation
394
388
for (int i = 0 ; i < capturedRequests .length - 1 ; i ++) {
395
389
final var capturedRequest = capturedRequests [i ];
396
390
nodeResponses .add (completeOneRequest (capturedRequest ));
397
391
}
398
392
399
393
final var raceBarrier = new CyclicBarrier (3 );
394
+ final var lastResponseFuture = new PlainActionFuture <TestNodeResponse >();
400
395
final Thread completeThread = new Thread (() -> {
401
396
safeAwait (raceBarrier );
402
- nodeResponses . add (completeOneRequest (capturedRequests [capturedRequests .length - 1 ]));
397
+ lastResponseFuture . onResponse (completeOneRequest (capturedRequests [capturedRequests .length - 1 ]));
403
398
});
404
399
final Thread cancelThread = new Thread (() -> {
405
400
safeAwait (raceBarrier );
@@ -419,8 +414,11 @@ protected void onCancelled() {
419
414
assertNotNull ("expect task cancellation exception, but got\n " + ExceptionsHelper .stackTrace (e ), taskCancelledException );
420
415
assertThat (e .getMessage (), containsString ("task cancelled [simulated]" ));
421
416
assertTrue (cancellableTask .isCancelled ());
422
- safeAwait ( onCancelledLatch ); // wait for the latch, the listener for releasing node responses is called before it
417
+ // All previously captured responses are released due to cancellation
423
418
assertTrue (nodeResponses .stream ().allMatch (r -> r .hasReferences () == false ));
419
+ // Wait for the last response to be gathered and assert it is also released by either the concurrent cancellation or
420
+ // not tracked in onItemResponse at all due to already cancelled
421
+ assertFalse (safeGet (lastResponseFuture ).hasReferences ());
424
422
}
425
423
426
424
completeThread .join (10_000 );
0 commit comments