Răsfoiți Sursa

Only start re-assigning persistent tasks if they are not already being reassigned (#76258)

* Only start re-assigning persistent tasks if they are not already being reassigned

* adding tests addressing PR comments

* addressing Pr COmments

* addressing PR comments + style"

* improving test rigor
Benjamin Trent 4 ani în urmă
părinte
comite
e8a3b05fcf

+ 8 - 1
server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

@@ -37,6 +37,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import java.io.Closeable;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 /**
@@ -55,6 +56,7 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos
     private final EnableAssignmentDecider enableDecider;
     private final ThreadPool threadPool;
     private final PeriodicRechecker periodicRechecker;
+    private final AtomicBoolean reassigningTasks = new AtomicBoolean(false);
 
     public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService,
                                          ThreadPool threadPool) {
@@ -353,7 +355,10 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos
     /**
      * Submit a cluster state update to reassign any persistent tasks that need reassigning
      */
-    private void reassignPersistentTasks() {
+    void reassignPersistentTasks() {
+        if (this.reassigningTasks.compareAndSet(false, true) == false) {
+            return;
+        }
         clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() {
             @Override
             public ClusterState execute(ClusterState currentState) {
@@ -362,6 +367,7 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos
 
             @Override
             public void onFailure(String source, Exception e) {
+                reassigningTasks.set(false);
                 logger.warn("failed to reassign persistent tasks", e);
                 if (e instanceof NotMasterException == false) {
                     // There must be a task that's worth rechecking because there was one
@@ -373,6 +379,7 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos
 
             @Override
             public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                reassigningTasks.set(false);
                 if (isAnyTaskUnassigned(newState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE))) {
                     periodicRechecker.rescheduleIfNecessary();
                 }

+ 58 - 3
server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java

@@ -50,6 +50,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -69,8 +70,13 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.core.Is.is;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 public class PersistentTasksClusterServiceTests extends ESTestCase {
@@ -429,7 +435,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
         nonClusterStateCondition = false;
 
         boolean shouldSimulateFailure = randomBoolean();
-        ClusterService recheckTestClusterService = createRecheckTestClusterService(clusterState, shouldSimulateFailure);
+        ClusterService recheckTestClusterService = createStateUpdateClusterState(clusterState, shouldSimulateFailure);
         PersistentTasksClusterService service = createService(recheckTestClusterService,
             (params, candidateNodes, currentState) -> assignBasedOnNonClusterStateCondition(candidateNodes));
 
@@ -477,7 +483,7 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
         ClusterState clusterState = builder.metadata(metadata).nodes(nodes).build();
         nonClusterStateCondition = false;
 
-        ClusterService recheckTestClusterService = createRecheckTestClusterService(clusterState, false);
+        ClusterService recheckTestClusterService = createStateUpdateClusterState(clusterState, false);
         PersistentTasksClusterService service = createService(recheckTestClusterService,
             (params, candidateNodes, currentState) -> assignBasedOnNonClusterStateCondition(candidateNodes));
 
@@ -639,7 +645,53 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
             Sets.haveEmptyIntersection(shutdownNodes, nodesWithTasks));
     }
 
-    private ClusterService createRecheckTestClusterService(ClusterState initialState, boolean shouldSimulateFailure) {
+    public void testReassignOnlyOnce() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+        ClusterState initialState = initialState();
+        ClusterState.Builder builder = ClusterState.builder(initialState);
+        PersistentTasksCustomMetadata.Builder tasks = PersistentTasksCustomMetadata.builder(
+            initialState.metadata().custom(PersistentTasksCustomMetadata.TYPE)
+        );
+        DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(initialState.nodes());
+        addTestNodes(nodes, randomIntBetween(1, 3));
+        addTask(tasks, "assign_based_on_non_cluster_state_condition", null);
+        Metadata.Builder metadata = Metadata.builder(initialState.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build());
+        ClusterState clusterState = builder.metadata(metadata).nodes(nodes).build();
+
+        boolean shouldSimulateFailure = randomBoolean();
+        ClusterService recheckTestClusterService = createStateUpdateClusterState(clusterState, shouldSimulateFailure, latch);
+        PersistentTasksClusterService service = createService(
+            recheckTestClusterService,
+            (params, candidateNodes, currentState) -> assignBasedOnNonClusterStateCondition(candidateNodes)
+        );
+        verify(recheckTestClusterService, atLeastOnce()).getClusterSettings();
+        verify(recheckTestClusterService, atLeastOnce()).addListener(any());
+        Thread t1 = new Thread(service::reassignPersistentTasks);
+        Thread t2 = new Thread(service::reassignPersistentTasks);
+        try {
+            t1.start();
+            t2.start();
+            // Make sure we have at least one reassign check before we count down the latch
+            assertBusy(
+                () -> verify(recheckTestClusterService, atLeastOnce()).submitStateUpdateTask(eq("reassign persistent tasks"), any())
+            );
+        } finally {
+            latch.countDown();
+            t1.join();
+            t2.join();
+            service.reassignPersistentTasks();
+        }
+        // verify that our reassignment is possible again, here we have once from the previous reassignment in the `try` block
+        // And one from the line above once the other threads have joined
+        assertBusy(() -> verify(recheckTestClusterService, times(2)).submitStateUpdateTask(eq("reassign persistent tasks"), any()));
+        verifyNoMoreInteractions(recheckTestClusterService);
+    }
+
+    private ClusterService createStateUpdateClusterState(ClusterState initialState, boolean shouldSimulateFailure) {
+        return createStateUpdateClusterState(initialState, shouldSimulateFailure, null);
+    }
+
+    private ClusterService createStateUpdateClusterState(ClusterState initialState, boolean shouldSimulateFailure, CountDownLatch await) {
         AtomicBoolean testFailureNextTime = new AtomicBoolean(shouldSimulateFailure);
         AtomicReference<ClusterState> state = new AtomicReference<>(initialState);
         ClusterService recheckTestClusterService = mock(ClusterService.class);
@@ -651,6 +703,9 @@ public class PersistentTasksClusterServiceTests extends ESTestCase {
             ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocationOnMock.getArguments()[1];
             ClusterState before = state.get();
             ClusterState after = task.execute(before);
+            if (await != null) {
+                await.await();
+            }
             if (testFailureNextTime.compareAndSet(true, false)) {
                 task.onFailure("testing failure", new RuntimeException("foo"));
             } else {

+ 11 - 11
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java

@@ -23,11 +23,11 @@ import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.TreeMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -156,7 +156,7 @@ public class JobNodeSelector {
             logger.debug(reason);
             return new PersistentTasksCustomMetadata.Assignment(null, reason);
         }
-        List<String> reasons = new LinkedList<>();
+        Map<String, String> reasons = new TreeMap<>();
         long maxAvailableMemory = Long.MIN_VALUE;
         DiscoveryNode minLoadedNodeByMemory = null;
         long requiredMemoryForJob = estimatedMemoryFootprint;
@@ -166,7 +166,7 @@ public class JobNodeSelector {
             String reason = nodeFilter.apply(node);
             if (reason != null) {
                 logger.trace(reason);
-                reasons.add(reason);
+                reasons.put(node.getName(), reason);
                 continue;
             }
             NodeLoad currentLoad = nodeLoadDetector.detectNodeLoad(
@@ -180,7 +180,7 @@ public class JobNodeSelector {
             if (currentLoad.getError() != null) {
                 reason = createReason(jobId, nodeNameAndMlAttributes(node), currentLoad.getError());
                 logger.trace(reason);
-                reasons.add(reason);
+                reasons.put(node.getName(), reason);
                 continue;
             }
             // Assuming the node is eligible at all, check loading
@@ -194,7 +194,7 @@ public class JobNodeSelector {
                     currentLoad.getNumAllocatingJobs(),
                     maxConcurrentJobAllocations);
                 logger.trace(reason);
-                reasons.add(reason);
+                reasons.put(node.getName(), reason);
                 continue;
             }
 
@@ -206,7 +206,7 @@ public class JobNodeSelector {
                     MAX_OPEN_JOBS_PER_NODE.getKey(),
                     maxNumberOfOpenJobs);
                 logger.trace(reason);
-                reasons.add(reason);
+                reasons.put(node.getName(), reason);
                 continue;
             }
 
@@ -215,7 +215,7 @@ public class JobNodeSelector {
                     nodeNameAndMlAttributes(node),
                     "This node is not providing accurate information to determine is load by memory.");
                 logger.trace(reason);
-                reasons.add(reason);
+                reasons.put(node.getName(),reason);
                 continue;
             }
 
@@ -224,7 +224,7 @@ public class JobNodeSelector {
                     nodeNameAndMlAttributes(node),
                     "This node is indicating that it has no native memory for machine learning.");
                 logger.trace(reason);
-                reasons.add(reason);
+                reasons.put(node.getName(),reason);
                 continue;
             }
 
@@ -247,7 +247,7 @@ public class JobNodeSelector {
                     requiredMemoryForJob,
                     ByteSizeValue.ofBytes(requiredMemoryForJob).toString());
                 logger.trace(reason);
-                reasons.add(reason);
+                reasons.put(node.getName(),reason);
                 continue;
             }
 
@@ -260,7 +260,7 @@ public class JobNodeSelector {
         return createAssignment(
             estimatedMemoryFootprint,
             minLoadedNodeByMemory,
-            reasons,
+            reasons.values(),
             maxNodeSize > 0L ?
                 NativeMemoryCalculator.allowedBytesForMl(maxNodeSize, maxMachineMemoryPercent, useAutoMemoryPercentage) :
                 Long.MAX_VALUE);
@@ -268,7 +268,7 @@ public class JobNodeSelector {
 
     PersistentTasksCustomMetadata.Assignment createAssignment(long estimatedMemoryUsage,
                                                               DiscoveryNode minLoadedNode,
-                                                              List<String> reasons,
+                                                              Collection<String> reasons,
                                                               long biggestPossibleJob) {
         if (minLoadedNode == null) {
             String explanation = String.join("|", reasons);

+ 187 - 21
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.collect.MapBuilder;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.common.transport.TransportAddress;
@@ -34,9 +35,12 @@ import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
 import org.junit.Before;
 
 import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -107,7 +111,11 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());
 
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            job.getId(),
+            MlTasks.JOB_TASK_NAME,
             memoryTracker, 0, node -> nodeFilter(node, job));
         PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode,
             2,
@@ -133,7 +141,10 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         String dataFrameAnalyticsId = "data_frame_analytics_id1000";
 
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), dataFrameAnalyticsId,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            dataFrameAnalyticsId,
             MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0,
             node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId)));
         PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode,
@@ -166,7 +177,11 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());
 
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            job.getId(),
+            MlTasks.JOB_TASK_NAME,
             memoryTracker, 0, node -> nodeFilter(node, job));
         PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode,
             2,
@@ -195,7 +210,10 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         String dataFrameAnalyticsId = "data_frame_analytics_id_new";
 
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), dataFrameAnalyticsId,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            dataFrameAnalyticsId,
             MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0,
             node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId)));
         PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode,
