Sfoglia il codice sorgente

Fix TasksIT#testGetTaskWaitForCompletionWithoutStoringResult (#108094)

It seems that the failure (the missed index) has always existed in the test scenario and it's supposed to be handled by TransportGetTaskAction.java. We catch IndexNotFoundException here and convert it to ResourceNotFoundException. Then we catch ResourceNotFoundException here and return a snapshot of a task as a response.

In the stack trace, getFinishedTaskFromIndex was called from getRunningTaskFromNode, not from waitedForCompletion due to a race between creating a get request and unblocking request which are sent asynchronously. I've changed the waitForCompletionTestCase test method to unblock the task only after the request started waiting for the task completion by registering a removal listener. By doing so, we make sure we test the "wait for completion" branch when task is running.

The part about the missed index seems to irrelevant, since waitedForCompletion is able to suppress the error and return a snapshot of running task which is not possible if getFinishedTaskFromIndex gets called directly from getRunningTaskFromNode.

Resolves #107823
Artem Prigoda 1 anno fa
parent
commit
e622101ac9

+ 22 - 13
server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java

@@ -41,6 +41,7 @@ import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.tasks.RemovedTaskListener;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.tasks.TaskId;
@@ -599,24 +600,32 @@ public class TasksIT extends ESIntegTestCase {
             TEST_TASK_ACTION.name() + "[n]",
             () -> client().execute(TEST_TASK_ACTION, request)
         );
-        ActionFuture<T> waitResponseFuture;
+        var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks();
+        assertThat(tasks, hasSize(1));
+        TaskId taskId = tasks.get(0).taskId();
+        clusterAdmin().prepareGetTask(taskId).get();
+
+        var taskManager = (MockTaskManager) internalCluster().getInstance(
+            TransportService.class,
+            clusterService().state().getNodes().resolveNode(taskId.getNodeId()).getName()
+        ).getTaskManager();
+        var listener = new MockTaskManagerListener() {
+            @Override
+            public void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) {
+                // Unblock the request only after it started waiting for task completion
+                client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest());
+            }
+        };
+        taskManager.addListener(listener);
         try {
-            var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks();
-            assertThat(tasks, hasSize(1));
-            var taskId = tasks.get(0).taskId();
-            clusterAdmin().prepareGetTask(taskId).get();
-
             // Spin up a request to wait for the test task to finish
-            waitResponseFuture = wait.apply(taskId);
+            // The task will be unblocked as soon as the request started waiting for task completion
+            T waitResponse = wait.apply(taskId).get();
+            validator.accept(waitResponse);
         } finally {
-            // Unblock the request so the wait for completion request can finish
-            client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest()).get();
+            taskManager.removeListener(listener);
         }
 
-        // Now that the task is unblocked the list response will come back
-        T waitResponse = waitResponseFuture.get();
-        validator.accept(waitResponse);
-
         TestTaskPlugin.NodesResponse response = future.get();
         assertEquals(emptyList(), response.failures());
     }

+ 11 - 3
server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java

@@ -141,9 +141,17 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
         } else {
             if (request.getWaitForCompletion()) {
                 final ListenableActionFuture<Void> future = new ListenableActionFuture<>();
-                RemovedTaskListener removedTaskListener = task -> {
-                    if (task.equals(runningTask)) {
-                        future.onResponse(null);
+                RemovedTaskListener removedTaskListener = new RemovedTaskListener() {
+                    @Override
+                    public void onRemoved(Task task) {
+                        if (task.equals(runningTask)) {
+                            future.onResponse(null);
+                        }
+                    }
+
+                    @Override
+                    public String toString() {
+                        return "Waiting for task completion " + runningTask;
                     }
                 };
                 taskManager.registerRemovedTaskListener(removedTaskListener);

+ 13 - 0
test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java

@@ -13,6 +13,7 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.tasks.RemovedTaskListener;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskAwareRequest;
 import org.elasticsearch.tasks.TaskManager;
@@ -84,4 +85,16 @@ public class MockTaskManager extends TaskManager {
     public void removeListener(MockTaskManagerListener listener) {
         listeners.remove(listener);
     }
+
+    @Override
+    public void registerRemovedTaskListener(RemovedTaskListener removedTaskListener) {
+        super.registerRemovedTaskListener(removedTaskListener);
+        for (MockTaskManagerListener listener : listeners) {
+            try {
+                listener.onRemovedTaskListenerRegistered(removedTaskListener);
+            } catch (Exception e) {
+                logger.warn("failed to notify task manager listener about a registered removed task listener", e);
+            }
+        }
+    }
 }

+ 3 - 0
test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.test.tasks;
 
+import org.elasticsearch.tasks.RemovedTaskListener;
 import org.elasticsearch.tasks.Task;
 
 /**
@@ -17,4 +18,6 @@ public interface MockTaskManagerListener {
     default void onTaskRegistered(Task task) {};
 
     default void onTaskUnregistered(Task task) {};
+
+    default void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) {};
 }