Browse Source

[ML] Make the scale the processor count setting updatable (#98298)

David Kyle 2 years ago
parent
commit
92316f5391
15 changed files with 206 additions and 117 deletions
  1. 3 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  2. 2 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetMlAutoscalingStats.java
  3. 18 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java
  4. 4 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java
  5. 13 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java
  6. 10 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java
  7. 11 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java
  8. 18 18
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java
  9. 3 8
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java
  10. 1 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java
  11. 5 5
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java
  12. 24 7
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java
  13. 2 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java
  14. 84 57
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancerTests.java
  15. 8 4
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlProcessorsTests.java

+ 3 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -596,10 +596,10 @@ public class MachineLearning extends Plugin
      * allocations that can be deployed on a node.
      */
     public static final Setting<Integer> ALLOCATED_PROCESSORS_SCALE = Setting.intSetting(
-        "ml.allocated_processors_scale",
+        "xpack.ml.allocated_processors_scale",
         1,
         1,
-        Property.OperatorDynamic,
+        Property.Dynamic,
         Property.NodeScope
     );
 
@@ -782,6 +782,7 @@ public class MachineLearning extends Plugin
     @Override
     public List<Setting<?>> getSettings() {
         return List.of(
+            ALLOCATED_PROCESSORS_SCALE,
             MachineLearningField.AUTODETECT_PROCESS,
             PROCESS_CONNECT_TIMEOUT,
             CONCURRENT_JOB_ALLOCATIONS,

+ 2 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetMlAutoscalingStats.java

@@ -77,6 +77,7 @@ public class TransportGetMlAutoscalingStats extends TransportMasterNodeAction<Re
         if (mlMemoryTracker.isRecentlyRefreshed()) {
             MlAutoscalingResourceTracker.getMlAutoscalingStats(
                 state,
+                clusterService.getClusterSettings(),
                 parentTaskAssigningClient,
                 request.timeout(),
                 mlMemoryTracker,
@@ -96,6 +97,7 @@ public class TransportGetMlAutoscalingStats extends TransportMasterNodeAction<Re
                     ActionListener.wrap(
                         ignored -> MlAutoscalingResourceTracker.getMlAutoscalingStats(
                             state,
+                            clusterService.getClusterSettings(),
                             parentTaskAssigningClient,
                             request.timeout(),
                             mlMemoryTracker,

+ 18 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java

@@ -20,6 +20,7 @@ import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingCapacity;
 import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext;
 import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
 import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;
+import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
 
@@ -43,6 +44,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
     private final MlProcessorAutoscalingDecider processorDecider;
 
     private volatile boolean isMaster;
+    private volatile int allocatedProcessorsScale;
 
     public MlAutoscalingDeciderService(
         MlMemoryTracker memoryTracker,
@@ -69,7 +71,15 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
             scaleTimer
         );
         this.processorDecider = new MlProcessorAutoscalingDecider(scaleTimer);
+        this.allocatedProcessorsScale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings);
+
         clusterService.addLocalNodeMasterListener(this);
+        clusterService.getClusterSettings()
+            .addSettingsUpdateConsumer(MachineLearning.ALLOCATED_PROCESSORS_SCALE, this::setAllocatedProcessorsScale);
+    }
+
+    void setAllocatedProcessorsScale(int scale) {
+        this.allocatedProcessorsScale = scale;
     }
 
     @Override
@@ -96,7 +106,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
         final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity);
         final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(
             mlContext.mlNodes,
-            configuration
+            allocatedProcessorsScale
         );
 
         final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext)
@@ -123,7 +133,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
             return downscaleToZero(configuration, context, currentNativeMemoryCapacity, reasonBuilder);
         }
 
-        MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext);
+        MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext, allocatedProcessorsScale);
         if (memoryCapacity.isUndetermined()) {
             // If we cannot determine memory capacity we shouldn't make any autoscaling decision
             // as it could lead to undesired capacity. For example, it could be that the processor decider decides
@@ -134,7 +144,12 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
                 reasonBuilder.setSimpleReason(format("[memory_decider] %s", memoryCapacity.reason())).build()
             );
         }
-        MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale(configuration, context, mlContext);
+        MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale(
+            configuration,
+            context,
+            mlContext,
+            allocatedProcessorsScale
+        );
         reasonBuilder.setSimpleReason(
             format("[memory_decider] %s; [processor_decider] %s", memoryCapacity.reason(), processorCapacity.reason())
         );

+ 4 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java

