Browse Source

[ML] Maintain ml node attribute for processors as integer for BWC (#89896)

In #89645 I switched the ml node attribute for allocated processors
to write a string representation of a double value. However, that
means the attribute value cannot be parsed in a mixed cluster version
when the node that tries to read it is before 8.5.0.

This commit addresses this problem by introducing a new attribute
for the version the has a string representation of the double value
and keeps writing the older version of the attribute as integer.

It also refactors the logic of parsing the attribute value given
a node in a single place as the logic now involves BWC and the
duplication is adding complexity.
Dimitris Athanasiou 3 years ago
parent
commit
9e530e163c

+ 18 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -557,7 +557,11 @@ public class MachineLearning extends Plugin
     private static final String PRE_V8_MAX_OPEN_JOBS_NODE_ATTR = "ml.max_open_jobs";
     public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory";
     public static final String MAX_JVM_SIZE_NODE_ATTR = "ml.max_jvm_size";
-    public static final String ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors";
+
+    // TODO Remove if compatibility with 8.x is no longer necessary
+    public static final String PRE_V_8_5_ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors";
+
+    public static final String ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors_double";
     public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting(
         "xpack.ml.node_concurrent_job_allocations",
         2,
@@ -753,6 +757,7 @@ public class MachineLearning extends Plugin
         String maxOpenJobsPerNodeNodeAttrName = "node.attr." + PRE_V8_MAX_OPEN_JOBS_NODE_ATTR;
         String machineMemoryAttrName = "node.attr." + MACHINE_MEMORY_NODE_ATTR;
         String jvmSizeAttrName = "node.attr." + MAX_JVM_SIZE_NODE_ATTR;
+        String deprecatedAllocatedProcessorsAttrName = "node.attr." + PRE_V_8_5_ALLOCATED_PROCESSORS_NODE_ATTR;
         String allocatedProcessorsAttrName = "node.attr." + ALLOCATED_PROCESSORS_NODE_ATTR;
 
         if (enabled == false) {
@@ -768,11 +773,22 @@ public class MachineLearning extends Plugin
                 Long.toString(OsProbe.getInstance().osStats().getMem().getAdjustedTotal().getBytes())
             );
             addMlNodeAttribute(additionalSettings, jvmSizeAttrName, Long.toString(Runtime.getRuntime().maxMemory()));
+            addMlNodeAttribute(
+                additionalSettings,
+                deprecatedAllocatedProcessorsAttrName,
+                Integer.toString(EsExecutors.allocatedProcessors(settings))
+            );
             addMlNodeAttribute(additionalSettings, allocatedProcessorsAttrName, Double.toString(getAllocatedProcessors().count()));
             // This is not used in v8 and higher, but users are still prevented from setting it directly to avoid confusion
             disallowMlNodeAttributes(maxOpenJobsPerNodeNodeAttrName);
         } else {
-            disallowMlNodeAttributes(maxOpenJobsPerNodeNodeAttrName, machineMemoryAttrName, jvmSizeAttrName, allocatedProcessorsAttrName);
+            disallowMlNodeAttributes(
+                maxOpenJobsPerNodeNodeAttrName,
+                machineMemoryAttrName,
+                jvmSizeAttrName,
+                deprecatedAllocatedProcessorsAttrName,
+                allocatedProcessorsAttrName
+            );
         }
         return additionalSettings.build();
     }

+ 2 - 15
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java

@@ -13,7 +13,6 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.Processors;
 import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
@@ -30,6 +29,7 @@ import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.job.NodeLoad;
 import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
+import org.elasticsearch.xpack.ml.utils.MlProcessors;
 import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;
 
 import java.time.Duration;
@@ -819,20 +819,7 @@ class MlMemoryAutoscalingDecider {
         int totalRequiredProcessors = assignments.stream()
             .mapToInt(t -> t.getTaskParams().getNumberOfAllocations() * t.getTaskParams().getThreadsPerAllocation())
             .sum();
-        int totalMlProcessors = mlNodes.stream().mapToInt(node -> {
-            String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
-            try {
-                double allocatedProcessorsAsDouble = Double.parseDouble(allocatedProcessorsString);
-                return allocatedProcessorsAsDouble > 0 ? Processors.of(allocatedProcessorsAsDouble).roundUp() : 0;
-            } catch (NumberFormatException e) {
-                assert e == null
-                    : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
-                        + " should parse because we set it internally: invalid value was ["
-                        + allocatedProcessorsString
-                        + "]";
-                return 0;
-            }
-        }).sum();
+        int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node).roundUp()).sum();
         return totalRequiredProcessors * 2 > totalMlProcessors;
     }
 

+ 2 - 20
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java

@@ -16,8 +16,8 @@ import org.elasticsearch.logging.LogManager;
 import org.elasticsearch.logging.Logger;
 import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext;
 import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
-import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentMetadata;
+import org.elasticsearch.xpack.ml.utils.MlProcessors;
 
 import java.time.Instant;
 import java.util.List;
