|
@@ -58,6 +58,7 @@ import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.CyclicBarrier;
|
|
import java.util.concurrent.CyclicBarrier;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.concurrent.Executor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -377,7 +378,13 @@ public class TransportNodesActionTests extends ESTestCase {
|
|
public void testConcurrentlyCompletionAndCancellation() throws InterruptedException {
|
|
public void testConcurrentlyCompletionAndCancellation() throws InterruptedException {
|
|
final var action = getTestTransportNodesAction();
|
|
final var action = getTestTransportNodesAction();
|
|
|
|
|
|
- final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap());
|
|
|
|
|
|
+ final CountDownLatch onCancelledLatch = new CountDownLatch(1);
|
|
|
|
+ final CancellableTask cancellableTask = new CancellableTask(randomLong(), "transport", "action", "", null, emptyMap()) {
|
|
|
|
+ @Override
|
|
|
|
+ protected void onCancelled() {
|
|
|
|
+ onCancelledLatch.countDown();
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
|
|
final PlainActionFuture<TestNodesResponse> future = new PlainActionFuture<>();
|
|
final PlainActionFuture<TestNodesResponse> future = new PlainActionFuture<>();
|
|
action.execute(cancellableTask, new TestNodesRequest(), future);
|
|
action.execute(cancellableTask, new TestNodesRequest(), future);
|
|
@@ -414,6 +421,11 @@ public class TransportNodesActionTests extends ESTestCase {
|
|
assertNotNull("expect task cancellation exception, but got\n" + ExceptionsHelper.stackTrace(e), taskCancelledException);
|
|
assertNotNull("expect task cancellation exception, but got\n" + ExceptionsHelper.stackTrace(e), taskCancelledException);
|
|
assertThat(e.getMessage(), containsString("task cancelled [simulated]"));
|
|
assertThat(e.getMessage(), containsString("task cancelled [simulated]"));
|
|
assertTrue(cancellableTask.isCancelled());
|
|
assertTrue(cancellableTask.isCancelled());
|
|
|
|
+ // Wait for the latch, the listener for releasing node responses is called before it.
|
|
|
|
+ // We need to wait for the latch because the cancellation may be detected in CancellableFanOut#onCompletion with
|
|
|
|
+ // the responseHandled flag being true. The flag is set by the cancellation listener which is still in process of
|
|
|
|
+ // draining existing responses.
|
|
|
|
+ safeAwait(onCancelledLatch);
|
|
// All previously captured responses are released due to cancellation
|
|
// All previously captured responses are released due to cancellation
|
|
assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false));
|
|
assertTrue(nodeResponses.stream().allMatch(r -> r.hasReferences() == false));
|
|
// Wait for the last response to be gathered and assert it is also released by either the concurrent cancellation or
|
|
// Wait for the last response to be gathered and assert it is also released by either the concurrent cancellation or
|