|
@@ -76,6 +76,8 @@ import java.util.stream.Collectors;
|
|
|
|
|
|
import static java.util.Collections.emptyList;
|
|
|
import static java.util.Collections.singleton;
|
|
|
+import static org.elasticsearch.action.admin.cluster.node.tasks.TestTaskPlugin.TEST_TASK_ACTION;
|
|
|
+import static org.elasticsearch.action.admin.cluster.node.tasks.TestTaskPlugin.UNBLOCK_TASK_ACTION;
|
|
|
import static org.elasticsearch.core.TimeValue.timeValueMillis;
|
|
|
import static org.elasticsearch.core.TimeValue.timeValueSeconds;
|
|
|
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
|
|
@@ -498,7 +500,7 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
// Start blocking test task
|
|
|
// Get real client (the plugin is not registered on transport nodes)
|
|
|
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
|
|
|
- ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request);
|
|
|
+ ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TEST_TASK_ACTION, request);
|
|
|
|
|
|
logger.info("--> started test tasks");
|
|
|
|
|
@@ -506,54 +508,51 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
assertBusy(
|
|
|
() -> assertEquals(
|
|
|
internalCluster().size(),
|
|
|
- clusterAdmin().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()
|
|
|
+ clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name() + "[n]").get().getTasks().size()
|
|
|
)
|
|
|
);
|
|
|
|
|
|
logger.info("--> cancelling the main test task");
|
|
|
- CancelTasksResponse cancelTasksResponse = clusterAdmin().prepareCancelTasks().setActions(TestTaskPlugin.TestTaskAction.NAME).get();
|
|
|
+ CancelTasksResponse cancelTasksResponse = clusterAdmin().prepareCancelTasks().setActions(TEST_TASK_ACTION.name()).get();
|
|
|
assertEquals(1, cancelTasksResponse.getTasks().size());
|
|
|
|
|
|
expectThrows(TaskCancelledException.class, future::actionGet);
|
|
|
|
|
|
logger.info("--> checking that test tasks are not running");
|
|
|
- assertEquals(0, clusterAdmin().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "*").get().getTasks().size());
|
|
|
+ assertEquals(0, clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name() + "*").get().getTasks().size());
|
|
|
}
|
|
|
|
|
|
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/95325")
|
|
|
public void testTasksUnblocking() throws Exception {
|
|
|
// Start blocking test task
|
|
|
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
|
|
|
- ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request);
|
|
|
+ ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TEST_TASK_ACTION, request);
|
|
|
// Wait for the task to start on all nodes
|
|
|
assertBusy(
|
|
|
() -> assertEquals(
|
|
|
internalCluster().size(),
|
|
|
- clusterAdmin().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()
|
|
|
+ clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name() + "[n]").get().getTasks().size()
|
|
|
)
|
|
|
);
|
|
|
|
|
|
- new TestTaskPlugin.UnblockTestTasksRequestBuilder(client(), TestTaskPlugin.UnblockTestTasksAction.INSTANCE).get();
|
|
|
+ new TestTaskPlugin.UnblockTestTasksRequestBuilder(client(), UNBLOCK_TASK_ACTION).get();
|
|
|
|
|
|
future.get();
|
|
|
assertBusy(
|
|
|
- () -> assertEquals(
|
|
|
- 0,
|
|
|
- clusterAdmin().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()
|
|
|
- )
|
|
|
+ () -> assertEquals(0, clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name() + "[n]").get().getTasks().size())
|
|
|
);
|
|
|
}
|
|
|
|
|
|
public void testListTasksWaitForCompletion() throws Exception {
|
|
|
waitForCompletionTestCase(
|
|
|
randomBoolean(),
|
|
|
- id -> clusterAdmin().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME).setWaitForCompletion(true).execute(),
|
|
|
+ id -> clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).setWaitForCompletion(true).execute(),
|
|
|
response -> {
|
|
|
assertThat(response.getNodeFailures(), empty());
|
|
|
assertThat(response.getTaskFailures(), empty());
|
|
|
assertThat(response.getTasks(), hasSize(1));
|
|
|
TaskInfo task = response.getTasks().get(0);
|
|
|
- assertEquals(TestTaskPlugin.TestTaskAction.NAME, task.action());
|
|
|
+ assertEquals(TEST_TASK_ACTION.name(), task.action());
|
|
|
}
|
|
|
);
|
|
|
}
|
|
@@ -565,7 +564,7 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
assertNull(response.getTask().getResponse());
|
|
|
// But the task's details should still be there because we grabbed a reference to the task before waiting for it to complete
|
|
|
assertNotNull(response.getTask().getTask());
|
|
|
- assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().action());
|
|
|
+ assertEquals(TEST_TASK_ACTION.name(), response.getTask().getTask().action());
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -576,7 +575,7 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
assertEquals(0, response.getTask().getResponseAsMap().get("failure_count"));
|
|
|
// The task's details should also be there
|
|
|
assertNotNull(response.getTask().getTask());
|
|
|
- assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().action());
|
|
|
+ assertEquals(TEST_TASK_ACTION.name(), response.getTask().getTask().action());
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -591,7 +590,7 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
// Start blocking test task
|
|
|
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
|
|
|
request.setShouldStoreResult(storeResult);
|
|
|
- ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request);
|
|
|
+ ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TEST_TASK_ACTION, request);
|
|
|
|
|
|
ActionFuture<T> waitResponseFuture;
|
|
|
TaskId taskId;
|
|
@@ -627,7 +626,7 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
waitForWaitingToStart.await();
|
|
|
} finally {
|
|
|
// Unblock the request so the wait for completion request can finish
|
|
|
- new TestTaskPlugin.UnblockTestTasksRequestBuilder(client(), TestTaskPlugin.UnblockTestTasksAction.INSTANCE).get();
|
|
|
+ new TestTaskPlugin.UnblockTestTasksRequestBuilder(client(), UNBLOCK_TASK_ACTION).get();
|
|
|
}
|
|
|
|
|
|
// Now that the task is unblocked the list response will come back
|
|
@@ -641,7 +640,7 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
public void testListTasksWaitForTimeout() throws Exception {
|
|
|
waitForTimeoutTestCase(id -> {
|
|
|
ListTasksResponse response = clusterAdmin().prepareListTasks()
|
|
|
- .setActions(TestTaskPlugin.TestTaskAction.NAME)
|
|
|
+ .setActions(TEST_TASK_ACTION.name())
|
|
|
.setWaitForCompletion(true)
|
|
|
.setTimeout(timeValueMillis(100))
|
|
|
.get();
|
|
@@ -667,7 +666,7 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
private void waitForTimeoutTestCase(Function<TaskId, ? extends Iterable<? extends Throwable>> wait) throws Exception {
|
|
|
// Start blocking test task
|
|
|
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
|
|
|
- ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request);
|
|
|
+ ActionFuture<TestTaskPlugin.NodesResponse> future = client().execute(TEST_TASK_ACTION, request);
|
|
|
try {
|
|
|
TaskId taskId = waitForTestTaskStartOnAllNodes();
|
|
|
|
|
@@ -685,7 +684,7 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
}
|
|
|
} finally {
|
|
|
// Now we can unblock those requests
|
|
|
- new TestTaskPlugin.UnblockTestTasksRequestBuilder(client(), TestTaskPlugin.UnblockTestTasksAction.INSTANCE).get();
|
|
|
+ new TestTaskPlugin.UnblockTestTasksRequestBuilder(client(), UNBLOCK_TASK_ACTION).get();
|
|
|
}
|
|
|
future.get();
|
|
|
}
|
|
@@ -695,13 +694,10 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
*/
|
|
|
private TaskId waitForTestTaskStartOnAllNodes() throws Exception {
|
|
|
assertBusy(() -> {
|
|
|
- List<TaskInfo> tasks = clusterAdmin().prepareListTasks()
|
|
|
- .setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]")
|
|
|
- .get()
|
|
|
- .getTasks();
|
|
|
+ List<TaskInfo> tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name() + "[n]").get().getTasks();
|
|
|
assertEquals(internalCluster().size(), tasks.size());
|
|
|
});
|
|
|
- List<TaskInfo> task = clusterAdmin().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME).get().getTasks();
|
|
|
+ List<TaskInfo> task = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks();
|
|
|
assertThat(task, hasSize(1));
|
|
|
return task.get(0).taskId();
|
|
|
}
|
|
@@ -709,7 +705,7 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
public void testTasksListWaitForNoTask() throws Exception {
|
|
|
// Spin up a request to wait for no matching tasks
|
|
|
ActionFuture<ListTasksResponse> waitResponseFuture = clusterAdmin().prepareListTasks()
|
|
|
- .setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]")
|
|
|
+ .setActions(TEST_TASK_ACTION.name() + "[n]")
|
|
|
.setWaitForCompletion(true)
|
|
|
.setTimeout(timeValueMillis(10))
|
|
|
.execute();
|
|
@@ -753,7 +749,7 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
}
|
|
|
|
|
|
public void testTaskStoringSuccessfulResult() throws Exception {
|
|
|
- registerTaskManagerListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
|
|
|
+ registerTaskManagerListeners(TEST_TASK_ACTION.name()); // we need this to get task id of the process
|
|
|
|
|
|
// Start non-blocking test task
|
|
|
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
|
|
@@ -762,9 +758,9 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
TaskId parentTaskId = new TaskId("parent_node", randomLong());
|
|
|
request.setParentTask(parentTaskId);
|
|
|
|
|
|
- client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request).get();
|
|
|
+ client().execute(TEST_TASK_ACTION, request).get();
|
|
|
|
|
|
- List<TaskInfo> events = findEvents(TestTaskPlugin.TestTaskAction.NAME, Tuple::v1);
|
|
|
+ List<TaskInfo> events = findEvents(TEST_TASK_ACTION.name(), Tuple::v1);
|
|
|
|
|
|
assertEquals(1, events.size());
|
|
|
TaskInfo taskInfo = events.get(0);
|
|
@@ -803,15 +799,15 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
assertNull(getResponse.getTask().getError());
|
|
|
|
|
|
// run it again to check that the tasks index has been successfully created and can be re-used
|
|
|
- client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request).get();
|
|
|
+ client().execute(TEST_TASK_ACTION, request).get();
|
|
|
|
|
|
- events = findEvents(TestTaskPlugin.TestTaskAction.NAME, Tuple::v1);
|
|
|
+ events = findEvents(TEST_TASK_ACTION.name(), Tuple::v1);
|
|
|
|
|
|
assertEquals(2, events.size());
|
|
|
}
|
|
|
|
|
|
public void testTaskStoringFailureResult() throws Exception {
|
|
|
- registerTaskManagerListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
|
|
|
+ registerTaskManagerListeners(TEST_TASK_ACTION.name()); // we need this to get task id of the process
|
|
|
|
|
|
TestTaskPlugin.NodesRequest request = new TestTaskPlugin.NodesRequest("test");
|
|
|
request.setShouldFail(true);
|
|
@@ -819,9 +815,9 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
request.setShouldBlock(false);
|
|
|
|
|
|
// Start non-blocking test task that should fail
|
|
|
- assertFutureThrows(client().execute(TestTaskPlugin.TestTaskAction.INSTANCE, request), IllegalStateException.class);
|
|
|
+ assertFutureThrows(client().execute(TEST_TASK_ACTION, request), IllegalStateException.class);
|
|
|
|
|
|
- List<TaskInfo> events = findEvents(TestTaskPlugin.TestTaskAction.NAME, Tuple::v1);
|
|
|
+ List<TaskInfo> events = findEvents(TEST_TASK_ACTION.name(), Tuple::v1);
|
|
|
assertEquals(1, events.size());
|
|
|
TaskInfo failedTaskInfo = events.get(0);
|
|
|
TaskId failedTaskId = failedTaskInfo.taskId();
|