@@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
@@ -65,6 +66,7 @@ public final class MlAutoscalingResourceTracker {
 
     public static void getMlAutoscalingStats(
         ClusterState clusterState,
+        ClusterSettings clusterSettings,
         Client client,
         TimeValue timeout,
         MlMemoryTracker mlMemoryTracker,
@@ -82,7 +84,8 @@ public final class MlAutoscalingResourceTracker {
             ? NativeMemoryCalculator.allowedBytesForMl(clusterState.nodes().get(mlNodes[0]), settings).orElse(0L)
             : 0L;
         int processorsAvailableFirstNode = mlNodes.length > 0
-            ? MlProcessors.get(clusterState.nodes().get(mlNodes[0]), settings).roundDown()
+            ? MlProcessors.get(clusterState.nodes().get(mlNodes[0]), clusterSettings.get(MachineLearning.ALLOCATED_PROCESSORS_SCALE))
+                .roundDown()
             : 0;
 
         // Todo: MAX_LOW_PRIORITY_MODELS_PER_NODE not checked yet

+ 13 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java

@@ -118,7 +118,12 @@ class MlMemoryAutoscalingDecider {
         }
     }
 
-    public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) {
+    public MlMemoryAutoscalingCapacity scale(
+        Settings configuration,
+        AutoscalingDeciderContext context,
+        MlAutoscalingContext mlContext,
+        int allocatedProcessorsScale
+    ) {
         final ClusterState clusterState = context.state();
 
         scaleTimer.lastScaleToScaleIntervalMillis()
@@ -258,7 +263,11 @@ class MlMemoryAutoscalingDecider {
                 }
                 // We should keep this check here as well as in the processor decider while cloud is not
                 // reacting to processor autoscaling.
-                if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes, configuration)) {
+                if (modelAssignmentsRequireMoreThanHalfCpu(
+                    mlContext.modelAssignments.values(),
+                    mlContext.mlNodes,
+                    allocatedProcessorsScale
+                )) {
                     logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors");
                     return null;
                 }
@@ -818,12 +827,12 @@ class MlMemoryAutoscalingDecider {
     static boolean modelAssignmentsRequireMoreThanHalfCpu(
         Collection<TrainedModelAssignment> assignments,
         List<DiscoveryNode> mlNodes,
-        Settings settings
+        int allocatedProcessorsScale
     ) {
         int totalRequiredProcessors = assignments.stream()
             .mapToInt(t -> t.getTaskParams().getNumberOfAllocations() * t.getTaskParams().getThreadsPerAllocation())
             .sum();
-        int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, settings).roundUp()).sum();
+        int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, allocatedProcessorsScale).roundUp()).sum();
         return totalRequiredProcessors * 2 > totalMlProcessors;
     }
 

+ 10 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java

@@ -41,7 +41,12 @@ class MlProcessorAutoscalingDecider {
         this.scaleTimer = Objects.requireNonNull(scaleTimer);
     }
 
-    public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) {
+    public MlProcessorAutoscalingCapacity scale(
+        Settings configuration,
+        AutoscalingDeciderContext context,
+        MlAutoscalingContext mlContext,
+        int allocatedProcessorsScale
+    ) {
         TrainedModelAssignmentMetadata trainedModelAssignmentMetadata = TrainedModelAssignmentMetadata.fromState(context.state());
 
         if (hasUnsatisfiedDeployments(trainedModelAssignmentMetadata, mlContext.mlNodes)) {
@@ -52,7 +57,7 @@ class MlProcessorAutoscalingDecider {
             ).build();
         }
 
-        final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, configuration);
+        final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, allocatedProcessorsScale);
 
         final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build();
 
@@ -65,7 +70,7 @@ class MlProcessorAutoscalingDecider {
         if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu(
             trainedModelAssignmentMetadata.allAssignments().values(),
             mlContext.mlNodes,
-            configuration
+            allocatedProcessorsScale
         )) {
             return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
                 .setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors")
@@ -137,11 +142,11 @@ class MlProcessorAutoscalingDecider {
         );
     }
 