@@ -221,7 +239,11 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());
 
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            job.getId(),
+            MlTasks.JOB_TASK_NAME,
             memoryTracker, 0, node -> nodeFilter(node, job));
         PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode,
             2,
@@ -255,7 +277,10 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         String dataFrameAnalyticsId = "data_frame_analytics_id1000";
 
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), dataFrameAnalyticsId,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            dataFrameAnalyticsId,
             MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0,
             node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId)));
         PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(
@@ -287,7 +312,10 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         String dataFrameAnalyticsId = "data_frame_analytics_id1000";
 
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), dataFrameAnalyticsId,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            dataFrameAnalyticsId,
             MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0,
             node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId)));
         PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(
@@ -323,7 +351,11 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", JOB_MEMORY_REQUIREMENT).build(new Date());
 
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            job.getId(),
+            MlTasks.JOB_TASK_NAME,
             memoryTracker, 0, node -> nodeFilter(node, job));
         PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(
             20,
@@ -365,7 +397,11 @@ public class JobNodeSelectorTests extends ESTestCase {
         Job job6 = BaseMlIntegTestCase.createFareQuoteJob("job_id6", JOB_MEMORY_REQUIREMENT).build(new Date());
 
         ClusterState cs = csBuilder.build();
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job6.getId(), MlTasks.JOB_TASK_NAME,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs,
+            shuffled(cs.nodes().getAllNodes()),
+            job6.getId(),
+            MlTasks.JOB_TASK_NAME,
             memoryTracker, 0, node -> nodeFilter(node, job6));
         PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(
             10,
@@ -384,7 +420,13 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs = csBuilder.build();
 
         Job job7 = BaseMlIntegTestCase.createFareQuoteJob("job_id7", JOB_MEMORY_REQUIREMENT).build(new Date());
-        jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
+        jobNodeSelector = new JobNodeSelector(
+            cs,
+            shuffled(cs.nodes().getAllNodes()),
+            job7.getId(),
+            MlTasks.JOB_TASK_NAME,
+            memoryTracker,
+            0,
             node -> nodeFilter(node, job7));
         result = jobNodeSelector.selectNode(10,
             2,
@@ -402,7 +444,13 @@ public class JobNodeSelectorTests extends ESTestCase {
         csBuilder = ClusterState.builder(cs);
         csBuilder.metadata(Metadata.builder(cs.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks));
         cs = csBuilder.build();
-        jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
+        jobNodeSelector = new JobNodeSelector(
+            cs,
+            shuffled(cs.nodes().getAllNodes()),
+            job7.getId(),
+            MlTasks.JOB_TASK_NAME,
+            memoryTracker,
+            0,
             node -> nodeFilter(node, job7));
         result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, false);
         assertNull("no node selected, because stale task", result.getExecutorNode());
@@ -415,7 +463,13 @@ public class JobNodeSelectorTests extends ESTestCase {
         csBuilder = ClusterState.builder(cs);
         csBuilder.metadata(Metadata.builder(cs.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks));
         cs = csBuilder.build();
-        jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
+        jobNodeSelector = new JobNodeSelector(
+            cs,
+            shuffled(cs.nodes().getAllNodes()),
+            job7.getId(),
+            MlTasks.JOB_TASK_NAME,
+            memoryTracker,
+            0,
             node -> nodeFilter(node, job7));
         result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, false);
         assertNull("no node selected, because null state", result.getExecutorNode());
@@ -457,7 +511,11 @@ public class JobNodeSelectorTests extends ESTestCase {
         Job job7 = BaseMlIntegTestCase.createFareQuoteJob("job_id7", JOB_MEMORY_REQUIREMENT).build(new Date());
 
         // Allocation won't be possible if the stale failed job is treated as opening
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job7.getId(), MlTasks.JOB_TASK_NAME,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs,
+            shuffled(cs.nodes().getAllNodes()),
+            job7.getId(),
+            MlTasks.JOB_TASK_NAME,
             memoryTracker, 0, node -> nodeFilter(node, job7));
         PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10,
             2,
@@ -474,7 +532,13 @@ public class JobNodeSelectorTests extends ESTestCase {
         csBuilder.metadata(Metadata.builder(cs.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks));
         cs = csBuilder.build();
         Job job8 = BaseMlIntegTestCase.createFareQuoteJob("job_id8", JOB_MEMORY_REQUIREMENT).build(new Date());
-        jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job8.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
+        jobNodeSelector = new JobNodeSelector(
+            cs,
+            shuffled(cs.nodes().getAllNodes()),
+            job8.getId(),
+            MlTasks.JOB_TASK_NAME,
+            memoryTracker,
+            0,
             node -> nodeFilter(node, job8));
         result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, false);
         assertNull("no node selected, because OPENING state", result.getExecutorNode());
@@ -508,7 +572,11 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs.nodes(nodes);
         metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks);
         cs.metadata(metadata);
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            job.getId(),
+            MlTasks.JOB_TASK_NAME,
             memoryTracker, 0, node -> nodeFilter(node, job));
         PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10,
             2,
@@ -519,6 +587,75 @@ public class JobNodeSelectorTests extends ESTestCase {
         assertNull(result.getExecutorNode());
     }
 
+    public void testSelectLeastLoadedMlNode_reasonsAreInDeterministicOrder() {
+        Map<String, String> nodeAttr = new HashMap<>();
+        nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
+        nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "1000000000");
+        DiscoveryNodes nodes = DiscoveryNodes.builder()
+            .add(
+                new DiscoveryNode(
+                    "_node_name1",
+                    "_node_id1",
+                    new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
+                    nodeAttr,
+                    Collections.emptySet(),
+                    Version.CURRENT
+                )
+            )
+            .add(
+                new DiscoveryNode(
+                    "_node_name2",
+                    "_node_id2",
+                    new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
+                    nodeAttr,
+                    Collections.emptySet(),
+                    Version.CURRENT
+                )
+            )
+            .build();
+
+        PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
+        OpenJobPersistentTasksExecutorTests.addJobTask("incompatible_type_job", "_node_id1", null, tasksBuilder);
+        PersistentTasksCustomMetadata tasks = tasksBuilder.build();
+
+        ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
+        Metadata.Builder metadata = Metadata.builder();
+
+        Job job = mock(Job.class);
+        when(job.getId()).thenReturn("incompatible_type_job");
+        when(job.getJobVersion()).thenReturn(Version.CURRENT);
+        when(job.getJobType()).thenReturn("incompatible_type");
+        when(job.getInitialResultsIndexName()).thenReturn("shared");
+
+        cs.nodes(nodes);
+        metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks);
+        cs.metadata(metadata);
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            job.getId(),
+            MlTasks.JOB_TASK_NAME,
+            memoryTracker,
+            0,
+            node -> nodeFilter(node, job)
+        );
+        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, false);
+        assertThat(
+            result.getExplanation(),
+            equalTo(
+                "Not opening job [incompatible_type_job] on node [{_node_name1}{version="
+                    + Version.CURRENT
+                    + "}], "
+                    + "because this node does not support jobs of type [incompatible_type]|"
+                    + "Not opening job [incompatible_type_job] on node [{_node_name2}{version="
+                    + Version.CURRENT
+                    + "}], "
+                    + "because this node does not support jobs of type [incompatible_type]"
+            )
+        );
+        assertNull(result.getExecutorNode());
+    }
+
     public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion() {
         Map<String, String> nodeAttr = new HashMap<>();
         nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
@@ -544,7 +681,10 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs.nodes(nodes);
         metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks);
         cs.metadata(metadata);
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(),
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            job.getId(),
             MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> nodeFilter(node, job));
         PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10,
             2,
