Bläddra i källkod

[ML] Include ML processor limits in `_ml/info` response (#101392)

The _ml/info response now includes two extra fields in its
`limits`:

1. `max_single_ml_node_processors`
2. `total_ml_processors`

These fields are _only_ included if they can be accurately
calculated. If autoscaling is enabled and the ML nodes are
not at their maximum size then these fields _cannot_
currently be accurately calculated. (This could potentially
be improved in the future with additional settings set by
the control plane.)
David Roberts 2 år sedan
förälder
incheckning
9286716660

+ 5 - 0
docs/changelog/101392.yaml

@@ -0,0 +1,5 @@
+pr: 101392
+summary: Include ML processor limits in `_ml/info` response
+area: Machine Learning
+type: enhancement
+issues: []

+ 5 - 1
docs/reference/ml/common/apis/get-ml-info.asciidoc

@@ -120,7 +120,9 @@ This is a possible response:
   },
   "limits" : {
     "effective_max_model_memory_limit": "28961mb",
-    "total_ml_memory": "86883mb"
+    "total_ml_memory": "86883mb",
+    "total_ml_processors": 16,
+    "max_single_ml_node_processors": 16
   }
 }
 ----
@@ -129,3 +131,5 @@ This is a possible response:
 // TESTRESPONSE[s/"build_hash": "99a07c016d5a73"/"build_hash": "$body.native_code.build_hash"/]
 // TESTRESPONSE[s/"effective_max_model_memory_limit": "28961mb"/"effective_max_model_memory_limit": "$body.limits.effective_max_model_memory_limit"/]
 // TESTRESPONSE[s/"total_ml_memory": "86883mb"/"total_ml_memory": "$body.limits.total_ml_memory"/]
+// TESTRESPONSE[s/"total_ml_processors": 16/"total_ml_processors": $body.limits.total_ml_processors/]
+// TESTRESPONSE[s/"max_single_ml_node_processors": 16/"max_single_ml_node_processors": $body.limits.max_single_ml_node_processors/]

+ 40 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportMlInfoAction.java

@@ -11,11 +11,13 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.Processors;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportService;
@@ -26,14 +28,20 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
 import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.ml.MachineLearning;
+import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
 import org.elasticsearch.xpack.ml.process.MlControllerHolder;
+import org.elasticsearch.xpack.ml.utils.MlProcessors;
 import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.concurrent.TimeoutException;
 
 public class TransportMlInfoAction extends HandledTransportAction<MlInfoAction.Request, MlInfoAction.Response> {
@@ -137,6 +145,38 @@ public class TransportMlInfoAction extends HandledTransportAction<MlInfoAction.R
             limits.put("effective_max_model_memory_limit", effectiveMaxModelMemoryLimit.getStringRep());
         }
         limits.put("total_ml_memory", NativeMemoryCalculator.calculateTotalMlMemory(clusterSettings, nodes).getStringRep());
+
+        // Add processor information _if_ known with certainty. It won't be known with certainty if autoscaling is enabled.
+        // If we can scale up in terms of memory, assume we can also scale up in terms of processors.
+        List<DiscoveryNode> mlNodes = nodes.stream().filter(MachineLearning::isMlNode).toList();
+        if (areMlNodesBiggestSize(clusterSettings.get(MachineLearning.MAX_ML_NODE_SIZE), mlNodes)) {
+            Processors singleNodeProcessors = MlProcessors.getMaxMlNodeProcessors(
+                nodes,
+                clusterSettings.get(MachineLearning.ALLOCATED_PROCESSORS_SCALE)
+            );
+            if (singleNodeProcessors.count() > 0) {
+                limits.put("max_single_ml_node_processors", singleNodeProcessors.roundDown());
+            }
+            Processors totalMlProcessors = MlProcessors.getTotalMlNodeProcessors(
+                nodes,
+                clusterSettings.get(MachineLearning.ALLOCATED_PROCESSORS_SCALE)
+            );
+            if (totalMlProcessors.count() > 0) {
+                int potentialExtraProcessors = Math.max(0, clusterSettings.get(MachineLearning.MAX_LAZY_ML_NODES) - mlNodes.size())
+                    * singleNodeProcessors.roundDown();
+                limits.put("total_ml_processors", totalMlProcessors.roundDown() + potentialExtraProcessors);
+            }
+        }
         return limits;
     }
+
+    static boolean areMlNodesBiggestSize(ByteSizeValue maxMLNodeSize, Collection<DiscoveryNode> mlNodes) {
+        if (maxMLNodeSize.getBytes() == 0) {
+            return true;
+        }
+
+        OptionalLong smallestMLNode = mlNodes.stream().map(NodeLoadDetector::getNodeSize).flatMapToLong(OptionalLong::stream).min();
+
+        return smallestMLNode.isPresent() && smallestMLNode.getAsLong() >= maxMLNodeSize.getBytes();
+    }
 }

+ 27 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java

@@ -8,6 +8,8 @@
 package org.elasticsearch.xpack.ml.utils;
 
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.unit.Processors;
 import org.elasticsearch.xpack.ml.MachineLearning;
 
@@ -44,4 +46,29 @@ public final class MlProcessors {
             return Processors.ZERO;
         }
     }