-    MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes, Settings settings) {
+    MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes, int allocatedProcessorsScale) {
         Processors maxNodeProcessors = Processors.ZERO;
         Processors tierProcessors = Processors.ZERO;
         for (DiscoveryNode node : mlNodes) {
-            Processors nodeProcessors = MlProcessors.get(node, settings);
+            Processors nodeProcessors = MlProcessors.get(node, allocatedProcessorsScale);
             if (nodeProcessors.compareTo(maxNodeProcessors) > 0) {
                 maxNodeProcessors = nodeProcessors;
             }

+ 11 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java

@@ -80,6 +80,7 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
     private volatile int maxOpenJobs;
     protected volatile int maxLazyMLNodes;
     protected volatile long maxMLNodeSize;
+    protected volatile int allocatedProcessorsScale;
 
     public TrainedModelAssignmentClusterService(
         Settings settings,
@@ -99,6 +100,7 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
         this.maxOpenJobs = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(settings);
         this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
         this.maxMLNodeSize = MachineLearning.MAX_ML_NODE_SIZE.get(settings).getBytes();
+        this.allocatedProcessorsScale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings);
         // Only nodes that can possibly be master nodes really need this service running
         if (DiscoveryNode.isMasterNode(settings)) {
             clusterService.addListener(this);
@@ -109,6 +111,8 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
             clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_OPEN_JOBS_PER_NODE, this::setMaxOpenJobs);
             clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
             clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_ML_NODE_SIZE, this::setMaxMLNodeSize);
+            clusterService.getClusterSettings()
+                .addSettingsUpdateConsumer(MachineLearning.ALLOCATED_PROCESSORS_SCALE, this::setAllocatedProcessorsScale);
         }
     }
 
@@ -132,6 +136,10 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
         this.maxMLNodeSize = value.getBytes();
     }
 
+    private void setAllocatedProcessorsScale(int scale) {
+        this.allocatedProcessorsScale = scale;
+    }
+
     @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
     private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
         clusterService.submitUnbatchedStateUpdateTask(source, task);
@@ -486,9 +494,10 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
             TrainedModelAssignmentMetadata.fromState(currentState),
             nodeLoads,
             nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(currentState),
-            modelToAdd
+            modelToAdd,
+            allocatedProcessorsScale
         );
-        TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance(clusterService.getSettings());
+        TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance();
         if (modelToAdd.isPresent()) {
             checkModelIsFullyAllocatedIfScalingIsNotPossible(modelToAdd.get().getDeploymentId(), rebalanced, nodes);
         }

+ 18 - 18
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java

@@ -13,7 +13,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
 import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
