|
@@ -338,55 +338,60 @@ public class TasksIT extends ESIntegTestCase {
|
|
|
*/
|
|
|
ReentrantLock taskFinishLock = new ReentrantLock();
|
|
|
taskFinishLock.lock();
|
|
|
- CountDownLatch taskRegistered = new CountDownLatch(1);
|
|
|
- for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
|
|
|
- ((MockTaskManager) transportService.getTaskManager()).addListener(new MockTaskManagerListener() {
|
|
|
- @Override
|
|
|
- public void onTaskRegistered(Task task) {
|
|
|
- if (task.getAction().startsWith(IndexAction.NAME)) {
|
|
|
- taskRegistered.countDown();
|
|
|
+ ListenableActionFuture<?> indexFuture = null;
|
|
|
+ try {
|
|
|
+ CountDownLatch taskRegistered = new CountDownLatch(1);
|
|
|
+ for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
|
|
|
+ ((MockTaskManager) transportService.getTaskManager()).addListener(new MockTaskManagerListener() {
|
|
|
+ @Override
|
|
|
+ public void onTaskRegistered(Task task) {
|
|
|
+ if (task.getAction().startsWith(IndexAction.NAME)) {
|
|
|
+ taskRegistered.countDown();
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- @Override
|
|
|
- public void onTaskUnregistered(Task task) {
|
|
|
- /*
|
|
|
- * We can't block all tasks here or the task listing task
|
|
|
- * would never return.
|
|
|
- */
|
|
|
- if (false == task.getAction().startsWith(IndexAction.NAME)) {
|
|
|
- return;
|
|
|
+ @Override
|
|
|
+ public void onTaskUnregistered(Task task) {
|
|
|
+ /*
|
|
|
+ * We can't block all tasks here or the task listing task
|
|
|
+ * would never return.
|
|
|
+ */
|
|
|
+ if (false == task.getAction().startsWith(IndexAction.NAME)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ logger.debug("Blocking {} from being unregistered", task);
|
|
|
+ taskFinishLock.lock();
|
|
|
+ taskFinishLock.unlock();
|
|
|
}
|
|
|
- logger.debug("Blocking {} from being unregistered", task);
|
|
|
- taskFinishLock.lock();
|
|
|
- taskFinishLock.unlock();
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
- ListenableActionFuture<?> indexFuture = client().prepareIndex("test", "test").setSource("test", "test").execute();
|
|
|
- taskRegistered.await(10, TimeUnit.SECONDS); // waiting for at least one task to be registered
|
|
|
-
|
|
|
- ListTasksResponse listResponse = client().admin().cluster().prepareListTasks().setActions("indices:data/write/index*")
|
|
|
- .setDetailed(true).get();
|
|
|
- assertThat(listResponse.getTasks(), not(empty()));
|
|
|
- for (TaskInfo task : listResponse.getTasks()) {
|
|
|
- assertNotNull(task.getStatus());
|
|
|
- GetTaskResponse getResponse = client().admin().cluster().prepareGetTask(task.getTaskId()).get();
|
|
|
- assertFalse("task should still be running", getResponse.getTask().isCompleted());
|
|
|
- TaskInfo fetchedWithGet = getResponse.getTask().getTask();
|
|
|
- assertEquals(task.getId(), fetchedWithGet.getId());
|
|
|
- assertEquals(task.getType(), fetchedWithGet.getType());
|
|
|
- assertEquals(task.getAction(), fetchedWithGet.getAction());
|
|
|
- assertEquals(task.getDescription(), fetchedWithGet.getDescription());
|
|
|
- assertEquals(task.getStatus(), fetchedWithGet.getStatus());
|
|
|
- assertEquals(task.getStartTime(), fetchedWithGet.getStartTime());
|
|
|
- assertThat(fetchedWithGet.getRunningTimeNanos(), greaterThanOrEqualTo(task.getRunningTimeNanos()));
|
|
|
- assertEquals(task.isCancellable(), fetchedWithGet.isCancellable());
|
|
|
- assertEquals(task.getParentTaskId(), fetchedWithGet.getParentTaskId());
|
|
|
+ });
|
|
|
+ }
|
|
|
+ indexFuture = client().prepareIndex("test", "test").setSource("test", "test").execute();
|
|
|
+ taskRegistered.await(10, TimeUnit.SECONDS); // waiting for at least one task to be registered
|
|
|
+
|
|
|
+ ListTasksResponse listResponse = client().admin().cluster().prepareListTasks().setActions("indices:data/write/index*")
|
|
|
+ .setDetailed(true).get();
|
|
|
+ assertThat(listResponse.getTasks(), not(empty()));
|
|
|
+ for (TaskInfo task : listResponse.getTasks()) {
|
|
|
+ assertNotNull(task.getStatus());
|
|
|
+ GetTaskResponse getResponse = client().admin().cluster().prepareGetTask(task.getTaskId()).get();
|
|
|
+ assertFalse("task should still be running", getResponse.getTask().isCompleted());
|
|
|
+ TaskInfo fetchedWithGet = getResponse.getTask().getTask();
|
|
|
+ assertEquals(task.getId(), fetchedWithGet.getId());
|
|
|
+ assertEquals(task.getType(), fetchedWithGet.getType());
|
|
|
+ assertEquals(task.getAction(), fetchedWithGet.getAction());
|
|
|
+ assertEquals(task.getDescription(), fetchedWithGet.getDescription());
|
|
|
+ assertEquals(task.getStatus(), fetchedWithGet.getStatus());
|
|
|
+ assertEquals(task.getStartTime(), fetchedWithGet.getStartTime());
|
|
|
+ assertThat(fetchedWithGet.getRunningTimeNanos(), greaterThanOrEqualTo(task.getRunningTimeNanos()));
|
|
|
+ assertEquals(task.isCancellable(), fetchedWithGet.isCancellable());
|
|
|
+ assertEquals(task.getParentTaskId(), fetchedWithGet.getParentTaskId());
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ taskFinishLock.unlock();
|
|
|
+ if (indexFuture != null) {
|
|
|
+ indexFuture.get();
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- taskFinishLock.unlock();
|
|
|
- indexFuture.get();
|
|
|
}
|
|
|
|
|
|
public void testTasksCancellation() throws Exception {
|