@@ -138,7 +138,7 @@ class MlProcessorAutoscalingDecider {
         Processors maxNodeProcessors = Processors.ZERO;
         Processors tierProcessors = Processors.ZERO;
         for (DiscoveryNode node : mlNodes) {
-            Processors nodeProcessors = getProcessors(node);
+            Processors nodeProcessors = MlProcessors.get(node);
             if (nodeProcessors.compareTo(maxNodeProcessors) > 0) {
                 maxNodeProcessors = nodeProcessors;
             }
@@ -146,22 +146,4 @@ class MlProcessorAutoscalingDecider {
         }
         return MlProcessorAutoscalingCapacity.builder(maxNodeProcessors, tierProcessors).build();
     }
-
-    private Processors getProcessors(DiscoveryNode node) {
-        String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
-        if (allocatedProcessorsString == null) {
-            return Processors.ZERO;
-        }
-        try {
-            double processorsAsDouble = Double.parseDouble(allocatedProcessorsString);
-            return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO;
-        } catch (NumberFormatException e) {
-            assert e == null
-                : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
-                    + " should parse because we set it internally: invalid value was ["
-                    + allocatedProcessorsString
-                    + "]";
-            return Processors.ZERO;
-        }
-    }
 }

+ 4 - 18
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java

@@ -14,7 +14,6 @@ import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.unit.Processors;
 import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
 import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo;
 import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingState;
@@ -24,6 +23,7 @@ import org.elasticsearch.xpack.ml.autoscaling.NodeAvailabilityZoneMapper;
 import org.elasticsearch.xpack.ml.inference.assignment.planning.AssignmentPlan;
 import org.elasticsearch.xpack.ml.inference.assignment.planning.ZoneAwareAssignmentPlanner;
 import org.elasticsearch.xpack.ml.job.NodeLoad;
+import org.elasticsearch.xpack.ml.utils.MlProcessors;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -142,7 +142,7 @@ class TrainedModelAssignmentRebalancer {
                                 // We subtract native inference memory as the planner expects available memory for
                                 // native inference including current assignments.
                                 getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(load),
-                                getNodeAllocatedProcessors(discoveryNode).roundUp()
+                                MlProcessors.get(discoveryNode).roundUp()
                             )
                         );
                     } else {
@@ -158,20 +158,6 @@ class TrainedModelAssignmentRebalancer {
         }));
     }
 
-    private static Processors getNodeAllocatedProcessors(DiscoveryNode node) {
-        String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
-        try {
-            double allocatedProcessorsAsDouble = allocatedProcessorsString == null ? 0.0 : Double.parseDouble(allocatedProcessorsString);
-            return allocatedProcessorsAsDouble > 0 ? Processors.of(allocatedProcessorsAsDouble) : Processors.ZERO;
-        } catch (NumberFormatException e) {
-            assert e == null
-                : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
-                    + " should parse because we set it internally: invalid value was "
-                    + allocatedProcessorsString;
-            return Processors.ZERO;
-        }
-    }
-
     private static long getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(NodeLoad load) {
         return load.getFreeMemoryExcludingPerNodeOverhead() - load.getAssignedNativeInferenceMemory();
     }
@@ -267,7 +253,7 @@ class TrainedModelAssignmentRebalancer {
             // But we should also check if we managed to assign a model during the rebalance for which
             // we check if the node has used up any of its allocated processors.
             boolean isPerNodeOverheadAccountedFor = load.getNumAssignedJobsAndModels() > 0
-                || assignmentPlan.getRemainingNodeCores(load.getNodeId()) < getNodeAllocatedProcessors(node).roundUp();
+                || assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node).roundUp();
             long requiredMemory = model.memoryBytes() + (isPerNodeOverheadAccountedFor
                 ? 0
                 : MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes());
@@ -296,7 +282,7 @@ class TrainedModelAssignmentRebalancer {
                     "This node has insufficient allocated processors. Available processors [{}], free processors [{}], "
                         + "processors required for each allocation of this model [{}]",
                     new Object[] {
-                        getNodeAllocatedProcessors(node).roundUp(),
+                        MlProcessors.get(node).roundUp(),
                         assignmentPlan.getRemainingNodeCores(node.getId()),
                         model.threadsPerAllocation() }
                 )

+ 38 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java

@@ -0,0 +1,38 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.ml.utils;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.unit.Processors;
+import org.elasticsearch.xpack.ml.MachineLearning;
+
+public final class MlProcessors {
+
+    private MlProcessors() {}
+
+    public static Processors get(DiscoveryNode node) {
+        String allocatedProcessorsString = node.getVersion().onOrAfter(Version.V_8_5_0)
+            ? node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR)
+            : node.getAttributes().get(MachineLearning.PRE_V_8_5_ALLOCATED_PROCESSORS_NODE_ATTR);
+        if (allocatedProcessorsString == null) {
+            return Processors.ZERO;
+        }
+        try {
+            double processorsAsDouble = Double.parseDouble(allocatedProcessorsString);
+            return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO;
+        } catch (NumberFormatException e) {
+            assert e == null
+                : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
+                    + " should parse because we set it internally: invalid value was ["
+                    + allocatedProcessorsString
+                    + "]";
+            return Processors.ZERO;
+        }
+    }
+}