@@ -51,20 +50,23 @@ class TrainedModelAssignmentRebalancer {
     private final Map<DiscoveryNode, NodeLoad> nodeLoads;
     private final Map<List<String>, Collection<DiscoveryNode>> mlNodesByZone;
     private final Optional<StartTrainedModelDeploymentAction.TaskParams> deploymentToAdd;
+    private final int allocatedProcessorsScale;
 
     TrainedModelAssignmentRebalancer(
         TrainedModelAssignmentMetadata currentMetadata,
         Map<DiscoveryNode, NodeLoad> nodeLoads,
         Map<List<String>, Collection<DiscoveryNode>> mlNodesByZone,
-        Optional<StartTrainedModelDeploymentAction.TaskParams> deploymentToAdd
+        Optional<StartTrainedModelDeploymentAction.TaskParams> deploymentToAdd,
+        int allocatedProcessorsScale
     ) {
         this.currentMetadata = Objects.requireNonNull(currentMetadata);
         this.nodeLoads = Objects.requireNonNull(nodeLoads);
         this.mlNodesByZone = Objects.requireNonNull(mlNodesByZone);
         this.deploymentToAdd = Objects.requireNonNull(deploymentToAdd);
+        this.allocatedProcessorsScale = allocatedProcessorsScale;
     }
 
-    TrainedModelAssignmentMetadata.Builder rebalance(Settings settings) {
+    TrainedModelAssignmentMetadata.Builder rebalance() {
         if (deploymentToAdd.isPresent() && currentMetadata.hasDeployment(deploymentToAdd.get().getDeploymentId())) {
             throw new ResourceAlreadyExistsException(
                 "[{}] assignment for deployment with model [{}] already exists",
@@ -78,8 +80,8 @@ class TrainedModelAssignmentRebalancer {
             return TrainedModelAssignmentMetadata.Builder.fromMetadata(currentMetadata);
         }
 
-        AssignmentPlan assignmentPlan = computeAssignmentPlan(settings);
-        return buildAssignmentsFromPlan(assignmentPlan, settings);
+        AssignmentPlan assignmentPlan = computeAssignmentPlan();
+        return buildAssignmentsFromPlan(assignmentPlan);
     }
 
     private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
@@ -92,8 +94,8 @@ class TrainedModelAssignmentRebalancer {
         return true;
     }
 
-    AssignmentPlan computeAssignmentPlan(Settings settings) {
-        final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap(settings);
+    AssignmentPlan computeAssignmentPlan() {
+        final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap();
         final Set<String> assignableNodeIds = nodesByZone.values()
             .stream()
             .flatMap(List::stream)
@@ -271,7 +273,7 @@ class TrainedModelAssignmentRebalancer {
         return fittingAssignments;
     }
 
-    private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap(Settings settings) {
+    private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
         return mlNodesByZone.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> {
             Collection<DiscoveryNode> discoveryNodes = e.getValue();
             List<AssignmentPlan.Node> nodes = new ArrayList<>();
@@ -285,7 +287,7 @@ class TrainedModelAssignmentRebalancer {
                                 // We subtract native inference memory as the planner expects available memory for
                                 // native inference including current assignments.
                                 getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(load),
-                                MlProcessors.get(discoveryNode, settings).roundUp()
+                                MlProcessors.get(discoveryNode, allocatedProcessorsScale).roundUp()
                             )
                         );
                     } else {
@@ -305,7 +307,7 @@ class TrainedModelAssignmentRebalancer {
         return load.getFreeMemoryExcludingPerNodeOverhead() - load.getAssignedNativeInferenceMemory();
     }
 
-    private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan, Settings settings) {
+    private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan) {
         TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.Builder.empty();
         for (AssignmentPlan.Deployment deployment : assignmentPlan.models()) {
             TrainedModelAssignment existingAssignment = currentMetadata.getDeploymentAssignment(deployment.id());
@@ -343,7 +345,7 @@ class TrainedModelAssignmentRebalancer {
             }
             assignmentBuilder.calculateAndSetAssignmentState();
 
-            explainAssignments(assignmentPlan, nodeLoads, deployment, settings).ifPresent(assignmentBuilder::setReason);
+            explainAssignments(assignmentPlan, nodeLoads, deployment).ifPresent(assignmentBuilder::setReason);
             builder.addNewAssignment(deployment.id(), assignmentBuilder);
         }
         return builder;
@@ -352,8 +354,7 @@ class TrainedModelAssignmentRebalancer {
     private Optional<String> explainAssignments(
         AssignmentPlan assignmentPlan,
         Map<DiscoveryNode, NodeLoad> nodeLoads,
-        AssignmentPlan.Deployment deployment,
-        Settings settings
+        AssignmentPlan.Deployment deployment
     ) {
         if (assignmentPlan.satisfiesAllocations(deployment)) {
             return Optional.empty();
@@ -365,7 +366,7 @@ class TrainedModelAssignmentRebalancer {
 
         Map<String, String> nodeToReason = new TreeMap<>();
         for (Map.Entry<DiscoveryNode, NodeLoad> nodeAndLoad : nodeLoads.entrySet()) {
-            Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment, settings);
+            Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment);
             reason.ifPresent(s -> nodeToReason.put(nodeAndLoad.getKey().getId(), s));
         }
 
@@ -384,8 +385,7 @@ class TrainedModelAssignmentRebalancer {
         AssignmentPlan assignmentPlan,
         DiscoveryNode node,
         NodeLoad load,
-        AssignmentPlan.Deployment deployment,
-        Settings settings
+        AssignmentPlan.Deployment deployment
     ) {
         if (Strings.isNullOrEmpty(load.getError()) == false) {
             return Optional.of(load.getError());
@@ -398,7 +398,7 @@ class TrainedModelAssignmentRebalancer {
             // But we should also check if we managed to assign a model during the rebalance for which
             // we check if the node has used up any of its allocated processors.
             boolean isPerNodeOverheadAccountedFor = load.getNumAssignedJobsAndModels() > 0
-                || assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node, settings).roundUp();
+                || assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node, allocatedProcessorsScale).roundUp();
             long requiredMemory = deployment.memoryBytes() + (isPerNodeOverheadAccountedFor
                 ? 0
                 : MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes());
@@ -427,7 +427,7 @@ class TrainedModelAssignmentRebalancer {
                     "This node has insufficient allocated processors. Available processors [{}], free processors [{}], "
                         + "processors required for each allocation of this model [{}]",
                     new Object[] {
-                        MlProcessors.get(node, settings).roundUp(),
+                        MlProcessors.get(node, allocatedProcessorsScale).roundUp(),
                         assignmentPlan.getRemainingNodeCores(node.getId()),
                         deployment.threadsPerAllocation() }
                 )

+ 3 - 8
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java

@@ -8,7 +8,6 @@
 package org.elasticsearch.xpack.ml.utils;
 
 import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.Processors;
 import org.elasticsearch.xpack.ml.MachineLearning;
 
@@ -16,7 +15,7 @@ public final class MlProcessors {
 
     private MlProcessors() {}
 
-    public static Processors get(DiscoveryNode node, Settings settings) {
+    public static Processors get(DiscoveryNode node, Integer allocatedProcessorScale) {
         // Try getting the most modern setting, and if that's null then instead get the older setting. (If both are null then return zero.)
         String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
         if (allocatedProcessorsString == null) {
@@ -31,12 +30,8 @@ public final class MlProcessors {
                 return Processors.ZERO;
             }
 
-            Integer scale = null;
-            if (settings != null) {
-                scale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings);
-            }
-            if (scale != null) {
-                processorsAsDouble = processorsAsDouble / scale;
+            if (allocatedProcessorScale != null) {
+                processorsAsDouble = processorsAsDouble / allocatedProcessorScale;
             }
             return Processors.of(processorsAsDouble);
 

+ 1 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java

@@ -132,6 +132,7 @@ public class MlAutoscalingDeciderServiceTests extends ESTestCase {
                 MachineLearning.MAX_OPEN_JOBS_PER_NODE,
                 MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT,
                 MachineLearning.MAX_ML_NODE_SIZE,
+                MachineLearning.ALLOCATED_PROCESSORS_SCALE,
                 AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING
             )
         );

+ 5 - 5
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java

@@ -1080,7 +1080,7 @@ public class MlMemoryAutoscalingDeciderTests extends ESTestCase {
                     ).build()
                 ),
                 withMlNodes("ml_node_1", "ml_node_2"),
-                Settings.EMPTY
+                1
             )
         );
         assertTrue(
@@ -1112,7 +1112,7 @@ public class MlMemoryAutoscalingDeciderTests extends ESTestCase {
                     ).build()
                 ),
                 withMlNodes("ml_node_1", "ml_node_2"),
-                Settings.EMPTY
+                1
             )
         );
         assertFalse(
@@ -1144,7 +1144,7 @@ public class MlMemoryAutoscalingDeciderTests extends ESTestCase {
                     ).build()
                 ),
                 withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4"),
-                Settings.EMPTY
+                1
             )
         );
     }
@@ -1240,7 +1240,7 @@ public class MlMemoryAutoscalingDeciderTests extends ESTestCase {
         DeciderContext deciderContext = new DeciderContext(clusterState, autoscalingCapacity);
         MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(clusterState);
 
-        MlMemoryAutoscalingCapacity result = decider.scale(settings, deciderContext, mlAutoscalingContext);
+        MlMemoryAutoscalingCapacity result = decider.scale(settings, deciderContext, mlAutoscalingContext, 1);
         assertThat(result.reason(), containsString("but the number in the queue is less than the configured maximum allowed"));
         assertThat(result.nodeSize(), equalTo(ByteSizeValue.ofGb(1)));
         assertThat(result.tierSize(), equalTo(ByteSizeValue.ofGb(1)));
@@ -1269,7 +1269,7 @@ public class MlMemoryAutoscalingDeciderTests extends ESTestCase {
         DeciderContext deciderContext = new DeciderContext(clusterState, AutoscalingCapacity.ZERO);
         MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(clusterState);
 
-        MlMemoryAutoscalingCapacity result = decider.scale(settings, deciderContext, mlAutoscalingContext);
+        MlMemoryAutoscalingCapacity result = decider.scale(settings, deciderContext, mlAutoscalingContext, 1);
         assertThat(
             result.reason(),
             containsString(

+ 24 - 7
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java

@@ -108,7 +108,8 @@ public class MlProcessorAutoscalingDeciderTests extends ESTestCase {
         MlProcessorAutoscalingCapacity capacity = decider.scale(
             Settings.EMPTY,
             newContext(clusterState),
-            new MlAutoscalingContext(clusterState)
+            new MlAutoscalingContext(clusterState),
+            1
         );
 
         assertThat(capacity.nodeProcessors(), equalTo(Processors.of(7.8)));
@@ -177,7 +178,8 @@ public class MlProcessorAutoscalingDeciderTests extends ESTestCase {
         MlProcessorAutoscalingCapacity capacity = decider.scale(
             Settings.EMPTY,
             newContext(clusterState),
-            new MlAutoscalingContext(clusterState)
+            new MlAutoscalingContext(clusterState),
+            1
         );
 
         assertThat(capacity.nodeProcessors(), equalTo(Processors.of(8.0)));
@@ -246,7 +248,8 @@ public class MlProcessorAutoscalingDeciderTests extends ESTestCase {
         MlProcessorAutoscalingCapacity capacity = decider.scale(
             Settings.EMPTY,
             newContext(clusterState),
-            new MlAutoscalingContext(clusterState)
+            new MlAutoscalingContext(clusterState),
+            1
         );
 
         assertThat(capacity.nodeProcessors(), equalTo(Processors.of(4.0)));
@@ -313,7 +316,8 @@ public class MlProcessorAutoscalingDeciderTests extends ESTestCase {
         MlProcessorAutoscalingCapacity capacity = decider.scale(
             Settings.EMPTY,
             newContext(clusterState),
-            new MlAutoscalingContext(clusterState)
+            new MlAutoscalingContext(clusterState),
+            1
         );
 
         assertThat(capacity.nodeProcessors(), equalTo(Processors.of(3.8)));
@@ -322,6 +326,16 @@ public class MlProcessorAutoscalingDeciderTests extends ESTestCase {
             capacity.reason(),
             equalTo("not scaling down as model assignments require more than half of the ML tier's allocated processors")
         );
+
+        // test with allocated processor scaling
+        capacity = decider.scale(Settings.EMPTY, newContext(clusterState), new MlAutoscalingContext(clusterState), 2);
+
+        assertThat(capacity.nodeProcessors(), equalTo(Processors.of(1.9)));
+        assertThat(capacity.tierProcessors(), equalTo(Processors.of(3.8)));
+        assertThat(
+            capacity.reason(),
+            equalTo("not scaling down as model assignments require more than half of the ML tier's allocated processors")
+        );
     }
 
     public void testScale_GivenDownScalePossible_DelayNotSatisfied() {
@@ -384,7 +398,8 @@ public class MlProcessorAutoscalingDeciderTests extends ESTestCase {
         MlProcessorAutoscalingCapacity capacity = decider.scale(
             Settings.EMPTY,
             newContext(clusterState),
-            new MlAutoscalingContext(clusterState)
+            new MlAutoscalingContext(clusterState),
+            1
         );
 
         assertThat(capacity.nodeProcessors(), equalTo(Processors.of(7.9)));
@@ -456,7 +471,8 @@ public class MlProcessorAutoscalingDeciderTests extends ESTestCase {
         MlProcessorAutoscalingCapacity capacity = decider.scale(
             Settings.EMPTY,
             newContext(clusterState),
-            new MlAutoscalingContext(clusterState)
+            new MlAutoscalingContext(clusterState),
+            1
         );
 
         assertThat(capacity.nodeProcessors(), equalTo(Processors.of(2.0)));
@@ -528,7 +544,8 @@ public class MlProcessorAutoscalingDeciderTests extends ESTestCase {
         MlProcessorAutoscalingCapacity capacity = decider.scale(
             Settings.EMPTY,
             newContext(clusterState),
-            new MlAutoscalingContext(clusterState)
+            new MlAutoscalingContext(clusterState),
+            1
         );
 
         assertThat(capacity.nodeProcessors(), equalTo(Processors.ZERO));

+ 2 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java

@@ -98,7 +98,8 @@ public class TrainedModelAssignmentClusterServiceTests extends ESTestCase {
                 MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT,
                 MachineLearning.MAX_OPEN_JOBS_PER_NODE,
                 MachineLearning.MAX_LAZY_ML_NODES,
-                MachineLearning.MAX_ML_NODE_SIZE
+                MachineLearning.MAX_ML_NODE_SIZE,
+                MachineLearning.ALLOCATED_PROCESSORS_SCALE
             )
         );
         when(clusterService.getClusterSettings()).thenReturn(clusterSettings);

+ 84 - 57
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancerTests.java

@@ -10,7 +10,6 @@ package org.elasticsearch.xpack.ml.inference.assignment;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
@@ -44,8 +43,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             TrainedModelAssignmentMetadata.Builder.empty().build(),
             Map.of(),
             Map.of(),
-            Optional.empty()
-        ).rebalance(Settings.EMPTY).build();
+            Optional.empty(),
+            1
+        ).rebalance().build();
         assertThat(result.allAssignments().isEmpty(), is(true));
     }
 
@@ -73,9 +73,13 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
         nodeLoads.put(buildNode("node-1", oneGbBytes, 4), NodeLoad.builder("node-1").setMaxMemory(oneGbBytes).build());
         nodeLoads.put(buildNode("node-2", oneGbBytes, 4), NodeLoad.builder("node-2").setMaxMemory(oneGbBytes).build());
 
-        TrainedModelAssignmentMetadata result = new TrainedModelAssignmentRebalancer(currentMetadata, nodeLoads, Map.of(), Optional.empty())
-            .rebalance(Settings.EMPTY)
-            .build();
+        TrainedModelAssignmentMetadata result = new TrainedModelAssignmentRebalancer(
+            currentMetadata,
+            nodeLoads,
+            Map.of(),
+            Optional.empty(),
+            1
+        ).rebalance().build();
 
         assertThat(currentMetadata, equalTo(result));
     }
@@ -111,8 +115,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node1, node2)),
-            Optional.empty()
-        ).rebalance(Settings.EMPTY).build();
+            Optional.empty(),
+            1
+        ).rebalance().build();
 
         assertThat(result.allAssignments(), is(aMapWithSize(2)));
 
@@ -135,9 +140,7 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             .build();
         expectThrows(
             ResourceAlreadyExistsException.class,
-            () -> new TrainedModelAssignmentRebalancer(currentMetadata, Map.of(), Map.of(), Optional.of(taskParams)).rebalance(
-                Settings.EMPTY
-            )
+            () -> new TrainedModelAssignmentRebalancer(currentMetadata, Map.of(), Map.of(), Optional.of(taskParams), 1).rebalance()
         );
     }
 
@@ -150,8 +153,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             Map.of(),
             Map.of(),
-            Optional.of(taskParams)
-        ).rebalance(Settings.EMPTY).build();
+            Optional.of(taskParams),
+            1
+        ).rebalance().build();
 
         TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId);
         assertThat(assignment, is(notNullValue()));
@@ -176,8 +180,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node)),
-            Optional.of(taskParams)
-        ).rebalance(Settings.EMPTY).build();
+            Optional.of(taskParams),
+            1
+        ).rebalance().build();
 
         TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId);
         assertThat(assignment, is(notNullValue()));
