|
@@ -60,7 +60,6 @@ import org.elasticsearch.transport.TransportException;
|
|
|
import org.elasticsearch.transport.TransportResponseHandler;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
import org.junit.After;
|
|
|
-import org.junit.Before;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
@@ -83,6 +82,7 @@ import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.hasSize;
|
|
|
import static org.hamcrest.Matchers.instanceOf;
|
|
|
|
|
|
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
|
|
|
public class CancellableTasksIT extends ESIntegTestCase {
|
|
|
|
|
|
static int idGenerator = 0;
|
|
@@ -91,15 +91,6 @@ public class CancellableTasksIT extends ESIntegTestCase {
|
|
|
static final Map<TestRequest, CountDownLatch> beforeExecuteLatches = ConcurrentCollections.newConcurrentMap();
|
|
|
static final Map<TestRequest, CountDownLatch> completedLatches = ConcurrentCollections.newConcurrentMap();
|
|
|
|
|
|
- @Before
|
|
|
- public void resetTestStates() {
|
|
|
- idGenerator = 0;
|
|
|
- beforeSendLatches.clear();
|
|
|
- arrivedLatches.clear();
|
|
|
- beforeExecuteLatches.clear();
|
|
|
- completedLatches.clear();
|
|
|
- }
|
|
|
-
|
|
|
@After
|
|
|
public void ensureAllBansRemoved() throws Exception {
|
|
|
assertBusy(() -> {
|
|
@@ -150,7 +141,7 @@ public class CancellableTasksIT extends ESIntegTestCase {
|
|
|
beforeSendLatches.get(req).countDown();
|
|
|
}
|
|
|
for (TestRequest req : sentRequests) {
|
|
|
- arrivedLatches.get(req).await();
|
|
|
+ assertTrue(arrivedLatches.get(req).await(60, TimeUnit.SECONDS));
|
|
|
}
|
|
|
Set<TestRequest> completedRequests = new HashSet<>();
|
|
|
for (TestRequest req : randomSubsetOf(sentRequests)) {
|
|
@@ -163,7 +154,7 @@ public class CancellableTasksIT extends ESIntegTestCase {
|
|
|
beforeExecuteLatches.get(req).countDown();
|
|
|
}
|
|
|
for (TestRequest req : completedRequests) {
|
|
|
- completedLatches.get(req).await();
|
|
|
+ assertTrue(completedLatches.get(req).await(60, TimeUnit.SECONDS));
|
|
|
}
|
|
|
return Sets.difference(sentRequests, completedRequests);
|
|
|
}
|
|
@@ -500,7 +491,7 @@ public class CancellableTasksIT extends ESIntegTestCase {
|
|
|
GroupedActionListener<TestResponse> groupedListener =
|
|
|
new GroupedActionListener<>(listener.map(r -> new TestResponse()), subRequests.size() + 1);
|
|
|
transportService.getThreadPool().generic().execute(ActionRunnable.supply(groupedListener, () -> {
|
|
|
- beforeExecuteLatches.get(request).await();
|
|
|
+ assertTrue(beforeExecuteLatches.get(request).await(60, TimeUnit.SECONDS));
|
|
|
if (((CancellableTask) task).isCancelled()) {
|
|
|
throw new TaskCancelledException("Task was cancelled while executing");
|
|
|
}
|
|
@@ -524,7 +515,7 @@ public class CancellableTasksIT extends ESIntegTestCase {
|
|
|
|
|
|
@Override
|
|
|
protected void doRun() throws Exception {
|
|
|
- beforeSendLatches.get(subRequest).await();
|
|
|
+ assertTrue(beforeSendLatches.get(subRequest).await(60, TimeUnit.SECONDS));
|
|
|
if (client.getLocalNodeId().equals(subRequest.node.getId()) && randomBoolean()) {
|
|
|
try {
|
|
|
client.executeLocally(TransportTestAction.ACTION, subRequest, latchedListener);
|