Browse Source

[ML] increase the default value of xpack.ml.max_open_jobs from 20 to 512 for autoscaling improvements (#72487)

This commit increases the xpack.ml.max_open_jobs from 20 to 512. Additionally, it ignores nodes that cannot provide an accurate view into their native memory.

If a node does not have a view into its native memory, we ignore it for assignment.

This effectively fixes a bug with autoscaling. Autoscaling relies on jobs with adequate memory to assign jobs to nodes. If that is hampered by the xpack.ml.max_open_jobs scaling decisions are hampered.
Benjamin Trent 4 years ago
parent
commit
2ce4d175f0
20 changed files with 108 additions and 190 deletions
  1. 3 3
      docs/reference/ml/anomaly-detection/apis/get-datafeed-stats.asciidoc
  2. 12 12
      docs/reference/ml/anomaly-detection/apis/get-job-stats.asciidoc
  3. 1 1
      docs/reference/settings/ml-settings.asciidoc
  4. 1 0
      x-pack/plugin/build.gradle
  5. 3 3
      x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java
  6. 9 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  7. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java
  8. 0 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java
  9. 53 71
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java
  10. 1 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java
  11. 3 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java
  12. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java
  13. 1 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java
  14. 1 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java
  15. 2 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java
  16. 4 12
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java
  17. 6 6
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java
  18. 5 65
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java
  19. 0 1
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/get_datafeed_stats.yml
  20. 1 1
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/jobs_get_stats.yml

+ 3 - 3
docs/reference/ml/anomaly-detection/apis/get-datafeed-stats.asciidoc

@@ -19,7 +19,7 @@ Retrieves usage information for {dfeeds}.
 
 `GET _ml/datafeeds/_stats`  +
 
-`GET _ml/datafeeds/_all/_stats` 
+`GET _ml/datafeeds/_all/_stats`
 
 [[ml-get-datafeed-stats-prereqs]]
 == {api-prereq-title}
@@ -144,7 +144,7 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=search-time]
 
 `404` (Missing resources)::
   If `allow_no_match` is `false`, this code indicates that there are no
-  resources that match the request or only partial matches for the request. 
+  resources that match the request or only partial matches for the request.
 
 [[ml-get-datafeed-stats-example]]
 == {api-examples-title}
@@ -172,7 +172,7 @@ The API returns the following results:
         "transport_address" : "127.0.0.1:9300",
         "attributes" : {
           "ml.machine_memory" : "17179869184",
-          "ml.max_open_jobs" : "20"
+          "ml.max_open_jobs" : "512"
         }
       },
       "assignment_explanation" : "",

+ 12 - 12
docs/reference/ml/anomaly-detection/apis/get-job-stats.asciidoc

@@ -17,7 +17,7 @@ Retrieves usage information for {anomaly-jobs}.
 
 `GET _ml/anomaly_detectors/_stats` +
 
-`GET _ml/anomaly_detectors/_all/_stats` 
+`GET _ml/anomaly_detectors/_all/_stats`
 
 [[ml-get-job-stats-prereqs]]
 == {api-prereq-title}
@@ -147,13 +147,13 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=sparse-bucket-count]
 
 `deleting`::
 (Boolean)
-Indicates that the process of deleting the job is in progress but not yet 
+Indicates that the process of deleting the job is in progress but not yet
 completed. It is only reported when `true`.
 
 //Begin forecasts_stats
 [[forecastsstats]]`forecasts_stats`::
-(object) An object that provides statistical information about forecasts 
-belonging to this job. Some statistics are omitted if no forecasts have been 
+(object) An object that provides statistical information about forecasts
+belonging to this job. Some statistics are omitted if no forecasts have been
 made.
 +
 NOTE: Unless there is at least one forecast, `memory_bytes`, `records`,
@@ -163,24 +163,24 @@ NOTE: Unless there is at least one forecast, `memory_bytes`, `records`,
 [%collapsible%open]
 ====
 `forecasted_jobs`:::
-(long) A value of `0` indicates that forecasts do not exist for this job. A 
+(long) A value of `0` indicates that forecasts do not exist for this job. A
 value of `1` indicates that at least one forecast exists.
 
 `memory_bytes`:::
-(object) The `avg`, `min`, `max` and `total` memory usage in bytes for forecasts 
+(object) The `avg`, `min`, `max` and `total` memory usage in bytes for forecasts
 related to this job. If there are no forecasts, this property is omitted.
 
 `records`:::
-(object) The `avg`, `min`, `max` and `total` number of `model_forecast` documents 
+(object) The `avg`, `min`, `max` and `total` number of `model_forecast` documents
 written for forecasts related to this job. If there are no forecasts, this
 property is omitted.
 
 `processing_time_ms`:::
-(object) The `avg`, `min`, `max` and `total` runtime in milliseconds for 
+(object) The `avg`, `min`, `max` and `total` runtime in milliseconds for
 forecasts related to this job. If there are no forecasts, this property is omitted.
 
 `status`:::