@@ -211,8 +216,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(),
-            Optional.of(taskParams)
-        ).rebalance(Settings.EMPTY).build();
+            Optional.of(taskParams),
+            1
+        ).rebalance().build();
 
         TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId);
         assertThat(assignment, is(notNullValue()));
@@ -246,8 +252,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(),
-            Optional.of(taskParams)
-        ).rebalance(Settings.EMPTY).build();
+            Optional.of(taskParams),
+            1
+        ).rebalance().build();
 
         TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId);
         assertThat(assignment, is(notNullValue()));
@@ -281,8 +288,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node1, node2)),
-            Optional.of(taskParams)
-        ).rebalance(Settings.EMPTY).build();
+            Optional.of(taskParams),
+            1
+        ).rebalance().build();
 
         TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId);
         assertThat(assignment, is(notNullValue()));
@@ -313,8 +321,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node1)),
-            Optional.of(taskParams)
-        ).rebalance(Settings.EMPTY).build();
+            Optional.of(taskParams),
+            1
+        ).rebalance().build();
 
         TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId);
         assertThat(assignment, is(notNullValue()));
@@ -351,8 +360,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node1, node2)),
-            Optional.of(taskParams)
-        ).rebalance(Settings.EMPTY).build();
+            Optional.of(taskParams),
+            1
+        ).rebalance().build();
 
         assertThat(result.allAssignments(), is(aMapWithSize(2)));
 
