|
|
@@ -21,6 +21,7 @@ import org.elasticsearch.action.search.TransportSearchAction;
|
|
|
import org.elasticsearch.action.support.PlainActionFuture;
|
|
|
import org.elasticsearch.client.internal.node.NodeClient;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
+import org.elasticsearch.common.util.set.Sets;
|
|
|
import org.elasticsearch.http.HttpChannel;
|
|
|
import org.elasticsearch.http.HttpResponse;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
@@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
+import java.util.function.LongSupplier;
|
|
|
|
|
|
public class RestCancellableNodeClientTests extends ESTestCase {
|
|
|
|
|
|
@@ -148,8 +150,42 @@ public class RestCancellableNodeClientTests extends ESTestCase {
|
|
|
assertEquals(totalSearches, testClient.cancelledTasks.size());
|
|
|
}
|
|
|
|
|
|
+ public void testConcurrentExecuteAndClose() throws Exception {
|
|
|
+ final var testClient = new TestClient(Settings.EMPTY, threadPool, true);
|
|
|
+ int initialHttpChannels = RestCancellableNodeClient.getNumChannels();
|
|
|
+ int numTasks = randomIntBetween(1, 30);
|
|
|
+ TestHttpChannel channel = new TestHttpChannel();
|
|
|
+ final var startLatch = new CountDownLatch(1);
|
|
|
+ final var doneLatch = new CountDownLatch(numTasks + 1);
|
|
|
+ final var expectedTasks = Sets.<TaskId>newHashSetWithExpectedSize(numTasks);
|
|
|
+ for (int j = 0; j < numTasks; j++) {
|
|
|
+ RestCancellableNodeClient client = new RestCancellableNodeClient(testClient, channel);
|
|
|
+ threadPool.generic().execute(() -> {
|
|
|
+ client.execute(TransportSearchAction.TYPE, new SearchRequest(), ActionListener.running(ESTestCase::fail));
|
|
|
+ startLatch.countDown();
|
|
|
+ doneLatch.countDown();
|
|
|
+ });
|
|
|
+ expectedTasks.add(new TaskId(testClient.getLocalNodeId(), j));
|
|
|
+ }
|
|
|
+ threadPool.generic().execute(() -> {
|
|
|
+ try {
|
|
|
+ safeAwait(startLatch);
|
|
|
+ channel.awaitClose();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new AssertionError(e);
|
|
|
+ } finally {
|
|
|
+ doneLatch.countDown();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ safeAwait(doneLatch);
|
|
|
+ assertEquals(initialHttpChannels, RestCancellableNodeClient.getNumChannels());
|
|
|
+ assertEquals(expectedTasks, testClient.cancelledTasks);
|
|
|
+ }
|
|
|
+
|
|
|
private static class TestClient extends NodeClient {
|
|
|
- private final AtomicLong counter = new AtomicLong(0);
|
|
|
+ private final LongSupplier searchTaskIdGenerator = new AtomicLong(0)::getAndIncrement;
|
|
|
+ private final LongSupplier cancelTaskIdGenerator = new AtomicLong(1000)::getAndIncrement;
|
|
|
private final Set<TaskId> cancelledTasks = new CopyOnWriteArraySet<>();
|
|
|
private final AtomicInteger searchRequests = new AtomicInteger(0);
|
|
|
private final boolean timeout;
|
|
|
@@ -167,9 +203,17 @@ public class RestCancellableNodeClientTests extends ESTestCase {
|
|
|
) {
|
|
|
switch (action.name()) {
|
|
|
case TransportCancelTasksAction.NAME -> {
|
|
|
- CancelTasksRequest cancelTasksRequest = (CancelTasksRequest) request;
|
|
|
- assertTrue("tried to cancel the same task more than once", cancelledTasks.add(cancelTasksRequest.getTargetTaskId()));
|
|
|
- Task task = request.createTask(counter.getAndIncrement(), "cancel_task", action.name(), null, Collections.emptyMap());
|
|
|
+ assertTrue(
|
|
|
+ "tried to cancel the same task more than once",
|
|
|
+ cancelledTasks.add(asInstanceOf(CancelTasksRequest.class, request).getTargetTaskId())
|
|
|
+ );
|
|
|
+ Task task = request.createTask(
|
|
|
+ cancelTaskIdGenerator.getAsLong(),
|
|
|
+ "cancel_task",
|
|
|
+ action.name(),
|
|
|
+ null,
|
|
|
+ Collections.emptyMap()
|
|
|
+ );
|
|
|
if (randomBoolean()) {
|
|
|
listener.onResponse(null);
|
|
|
} else {
|
|
|
@@ -180,7 +224,13 @@ public class RestCancellableNodeClientTests extends ESTestCase {
|
|
|
}
|
|
|
case TransportSearchAction.NAME -> {
|
|
|
searchRequests.incrementAndGet();
|
|
|
- Task searchTask = request.createTask(counter.getAndIncrement(), "search", action.name(), null, Collections.emptyMap());
|
|
|
+ Task searchTask = request.createTask(
|
|
|
+ searchTaskIdGenerator.getAsLong(),
|
|
|
+ "search",
|
|
|
+ action.name(),
|
|
|
+ null,
|
|
|
+ Collections.emptyMap()
|
|
|
+ );
|
|
|
if (timeout == false) {
|
|
|
if (rarely()) {
|
|
|
// make sure that search is sometimes also called from the same thread before the task is returned
|
|
|
@@ -191,7 +241,7 @@ public class RestCancellableNodeClientTests extends ESTestCase {
|
|
|
}
|
|
|
return searchTask;
|
|
|
}
|
|
|
- default -> throw new UnsupportedOperationException();
|
|
|
+ default -> throw new AssertionError("unexpected action " + action.name());
|
|
|
}
|
|
|
|
|
|
}
|
|
|
@@ -222,10 +272,7 @@ public class RestCancellableNodeClientTests extends ESTestCase {
|
|
|
|
|
|
@Override
|
|
|
public void close() {
|
|
|
- if (open.compareAndSet(true, false) == false) {
|
|
|
- assert false : "HttpChannel is already closed";
|
|
|
- return; // nothing to do
|
|
|
- }
|
|
|
+ assertTrue("HttpChannel is already closed", open.compareAndSet(true, false));
|
|
|
ActionListener<Void> listener = closeListener.get();
|
|
|
if (listener != null) {
|
|
|
boolean failure = randomBoolean();
|
|
|
@@ -241,6 +288,7 @@ public class RestCancellableNodeClientTests extends ESTestCase {
|
|
|
}
|
|
|
|
|
|
private void awaitClose() throws InterruptedException {
|
|
|
+ assertNotNull("must set closeListener before calling awaitClose", closeListener.get());
|
|
|
close();
|
|
|
closeLatch.await();
|
|
|
}
|
|
|
@@ -257,7 +305,7 @@ public class RestCancellableNodeClientTests extends ESTestCase {
|
|
|
listener.onResponse(null);
|
|
|
} else {
|
|
|
if (closeListener.compareAndSet(null, listener) == false) {
|
|
|
- throw new IllegalStateException("close listener already set, only one is allowed!");
|
|
|
+ throw new AssertionError("close listener already set, only one is allowed!");
|
|
|
}
|
|
|
}
|
|
|
}
|