-(object) The count of forecasts by their status. For example: 
+(object) The count of forecasts by their status. For example:
 {"finished" : 2, "started" : 1}. If there are no forecasts, this property is omitted.
 
 `total`:::
@@ -296,7 +296,7 @@ available only for open jobs.
 `attributes`:::
 (object)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=node-attributes]
-  
+
 `ephemeral_id`:::
 (string)
 include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=node-ephemeral-id]
@@ -367,7 +367,7 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=bucket-time-total]
 == {api-response-codes-title}
 
 `404` (Missing resources)::
-  If `allow_no_match` is `false`, this code indicates that there are no 
+  If `allow_no_match` is `false`, this code indicates that there are no
   resources that match the request or only partial matches for the request.
 
 [[ml-get-job-stats-example]]
@@ -460,7 +460,7 @@ The API returns the following results:
         "attributes" : {
           "ml.machine_memory" : "17179869184",
           "xpack.installed" : "true",
-          "ml.max_open_jobs" : "20"
+          "ml.max_open_jobs" : "512"
         }
       },
       "assignment_explanation" : "",

+ 1 - 1
docs/reference/settings/ml-settings.asciidoc

@@ -118,7 +118,7 @@ allowed, fewer jobs will run on a node. Prior to version 7.1, this setting was a
 per-node non-dynamic setting. It became a cluster-wide dynamic setting in
 version 7.1. As a result, changes to its value after node startup are used only 
 after every node in the cluster is running version 7.1 or higher. The minimum
-value is `1`; the maximum value is `512`. Defaults to `20`.
+value is `1`; the maximum value is `512`. Defaults to `512`.
 
 `xpack.ml.nightly_maintenance_requests_per_second`::
 (<<cluster-update-settings,Dynamic>>) The rate at which the nightly maintenance 

+ 1 - 0
x-pack/plugin/build.gradle

@@ -116,6 +116,7 @@ tasks.named("yamlRestCompatTest").configure {
     'ml/jobs_get_stats/Test get job stats after uploading data prompting the creation of some stats',
     'ml/jobs_get_stats/Test get job stats for closed job',
     'ml/jobs_get_stats/Test no exception on get job stats with missing index',
+    'ml/get_datafeed_stats/Test get stats for started datafeed',
     'ml/post_data/Test POST data job api, flush, close and verify DataCounts doc',
     'ml/post_data/Test flush with skip_time',
     'ml/set_upgrade_mode/Setting upgrade mode to disabled from enabled',

+ 3 - 3
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java

@@ -251,7 +251,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
             PersistentTask<?> task = tasks.getTask(MlTasks.jobTaskId(jobId));
 
             DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
-            assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20"));
+            assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "512"));
             JobTaskState jobTaskState = (JobTaskState) task.getState();
             assertNotNull(jobTaskState);
             assertEquals(JobState.OPENED, jobTaskState.getState());
@@ -448,7 +448,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
         assertBusy(() -> assertJobTask(jobId, JobState.OPENED, true));
     }
 
-    public void testCloseUnassignedLazyJobAndDatafeed() throws Exception {
+    public void testCloseUnassignedLazyJobAndDatafeed() {
         internalCluster().ensureAtLeastNumDataNodes(3);
         ensureStableCluster(3);
 
@@ -513,7 +513,7 @@ public class BasicDistributedJobsIT extends BaseMlIntegTestCase {
             assertNotNull(task.getExecutorNode());
             assertFalse(needsReassignment(task.getAssignment(), clusterState.nodes()));
             DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode());
-            assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20"));
+            assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "512"));
 
             JobTaskState jobTaskState = (JobTaskState) task.getState();
             assertNotNull(jobTaskState);

+ 9 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -466,7 +466,14 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
     // older nodes will not react to the dynamic changes. Therefore, in such mixed version clusters
     // allocation will be based on the value first read at node startup rather than the current value.
     public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
-            Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope);
+            Setting.intSetting(
+                "xpack.ml.max_open_jobs",
+                MAX_MAX_OPEN_JOBS_PER_NODE,
+                1,
+                MAX_MAX_OPEN_JOBS_PER_NODE,
+                Property.Dynamic,
+                Property.NodeScope
+            );
 
     public static final Setting<TimeValue> PROCESS_CONNECT_TIMEOUT =
         Setting.timeSetting("xpack.ml.process_connect_timeout", TimeValue.timeValueSeconds(10),
@@ -571,8 +578,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
         }
 
         Settings.Builder additionalSettings = Settings.builder();