@@ -414,8 +424,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node1, node2, node3)),
-            Optional.empty()
-        ).rebalance(Settings.EMPTY).build();
+            Optional.empty(),
+            1
+        ).rebalance().build();
 
         assertThat(result.allAssignments(), is(aMapWithSize(2)));
 
@@ -477,8 +488,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node1)),
-            Optional.empty()
-        ).rebalance(Settings.EMPTY).build();
+            Optional.empty(),
+            1
+        ).rebalance().build();
 
         assertThat(result.allAssignments(), is(aMapWithSize(2)));
 
@@ -546,8 +558,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node1)),
-            Optional.empty()
-        ).rebalance(Settings.EMPTY).build();
+            Optional.empty(),
+            1
+        ).rebalance().build();
 
         assertThat(result.allAssignments(), is(aMapWithSize(2)));
 
@@ -594,8 +607,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node1)),
-            Optional.empty()
-        ).rebalance(Settings.EMPTY).build();
+            Optional.empty(),
+            1
+        ).rebalance().build();
 
         assertThat(result.allAssignments(), is(aMapWithSize(1)));
 
@@ -627,8 +641,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(),
-            Optional.of(taskParams)
-        ).rebalance(Settings.EMPTY).build();
+            Optional.of(taskParams),
+            1
+        ).rebalance().build();
 
         TrainedModelAssignment assignment = result.getDeploymentAssignment(deploymentId);
         assertThat(assignment, is(notNullValue()));
