Browse Source

[ML] add new setting xpack.ml.use_auto_machine_memory_percent for auto calculating native memory percentage allowed for machine learning jobs (#63887)

When running ML, sometimes it is best to automatically adjust the
memory allotted for machine learning based on the nodesize
and how much space is given to the JVM

This commit adds a new static setting xpack.ml.use_auto_machine_memory_percent for
allowing this dynamic calculation. The old setting remains as a backup
just in case the limit cannot be automatically determined due to
lack of information.

Closes #63795
Benjamin Trent 5 years ago
parent
commit
165e063b50
17 changed files with 362 additions and 60 deletions
  1. 16 0
      docs/reference/settings/ml-settings.asciidoc
  2. 3 2
      x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java
  3. 21 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  4. 10 14
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java
  5. 9 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java
  6. 11 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java
  7. 4 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java
  8. 13 9
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java
  9. 1 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java
  10. 94 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/NativeMemoryCalculator.java
  11. 9 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportMlInfoActionTests.java
  12. 3 3
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java
  13. 1 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java
  14. 6 6
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java
  15. 19 19
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java
  16. 3 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java
  17. 139 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/NativeMemoryCalculatorTests.java

+ 16 - 0
docs/reference/settings/ml-settings.asciidoc

@@ -153,6 +153,22 @@ connect within the time period specified by this setting then the process is
 assumed to have failed. Defaults to `10s`. The minimum value for this setting is
 `5s`.
 
+xpack.ml.use_auto_machine_memory_percent::
+(<<static-cluster-setting,Static>>) If this setting is `true`, the
+`xpack.ml.max_machine_memory_percent` setting is ignored. Instead, the maximum
+percentage of the machine's memory that can be used for running {ml} analytics
+processes is calculated automatically and takes into account the total node size
+and the size of the JVM on the node. The default value is `false`. If this
+setting differs between nodes, the value on the current master node is heeded.
++
+TIP: If you do not have dedicated {ml} nodes (that is to say, the node has
+multiple roles), do not enable this setting. Its calculations assume that {ml}
+analytics are the main purpose of the node.
++
+IMPORTANT: The calculation assumes that dedicated {ml} nodes have at least
+`256MB` memory reserved outside of the JVM. If you have tiny {ml}
+nodes in your cluster, you shouldn't use this setting.
+
 [discrete]
 [[model-inference-circuit-breaker]]
 ==== {ml-cap} circuit breaker settings

+ 3 - 2
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java

@@ -25,6 +25,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
+import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;
 
 public class TooManyJobsIT extends BaseMlIntegTestCase {
 
@@ -213,7 +214,7 @@ public class TooManyJobsIT extends BaseMlIntegTestCase {
 
     private long calculateMaxMlMemory() {
         Settings settings = internalCluster().getInstance(Settings.class);
-        return Long.parseLong(internalCluster().getInstance(TransportService.class).getLocalNode().getAttributes()
-                .get(MachineLearning.MACHINE_MEMORY_NODE_ATTR)) * MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings) / 100;
+        return NativeMemoryCalculator.allowedBytesForMl(internalCluster().getInstance(TransportService.class).getLocalNode(), settings)
+            .orElse(0L);
     }
 }

+ 21 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -405,6 +405,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
     private static final String PRE_V7_ML_ENABLED_NODE_ATTR = "ml.enabled";
     public static final String MAX_OPEN_JOBS_NODE_ATTR = "ml.max_open_jobs";
     public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory";
+    public static final String MAX_JVM_SIZE_NODE_ATTR = "ml.max_jvm_size";
     public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS =
             Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope);
     /**
@@ -421,6 +422,22 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
     // can handle.
     public static final Setting<Integer> MAX_MACHINE_MEMORY_PERCENT =
             Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 200, Property.Dynamic, Property.NodeScope);
+    /**
+     * This boolean value indicates if `max_machine_memory_percent` should be ignored and a automatic calculation is used instead.
+     *
+     * This calculation takes into account total node size and the size of the JVM on that node.
+     *
+     * If the calculation fails, we fall back to `max_machine_memory_percent`.
+     *
+     * This setting is NOT dynamic. This allows the cluster administrator to set it on startup without worry of it
+     * being edited accidentally later.
+     * Consequently, it could be that this setting differs between nodes. But, we only ever pay attention to the value
+     * that is set on the current master. As master nodes are responsible for persistent task assignments.
+     */
+    public static final Setting<Boolean> USE_AUTO_MACHINE_MEMORY_PERCENT = Setting.boolSetting(
+        "xpack.ml.use_auto_machine_memory_percent",
+        false,
+        Property.NodeScope);
     public static final Setting<Integer> MAX_LAZY_ML_NODES =
             Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);
 
@@ -508,7 +525,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
                 ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE,
                 ModelLoadingService.INFERENCE_MODEL_CACHE_TTL,
                 ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES,
-                NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND
+                NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND,
+                USE_AUTO_MACHINE_MEMORY_PERCENT
             );
     }
 
@@ -516,6 +534,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
         String mlEnabledNodeAttrName = "node.attr." + PRE_V7_ML_ENABLED_NODE_ATTR;
         String maxOpenJobsPerNodeNodeAttrName = "node.attr." + MAX_OPEN_JOBS_NODE_ATTR;
         String machineMemoryAttrName = "node.attr." + MACHINE_MEMORY_NODE_ATTR;