+
+    public static Processors getMaxMlNodeProcessors(DiscoveryNodes nodes, Integer allocatedProcessorScale) {
+        Processors answer = Processors.ZERO;
+        for (DiscoveryNode node : nodes) {
+            if (node.getRoles().contains(DiscoveryNodeRole.ML_ROLE)) {
+                Processors nodeProcessors = get(node, allocatedProcessorScale);
+                if (answer.compareTo(nodeProcessors) < 0) {
+                    answer = nodeProcessors;
+                }
+            }
+        }
+        return answer;
+    }
+
+    public static Processors getTotalMlNodeProcessors(DiscoveryNodes nodes, Integer allocatedProcessorScale) {
+        int total = 0;
+        for (DiscoveryNode node : nodes) {
+            if (node.getRoles().contains(DiscoveryNodeRole.ML_ROLE)) {
+                Processors nodeProcessors = get(node, allocatedProcessorScale);
+                // Round down before summing, because ML only uses whole processors
+                total += nodeProcessors.roundDown();
+            }
+        }
+        return Processors.of((double) total);
+    }
 }

+ 37 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportMlInfoActionTests.java

@@ -0,0 +1,37 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.ml.action;
+
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.ml.MachineLearning;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.hamcrest.Matchers.is;
+
+public class TransportMlInfoActionTests extends ESTestCase {
+
+    public void testAreMlNodesBiggestSize() {
+        boolean expectedResult = randomBoolean();
+        long mlNodeSize = randomLongBetween(10000000L, 10000000000L);
+        long biggestSize = expectedResult ? mlNodeSize : mlNodeSize * randomLongBetween(2, 5);
+        int numMlNodes = randomIntBetween(2, 4);
+        var nodes = Stream.generate(
+            () -> DiscoveryNodeUtils.builder("node")
+                .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                .attributes(Map.of(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(mlNodeSize)))
+                .build()
+        ).limit(numMlNodes).toList();
+        assertThat(TransportMlInfoAction.areMlNodesBiggestSize(ByteSizeValue.ofBytes(biggestSize), nodes), is(expectedResult));
+    }
+}

+ 189 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlProcessorsTests.java

@@ -7,11 +7,14 @@
 
 package org.elasticsearch.xpack.ml.utils;
 
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.ml.MachineLearning;
 
 import java.util.Map;
+import java.util.Set;
 
 import static org.hamcrest.Matchers.equalTo;
 
@@ -34,4 +37,190 @@ public class MlProcessorsTests extends ESTestCase {
         var processor = MlProcessors.get(node, null);
         assertThat(processor.count(), equalTo(8.0));
     }
+
+    public void testGetMaxMlNodeProcessors() {
+        var nodes = DiscoveryNodes.builder()
+            .add(
+                DiscoveryNodeUtils.builder("n1")
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n2")
+                    .roles(Set.of(DiscoveryNodeRole.DATA_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n3")
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n4")
+                    .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0"))
+                    .build()
+            )
+            .build();
+        var processor = MlProcessors.getMaxMlNodeProcessors(nodes, 1);
+        assertThat(processor.count(), equalTo(8.0));
+    }
+
+    public void testGetMaxMlNodeProcessorsWithScale() {
+        var nodes = DiscoveryNodes.builder()
+            .add(
+                DiscoveryNodeUtils.builder("n1")
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n2")
+                    .roles(Set.of(DiscoveryNodeRole.DATA_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n3")
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "12.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n4")
+                    .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "10.0"))
+                    .build()
+            )
+            .build();
+        var processor = MlProcessors.getMaxMlNodeProcessors(nodes, 2);
+        assertThat(processor.count(), equalTo(6.0));
+    }
+
+    public void testGetMaxMlNodeProcessorsWithNull() {
+        var nodes = DiscoveryNodes.builder()
+            .add(
+                DiscoveryNodeUtils.builder("n1")
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n2")
+                    .roles(Set.of(DiscoveryNodeRole.DATA_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n3")
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n4")
+                    .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0"))
+                    .build()
+            )
+            .build();
+        var processor = MlProcessors.getMaxMlNodeProcessors(nodes, null);
+        assertThat(processor.count(), equalTo(7.0));
+    }
+
+    public void testGetTotalMlNodeProcessors() {
+        var nodes = DiscoveryNodes.builder()
+            .add(
+                DiscoveryNodeUtils.builder("n1")
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n2")
+                    .roles(Set.of(DiscoveryNodeRole.DATA_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n3")
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n4")
+                    .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0"))
+                    .build()
+            )
+            .build();
+        var processor = MlProcessors.getTotalMlNodeProcessors(nodes, 1);
+        assertThat(processor.count(), equalTo(15.0));
+    }
+
+    public void testGetTotalMlNodeProcessorsWithScale() {
+        var nodes = DiscoveryNodes.builder()
+            .add(
+                DiscoveryNodeUtils.builder("n1")
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n2")
+                    .roles(Set.of(DiscoveryNodeRole.DATA_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n3")
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n4")
+                    .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0"))
+                    .build()
+            )
+            .build();
+        var processor = MlProcessors.getTotalMlNodeProcessors(nodes, 2);
+        assertThat(processor.count(), equalTo(7.0));
+    }
+
+    public void testGetTotalMlNodeProcessorsWithNull() {
+        var nodes = DiscoveryNodes.builder()
+            .add(
+                DiscoveryNodeUtils.builder("n1")
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.5"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n2")
+                    .roles(Set.of(DiscoveryNodeRole.DATA_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "9.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n3")
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "7.0"))
+                    .build()
+            )
+            .add(
+                DiscoveryNodeUtils.builder("n4")
+                    .roles(Set.of(DiscoveryNodeRole.MASTER_ROLE))
+                    .attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "6.0"))
+                    .build()
+            )
+            .build();
+        var processor = MlProcessors.getTotalMlNodeProcessors(nodes, null);
+        assertThat(processor.count(), equalTo(13.0));
+    }
 }