@@ -672,8 +687,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of("zone-1"), List.of(node1), List.of("zone-2"), List.of(node2)),
-            Optional.of(taskParams1)
-        ).rebalance(Settings.EMPTY).build();
+            Optional.of(taskParams1),
+            1
+        ).rebalance().build();
 
         TrainedModelAssignment assignment = result.getDeploymentAssignment(deployment1);
         assertThat(assignment, is(notNullValue()));
@@ -710,8 +726,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node1)),
-            Optional.empty()
-        ).rebalance(Settings.EMPTY).build();
+            Optional.empty(),
+            1
+        ).rebalance().build();
 
         {
             TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1);
@@ -762,8 +779,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of("zone-1"), List.of(node1), List.of("zone-2"), List.of(node2)),
-            Optional.empty()
-        ).rebalance(Settings.EMPTY).build();
+            Optional.empty(),
+            1
+        ).rebalance().build();
 
         List<String> assignedNodes = new ArrayList<>();
 
@@ -815,8 +833,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node1)),
-            Optional.empty()
-        ).rebalance(Settings.EMPTY).build();
+            Optional.empty(),
+            1
+        ).rebalance().build();
 
         {
             TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1);
@@ -864,8 +883,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node1, node2)),
-            Optional.empty()
-        ).rebalance(Settings.EMPTY).build();
+            Optional.empty(),
+            1
+        ).rebalance().build();
 
         {
             TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1);
