|
@@ -46,11 +46,11 @@ import org.elasticsearch.common.settings.Settings;
|
|
|
import org.elasticsearch.index.query.QueryBuilders;
|
|
|
import org.elasticsearch.plugins.Plugin;
|
|
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
-import org.elasticsearch.tasks.PersistedTaskInfo;
|
|
|
+import org.elasticsearch.tasks.TaskResult;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.tasks.TaskId;
|
|
|
import org.elasticsearch.tasks.TaskInfo;
|
|
|
-import org.elasticsearch.tasks.TaskPersistenceService;
|
|
|
+import org.elasticsearch.tasks.TaskResultsService;
|
|
|
import org.elasticsearch.test.ESIntegTestCase;
|
|
|
import org.elasticsearch.test.tasks.MockTaskManager;
|
|
|
import org.elasticsearch.test.tasks.MockTaskManagerListener;
|
|
@@ -452,39 +452,39 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- public void testGetTaskWaitForCompletionNoPersist() throws Exception {
|
|
|
+ public void testGetTaskWaitForCompletionWithoutStoringResult() throws Exception {
|
|
|
waitForCompletionTestCase(false, id -> {
|
|
|
return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute();
|
|
|
}, response -> {
|
|
|
assertNotNull(response.getTask().getTask());
|
|
|
assertTrue(response.getTask().isCompleted());
|
|
|
- // We didn't persist the result so it won't come back when we wait
|
|
|
+ // We didn't store the result so it won't come back when we wait
|
|
|
assertNull(response.getTask().getResponse());
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- public void testGetTaskWaitForCompletionWithPersist() throws Exception {
|
|
|
+ public void testGetTaskWaitForCompletionWithStoringResult() throws Exception {
|
|
|
waitForCompletionTestCase(true, id -> {
|
|
|
return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute();
|
|
|
}, response -> {
|
|
|
assertNotNull(response.getTask().getTask());
|
|
|
assertTrue(response.getTask().isCompleted());
|
|
|
- // We persisted the task so we should get its results
|
|
|
+ // We stored the task so we should get its results
|
|
|
assertEquals(0, response.getTask().getResponseAsMap().get("failure_count"));
|
|
|
});
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Test wait for completion.
|
|
|
- * @param persist should the task persist its results
|
|
|
+ * @param storeResult should the task store its results
|
|
|
* @param wait start waiting for a task. Accepts that id of the task to wait for and returns a future waiting for it.
|
|
|
* @param validator validate the response and return the task ids that were found
|
|
|
*/
|
|
|
- private <T> void waitForCompletionTestCase(boolean persist, Function<TaskId, ListenableActionFuture<T>> wait, Consumer<T> validator)
|
|
|
+ private <T> void waitForCompletionTestCase(boolean storeResult, Function<TaskId, ListenableActionFuture<T>> wait, Consumer<T> validator)
|
|
|
throws Exception {
|
|
|
// Start blocking test task
|
|
|
ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
|
|
|
- .setShouldPersistResult(persist).execute();
|
|
|
+ .setShouldStoreResult(storeResult).execute();
|
|
|
|
|
|
ListenableActionFuture<T> waitResponseFuture;
|
|
|
TaskId taskId;
|
|
@@ -622,17 +622,17 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
assertThat(response.getTasks().size(), greaterThanOrEqualTo(1));
|
|
|
}
|
|
|
|
|
|
- public void testTaskResultPersistence() throws Exception {
|
|
|
+ public void testTaskStoringSuccesfulResult() throws Exception {
|
|
|
// Randomly create an empty index to make sure the type is created automatically
|
|
|
if (randomBoolean()) {
|
|
|
logger.info("creating an empty results index with custom settings");
|
|
|
- assertAcked(client().admin().indices().prepareCreate(TaskPersistenceService.TASK_INDEX));
|
|
|
+ assertAcked(client().admin().indices().prepareCreate(TaskResultsService.TASK_INDEX));
|
|
|
}
|
|
|
|
|
|
registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
|
|
|
|
|
|
// Start non-blocking test task
|
|
|
- TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).setShouldPersistResult(true).setShouldBlock(false).get();
|
|
|
+ TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).setShouldStoreResult(true).setShouldBlock(false).get();
|
|
|
|
|
|
List<TaskInfo> events = findEvents(TestTaskPlugin.TestTaskAction.NAME, Tuple::v1);
|
|
|
|
|
@@ -641,7 +641,7 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
TaskId taskId = taskInfo.getTaskId();
|
|
|
|
|
|
GetResponse resultDoc = client()
|
|
|
- .prepareGet(TaskPersistenceService.TASK_INDEX, TaskPersistenceService.TASK_TYPE, taskId.toString()).get();
|
|
|
+ .prepareGet(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE, taskId.toString()).get();
|
|
|
assertTrue(resultDoc.isExists());
|
|
|
|
|
|
Map<String, Object> source = resultDoc.getSource();
|
|
@@ -657,16 +657,16 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
|
|
|
assertNull(source.get("failure"));
|
|
|
|
|
|
- assertNoFailures(client().admin().indices().prepareRefresh(TaskPersistenceService.TASK_INDEX).get());
|
|
|
+ assertNoFailures(client().admin().indices().prepareRefresh(TaskResultsService.TASK_INDEX).get());
|
|
|
|
|
|
- SearchResponse searchResponse = client().prepareSearch(TaskPersistenceService.TASK_INDEX)
|
|
|
- .setTypes(TaskPersistenceService.TASK_TYPE)
|
|
|
+ SearchResponse searchResponse = client().prepareSearch(TaskResultsService.TASK_INDEX)
|
|
|
+ .setTypes(TaskResultsService.TASK_TYPE)
|
|
|
.setSource(SearchSourceBuilder.searchSource().query(QueryBuilders.termQuery("task.action", taskInfo.getAction())))
|
|
|
.get();
|
|
|
|
|
|
assertEquals(1L, searchResponse.getHits().totalHits());
|
|
|
|
|
|
- searchResponse = client().prepareSearch(TaskPersistenceService.TASK_INDEX).setTypes(TaskPersistenceService.TASK_TYPE)
|
|
|
+ searchResponse = client().prepareSearch(TaskResultsService.TASK_INDEX).setTypes(TaskResultsService.TASK_TYPE)
|
|
|
.setSource(SearchSourceBuilder.searchSource().query(QueryBuilders.termQuery("task.node", taskInfo.getTaskId().getNodeId())))
|
|
|
.get();
|
|
|
|
|
@@ -677,14 +677,14 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
assertNull(getResponse.getTask().getError());
|
|
|
}
|
|
|
|
|
|
- public void testTaskFailurePersistence() throws Exception {
|
|
|
+ public void testTaskStoringFailureResult() throws Exception {
|
|
|
registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process
|
|
|
|
|
|
// Start non-blocking test task that should fail
|
|
|
assertThrows(
|
|
|
TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
|
|
|
.setShouldFail(true)
|
|
|
- .setShouldPersistResult(true)
|
|
|
+ .setShouldStoreResult(true)
|
|
|
.setShouldBlock(false),
|
|
|
IllegalStateException.class
|
|
|
);
|
|
@@ -695,7 +695,7 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
TaskId failedTaskId = failedTaskInfo.getTaskId();
|
|
|
|
|
|
GetResponse failedResultDoc = client()
|
|
|
- .prepareGet(TaskPersistenceService.TASK_INDEX, TaskPersistenceService.TASK_TYPE, failedTaskId.toString())
|
|
|
+ .prepareGet(TaskResultsService.TASK_INDEX, TaskResultsService.TASK_TYPE, failedTaskId.toString())
|
|
|
.get();
|
|
|
assertTrue(failedResultDoc.isExists());
|
|
|
|
|
@@ -729,9 +729,9 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
public void testNodeNotFoundButTaskFound() throws Exception {
|
|
|
// Save a fake task that looks like it is from a node that isn't part of the cluster
|
|
|
CyclicBarrier b = new CyclicBarrier(2);
|
|
|
- TaskPersistenceService resultsService = internalCluster().getInstance(TaskPersistenceService.class);
|
|
|
- resultsService.persist(
|
|
|
- new PersistedTaskInfo(new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID),
|
|
|
+ TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class);
|
|
|
+ resultsService.storeResult(
|
|
|
+ new TaskResult(new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID),
|
|
|
new RuntimeException("test")),
|
|
|
new ActionListener<Void>() {
|
|
|
@Override
|