-        Boolean allocationEnabled = DiscoveryNode.hasRole(settings, DiscoveryNodeRole.ML_ROLE);
-        if (allocationEnabled != null && allocationEnabled) {
+        if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.ML_ROLE)) {
             // TODO: stop setting this attribute in 8.0.0 but disallow it (like mlEnabledNodeAttrName below)
             // The ML UI will need to be changed to check machineMemoryAttrName instead before this is done
             addMlNodeAttribute(additionalSettings, maxOpenJobsPerNodeNodeAttrName,

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

@@ -619,6 +619,7 @@ public class TransportStartDataFrameAnalyticsAction
             boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
             Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment =
                 getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
+            // NOTE: this will return here if isMemoryTrackerRecentlyRefreshed is false, we don't allow assignment with stale memory
             if (optionalAssignment.isPresent()) {
                 return optionalAssignment.get();
             }
@@ -638,7 +639,6 @@ public class TransportStartDataFrameAnalyticsAction
                 Integer.MAX_VALUE,
                 maxMachineMemoryPercent,
                 maxNodeMemory,
-                isMemoryTrackerRecentlyRefreshed,
                 useAutoMemoryPercentage
             );
             auditRequireMemoryIfNecessary(params.getId(), auditor, assignment, jobNodeSelector, isMemoryTrackerRecentlyRefreshed);

+ 0 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java

@@ -387,7 +387,6 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
                 node,
                 maxOpenJobs,
                 maxMachineMemoryPercent,
-                true,
                 useAuto);
             if (nodeLoad.getError() != null) {
                 logger.warn("[{}] failed to gather node load limits, failure [{}]. Returning no scale",
@@ -760,7 +759,6 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
                 node,
                 maxOpenJobs,
                 maxMachineMemoryPercent,
-                true,
                 useAuto);
             if (nodeLoad.getError() != null || nodeLoad.isUseMemory() == false) {
                 return Optional.empty();

+ 53 - 71
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java

@@ -103,8 +103,7 @@ public class JobNodeSelector {
 
     public Tuple<NativeMemoryCapacity, Long> perceivedCapacityAndMaxFreeMemory(int maxMachineMemoryPercent,
                                                                                boolean useAutoMemoryPercentage,
-                                                                               int maxOpenJobs,
-                                                                               boolean isMemoryTrackerRecentlyRefreshed) {
+                                                                               int maxOpenJobs) {
         List<DiscoveryNode> capableNodes = candidateNodes.stream()
             .filter(n -> this.nodeFilter.apply(n) == null)
             .collect(Collectors.toList());
@@ -120,7 +119,6 @@ public class JobNodeSelector {
                 n,
                 maxOpenJobs,
                 maxMachineMemoryPercent,
-                isMemoryTrackerRecentlyRefreshed,
                 useAutoMemoryPercentage)
             )
             .filter(nl -> nl.remainingJobs() > 0)
@@ -134,20 +132,16 @@ public class JobNodeSelector {
                                                                int maxConcurrentJobAllocations,
                                                                int maxMachineMemoryPercent,
                                                                long maxNodeSize,
-                                                               boolean isMemoryTrackerRecentlyRefreshed,
                                                                boolean useAutoMemoryPercentage) {
-        // Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe
-        // because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs
-        boolean allocateByMemory = isMemoryTrackerRecentlyRefreshed;
-        if (isMemoryTrackerRecentlyRefreshed == false) {
-            logger.warn("Falling back to allocating job [{}] by job counts because a memory requirement refresh could not be scheduled",
-                jobId);
+        final Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId);
+        if (estimatedMemoryFootprint == null) {
+            memoryTracker.asyncRefresh();
+            String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested";
+            logger.debug(reason);
+            return new PersistentTasksCustomMetadata.Assignment(null, reason);
         }
-
         List<String> reasons = new LinkedList<>();
-        long maxAvailableCount = Long.MIN_VALUE;
         long maxAvailableMemory = Long.MIN_VALUE;
-        DiscoveryNode minLoadedNodeByCount = null;
         DiscoveryNode minLoadedNodeByMemory = null;
         for (DiscoveryNode node : candidateNodes) {
 
@@ -164,7 +158,6 @@ public class JobNodeSelector {
                 node,
                 dynamicMaxOpenJobs,
                 maxMachineMemoryPercent,
-                allocateByMemory,
                 useAutoMemoryPercentage
             );
             if (currentLoad.getError() != null) {
@@ -174,7 +167,7 @@ public class JobNodeSelector {
                 continue;
             }
             // Assuming the node is eligible at all, check loading
-            allocateByMemory = currentLoad.isUseMemory();
+            boolean canAllocateByMemory = currentLoad.isUseMemory();
             int maxNumberOfOpenJobs = currentLoad.getMaxJobs();
 
             if (currentLoad.getNumAllocatingJobs() >= maxConcurrentJobAllocations) {
@@ -188,8 +181,7 @@ public class JobNodeSelector {
                 continue;
             }
 
-            long availableCount = maxNumberOfOpenJobs - currentLoad.getNumAssignedJobs();
-            if (availableCount == 0) {
+            if (currentLoad.remainingJobs() == 0) {
                 reason = createReason(jobId,
                     nodeNameAndMlAttributes(node),
                     "This node is full. Number of opened jobs [{}], {} [{}].",
@@ -201,68 +193,58 @@ public class JobNodeSelector {
                 continue;
             }
 
-            if (maxAvailableCount < availableCount) {
-                maxAvailableCount = availableCount;
-                minLoadedNodeByCount = node;
+            if (canAllocateByMemory == false) {
+                reason = createReason(jobId,
+                    nodeNameAndMlAttributes(node),
+                    "This node is not providing accurate information to determine is load by memory.");
+                logger.trace(reason);
+                reasons.add(reason);
+                continue;
+            }
+
+            if (currentLoad.getMaxMlMemory() <= 0) {
+                reason = createReason(jobId,
+                    nodeNameAndMlAttributes(node),
+                    "This node is indicating that it has no native memory for machine learning.");
+                logger.trace(reason);
+                reasons.add(reason);
+                continue;
             }
 
-            if (allocateByMemory) {
-                if (currentLoad.getMaxMlMemory() > 0) {
-                    Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId);
-                    if (estimatedMemoryFootprint != null) {
-                        // If this will be the first job assigned to the node then it will need to
-                        // load the native code shared libraries, so add the overhead for this
-                        if (currentLoad.getNumAssignedJobs() == 0) {
-                            estimatedMemoryFootprint += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
-                        }
-                        long availableMemory = currentLoad.getMaxMlMemory() - currentLoad.getAssignedJobMemory();
-                        if (estimatedMemoryFootprint > availableMemory) {
-                            reason = createReason(jobId,
-                                nodeNameAndMlAttributes(node),
-                                "This node has insufficient available memory. Available memory for ML [{} ({})], "
-                                    + "memory required by existing jobs [{} ({})], "
-                                    + "estimated memory required for this job [{} ({})].",
-                                currentLoad.getMaxMlMemory(),
-                                ByteSizeValue.ofBytes(currentLoad.getMaxMlMemory()).toString(),
-                                currentLoad.getAssignedJobMemory(),
-                                ByteSizeValue.ofBytes(currentLoad.getAssignedJobMemory()).toString(),
-                                estimatedMemoryFootprint,
-                                ByteSizeValue.ofBytes(estimatedMemoryFootprint).toString());
-                            logger.trace(reason);
-                            reasons.add(reason);
-                            continue;
-                        }
+            long requiredMemoryForJob = estimatedMemoryFootprint;
+            // If this will be the first job assigned to the node then it will need to
+            // load the native code shared libraries, so add the overhead for this
+            if (currentLoad.getNumAssignedJobs() == 0) {
+                requiredMemoryForJob += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
+            }
+            long availableMemory = currentLoad.getMaxMlMemory() - currentLoad.getAssignedJobMemory();
+            if (requiredMemoryForJob > availableMemory) {
+                reason = createReason(jobId,
+                    nodeNameAndMlAttributes(node),
+                    "This node has insufficient available memory. Available memory for ML [{} ({})], "
+                        + "memory required by existing jobs [{} ({})], "
+                        + "estimated memory required for this job [{} ({})].",
+                    currentLoad.getMaxMlMemory(),
+                    ByteSizeValue.ofBytes(currentLoad.getMaxMlMemory()).toString(),
+                    currentLoad.getAssignedJobMemory(),
+                    ByteSizeValue.ofBytes(currentLoad.getAssignedJobMemory()).toString(),
+                    requiredMemoryForJob,
+                    ByteSizeValue.ofBytes(requiredMemoryForJob).toString());
+                logger.trace(reason);
+                reasons.add(reason);
+                continue;
+            }
 
-                        if (maxAvailableMemory < availableMemory) {
-                            maxAvailableMemory = availableMemory;
-                            minLoadedNodeByMemory = node;
-                        }
-                    } else {
-                        // If we cannot get the job memory requirement,
-                        // fall back to simply allocating by job count
-                        allocateByMemory = false;
-                        logger.debug(
-                            () -> new ParameterizedMessage(
-                                "Falling back to allocating job [{}] by job counts because its memory requirement was not available",
-                                jobId));
-                    }
-                } else {
-                    // If we cannot get the available memory on any machine in
-                    // the cluster, fall back to simply allocating by job count
-                    allocateByMemory = false;
-                    logger.debug(
-                        () -> new ParameterizedMessage(
-                            "Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]",
-                            jobId,
-                            nodeNameAndMlAttributes(node)));
-                }
+            if (maxAvailableMemory < availableMemory) {
+                maxAvailableMemory = availableMemory;
+                minLoadedNodeByMemory = node;
             }
         }
 
         return createAssignment(
-            allocateByMemory ? minLoadedNodeByMemory : minLoadedNodeByCount,
+            minLoadedNodeByMemory,
             reasons,
-            allocateByMemory && maxNodeSize > 0L ?
+            maxNodeSize > 0L ?
                 NativeMemoryCalculator.allowedBytesForMl(maxNodeSize, maxMachineMemoryPercent, useAutoMemoryPercentage) :
                 Long.MAX_VALUE);
     }

+ 1 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java

@@ -42,7 +42,6 @@ public class NodeLoadDetector {
                                    DiscoveryNode node,
                                    int dynamicMaxOpenJobs,
                                    int maxMachineMemoryPercent,
-                                   boolean isMemoryTrackerRecentlyRefreshed,
                                    boolean useAutoMachineMemoryCalculation) {
         PersistentTasksCustomMetadata persistentTasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
         Map<String, String> nodeAttributes = node.getAttributes();
@@ -71,7 +70,7 @@ public class NodeLoadDetector {
         NodeLoad.Builder nodeLoad = NodeLoad.builder(node.getId())
             .setMaxMemory(maxMlMemory.orElse(-1L))
             .setMaxJobs(maxNumberOfOpenJobs)
-            .setUseMemory(isMemoryTrackerRecentlyRefreshed);
+            .setUseMemory(true);
         if (errors.isEmpty() == false) {
             return nodeLoad.setError(Strings.collectionToCommaDelimitedString(errors)).build();
         }

+ 3 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java

@@ -84,6 +84,7 @@ public class SnapshotUpgradeTaskExecutor extends AbstractJobPersistentTasksExecu
         boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
         Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment =
             getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
+        // NOTE: this will return here if isMemoryTrackerRecentlyRefreshed is false, we don't allow assignment with stale memory
         if (optionalAssignment.isPresent()) {
             return optionalAssignment.get();
         }
@@ -91,7 +92,8 @@ public class SnapshotUpgradeTaskExecutor extends AbstractJobPersistentTasksExecu
             clusterState,
             candidateNodes,
             params.getJobId(),
-            MlTasks.JOB_SNAPSHOT_UPGRADE_TASK_NAME,
+            // Use the job_task_name for the appropriate job size
+            MlTasks.JOB_TASK_NAME,
             memoryTracker,
             0,
             node -> null);
@@ -100,7 +102,6 @@ public class SnapshotUpgradeTaskExecutor extends AbstractJobPersistentTasksExecu
             Integer.MAX_VALUE,
             maxMachineMemoryPercent,
             Long.MAX_VALUE,
-            isMemoryTrackerRecentlyRefreshed,
             useAutoMemoryPercentage);
     }
 

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java

@@ -116,6 +116,7 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
         }
         boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
         Optional<Assignment> optionalAssignment = getPotentialAssignment(params, clusterState, isMemoryTrackerRecentlyRefreshed);
+        // NOTE: this will return here if isMemoryTrackerRecentlyRefreshed is false, we don't allow assignment with stale memory
         if (optionalAssignment.isPresent()) {
             return optionalAssignment.get();
         }
@@ -127,7 +128,6 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
             maxConcurrentJobAllocations,
             maxMachineMemoryPercent,
             maxNodeMemory,
-            isMemoryTrackerRecentlyRefreshed,
             useAutoMemoryPercentage);
         auditRequireMemoryIfNecessary(params.getJobId(), auditor, assignment, jobNodeSelector, isMemoryTrackerRecentlyRefreshed);
         return assignment;

+ 1 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/task/AbstractJobPersistentTasksExecutor.java

@@ -128,8 +128,7 @@ public abstract class AbstractJobPersistentTasksExecutor<Params extends Persiste
                 Tuple<NativeMemoryCapacity, Long> capacityAndFreeMemory = jobNodeSelector.perceivedCapacityAndMaxFreeMemory(
                     maxMachineMemoryPercent,
                     useAutoMemoryPercentage,
-                    maxOpenJobs,
-                    true
+                    maxOpenJobs
                 );
                 Long previouslyAuditedFreeMemory = auditedJobCapacity.get(getUniqueId(jobId));
                 if (capacityAndFreeMemory.v2().equals(previouslyAuditedFreeMemory) == false) {

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java

@@ -22,7 +22,7 @@ public class MachineLearningTests extends ESTestCase {
 
     public void testMaxOpenWorkersSetting_givenDefault() {
         int maxOpenWorkers = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(Settings.EMPTY);
-        assertEquals(20, maxOpenWorkers);
+        assertEquals(512, maxOpenWorkers);
     }
 
     public void testMaxOpenWorkersSetting_givenSetting() {

+ 2 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java

@@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.indices.TestIndexNameExpressionResolver;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
@@ -172,7 +173,7 @@ public class TransportStartDataFrameAnalyticsActionTests extends ESTestCase {
             "_node_name" + i,
             "_node_id" + i,
             new TransportAddress(InetAddress.getLoopbackAddress(), 9300 + i),
-            Map.of("ml.max_open_jobs", isMlNode ? "10" : "0", "ml.machine_memory", "-1"),
+            Map.of("ml.max_open_jobs", isMlNode ? "10" : "0", "ml.machine_memory", String.valueOf(ByteSizeValue.ofGb(1).getBytes())),
             Collections.emptySet(),
             nodeVersion);
     }

+ 4 - 12
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java

@@ -86,7 +86,7 @@ public class MlAutoscalingDeciderServiceTests extends ESTestCase {
         when(mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(any())).thenReturn(DEFAULT_JOB_SIZE);
         nodeLoadDetector = mock(NodeLoadDetector.class);
         when(nodeLoadDetector.getMlMemoryTracker()).thenReturn(mlMemoryTracker);
-        when(nodeLoadDetector.detectNodeLoad(any(), anyBoolean(), any(), anyInt(), anyInt(), anyBoolean(), anyBoolean()))
+        when(nodeLoadDetector.detectNodeLoad(any(), anyBoolean(), any(), anyInt(), anyInt(), anyBoolean()))
             .thenReturn(NodeLoad.builder("any")
                 .setUseMemory(true)
                 .incAssignedJobMemory(ByteSizeValue.ofGb(1).getBytes())
@@ -455,14 +455,6 @@ public class MlAutoscalingDeciderServiceTests extends ESTestCase {
         return new MlAutoscalingDeciderService(nodeLoadDetector, settings, clusterService, timeSupplier);
     }
 
-    private void withNodesLoadedWith(ByteSizeValue value) {
-        when(nodeLoadDetector.detectNodeLoad(any(), anyBoolean(), any(), anyInt(), anyInt(), anyBoolean(), anyBoolean()))
-            .thenReturn(NodeLoad.builder("any")
-                .setUseMemory(true)
-                .incAssignedJobMemory(value.getBytes())
-                .build());
-    }
-
     private static ClusterState clusterState(List<String> anomalyTasks,
                                              List<String> batchAnomalyTasks,
                                              List<String> analyticsTasks,
@@ -473,19 +465,19 @@ public class MlAutoscalingDeciderServiceTests extends ESTestCase {
         DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
         for (DiscoveryNode node : nodeList) {
             nodesBuilder.add(node);
-        };
+        }
         PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
         for (String jobId : anomalyTasks) {
             OpenJobPersistentTasksExecutorTests.addJobTask(jobId,
                 randomFrom(nodeNames),
-                randomFrom(JobState.CLOSING, JobState.OPENED, JobState.OPENING, (JobState)null),
+                randomFrom(JobState.CLOSING, JobState.OPENED, JobState.OPENING, null),
                 tasksBuilder);
         }
         for (String jobId : batchAnomalyTasks) {
             String nodeAssignment = randomFrom(nodeNames);
             OpenJobPersistentTasksExecutorTests.addJobTask(jobId,
                 nodeAssignment,
-                randomFrom(JobState.CLOSING, JobState.OPENED, JobState.OPENING, (JobState)null),
+                randomFrom(JobState.CLOSING, JobState.OPENED, JobState.OPENING, null),
                 tasksBuilder);
             StartDatafeedAction.DatafeedParams dfParams =new StartDatafeedAction.DatafeedParams(jobId + "-datafeed", 0);
             dfParams.setEndTime(new Date().getTime());

+ 6 - 6
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java

@@ -77,28 +77,28 @@ public class JobNodeLoadDetectorTests extends ESTestCase {
         final ClusterState cs = ClusterState.builder(new ClusterName("_name")).nodes(nodes)
                 .metadata(Metadata.builder().putCustom(PersistentTasksCustomMetadata.TYPE, tasks)).build();
 
-        NodeLoad load = nodeLoadDetector.detectNodeLoad(cs, true, nodes.get("_node_id1"), 10, 30, true, false);
+        NodeLoad load = nodeLoadDetector.detectNodeLoad(cs, true, nodes.get("_node_id1"), 10, 30, false);
         assertThat(load.getAssignedJobMemory(), equalTo(52428800L));
         assertThat(load.getNumAllocatingJobs(), equalTo(2L));
         assertThat(load.getNumAssignedJobs(), equalTo(2L));
         assertThat(load.getMaxJobs(), equalTo(10));
         assertThat(load.getMaxMlMemory(), equalTo(0L));
 
-        load = nodeLoadDetector.detectNodeLoad(cs, true, nodes.get("_node_id2"), 5, 30, true, false);
+        load = nodeLoadDetector.detectNodeLoad(cs, true, nodes.get("_node_id2"), 5, 30, false);
         assertThat(load.getAssignedJobMemory(), equalTo(41943040L));
         assertThat(load.getNumAllocatingJobs(), equalTo(1L));
         assertThat(load.getNumAssignedJobs(), equalTo(1L));
         assertThat(load.getMaxJobs(), equalTo(5));
         assertThat(load.getMaxMlMemory(), equalTo(0L));
 
-        load = nodeLoadDetector.detectNodeLoad(cs, true, nodes.get("_node_id3"), 5, 30, true, false);
+        load = nodeLoadDetector.detectNodeLoad(cs, true, nodes.get("_node_id3"), 5, 30, false);
         assertThat(load.getAssignedJobMemory(), equalTo(0L));
         assertThat(load.getNumAllocatingJobs(), equalTo(0L));
         assertThat(load.getNumAssignedJobs(), equalTo(0L));
         assertThat(load.getMaxJobs(), equalTo(5));
         assertThat(load.getMaxMlMemory(), equalTo(0L));
 
-        load = nodeLoadDetector.detectNodeLoad(cs, true, nodes.get("_node_id4"), 5, 30, true, false);
+        load = nodeLoadDetector.detectNodeLoad(cs, true, nodes.get("_node_id4"), 5, 30, false);
         assertThat(load.getAssignedJobMemory(), equalTo(41943040L));
         assertThat(load.getNumAllocatingJobs(), equalTo(0L));
         assertThat(load.getNumAssignedJobs(), equalTo(1L));
@@ -120,7 +120,7 @@ public class JobNodeLoadDetectorTests extends ESTestCase {
         Metadata.Builder metadata = Metadata.builder();
         cs.metadata(metadata);
 
-        NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, 30, true, false);
+        NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, 30, false);
         assertThat(load.getError(), containsString("ml.max_open_jobs attribute [foo] is not an integer"));
     }
 
@@ -138,7 +138,7 @@ public class JobNodeLoadDetectorTests extends ESTestCase {
         Metadata.Builder metadata = Metadata.builder();
         cs.metadata(metadata);
 
-        NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, -1, true, false);
+        NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, -1, false);
         assertThat(load.getError(), containsString("ml.machine_memory attribute [bar] is not a long"));
     }
 

+ 5 - 65
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java

@@ -41,7 +41,6 @@ import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
-import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder;
 import static org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutor.nodeFilter;
 import static org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutorTests.jobWithRules;
 import static org.hamcrest.Matchers.containsString;
@@ -94,48 +93,6 @@ public class JobNodeSelectorTests extends ESTestCase {
         assertEquals("{_node_id1}{ml.machine_memory=5}", JobNodeSelector.nodeNameAndMlAttributes(node));
     }
 
-    public void testSelectLeastLoadedMlNode_byCount() {
-        Map<String, String> nodeAttr = new HashMap<>();
-        nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "10");
-        nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "-1");
-        // MachineLearning.MACHINE_MEMORY_NODE_ATTR negative, so this will fall back to allocating by count
-        DiscoveryNodes nodes = DiscoveryNodes.builder()
-            .add(new DiscoveryNode("_node_name1", "_node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300),
-                nodeAttr, Collections.emptySet(), Version.CURRENT))
-            .add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
-                nodeAttr, Collections.emptySet(), Version.CURRENT))
-            .add(new DiscoveryNode("_node_name3", "_node_id3", new TransportAddress(InetAddress.getLoopbackAddress(), 9302),
-                nodeAttr, Collections.emptySet(), Version.CURRENT))
-            .build();
-
-        PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
-        OpenJobPersistentTasksExecutorTests.addJobTask("job_id1", "_node_id1", null, tasksBuilder);
-        OpenJobPersistentTasksExecutorTests.addJobTask("job_id2", "_node_id1", null, tasksBuilder);
-        OpenJobPersistentTasksExecutorTests.addJobTask("job_id3", "_node_id2", null, tasksBuilder);
-        PersistentTasksCustomMetadata tasks = tasksBuilder.build();
-
-        ClusterState.Builder cs = ClusterState.builder(new ClusterName("_name"));
-        cs.nodes(nodes);
-        Metadata.Builder metadata = Metadata.builder();
-        metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks);
-        cs.metadata(metadata);
-
-        Job.Builder jobBuilder = buildJobBuilder("job_id4");
-        jobBuilder.setJobVersion(Version.CURRENT);
-
-        Job job = jobBuilder.build();
-        JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), cs.nodes().getAllNodes(), job.getId(), MlTasks.JOB_TASK_NAME,
-            memoryTracker, 0, node -> nodeFilter(node, job));
-        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10,
-            2,
-            30,
-            MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
-            false);
-        assertEquals("", result.getExplanation());
-        assertEquals("_node_id3", result.getExecutorNode());
-    }
-
     public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityCountLimiting() {
         int numNodes = randomIntBetween(1, 10);
         int maxRunningJobsPerNode = randomIntBetween(1, 100);
@@ -156,7 +113,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             maxMachineMemoryPercent,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertNull(result.getExecutorNode());
         assertThat(result.getExplanation(), containsString("node is full. Number of opened jobs ["
@@ -184,7 +140,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             maxMachineMemoryPercent,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertNull(result.getExecutorNode());
         assertThat(result.getExplanation(), containsString("node is full. Number of opened jobs ["
@@ -217,7 +172,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             maxMachineMemoryPercent,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertNull(result.getExecutorNode());
         assertThat(result.getExplanation(), containsString("node has insufficient available memory. "
@@ -235,7 +189,7 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         Map<String, String> nodeAttr = new HashMap<>();
         nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
-        nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, "-1");
+        nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, String.valueOf(ByteSizeValue.ofGb(1).getBytes()));
 
         ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 1, JobState.OPENED, null);
 
@@ -248,7 +202,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             maxMachineMemoryPercent,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertNotNull(result.getExecutorNode());
     }
@@ -274,7 +227,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             maxMachineMemoryPercent,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertNull(result.getExecutorNode());
         assertThat(result.getExplanation(), containsString("node has insufficient available memory. "
@@ -311,7 +263,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             maxMachineMemoryPercent,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertNull(result.getExecutorNode());
         assertThat(result.getExplanation(), containsString("node has insufficient available memory. "
@@ -344,7 +295,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             maxMachineMemoryPercent,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertNull(result.getExecutorNode());
         assertThat(result.getExplanation(), containsString("node has insufficient available memory. "
@@ -380,7 +330,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             30,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertTrue(result.getExplanation().contains("node isn't a machine learning node"));
         assertNull(result.getExecutorNode());
@@ -423,7 +372,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             30,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertEquals("_node_id3", result.getExecutorNode());
 
@@ -442,7 +390,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             30,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertNull("no node selected, because OPENING state", result.getExecutorNode());
         assertTrue(result.getExplanation().contains("Node exceeds [2] the maximum number of jobs [2] in opening state"));
@@ -457,7 +404,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs = csBuilder.build();
         jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> nodeFilter(node, job7));
-        result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false);
+        result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, false);
         assertNull("no node selected, because stale task", result.getExecutorNode());
         assertTrue(result.getExplanation().contains("Node exceeds [2] the maximum number of jobs [2] in opening state"));
 
@@ -470,7 +417,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs = csBuilder.build();
         jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> nodeFilter(node, job7));
-        result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false);
+        result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, false);
         assertNull("no node selected, because null state", result.getExecutorNode());
         assertTrue(result.getExplanation().contains("Node exceeds [2] the maximum number of jobs [2] in opening state"));
     }
@@ -516,7 +463,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             30,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertEquals("_node_id1", result.getExecutorNode());
 
@@ -530,7 +476,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         Job job8 = BaseMlIntegTestCase.createFareQuoteJob("job_id8", JOB_MEMORY_REQUIREMENT).build(new Date());
         jobNodeSelector = new JobNodeSelector(cs, cs.nodes().getAllNodes(), job8.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> nodeFilter(node, job8));
-        result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, isMemoryTrackerRecentlyRefreshed, false);
+        result = jobNodeSelector.selectNode(10, 2, 30, MAX_JOB_BYTES, false);
         assertNull("no node selected, because OPENING state", result.getExecutorNode());
         assertTrue(result.getExplanation().contains("Node exceeds [2] the maximum number of jobs [2] in opening state"));
     }
@@ -568,7 +514,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             30,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertThat(result.getExplanation(), containsString("node does not support jobs of type [incompatible_type]"));
         assertNull(result.getExecutorNode());
@@ -605,7 +550,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             30,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertThat(result.getExplanation(), containsString(
             "job's model snapshot requires a node of version [6.3.0] or higher"));
@@ -640,7 +584,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             30,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertNotNull(result.getExecutorNode());
     }