@@ -578,7 +718,11 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs.metadata(metadata);
 
         Job job = jobWithRules("job_with_rules");
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            job.getId(),
+            MlTasks.JOB_TASK_NAME,
             memoryTracker, 0, node -> nodeFilter(node, job));
         PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10,
             2,
@@ -637,7 +781,11 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs.nodes(nodes);
 
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            job.getId(),
+            MlTasks.JOB_TASK_NAME,
             memoryTracker, 0, node -> nodeFilter(node, job));
         PersistentTasksCustomMetadata.Assignment result =
             jobNodeSelector.considerLazyAssignment(new PersistentTasksCustomMetadata.Assignment(null, "foo"));
@@ -657,7 +805,11 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs.nodes(nodes);
 
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            job.getId(),
+            MlTasks.JOB_TASK_NAME,
             memoryTracker, randomIntBetween(1, 3), node -> nodeFilter(node, job));
         PersistentTasksCustomMetadata.Assignment result =
             jobNodeSelector.considerLazyAssignment(new PersistentTasksCustomMetadata.Assignment(null, "foo"));
@@ -680,7 +832,11 @@ public class JobNodeSelectorTests extends ESTestCase {
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", ByteSizeValue.ofMb(10)).build(new Date());
         when(memoryTracker.getJobMemoryRequirement(anyString(), eq("job_id1000"))).thenReturn(1000L);
 
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            job.getId(),
+            MlTasks.JOB_TASK_NAME,
             memoryTracker, randomIntBetween(1, 3), node -> nodeFilter(node, job));
         PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(maxRunningJobsPerNode,
             2,
@@ -741,7 +897,11 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", JOB_MEMORY_REQUIREMENT).build(new Date());
 
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME,
+        JobNodeSelector jobNodeSelector = new JobNodeSelector(
+            cs.build(),
+            shuffled(cs.nodes().getAllNodes()),
+            job.getId(),
+            MlTasks.JOB_TASK_NAME,
             memoryTracker, 0, node -> nodeFilter(node, job));
         Tuple<NativeMemoryCapacity, Long> capacityAndFreeMemory = jobNodeSelector.perceivedCapacityAndMaxFreeMemory(
             10,
@@ -790,6 +950,12 @@ public class JobNodeSelectorTests extends ESTestCase {
         return cs;
     }
 
+    static Collection<DiscoveryNode> shuffled(Collection<DiscoveryNode> nodes) {
+        List<DiscoveryNode> toShuffle = new ArrayList<>(nodes);
+        Randomness.shuffle(toShuffle);
+        return toShuffle;
+    }
+
     static void addDataFrameAnalyticsJobTask(String id, String nodeId, DataFrameAnalyticsState state,
                                              PersistentTasksCustomMetadata.Builder builder) {
         addDataFrameAnalyticsJobTask(id, nodeId, state, builder, false, false);