@@ -913,8 +933,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node1)),
-            Optional.of(taskParams2)
-        ).rebalance(Settings.EMPTY).build();
+            Optional.of(taskParams2),
+            1
+        ).rebalance().build();
 
         {
             TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1);
@@ -964,8 +985,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node1, node2)),
-            Optional.of(taskParams2)
-        ).rebalance(Settings.EMPTY).build();
+            Optional.of(taskParams2),
+            1
+        ).rebalance().build();
 
         {
             TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1);
@@ -1015,8 +1037,9 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node1, node2)),
-            Optional.of(taskParams2)
-        ).rebalance(Settings.EMPTY).build();
+            Optional.of(taskParams2),
+            1
+        ).rebalance().build();
 
         {
             TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1);
@@ -1056,13 +1079,13 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
         // The deployment wants 4 threads, the node has 4 CPUs but with
         // the scaling setting(2) that is divided by 2. Now the model
         // assignment cannot be satisfied.
-        var settings = Settings.builder().put(MachineLearning.ALLOCATED_PROCESSORS_SCALE.getKey(), 2).build();
         TrainedModelAssignmentMetadata result = new TrainedModelAssignmentRebalancer(
             currentMetadata,
             nodeLoads,
             Map.of(List.of(), List.of(node)),
-            Optional.of(taskParams)
-        ).rebalance(settings).build();
+            Optional.of(taskParams),
+            2
+        ).rebalance().build();
 
         TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId);
         assertThat(assignment, is(notNullValue()));
@@ -1078,9 +1101,13 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase {
         );
 
         // Without the scaling factor the assignment is satisfied.
-        result = new TrainedModelAssignmentRebalancer(currentMetadata, nodeLoads, Map.of(List.of(), List.of(node)), Optional.of(taskParams))
-            .rebalance(Settings.EMPTY)
-            .build();
+        result = new TrainedModelAssignmentRebalancer(
+            currentMetadata,
+            nodeLoads,
+            Map.of(List.of(), List.of(node)),
+            Optional.of(taskParams),
+            1
+        ).rebalance().build();
 
         assignment = result.getDeploymentAssignment(modelId);
         assertThat(assignment.getReason().isPresent(), is(false));

+ 8 - 4
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlProcessorsTests.java

@@ -8,7 +8,6 @@
 package org.elasticsearch.xpack.ml.utils;
 
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.ml.MachineLearning;
 
@@ -20,14 +19,19 @@ public class MlProcessorsTests extends ESTestCase {
 
     public void testGet() {
         var node = DiscoveryNodeUtils.builder("foo").attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0")).build();
-        var processor = MlProcessors.get(node, Settings.EMPTY);
+        var processor = MlProcessors.get(node, 1);
         assertThat(processor.count(), equalTo(8.0));
     }
 
     public void testGetWithScale() {
         var node = DiscoveryNodeUtils.builder("foo").attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0")).build();
-        var settings = Settings.builder().put(MachineLearning.ALLOCATED_PROCESSORS_SCALE.getKey(), 2).build();
-        var processor = MlProcessors.get(node, settings);
+        var processor = MlProcessors.get(node, 2);
         assertThat(processor.count(), equalTo(4.0));
     }
+
+    public void testGetWithNull() {
+        var node = DiscoveryNodeUtils.builder("foo").attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0")).build();
+        var processor = MlProcessors.get(node, null);
+        assertThat(processor.count(), equalTo(8.0));
+    }
 }