@@ -677,7 +620,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             30,
             MAX_JOB_BYTES,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertNotNull(result.getExecutorNode());
         assertThat(result.getExecutorNode(), equalTo(candidate.getId()));
@@ -744,7 +686,6 @@ public class JobNodeSelectorTests extends ESTestCase {
             2,
             maxMachineMemoryPercent,
             10L,
-            isMemoryTrackerRecentlyRefreshed,
             false);
         assertNull(result.getExecutorNode());
         assertThat(result.getExplanation(),
@@ -805,8 +746,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         Tuple<NativeMemoryCapacity, Long> capacityAndFreeMemory = jobNodeSelector.perceivedCapacityAndMaxFreeMemory(
             10,
             false,
-            1,
-            true);
+            1);
         assertThat(capacityAndFreeMemory.v2(), equalTo(ByteSizeValue.ofGb(3).getBytes()));
         assertThat(capacityAndFreeMemory.v1(),
             equalTo(new NativeMemoryCapacity(ByteSizeValue.ofGb(7).getBytes(), ByteSizeValue.ofGb(3).getBytes(), 10L)));

+ 0 - 1
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/get_datafeed_stats.yml

@@ -185,7 +185,6 @@ setup:
   - match: { datafeeds.0.state: "started"}
   - is_true: datafeeds.0.node.name
   - is_true: datafeeds.0.node.transport_address
-  - match: { datafeeds.0.node.attributes.ml\.max_open_jobs: "20"}
 
 ---
 "Test get stats for started datafeed contains timing stats":

+ 1 - 1
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/jobs_get_stats.yml

@@ -103,7 +103,7 @@ setup:
   - match:  { jobs.0.state: opened }
   - is_true:  jobs.0.node.name
   - is_true:  jobs.0.node.transport_address
-  - match:  { jobs.0.node.attributes.ml\.max_open_jobs: "20"}
+  - match:  { jobs.0.node.attributes.ml\.max_open_jobs: "512"}
   - is_true:  jobs.0.open_time
   - match:  { jobs.0.timing_stats.job_id: job-stats-test }
   - match:  { jobs.0.timing_stats.bucket_count: 1 }  # Records are 1h apart and bucket span is 1h so 1 bucket is produced