Explorar el Código

ML: better handle task state race condition (#38040)

Benjamin Trent hace 6 años
padre
commit
be381b4525

+ 26 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java

@@ -5,8 +5,10 @@
  */
 package org.elasticsearch.xpack.ml.action;
 
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -33,11 +35,13 @@ import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
 import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor;
 
+import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.ExceptionsHelper.rethrowAndSuppress;
 import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
@@ -119,9 +123,20 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
                     .cluster()
                     .prepareListTasks()
                     .setActions(DATAFEED_TASK_NAME + "[c]", JOB_TASK_NAME + "[c]")
+                    // There is a chance that we failed un-allocating a task due to allocation_id being changed
+                    // This call will timeout in that case and return an error
                     .setWaitForCompletion(true)
                     .setTimeout(request.timeout()).execute(ActionListener.wrap(
-                        r -> wrappedListener.onResponse(new AcknowledgedResponse(true)),
+                        r -> {
+                            try {
+                                // Handle potential node timeouts,
+                                // these should be considered failures as tasks as still potentially executing
+                                rethrowAndSuppress(r.getNodeFailures());
+                                wrappedListener.onResponse(new AcknowledgedResponse(true));
+                            } catch (ElasticsearchException ex) {
+                                wrappedListener.onFailure(ex);
+                            }
+                        },
                         wrappedListener::onFailure));
             },
             wrappedListener::onFailure
@@ -243,10 +258,19 @@ public class TransportSetUpgradeModeAction extends TransportMasterNodeAction<Set
             .stream()
             .filter(persistentTask -> (persistentTask.getTaskName().equals(MlTasks.JOB_TASK_NAME) ||
                 persistentTask.getTaskName().equals(MlTasks.DATAFEED_TASK_NAME)))
+            // We want to always have the same ordering of which tasks we un-allocate first.
+            // However, the order in which the distributed tasks handle the un-allocation event is not guaranteed.
+            .sorted(Comparator.comparing(PersistentTask::getTaskName))
             .collect(Collectors.toList());
 
         TypedChainTaskExecutor<PersistentTask<?>> chainTaskExecutor =
-            new TypedChainTaskExecutor<>(client.threadPool().executor(executor()), r -> true, ex -> true);
+            new TypedChainTaskExecutor<>(client.threadPool().executor(executor()),
+                r -> true,
+                // Another process could modify tasks and thus we cannot find them via the allocation_id and name
+                // If the task was removed from the node, all is well
+                // We handle the case of allocation_id changing later in this transport class by timing out waiting for task completion
+                // Consequently, if the exception is ResourceNotFoundException, continue execution; circuit break otherwise.
+                ex -> ex instanceof ResourceNotFoundException == false);
 
         for (PersistentTask<?> task : datafeedAndJobTasks) {
             chainTaskExecutor.add(