+        String jvmSizeAttrName = "node.attr." + MAX_JVM_SIZE_NODE_ATTR;
 
         if (enabled == false) {
             disallowMlNodeAttributes(mlEnabledNodeAttrName, maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName);
@@ -531,6 +550,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
                     String.valueOf(MAX_OPEN_JOBS_PER_NODE.get(settings)));
             addMlNodeAttribute(additionalSettings, machineMemoryAttrName,
                     Long.toString(machineMemoryFromStats(OsProbe.getInstance().osStats())));
+            addMlNodeAttribute(additionalSettings, jvmSizeAttrName, Long.toString(Runtime.getRuntime().maxMemory()));
             // This is not used in v7 and higher, but users are still prevented from setting it directly to avoid confusion
             disallowMlNodeAttributes(mlEnabledNodeAttrName);
         } else {

+ 10 - 14
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java

@@ -14,6 +14,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.tasks.Task;
@@ -28,12 +29,14 @@ import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.process.MlControllerHolder;
+import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.concurrent.TimeoutException;
 
 public class TransportMlInfoAction extends HandledTransportAction<MlInfoAction.Request, MlInfoAction.Response> {
@@ -100,7 +103,7 @@ public class TransportMlInfoAction extends HandledTransportAction<MlInfoAction.R
         ByteSizeValue defaultLimit = ByteSizeValue.ofMb(AnalysisLimits.DEFAULT_MODEL_MEMORY_LIMIT_MB);
         ByteSizeValue maxModelMemoryLimit = clusterService.getClusterSettings().get(MachineLearningField.MAX_MODEL_MEMORY_LIMIT);
         if (maxModelMemoryLimit != null && maxModelMemoryLimit.getBytes() > 0
-                && maxModelMemoryLimit.getBytes() < defaultLimit.getBytes()) {
+            && maxModelMemoryLimit.getBytes() < defaultLimit.getBytes()) {
             return maxModelMemoryLimit;
         }
         return defaultLimit;
@@ -112,24 +115,16 @@ public class TransportMlInfoAction extends HandledTransportAction<MlInfoAction.R
         return anomalyDetectorsDefaults;
     }
 
-    static ByteSizeValue calculateEffectiveMaxModelMemoryLimit(int maxMachineMemoryPercent, DiscoveryNodes nodes) {
+    static ByteSizeValue calculateEffectiveMaxModelMemoryLimit(ClusterSettings clusterSettings, DiscoveryNodes nodes) {
 
         long maxMlMemory = -1;
 
         for (DiscoveryNode node : nodes) {
-
-            Map<String, String> nodeAttributes = node.getAttributes();
-            String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR);
-            if (machineMemoryStr == null) {
-                continue;
-            }
-            long machineMemory;
-            try {
-                machineMemory = Long.parseLong(machineMemoryStr);
-            } catch (NumberFormatException e) {
+            OptionalLong limit = NativeMemoryCalculator.allowedBytesForMl(node, clusterSettings);
+            if (limit.isEmpty()) {
                 continue;
             }
-            maxMlMemory = Math.max(maxMlMemory, machineMemory * maxMachineMemoryPercent / 100);
+            maxMlMemory = Math.max(maxMlMemory, limit.getAsLong());
         }
 
         if (maxMlMemory <= 0) {
@@ -146,7 +141,8 @@ public class TransportMlInfoAction extends HandledTransportAction<MlInfoAction.R
     private Map<String, Object> limits() {
         Map<String, Object> limits = new HashMap<>();
         ByteSizeValue effectiveMaxModelMemoryLimit = calculateEffectiveMaxModelMemoryLimit(
-            clusterService.getClusterSettings().get(MachineLearning.MAX_MACHINE_MEMORY_PERCENT), clusterService.state().getNodes());
+            clusterService.getClusterSettings(),
+            clusterService.state().getNodes());
         ByteSizeValue maxModelMemoryLimit = clusterService.getClusterSettings().get(MachineLearningField.MAX_MODEL_MEMORY_LIMIT);
         if (maxModelMemoryLimit != null && maxModelMemoryLimit.getBytes() > 0) {
             limits.put("max_model_memory_limit", maxModelMemoryLimit.getStringRep());

+ 9 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

@@ -74,6 +74,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
 import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE;
+import static org.elasticsearch.xpack.ml.MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT;
 
 /*
  This class extends from TransportMasterNodeAction for cluster state observing purposes.
@@ -357,6 +358,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
         private final Client client;
         private final IndexNameExpressionResolver expressionResolver;
         private final JobResultsProvider jobResultsProvider;
+        private final boolean useAutoMemoryPercentage;
 
         private volatile int maxConcurrentJobAllocations;
         private volatile int maxMachineMemoryPercent;
@@ -377,6 +379,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
             this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
             this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
             this.maxOpenJobs = MAX_OPEN_JOBS_PER_NODE.get(settings);
+            this.useAutoMemoryPercentage = USE_AUTO_MACHINE_MEMORY_PERCENT.get(settings);
             clusterService.getClusterSettings()
                     .addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
             clusterService.getClusterSettings()
@@ -424,7 +427,11 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
             JobNodeSelector jobNodeSelector = new JobNodeSelector(clusterState, jobId, MlTasks.JOB_TASK_NAME, memoryTracker,
                 job.allowLazyOpen() ? Integer.MAX_VALUE : maxLazyMLNodes, node -> nodeFilter(node, job));
             return jobNodeSelector.selectNode(
-                maxOpenJobs, maxConcurrentJobAllocations, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
+                maxOpenJobs,
+                maxConcurrentJobAllocations,
+                maxMachineMemoryPercent,
+                isMemoryTrackerRecentlyRefreshed,
+                useAutoMemoryPercentage);
         }
 
         @Override
@@ -523,6 +530,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
         void setMaxOpenJobs(int maxOpenJobs) {
             this.maxOpenJobs = maxOpenJobs;
         }
+
     }
 
     public static class JobTask extends AllocatedPersistentTask implements OpenJobAction.JobTaskMatcher {

+ 11 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

@@ -95,6 +95,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
 import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE;
+import static org.elasticsearch.xpack.ml.MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT;
 
 /**
  * Starts the persistent task for running data frame analytics.
@@ -599,6 +600,7 @@ public class TransportStartDataFrameAnalyticsAction
         private final MlMemoryTracker memoryTracker;
         private final IndexNameExpressionResolver resolver;
         private final IndexTemplateConfig inferenceIndexTemplate;
+        private final boolean useAutoMemoryPercentage;
 
         private volatile int maxMachineMemoryPercent;
         private volatile int maxLazyMLNodes;
@@ -619,6 +621,7 @@ public class TransportStartDataFrameAnalyticsAction
             this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
             this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
             this.maxOpenJobs = MAX_OPEN_JOBS_PER_NODE.get(settings);
+            this.useAutoMemoryPercentage = USE_AUTO_MACHINE_MEMORY_PERCENT.get(settings);
             clusterService.getClusterSettings()
                 .addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent);
             clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
@@ -683,7 +686,13 @@ public class TransportStartDataFrameAnalyticsAction
                     node -> nodeFilter(node, params));
             // Pass an effectively infinite value for max concurrent opening jobs, because data frame analytics jobs do
             // not have an "opening" state so would never be rejected for causing too many jobs in the "opening" state
-            return jobNodeSelector.selectNode(maxOpenJobs, Integer.MAX_VALUE, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
+            return jobNodeSelector.selectNode(
+                maxOpenJobs,
+                Integer.MAX_VALUE,
+                maxMachineMemoryPercent,
+                isMemoryTrackerRecentlyRefreshed,
+                useAutoMemoryPercentage
+            );
         }
 
         @Override
@@ -757,5 +766,6 @@ public class TransportStartDataFrameAnalyticsAction
         void setMaxOpenJobs(int maxOpenJobs) {
             this.maxOpenJobs = maxOpenJobs;
         }
+
     }
 }

+ 4 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java

@@ -77,7 +77,8 @@ public class JobNodeSelector {
     }
 
     public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJobs, int maxConcurrentJobAllocations,
-                                                               int maxMachineMemoryPercent, boolean isMemoryTrackerRecentlyRefreshed) {
+                                                               int maxMachineMemoryPercent, boolean isMemoryTrackerRecentlyRefreshed,
+                                                               boolean useAutoMemoryPercentage) {
         // Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe
         // because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs
         boolean allocateByMemory = isMemoryTrackerRecentlyRefreshed;
@@ -107,7 +108,8 @@ public class JobNodeSelector {
                 node,
                 dynamicMaxOpenJobs,
                 maxMachineMemoryPercent,
-                allocateByMemory
+                allocateByMemory,
+                useAutoMemoryPercentage
             );
             if (currentLoad.getError() != null) {
                 reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)

+ 13 - 9
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/NodeLoadDetector.java

@@ -20,11 +20,13 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
+import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 
 
 public class NodeLoadDetector {
@@ -45,7 +47,8 @@ public class NodeLoadDetector {
                                    DiscoveryNode node,
                                    int dynamicMaxOpenJobs,
                                    int maxMachineMemoryPercent,
-                                   boolean isMemoryTrackerRecentlyRefreshed) {
+                                   boolean isMemoryTrackerRecentlyRefreshed,
+                                   boolean useAutoMachineMemoryCalculation) {
         PersistentTasksCustomMetadata persistentTasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
         Map<String, String> nodeAttributes = node.getAttributes();
         List<String> errors = new ArrayList<>();
@@ -60,16 +63,17 @@ public class NodeLoadDetector {
                 maxNumberOfOpenJobs = -1;
             }
         }
-        String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR);
-        long machineMemory = -1;
-        try {
-            machineMemory = Long.parseLong(machineMemoryStr);
-        } catch (NumberFormatException e) {
-            errors.add(MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long");
+        OptionalLong maxMlMemory = NativeMemoryCalculator.allowedBytesForMl(node,
+            maxMachineMemoryPercent,
+            useAutoMachineMemoryCalculation);
+        if (maxMlMemory.isEmpty()) {
+            errors.add(MachineLearning.MACHINE_MEMORY_NODE_ATTR
+                + " attribute ["
+                + nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR)
+                + "] is not a long");
         }
-        long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100;
 
-        NodeLoad nodeLoad = new NodeLoad(node.getId(), maxMlMemory, maxNumberOfOpenJobs, isMemoryTrackerRecentlyRefreshed);
+        NodeLoad nodeLoad = new NodeLoad(node.getId(), maxMlMemory.orElse(-1L), maxNumberOfOpenJobs, isMemoryTrackerRecentlyRefreshed);
         if (errors.isEmpty() == false) {
             nodeLoad.error = Strings.collectionToCommaDelimitedString(errors);
             return nodeLoad;

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java

@@ -102,6 +102,7 @@ public class MlMemoryTracker implements LocalNodeMasterListener {
     public void onMaster() {
         isMaster = true;
         logger.trace("ML memory tracker on master");
+        asyncRefresh();
     }
 
     @Override

+ 94 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/NativeMemoryCalculator.java

@@ -0,0 +1,94 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ml.utils;
+
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+
+import java.util.OptionalLong;
+
+import static org.elasticsearch.xpack.ml.MachineLearning.MACHINE_MEMORY_NODE_ATTR;
+import static org.elasticsearch.xpack.ml.MachineLearning.MAX_JVM_SIZE_NODE_ATTR;
+import static org.elasticsearch.xpack.ml.MachineLearning.MAX_MACHINE_MEMORY_PERCENT;
+import static org.elasticsearch.xpack.ml.MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT;
+
+public final class NativeMemoryCalculator {
+
+    private static final long OS_OVERHEAD = ByteSizeValue.ofMb(200L).getBytes();
+
+    private NativeMemoryCalculator() { }
+
+    public static OptionalLong allowedBytesForMl(DiscoveryNode node, Settings settings) {
+        return allowedBytesForMl(
+            node.getAttributes().get(MACHINE_MEMORY_NODE_ATTR),
+            node.getAttributes().get(MAX_JVM_SIZE_NODE_ATTR),
+            MAX_MACHINE_MEMORY_PERCENT.get(settings),
+            USE_AUTO_MACHINE_MEMORY_PERCENT.get(settings));
+    }
+
+    public static OptionalLong allowedBytesForMl(DiscoveryNode node, ClusterSettings settings) {
+        return allowedBytesForMl(
+            node.getAttributes().get(MACHINE_MEMORY_NODE_ATTR),
+            node.getAttributes().get(MAX_JVM_SIZE_NODE_ATTR),
+            settings.get(MAX_MACHINE_MEMORY_PERCENT),
+            settings.get(USE_AUTO_MACHINE_MEMORY_PERCENT));
+    }
+
+    public static OptionalLong allowedBytesForMl(DiscoveryNode node, int maxMemoryPercent, boolean useAuto) {
+        return allowedBytesForMl(
+            node.getAttributes().get(MACHINE_MEMORY_NODE_ATTR),
+            node.getAttributes().get(MAX_JVM_SIZE_NODE_ATTR),
+            maxMemoryPercent,
+            useAuto);
+    }
+
+    private static OptionalLong allowedBytesForMl(String nodeBytes, String jvmBytes, int maxMemoryPercent, boolean useAuto) {
+        if (nodeBytes == null) {
+            return OptionalLong.empty();
+        }
+        final long machineMemory;
+        try {
+            machineMemory = Long.parseLong(nodeBytes);
+        } catch (NumberFormatException e) {
+            return OptionalLong.empty();
+        }
+        Long jvmMemory = null;
+        try {
+            if (Strings.isNullOrEmpty(jvmBytes) == false) {
+                jvmMemory = Long.parseLong(jvmBytes);
+            }
+        } catch (NumberFormatException e) {
+            return OptionalLong.empty();
+        }
+        return OptionalLong.of(allowedBytesForMl(machineMemory, jvmMemory, maxMemoryPercent, useAuto));
+    }
+
+    private static long allowedBytesForMl(long machineMemory, Long jvmSize, int maxMemoryPercent, boolean useAuto) {
+        if (useAuto && jvmSize != null) {
+            // It is conceivable that there is a machine smaller than 200MB.
+            // If the administrator wants to use the auto configuration, the node should be larger.
+            if (machineMemory - jvmSize < OS_OVERHEAD || machineMemory == 0) {
+                return machineMemory / 100;
+            }
+            // This calculation is dynamic and designed to maximally take advantage of the underlying machine for machine learning
+            // We only allow 200MB for the Operating system itself and take up to 90% of the underlying native memory left
+            // Example calculations:
+            // 1GB node -> 41%
+            // 2GB node -> 66%
+            // 16GB node -> 87%
+            // 64GB node -> 90%
+            long memoryPercent = Math.min(90, (int)Math.ceil(((machineMemory - jvmSize - OS_OVERHEAD) / (double)machineMemory) * 100.0D));
+            return machineMemory * memoryPercent / 100;
+        }
+
+        return machineMemory * maxMemoryPercent / 100;
+    }
+
+}

+ 9 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportMlInfoActionTests.java

@@ -9,8 +9,11 @@ package org.elasticsearch.xpack.ml.action;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
@@ -19,6 +22,8 @@ import org.elasticsearch.xpack.ml.MachineLearning;
 import java.net.InetAddress;
 import java.util.Collections;
 
+import static org.elasticsearch.xpack.ml.MachineLearning.MAX_MACHINE_MEMORY_PERCENT;
+import static org.elasticsearch.xpack.ml.MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
@@ -28,6 +33,9 @@ public class TransportMlInfoActionTests extends ESTestCase {
     public void testCalculateEffectiveMaxModelMemoryLimit() {
 
         int mlMemoryPercent = randomIntBetween(5, 90);
+        ClusterSettings clusterSettings = new ClusterSettings(
+            Settings.builder().put(MAX_MACHINE_MEMORY_PERCENT.getKey(), mlMemoryPercent).build(),
+            Sets.newHashSet(MAX_MACHINE_MEMORY_PERCENT, USE_AUTO_MACHINE_MEMORY_PERCENT));
         long highestMlMachineMemory = -1;
 
         DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
@@ -49,8 +57,7 @@ public class TransportMlInfoActionTests extends ESTestCase {
         }
         DiscoveryNodes nodes = builder.build();
 
-        ByteSizeValue effectiveMaxModelMemoryLimit =
-            TransportMlInfoAction.calculateEffectiveMaxModelMemoryLimit(mlMemoryPercent, nodes);
+        ByteSizeValue effectiveMaxModelMemoryLimit = TransportMlInfoAction.calculateEffectiveMaxModelMemoryLimit(clusterSettings, nodes);
 
         if (highestMlMachineMemory < 0) {
             assertThat(effectiveMaxModelMemoryLimit, nullValue());

+ 3 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java

@@ -152,7 +152,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
         ClusterService clusterService = mock(ClusterService.class);
         ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY,
             Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
-                MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE)
+                MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE, MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT)
         );
         when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
 
@@ -170,7 +170,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
         ClusterService clusterService = mock(ClusterService.class);
         ClusterSettings clusterSettings = new ClusterSettings(settings,
             Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
-                MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE)
+                MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE, MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT)
         );
         when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
 
@@ -198,7 +198,7 @@ public class TransportOpenJobActionTests extends ESTestCase {
         ClusterService clusterService = mock(ClusterService.class);
         ClusterSettings clusterSettings = new ClusterSettings(settings,
             Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
-                MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE)
+                MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE, MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT)
         );
         when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
 

+ 1 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java

@@ -220,6 +220,7 @@ public class TransportStartDataFrameAnalyticsActionTests extends ESTestCase {
                 Sets.newHashSet(
                     MachineLearning.CONCURRENT_JOB_ALLOCATIONS,
                     MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
+                    MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT,
                     MachineLearning.MAX_LAZY_ML_NODES,
                     MachineLearning.MAX_OPEN_JOBS_PER_NODE));
         when(clusterService.getClusterSettings()).thenReturn(clusterSettings);

+ 6 - 6
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeLoadDetectorTests.java

@@ -79,28 +79,28 @@ public class JobNodeLoadDetectorTests extends ESTestCase {
         metadata.putCustom(PersistentTasksCustomMetadata.TYPE, tasks);
         cs.metadata(metadata);
 
-        NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id1"), 10, -1, true);
+        NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id1"), 10, 30, true, false);
         assertThat(load.getAssignedJobMemory(), equalTo(52428800L));
         assertThat(load.getNumAllocatingJobs(), equalTo(2L));
         assertThat(load.getNumAssignedJobs(), equalTo(2L));
         assertThat(load.getMaxJobs(), equalTo(10));
         assertThat(load.getMaxMlMemory(), equalTo(0L));
 
-        load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id2"), 5, -1, true);
+        load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id2"), 5, 30, true, false);
         assertThat(load.getAssignedJobMemory(), equalTo(41943040L));
         assertThat(load.getNumAllocatingJobs(), equalTo(1L));
         assertThat(load.getNumAssignedJobs(), equalTo(1L));
         assertThat(load.getMaxJobs(), equalTo(5));
         assertThat(load.getMaxMlMemory(), equalTo(0L));
 
-        load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id3"), 5, -1, true);
+        load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id3"), 5, 30, true, false);
         assertThat(load.getAssignedJobMemory(), equalTo(0L));
         assertThat(load.getNumAllocatingJobs(), equalTo(0L));
         assertThat(load.getNumAssignedJobs(), equalTo(0L));
         assertThat(load.getMaxJobs(), equalTo(5));
         assertThat(load.getMaxMlMemory(), equalTo(0L));
 
-        load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id4"), 5, -1, true);
+        load = nodeLoadDetector.detectNodeLoad(cs.build(), true, nodes.get("_node_id4"), 5, 30, true, false);
         assertThat(load.getAssignedJobMemory(), equalTo(41943040L));
         assertThat(load.getNumAllocatingJobs(), equalTo(0L));
         assertThat(load.getNumAssignedJobs(), equalTo(1L));
@@ -122,7 +122,7 @@ public class JobNodeLoadDetectorTests extends ESTestCase {
         Metadata.Builder metadata = Metadata.builder();
         cs.metadata(metadata);
 
-        NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, -1, true);
+        NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, 30, true, false);
         assertThat(load.getError(), containsString("ml.max_open_jobs attribute [foo] is not an integer"));
     }
 
@@ -140,7 +140,7 @@ public class JobNodeLoadDetectorTests extends ESTestCase {
         Metadata.Builder metadata = Metadata.builder();
         cs.metadata(metadata);
 
-        NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, -1, true);
+        NodeLoadDetector.NodeLoad load = nodeLoadDetector.detectNodeLoad(cs.build(), false, nodes.get("_node_id1"), 10, -1, true, false);
         assertThat(load.getError(), containsString("ml.machine_memory attribute [bar] is not a long"));
     }
 

+ 19 - 19
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java

@@ -57,7 +57,7 @@ public class JobNodeSelectorTests extends ESTestCase {
     public void setup() {
         memoryTracker = mock(MlMemoryTracker.class);
         isMemoryTrackerRecentlyRefreshed = true;
-        when(memoryTracker.isRecentlyRefreshed()).thenReturn(isMemoryTrackerRecentlyRefreshed);
+        when(memoryTracker.isRecentlyRefreshed()).thenReturn(isMemoryTrackerRecentlyRefreshed, false);
         when(memoryTracker.getAnomalyDetectorJobMemoryRequirement(anyString())).thenReturn(JOB_MEMORY_REQUIREMENT.getBytes());
         when(memoryTracker.getDataFrameAnalyticsJobMemoryRequirement(anyString())).thenReturn(JOB_MEMORY_REQUIREMENT.getBytes());
         when(memoryTracker.getJobMemoryRequirement(anyString(), anyString())).thenReturn(JOB_MEMORY_REQUIREMENT.getBytes());
@@ -118,7 +118,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         Job job = jobBuilder.build();
         JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> TransportOpenJobAction.nodeFilter(node, job));
-        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed);
+        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false);
         assertEquals("", result.getExplanation());
         assertEquals("_node_id3", result.getExecutorNode());
     }
@@ -140,7 +140,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> TransportOpenJobAction.nodeFilter(node, job));
         PersistentTasksCustomMetadata.Assignment result =
-            jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
+            jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed, false);
         assertNull(result.getExecutorNode());
         assertThat(result.getExplanation(), containsString("because this node is full. Number of opened jobs ["
             + maxRunningJobsPerNode + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]"));
@@ -164,7 +164,7 @@ public class JobNodeSelectorTests extends ESTestCase {
             MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0,
             node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId)));
         PersistentTasksCustomMetadata.Assignment result =
-            jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
+            jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed, false);
         assertNull(result.getExecutorNode());
         assertThat(result.getExplanation(), containsString("because this node is full. Number of opened jobs ["
             + maxRunningJobsPerNode + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]"));
@@ -193,7 +193,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> TransportOpenJobAction.nodeFilter(node, job));
         PersistentTasksCustomMetadata.Assignment result =
-            jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
+            jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed, false);
         assertNull(result.getExecutorNode());
         assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
             + "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs ["
@@ -217,7 +217,7 @@ public class JobNodeSelectorTests extends ESTestCase {
             MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0,
             node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId)));
         PersistentTasksCustomMetadata.Assignment result =
-            jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
+            jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed, false);
         assertNotNull(result.getExecutorNode());
     }
 
@@ -239,7 +239,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker,
             0, node -> TransportOpenJobAction.nodeFilter(node, job));
         PersistentTasksCustomMetadata.Assignment result =
-            jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
+            jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed, false);
         assertNull(result.getExecutorNode());
         assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
             + "Available memory for ML [" + (firstJobTotalMemory - 1)
@@ -270,7 +270,7 @@ public class JobNodeSelectorTests extends ESTestCase {
             MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0,
             node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId)));
         PersistentTasksCustomMetadata.Assignment result =
-            jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
+            jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed, false);
         assertNull(result.getExecutorNode());
         assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
             + "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs ["
@@ -296,7 +296,7 @@ public class JobNodeSelectorTests extends ESTestCase {
             MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0,
             node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, createTaskParams(dataFrameAnalyticsId)));
         PersistentTasksCustomMetadata.Assignment result =
-            jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
+            jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed, false);
         assertNull(result.getExecutorNode());
         assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
             + "Available memory for ML [" + (firstJobTotalMemory - 1)
@@ -325,7 +325,7 @@ public class JobNodeSelectorTests extends ESTestCase {
 
         JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> TransportOpenJobAction.nodeFilter(node, job));
-        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(20, 2, 30, isMemoryTrackerRecentlyRefreshed);
+        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(20, 2, 30, isMemoryTrackerRecentlyRefreshed, false);
         assertTrue(result.getExplanation().contains("because this node isn't a ml node"));
         assertNull(result.getExecutorNode());
     }
@@ -362,7 +362,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         ClusterState cs = csBuilder.build();
         JobNodeSelector jobNodeSelector = new JobNodeSelector(cs, job6.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> TransportOpenJobAction.nodeFilter(node, job6));
-        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed);
+        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false);
         assertEquals("_node_id3", result.getExecutorNode());
 
         tasksBuilder = PersistentTasksCustomMetadata.builder(tasks);
@@ -376,7 +376,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         Job job7 = BaseMlIntegTestCase.createFareQuoteJob("job_id7", JOB_MEMORY_REQUIREMENT).build(new Date());
         jobNodeSelector = new JobNodeSelector(cs, job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> TransportOpenJobAction.nodeFilter(node, job7));
-        result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed);
+        result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false);
         assertNull("no node selected, because OPENING state", result.getExecutorNode());
         assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
 
@@ -390,7 +390,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs = csBuilder.build();
         jobNodeSelector = new JobNodeSelector(cs, job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> TransportOpenJobAction.nodeFilter(node, job7));
-        result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed);
+        result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false);
         assertNull("no node selected, because stale task", result.getExecutorNode());
         assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
 
@@ -403,7 +403,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs = csBuilder.build();
         jobNodeSelector = new JobNodeSelector(cs, job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> TransportOpenJobAction.nodeFilter(node, job7));
-        result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed);
+        result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false);
         assertNull("no node selected, because null state", result.getExecutorNode());
         assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
     }
@@ -445,7 +445,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         // Allocation won't be possible if the stale failed job is treated as opening
         JobNodeSelector jobNodeSelector = new JobNodeSelector(cs, job7.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> TransportOpenJobAction.nodeFilter(node, job7));
-        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed);
+        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false);
         assertEquals("_node_id1", result.getExecutorNode());
 
         tasksBuilder = PersistentTasksCustomMetadata.builder(tasks);
@@ -458,7 +458,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         Job job8 = BaseMlIntegTestCase.createFareQuoteJob("job_id8", JOB_MEMORY_REQUIREMENT).build(new Date());
         jobNodeSelector = new JobNodeSelector(cs, job8.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> TransportOpenJobAction.nodeFilter(node, job8));
-        result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed);
+        result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false);
         assertNull("no node selected, because OPENING state", result.getExecutorNode());
         assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
     }
@@ -492,7 +492,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs.metadata(metadata);
         JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> TransportOpenJobAction.nodeFilter(node, job));
-        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed);
+        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false);
         assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]"));
         assertNull(result.getExecutorNode());
     }
@@ -524,7 +524,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         cs.metadata(metadata);
         JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(),
             MlTasks.JOB_TASK_NAME, memoryTracker, 0, node -> TransportOpenJobAction.nodeFilter(node, job));
-        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed);
+        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false);
         assertThat(result.getExplanation(), containsString(
             "because the job's model snapshot requires a node of version [6.3.0] or higher"));
         assertNull(result.getExecutorNode());
@@ -554,7 +554,7 @@ public class JobNodeSelectorTests extends ESTestCase {
         Job job = TransportOpenJobActionTests.jobWithRules("job_with_rules");
         JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, 0,
             node -> TransportOpenJobAction.nodeFilter(node, job));
-        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed);
+        PersistentTasksCustomMetadata.Assignment result = jobNodeSelector.selectNode(10, 2, 30, isMemoryTrackerRecentlyRefreshed, false);
         assertNotNull(result.getExecutorNode());
     }
 

+ 3 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java

@@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.process;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
@@ -62,7 +63,9 @@ public class MlMemoryTrackerTests extends ESTestCase {
         ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY,
             Collections.singleton(PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING));
         ClusterService clusterService = mock(ClusterService.class);
+        ClusterState clusterState = ClusterState.EMPTY_STATE;
         when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
+        when(clusterService.state()).thenReturn(clusterState);
         ThreadPool threadPool = mock(ThreadPool.class);
         ExecutorService executorService = mock(ExecutorService.class);
         doAnswer(invocation -> {

+ 139 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/NativeMemoryCalculatorTests.java

@@ -0,0 +1,139 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.ml.utils;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.OptionalLong;
+
+import static org.elasticsearch.xpack.ml.MachineLearning.MACHINE_MEMORY_NODE_ATTR;
+import static org.elasticsearch.xpack.ml.MachineLearning.MAX_JVM_SIZE_NODE_ATTR;
+import static org.elasticsearch.xpack.ml.MachineLearning.MAX_MACHINE_MEMORY_PERCENT;
+import static org.elasticsearch.xpack.ml.MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT;
+import static org.hamcrest.Matchers.equalTo;
+
+public class NativeMemoryCalculatorTests extends ESTestCase{
+
+    private static final int NUM_TEST_RUNS = 10;
+    public void testAllowedBytesForMLWhenAutoIsFalse() {
+        for (int i = 0; i < NUM_TEST_RUNS; i++) {
+            long nodeSize = randomLongBetween(ByteSizeValue.ofMb(500).getBytes(), ByteSizeValue.ofGb(64).getBytes());
+            int percent = randomIntBetween(5, 200);
+            DiscoveryNode node = newNode(randomBoolean() ? null : randomNonNegativeLong(), nodeSize);
+            Settings settings = newSettings(percent, false);
+            ClusterSettings clusterSettings = newClusterSettings(percent, false);
+
+            long expected = nodeSize * percent / 100;
+
+            assertThat(NativeMemoryCalculator.allowedBytesForMl(node, settings).getAsLong(), equalTo(expected));
+            assertThat(NativeMemoryCalculator.allowedBytesForMl(node, clusterSettings).getAsLong(), equalTo(expected));
+            assertThat(NativeMemoryCalculator.allowedBytesForMl(node, percent, false).getAsLong(), equalTo(expected));
+        }
+    }
+
+    public void testAllowedBytesForMlWhenAutoIsTrue() {
+        for (int i = 0; i < NUM_TEST_RUNS; i++) {
+            long nodeSize = randomLongBetween(ByteSizeValue.ofMb(500).getBytes(), ByteSizeValue.ofGb(64).getBytes());
+            long jvmSize = randomLongBetween(ByteSizeValue.ofMb(250).getBytes(), nodeSize - ByteSizeValue.ofMb(200).getBytes());
+            int percent = randomIntBetween(5, 200);
+            DiscoveryNode node = newNode(jvmSize, nodeSize);
+            Settings settings = newSettings(percent, true);
+            ClusterSettings clusterSettings = newClusterSettings(percent, true);
+
+            int truePercent = Math.min(
+                90,
+                (int)Math.ceil(((nodeSize - jvmSize - ByteSizeValue.ofMb(200).getBytes()) / (double)nodeSize) * 100.0D));
+            long expected = nodeSize * truePercent / 100;
+
+            assertThat(NativeMemoryCalculator.allowedBytesForMl(node, settings).getAsLong(), equalTo(expected));
+            assertThat(NativeMemoryCalculator.allowedBytesForMl(node, clusterSettings).getAsLong(), equalTo(expected));
+            assertThat(NativeMemoryCalculator.allowedBytesForMl(node, percent, true).getAsLong(), equalTo(expected));
+        }
+    }
+
+    public void testAllowedBytesForMlWhenAutoIsTrueButJVMSizeIsUnknown() {
+        long nodeSize = randomLongBetween(ByteSizeValue.ofMb(500).getBytes(), ByteSizeValue.ofGb(64).getBytes());
+        int percent = randomIntBetween(5, 200);
+        DiscoveryNode node = newNode(null, nodeSize);
+        Settings settings = newSettings(percent, true);
+        ClusterSettings clusterSettings = newClusterSettings(percent, true);
+
+        long expected = nodeSize * percent / 100;
+
+        assertThat(NativeMemoryCalculator.allowedBytesForMl(node, settings).getAsLong(), equalTo(expected));
+        assertThat(NativeMemoryCalculator.allowedBytesForMl(node, clusterSettings).getAsLong(), equalTo(expected));
+        assertThat(NativeMemoryCalculator.allowedBytesForMl(node, percent, false).getAsLong(), equalTo(expected));
+    }
+
+    public void testAllowedBytesForMlWhenBothJVMAndNodeSizeAreUnknown() {
+        int percent = randomIntBetween(5, 200);
+        DiscoveryNode node = newNode(null, null);
+        Settings settings = newSettings(percent, randomBoolean());
+        ClusterSettings clusterSettings = newClusterSettings(percent, randomBoolean());
+
+        assertThat(NativeMemoryCalculator.allowedBytesForMl(node, settings), equalTo(OptionalLong.empty()));
+        assertThat(NativeMemoryCalculator.allowedBytesForMl(node, clusterSettings), equalTo(OptionalLong.empty()));
+        assertThat(NativeMemoryCalculator.allowedBytesForMl(node, percent, randomBoolean()), equalTo(OptionalLong.empty()));
+    }
+
+    public void testTinyNode() {
+        for (int i = 0; i < NUM_TEST_RUNS; i++) {
+            long nodeSize = randomLongBetween(0, ByteSizeValue.ofMb(200).getBytes());
+            long jvmSize = randomLongBetween(0, ByteSizeValue.ofMb(200).getBytes());
+            int percent = randomIntBetween(5, 200);
+            DiscoveryNode node = newNode(jvmSize, nodeSize);
+            Settings settings = newSettings(percent, true);
+            ClusterSettings clusterSettings = newClusterSettings(percent, true);
+            long expected = nodeSize / 100;
+            assertThat(NativeMemoryCalculator.allowedBytesForMl(node, settings).getAsLong(), equalTo(expected));
+            assertThat(NativeMemoryCalculator.allowedBytesForMl(node, clusterSettings).getAsLong(), equalTo(expected));
+            assertThat(NativeMemoryCalculator.allowedBytesForMl(node, percent, true).getAsLong(), equalTo(expected));
+        }
+    }
+
+    private static Settings newSettings(int maxMemoryPercent, boolean useAuto) {
+        return Settings.builder()
+            .put(USE_AUTO_MACHINE_MEMORY_PERCENT.getKey(), useAuto)
+            .put(MAX_MACHINE_MEMORY_PERCENT.getKey(), maxMemoryPercent)
+            .build();
+    }
+
+    private static ClusterSettings newClusterSettings(int maxMemoryPercent, boolean useAuto) {
+        return new ClusterSettings(
+            newSettings(maxMemoryPercent, useAuto),
+            Sets.newHashSet(USE_AUTO_MACHINE_MEMORY_PERCENT, MAX_MACHINE_MEMORY_PERCENT));
+    }
+
+    private static DiscoveryNode newNode(Long jvmSizeLong, Long nodeSizeLong) {
+        String jvmSize = jvmSizeLong != null ? jvmSizeLong.toString() : null;
+        String nodeSize = nodeSizeLong != null ? nodeSizeLong.toString() : null;
+        Map<String, String> attrs = new HashMap<>();
+        if (jvmSize != null) {
+            attrs.put(MAX_JVM_SIZE_NODE_ATTR, jvmSize);
+        }
+        if (nodeSize != null) {
+            attrs.put(MACHINE_MEMORY_NODE_ATTR, nodeSize);
+        }
+        return new DiscoveryNode(
+            "node",
+            ESTestCase.buildNewFakeTransportAddress(),
+            attrs,
+            DiscoveryNodeRole.BUILT_IN_ROLES,
+            Version.CURRENT
+        );
+    }
+
+}