Browse Source

[ML] Add processor autoscaling decider (#89645)

This adds a processor decider to the ML autoscaling decider service.
This first implementation is simple and naive. It simply computes
the required processor capacity to be the max trained model deployment
`threads_per_allocation` for the node, and the sum of all processors
required by trained model deployments for the tier.
Dimitris Athanasiou 3 years ago
parent
commit
407dc18eb8

+ 5 - 0
docs/changelog/89645.yaml

@@ -0,0 +1,5 @@
+pr: 89645
+summary: Add processor autoscaling decider
+area: Machine Learning
+type: enhancement
+issues: []

+ 5 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -42,6 +42,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
 import org.elasticsearch.common.settings.SettingsModule;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.Processors;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.env.Environment;
@@ -767,7 +768,7 @@ public class MachineLearning extends Plugin
                 Long.toString(OsProbe.getInstance().osStats().getMem().getAdjustedTotal().getBytes())
             );
             addMlNodeAttribute(additionalSettings, jvmSizeAttrName, Long.toString(Runtime.getRuntime().maxMemory()));
-            addMlNodeAttribute(additionalSettings, allocatedProcessorsAttrName, Integer.toString(getAllocatedProcessors()));
+            addMlNodeAttribute(additionalSettings, allocatedProcessorsAttrName, Double.toString(getAllocatedProcessors().count()));
             // This is not used in v8 and higher, but users are still prevented from setting it directly to avoid confusion
             disallowMlNodeAttributes(maxOpenJobsPerNodeNodeAttrName);
         } else {
@@ -785,8 +786,8 @@ public class MachineLearning extends Plugin
         }
     }
 
-    private int getAllocatedProcessors() {
-        return EsExecutors.allocatedProcessors(settings);
+    private Processors getAllocatedProcessors() {
+        return EsExecutors.nodeProcessors(settings);
     }
 
     private void disallowMlNodeAttributes(String... mlNodeAttributes) {
@@ -1448,7 +1449,7 @@ public class MachineLearning extends Plugin
         ScalingExecutorBuilder pytorchComms = new ScalingExecutorBuilder(
             NATIVE_INFERENCE_COMMS_THREAD_POOL_NAME,
             3,
-            getAllocatedProcessors() * 3,
+            getAllocatedProcessors().roundUp() * 3,
             TimeValue.timeValueMinutes(1),
             false,
             "xpack.ml.native_inference_comms_thread_pool"

+ 19 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java

@@ -39,6 +39,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
 
     private final ScaleTimer scaleTimer;
     private final MlMemoryAutoscalingDecider memoryDecider;
+    private final MlProcessorAutoscalingDecider processorDecider;
 
     private volatile boolean isMaster;
 
@@ -66,6 +67,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
             nodeLoadDetector,
             scaleTimer
         );
+        this.processorDecider = new MlProcessorAutoscalingDecider(scaleTimer, nodeAvailabilityZoneMapper);
         clusterService.addLocalNodeMasterListener(this);
     }
 
@@ -91,12 +93,21 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
         final MlAutoscalingContext mlContext = new MlAutoscalingContext(clusterState);
         final NativeMemoryCapacity currentNativeMemoryCapacity = memoryDecider.currentScale(mlContext.mlNodes);
         final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity);
+        final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(mlContext.mlNodes);
 
         final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext)
             .setCurrentMlCapacity(
                 new AutoscalingCapacity(
-                    new AutoscalingCapacity.AutoscalingResources(null, currentMemoryCapacity.tierSize(), null),
-                    new AutoscalingCapacity.AutoscalingResources(null, currentMemoryCapacity.nodeSize(), null)
+                    new AutoscalingCapacity.AutoscalingResources(
+                        null,
+                        currentMemoryCapacity.tierSize(),
+                        currentProcessorCapacity.tierProcessors()
+                    ),
+                    new AutoscalingCapacity.AutoscalingResources(
+                        null,
+                        currentMemoryCapacity.nodeSize(),
+                        currentProcessorCapacity.nodeProcessors()
+                    )
                 )
             )
             .setPassedConfiguration(configuration);
@@ -109,12 +120,15 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
         }
 
         MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext);
-        reasonBuilder.setSimpleReason(memoryCapacity.reason());
+        MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale(configuration, context, mlContext);
+        reasonBuilder.setSimpleReason(
+            String.format(Locale.ROOT, "[memory_decider] %s; [processor_decider] %s", memoryCapacity.reason(), processorCapacity.reason())
+        );
 
         return new AutoscalingDeciderResult(
             new AutoscalingCapacity(
-                new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.tierSize(), null),
-                new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.nodeSize(), null)
+                new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.tierSize(), processorCapacity.tierProcessors()),
+                new AutoscalingCapacity.AutoscalingResources(null, memoryCapacity.nodeSize(), processorCapacity.nodeProcessors())
             ),
             reasonBuilder.build()
         );

+ 5 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java

@@ -13,6 +13,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.Processors;
 import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
@@ -185,8 +186,6 @@ class MlMemoryAutoscalingDecider {
 
         final List<String> partiallyAllocatedModels = mlContext.findPartiallyAllocatedModels();
 
-        // TODO for autoscaling by memory, we only care about if the model is allocated to at least one node (see above)
-        // We should do this check in our autoscaling by processor count service, which will be a separate decider for readability's sake
         if (mlContext.waitingAnalyticsJobs.isEmpty() == false
             || mlContext.waitingSnapshotUpgrades.isEmpty() == false
             || mlContext.waitingAnomalyJobs.isEmpty() == false
@@ -257,7 +256,8 @@ class MlMemoryAutoscalingDecider {
                 if (capacity == null) {
                     return null;
                 }
-                // TODO we should remove this when we can auto-scale (down and up) via a new CPU auto-scaling decider
+                // We should keep this check here as well as in the processor decider while cloud is not
+                // reacting to processor autoscaling.
                 if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes)) {
                     logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors");
                     return null;
@@ -822,7 +822,8 @@ class MlMemoryAutoscalingDecider {
         int totalMlProcessors = mlNodes.stream().mapToInt(node -> {
             String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
             try {
-                return Integer.parseInt(allocatedProcessorsString);
+                double allocatedProcessorsAsDouble = Double.parseDouble(allocatedProcessorsString);
+                return allocatedProcessorsAsDouble > 0 ? Processors.of(allocatedProcessorsAsDouble).roundUp() : 0;
             } catch (NumberFormatException e) {
                 assert e == null
                     : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR

+ 51 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingCapacity.java

@@ -0,0 +1,51 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.ml.autoscaling;
+
+import org.elasticsearch.common.unit.Processors;
+
+public record MlProcessorAutoscalingCapacity(Processors nodeProcessors, Processors tierProcessors, String reason) {
+
+    public static Builder builder(Processors nodeProcessors, Processors tierProcessors) {
+        return new Builder(nodeProcessors, tierProcessors);
+    }
+
+    @Override
+    public String toString() {
+        return "MlProcessorAutoscalingCapacity{"
+            + "nodeProcessors="
+            + nodeProcessors
+            + ", tierProcessors="
+            + tierProcessors
+            + ", reason='"
+            + reason
+            + '\''
+            + '}';
+    }
+
+    public static class Builder {
+
+        private Processors nodeProcessors;
+        private Processors tierProcessors;
+        private String reason;
+
+        public Builder(Processors nodeProcessors, Processors tierProcessors) {
+            this.nodeProcessors = nodeProcessors;
+            this.tierProcessors = tierProcessors;
+        }
+
+        public Builder setReason(String reason) {
+            this.reason = reason;
+            return this;
+        }
+
+        MlProcessorAutoscalingCapacity build() {
+            return new MlProcessorAutoscalingCapacity(nodeProcessors, tierProcessors, reason);
+        }
+    }
+}

+ 167 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java

@@ -0,0 +1,167 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.ml.autoscaling;
+
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.Processors;
+import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
+import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext;
+import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
+import org.elasticsearch.xpack.ml.MachineLearning;
+import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentMetadata;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.time.Instant.ofEpochMilli;
+import static org.elasticsearch.common.xcontent.XContentElasticsearchExtension.DEFAULT_FORMATTER;
+import static org.elasticsearch.core.Strings.format;
+
+class MlProcessorAutoscalingDecider {
+
+    private static final Logger logger = LogManager.getLogger(MlProcessorAutoscalingDecider.class);
+
+    private final ScaleTimer scaleTimer;
+    private final NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper;
+
+    MlProcessorAutoscalingDecider(ScaleTimer scaleTimer, NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper) {
+        this.scaleTimer = Objects.requireNonNull(scaleTimer);
+        this.nodeAvailabilityZoneMapper = Objects.requireNonNull(nodeAvailabilityZoneMapper);
+    }
+
+    public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) {
+        TrainedModelAssignmentMetadata trainedModelAssignmentMetadata = TrainedModelAssignmentMetadata.fromState(context.state());
+
+        if (hasUnsatisfiedDeployments(trainedModelAssignmentMetadata, mlContext.mlNodes)) {
+            logger.debug(() -> "Computing required capacity as there are partially allocated deployments");
+            scaleTimer.resetScaleDownCoolDown();
+            return computeRequiredCapacity(trainedModelAssignmentMetadata).setReason(
+                "requesting scale up as there are unsatisfied deployments"
+            ).build();
+        }
+
+        final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes);
+
+        final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build();
+
+        if (requiredCapacity.tierProcessors().roundUp() == currentCapacity.tierProcessors().roundUp()) {
+            return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
+                .setReason("passing currently perceived capacity as it is fully used")
+                .build();
+        }
+
+        if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu(
+            trainedModelAssignmentMetadata.modelAssignments().values(),
+            mlContext.mlNodes
+        )) {
+            return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
+                .setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors")
+                .build();
+        }
+
+        long msLeftToScale = scaleTimer.markDownScaleAndGetMillisLeftFromDelay(configuration);
+        if (msLeftToScale <= 0) {
+            return MlProcessorAutoscalingCapacity.builder(requiredCapacity.nodeProcessors(), requiredCapacity.tierProcessors())
+                .setReason("requesting scale down as tier and/or node size could be smaller")
+                .build();
+        }
+
+        TimeValue downScaleDelay = MlAutoscalingDeciderService.DOWN_SCALE_DELAY.get(configuration);
+        logger.debug(
+            () -> format(
+                "not scaling down as the current scale down delay [%s] is not satisfied."
+                    + " The last time scale down was detected [%s]. Calculated scaled down capacity [%s] ",
+                downScaleDelay.getStringRep(),
+                DEFAULT_FORMATTER.format(ofEpochMilli(scaleTimer.downScaleDetectedMillis())),
+                requiredCapacity
+            )
+        );
+        return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
+            .setReason(
+                String.format(
+                    Locale.ROOT,
+                    "Passing currently perceived capacity as down scale delay has not been satisfied; configured delay [%s] "
+                        + "last detected scale down event [%s]. Will request scale down in approximately [%s]",
+                    downScaleDelay.getStringRep(),
+                    XContentElasticsearchExtension.DEFAULT_FORMATTER.format(Instant.ofEpochMilli(scaleTimer.downScaleDetectedMillis())),
+                    TimeValue.timeValueMillis(msLeftToScale).getStringRep()
+                )
+            )
+            .build();
+    }
+
+    private boolean hasUnsatisfiedDeployments(TrainedModelAssignmentMetadata trainedModelAssignmentMetadata, List<DiscoveryNode> mlNodes) {
+        final Set<String> mlNodeIds = mlNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
+        return trainedModelAssignmentMetadata.modelAssignments()
+            .values()
+            .stream()
+            .anyMatch(deployment -> deployment.isSatisfied(mlNodeIds) == false);
+    }
+
+    private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedModelAssignmentMetadata trainedModelAssignmentMetadata) {
+        int maxThreadsPerAllocation = 0;
+        int processorCount = 0;
+        for (TrainedModelAssignment assignment : trainedModelAssignmentMetadata.modelAssignments().values()) {
+            int threadsPerAllocation = assignment.getTaskParams().getThreadsPerAllocation();
+            maxThreadsPerAllocation = Math.max(maxThreadsPerAllocation, threadsPerAllocation);
+            processorCount += assignment.getTaskParams().getNumberOfAllocations() * threadsPerAllocation;
+        }
+
+        final int numMlAvailabilityZones = nodeAvailabilityZoneMapper.getNumMlAvailabilityZones().orElse(1);
+        if (numMlAvailabilityZones > 1) {
+            // We assume cloud provides what we ask for tier processors for each availability zone.
+            // Thus we need to devide the total processor count required by the number of ML availability zones.
+            processorCount = (processorCount - 1) / numMlAvailabilityZones + 1;
+        }
+        processorCount = Math.max(processorCount, maxThreadsPerAllocation);
+
+        return MlProcessorAutoscalingCapacity.builder(
+            maxThreadsPerAllocation > 0 ? Processors.of(Double.valueOf(maxThreadsPerAllocation)) : Processors.ZERO,
+            processorCount > 0 ? Processors.of(Double.valueOf(processorCount)) : Processors.ZERO
+        );
+    }
+
+    MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes) {
+        Processors maxNodeProcessors = Processors.ZERO;
+        Processors tierProcessors = Processors.ZERO;
+        for (DiscoveryNode node : mlNodes) {
+            Processors nodeProcessors = getProcessors(node);
+            if (nodeProcessors.compareTo(maxNodeProcessors) > 0) {
+                maxNodeProcessors = nodeProcessors;
+            }
+            tierProcessors = tierProcessors.plus(nodeProcessors);
+        }
+        return MlProcessorAutoscalingCapacity.builder(maxNodeProcessors, tierProcessors).build();
+    }
+
+    private Processors getProcessors(DiscoveryNode node) {
+        String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
+        if (allocatedProcessorsString == null) {
+            return Processors.ZERO;
+        }
+        try {
+            double processorsAsDouble = Double.parseDouble(allocatedProcessorsString);
+            return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO;
+        } catch (NumberFormatException e) {
+            assert e == null
+                : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
+                    + " should parse because we set it internally: invalid value was ["
+                    + allocatedProcessorsString
+                    + "]";
+            return Processors.ZERO;
+        }
+    }
+}

+ 8 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java

@@ -14,6 +14,7 @@ import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.Processors;
 import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
 import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo;
 import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingState;
@@ -30,7 +31,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.OptionalInt;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
@@ -142,7 +142,7 @@ class TrainedModelAssignmentRebalancer {
                                 // We subtract native inference memory as the planner expects available memory for
                                 // native inference including current assignments.
                                 getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(load),
-                                getNodeAllocatedProcessors(discoveryNode).orElse(0)
+                                getNodeAllocatedProcessors(discoveryNode).roundUp()
                             )
                         );
                     } else {
@@ -158,16 +158,17 @@ class TrainedModelAssignmentRebalancer {
         }));
     }
 
-    private static OptionalInt getNodeAllocatedProcessors(DiscoveryNode node) {
+    private static Processors getNodeAllocatedProcessors(DiscoveryNode node) {
         String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
         try {
-            return OptionalInt.of(Integer.parseInt(allocatedProcessorsString));
+            double allocatedProcessorsAsDouble = allocatedProcessorsString == null ? 0.0 : Double.parseDouble(allocatedProcessorsString);
+            return allocatedProcessorsAsDouble > 0 ? Processors.of(allocatedProcessorsAsDouble) : Processors.ZERO;
         } catch (NumberFormatException e) {
             assert e == null
                 : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
                     + " should parse because we set it internally: invalid value was "
                     + allocatedProcessorsString;
-            return OptionalInt.empty();
+            return Processors.ZERO;
         }
     }
 
@@ -266,7 +267,7 @@ class TrainedModelAssignmentRebalancer {
             // But we should also check if we managed to assign a model during the rebalance for which
             // we check if the node has used up any of its allocated processors.
             boolean isPerNodeOverheadAccountedFor = load.getNumAssignedJobsAndModels() > 0
-                || assignmentPlan.getRemainingNodeCores(load.getNodeId()) < getNodeAllocatedProcessors(node).orElse(0);
+                || assignmentPlan.getRemainingNodeCores(load.getNodeId()) < getNodeAllocatedProcessors(node).roundUp();
             long requiredMemory = model.memoryBytes() + (isPerNodeOverheadAccountedFor
                 ? 0
                 : MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes());
@@ -295,7 +296,7 @@ class TrainedModelAssignmentRebalancer {
                     "This node has insufficient allocated processors. Available processors [{}], free processors [{}], "
                         + "processors required for each allocation of this model [{}]",
                     new Object[] {
-                        getNodeAllocatedProcessors(node).orElse(0),
+                        getNodeAllocatedProcessors(node).roundUp(),
                         assignmentPlan.getRemainingNodeCores(node.getId()),
                         model.threadsPerAllocation() }
                 )

+ 0 - 14
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java

@@ -194,15 +194,6 @@ public class MlMemoryAutoscalingDeciderTests extends ESTestCase {
                 .incNumAssignedAnomalyDetectorJobs()
                 .build()
         );
-        MlScalingReason.Builder reasonBuilder = new MlScalingReason.Builder(new MlAutoscalingContext()).setPassedConfiguration(
-            Settings.EMPTY
-        )
-            .setCurrentMlCapacity(
-                AutoscalingCapacity.builder()
-                    .node(null, AUTO_NODE_TIERS_NO_MONITORING.get(0).v1(), null)
-                    .total(null, AUTO_NODE_TIERS_NO_MONITORING.get(0).v1(), null)
-                    .build()
-            );
         MlMemoryAutoscalingDecider decider = buildDecider();
         decider.setUseAuto(true);
         MlMemoryAutoscalingCapacity scaleUpResult = decider.checkForScaleUp(
@@ -244,8 +235,6 @@ public class MlMemoryAutoscalingDeciderTests extends ESTestCase {
                 .incNumAssignedAnomalyDetectorJobs()
                 .build()
         );
-        reasonBuilder = new MlScalingReason.Builder(new MlAutoscalingContext()).setPassedConfiguration(Settings.EMPTY)
-            .setCurrentMlCapacity(AutoscalingCapacity.builder().node(null, 2147483648L, null).total(null, 2147483648L, null).build());
         MlMemoryAutoscalingCapacity result = decider.checkForScaleDown(
             nodeForScaleDown,
             ByteSizeValue.ofMb(200).getBytes() + Job.PROCESS_MEMORY_OVERHEAD.getBytes(),
@@ -956,9 +945,6 @@ public class MlMemoryAutoscalingDeciderTests extends ESTestCase {
         when(nodeAvailabilityZoneMapper.getNumMlAvailabilityZones()).thenReturn(OptionalInt.of(3));
         MlMemoryAutoscalingDecider decider = buildDecider();
         decider.setMaxMachineMemoryPercent(25);
-        MlScalingReason.Builder reasonBuilder = new MlScalingReason.Builder(new MlAutoscalingContext()).setPassedConfiguration(
-            Settings.EMPTY
-        ).setCurrentMlCapacity(AutoscalingCapacity.ZERO);
         { // Current capacity allows for smaller node
             List<NodeLoad> nodeLoads = List.of(
                 NodeLoad.builder("foo")

+ 458 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java

@@ -0,0 +1,458 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.ml.autoscaling;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.collect.MapBuilder;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.Processors;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext;
+import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
+import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo;
+import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingState;
+import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
+import org.elasticsearch.xpack.ml.MachineLearning;
+import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentMetadata;
+import org.junit.Before;
+
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.LongSupplier;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MlProcessorAutoscalingDeciderTests extends ESTestCase {
+
+    private ScaleTimer scaleTimer;
+    private NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper;
+
+    @Before
+    public void setup() {
+        scaleTimer = new ScaleTimer(System::currentTimeMillis);
+        nodeAvailabilityZoneMapper = mock(NodeAvailabilityZoneMapper.class);
+        when(nodeAvailabilityZoneMapper.getNumMlAvailabilityZones()).thenReturn(OptionalInt.empty());
+    }
+
+    public void testScale_GivenCurrentCapacityIsUsedExactly() {
+        String modelId1 = "model-id-1";
+        String modelId2 = "model-id-2";
+
+        String mlNodeId1 = "ml-node-id-1";
+        String mlNodeId2 = "ml-node-id-2";
+        String dataNodeId = "data-node-id";
+        DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 7.8);
+        DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 7.6);
+        DiscoveryNode dataNode = buildNode(dataNodeId, false, 24.0);
+
+        ClusterState clusterState = ClusterState.builder(new ClusterName("test"))
+            .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build())
+            .metadata(
+                Metadata.builder()
+                    .putCustom(
+                        TrainedModelAssignmentMetadata.NAME,
+                        TrainedModelAssignmentMetadata.Builder.empty()
+                            .addNewAssignment(
+                                modelId1,
+                                TrainedModelAssignment.Builder.empty(
+                                    new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 3, 2, 1024, ByteSizeValue.ONE)
+                                ).addRoutingEntry(mlNodeId1, new RoutingInfo(2, 2, RoutingState.STARTED, ""))
+                            )
+                            .addNewAssignment(
+                                modelId2,
+                                TrainedModelAssignment.Builder.empty(
+                                    new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 1, 10, 1024, ByteSizeValue.ONE)
+                                )
+                                    .addRoutingEntry(mlNodeId1, new RoutingInfo(2, 2, RoutingState.STARTED, ""))
+                                    .addRoutingEntry(mlNodeId2, new RoutingInfo(8, 8, RoutingState.STARTED, ""))
+                            )
+                            .build()
+                    )
+                    .build()
+            )
+            .build();
+
+        MlProcessorAutoscalingDecider decider = newDecider();
+
+        MlProcessorAutoscalingCapacity capacity = decider.scale(
+            Settings.EMPTY,
+            newContext(clusterState),
+            new MlAutoscalingContext(clusterState)
+        );
+
+        assertThat(capacity.nodeProcessors(), equalTo(Processors.of(7.8)));
+        assertThat(capacity.tierProcessors(), equalTo(Processors.of(15.4)));
+        assertThat(capacity.reason(), equalTo("passing currently perceived capacity as it is fully used"));
+    }
+
+    public void testScale_GivenUnsatisfiedDeployments() {
+        String modelId1 = "model-id-1";
+        String modelId2 = "model-id-2";
+
+        String mlNodeId1 = "ml-node-id-1";
+        String mlNodeId2 = "ml-node-id-2";
+        String dataNodeId = "data-node-id";
+        DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 4);
+        DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 4);
+        DiscoveryNode dataNode = buildNode(dataNodeId, false, 24);
+
+        ClusterState clusterState = ClusterState.builder(new ClusterName("test"))
+            .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build())
+            .metadata(
+                Metadata.builder()
+                    .putCustom(
+                        TrainedModelAssignmentMetadata.NAME,
+                        TrainedModelAssignmentMetadata.Builder.empty()
+                            .addNewAssignment(
+                                modelId1,
+                                TrainedModelAssignment.Builder.empty(
+                                    new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 8, 1, 1024, ByteSizeValue.ONE)
+                                )
+                            )
+                            .addNewAssignment(
+                                modelId2,
+                                TrainedModelAssignment.Builder.empty(
+                                    new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 4, 3, 1024, ByteSizeValue.ONE)
+                                )
+                                    .addRoutingEntry(mlNodeId1, new RoutingInfo(1, 1, RoutingState.STARTED, ""))
+                                    .addRoutingEntry(mlNodeId2, new RoutingInfo(1, 1, RoutingState.STARTED, ""))
+                            )
+                            .build()
+                    )
+                    .build()
+            )
+            .build();
+
+        MlProcessorAutoscalingDecider decider = newDecider();
+
+        MlProcessorAutoscalingCapacity capacity = decider.scale(
+            Settings.EMPTY,
+            newContext(clusterState),
+            new MlAutoscalingContext(clusterState)
+        );
+
+        assertThat(capacity.nodeProcessors(), equalTo(Processors.of(8.0)));
+        assertThat(capacity.tierProcessors(), equalTo(Processors.of(20.0)));
+        assertThat(capacity.reason(), equalTo("requesting scale up as there are unsatisfied deployments"));
+    }
+
+    public void testScale_GivenUnsatisfiedDeployments_AndThreeMlAvailabilityZones_AndNodeProcessorsMoreThanTierProcessors() {
+        givenMlAvailabilityZones(3);
+
+        String modelId1 = "model-id-1";
+        String modelId2 = "model-id-2";
+
+        String mlNodeId1 = "ml-node-id-1";
+        String mlNodeId2 = "ml-node-id-2";
+        String dataNodeId = "data-node-id";
+        DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 4);
+        DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 4);
+        DiscoveryNode dataNode = buildNode(dataNodeId, false, 24);
+
+        ClusterState clusterState = ClusterState.builder(new ClusterName("test"))
+            .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build())
+            .metadata(
+                Metadata.builder()
+                    .putCustom(
+                        TrainedModelAssignmentMetadata.NAME,
+                        TrainedModelAssignmentMetadata.Builder.empty()
+                            .addNewAssignment(
+                                modelId1,
+                                TrainedModelAssignment.Builder.empty(
+                                    new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 8, 1, 1024, ByteSizeValue.ONE)
+                                )
+                            )
+                            .addNewAssignment(
+                                modelId2,
+                                TrainedModelAssignment.Builder.empty(
+                                    new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 4, 3, 1024, ByteSizeValue.ONE)
+                                )
+                                    .addRoutingEntry(mlNodeId1, new RoutingInfo(1, 1, RoutingState.STARTED, ""))
+                                    .addRoutingEntry(mlNodeId2, new RoutingInfo(1, 1, RoutingState.STARTED, ""))
+                            )
+                            .build()
+                    )
+                    .build()
+            )
+            .build();
+
+        MlProcessorAutoscalingDecider decider = newDecider();
+
+        MlProcessorAutoscalingCapacity capacity = decider.scale(
+            Settings.EMPTY,
+            newContext(clusterState),
+            new MlAutoscalingContext(clusterState)
+        );
+
+        assertThat(capacity.nodeProcessors(), equalTo(Processors.of(8.0)));
+        assertThat(capacity.tierProcessors(), equalTo(Processors.of(8.0)));
+        assertThat(capacity.reason(), equalTo("requesting scale up as there are unsatisfied deployments"));
+    }
+
+    public void testScale_GivenUnsatisfiedDeployments_AndThreeMlAvailabilityZones_AndNodeProcessorsLessThanTierProcessors() {
+        givenMlAvailabilityZones(3);
+
+        String modelId1 = "model-id-1";
+        String modelId2 = "model-id-2";
+
+        String mlNodeId1 = "ml-node-id-1";
+        String mlNodeId2 = "ml-node-id-2";
+        String dataNodeId = "data-node-id";
+        DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 4);
+        DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 4);
+        DiscoveryNode dataNode = buildNode(dataNodeId, false, 24);
+
+        ClusterState clusterState = ClusterState.builder(new ClusterName("test"))
+            .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build())
+            .metadata(
+                Metadata.builder()
+                    .putCustom(
+                        TrainedModelAssignmentMetadata.NAME,
+                        TrainedModelAssignmentMetadata.Builder.empty()
+                            .addNewAssignment(
+                                modelId1,
+                                TrainedModelAssignment.Builder.empty(
+                                    new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 8, 1, 1024, ByteSizeValue.ONE)
+                                )
+                            )
+                            .addNewAssignment(
+                                modelId2,
+                                TrainedModelAssignment.Builder.empty(
+                                    new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 4, 6, 1024, ByteSizeValue.ONE)
+                                )
+                                    .addRoutingEntry(mlNodeId1, new RoutingInfo(1, 1, RoutingState.STARTED, ""))
+                                    .addRoutingEntry(mlNodeId2, new RoutingInfo(1, 1, RoutingState.STARTED, ""))
+                            )
+                            .build()
+                    )
+                    .build()
+            )
+            .build();
+
+        MlProcessorAutoscalingDecider decider = newDecider();
+
+        MlProcessorAutoscalingCapacity capacity = decider.scale(
+            Settings.EMPTY,
+            newContext(clusterState),
+            new MlAutoscalingContext(clusterState)
+        );
+
+        assertThat(capacity.nodeProcessors(), equalTo(Processors.of(8.0)));
+        assertThat(capacity.tierProcessors(), equalTo(Processors.of(11.0)));
+        assertThat(capacity.reason(), equalTo("requesting scale up as there are unsatisfied deployments"));
+    }
+
+    public void testScale_GivenMoreThanHalfProcessorsAreUsed() {
+        String modelId1 = "model-id-1";
+        String modelId2 = "model-id-2";
+
+        String mlNodeId1 = "ml-node-id-1";
+        String mlNodeId2 = "ml-node-id-2";
+        String dataNodeId = "data-node-id";
+        DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 3.8);
+        DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 3.8);
+        DiscoveryNode dataNode = buildNode(dataNodeId, false, 24);
+
+        ClusterState clusterState = ClusterState.builder(new ClusterName("test"))
+            .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build())
+            .metadata(
+                Metadata.builder()
+                    .putCustom(
+                        TrainedModelAssignmentMetadata.NAME,
+                        TrainedModelAssignmentMetadata.Builder.empty()
+                            .addNewAssignment(
+                                modelId1,
+                                TrainedModelAssignment.Builder.empty(
+                                    new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 2, 2, 1024, ByteSizeValue.ONE)
+                                ).addRoutingEntry(mlNodeId1, new RoutingInfo(2, 2, RoutingState.STARTED, ""))
+                            )
+                            .addNewAssignment(
+                                modelId2,
+                                TrainedModelAssignment.Builder.empty(
+                                    new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 1, 1, 1024, ByteSizeValue.ONE)
+                                ).addRoutingEntry(mlNodeId2, new RoutingInfo(1, 1, RoutingState.STARTED, ""))
+                            )
+                            .build()
+                    )
+                    .build()
+            )
+            .build();
+
+        MlProcessorAutoscalingDecider decider = newDecider();
+
+        MlProcessorAutoscalingCapacity capacity = decider.scale(
+            Settings.EMPTY,
+            newContext(clusterState),
+            new MlAutoscalingContext(clusterState)
+        );
+
+        assertThat(capacity.nodeProcessors(), equalTo(Processors.of(3.8)));
+        assertThat(capacity.tierProcessors(), equalTo(Processors.of(7.6)));
+        assertThat(
+            capacity.reason(),
+            equalTo("not scaling down as model assignments require more than half of the ML tier's allocated processors")
+        );
+    }
+
+    public void testScale_GivenDownScalePossible_DelayNotSatisfied() {
+        String modelId1 = "model-id-1";
+        String modelId2 = "model-id-2";
+
+        String mlNodeId1 = "ml-node-id-1";
+        String mlNodeId2 = "ml-node-id-2";
+        String dataNodeId = "data-node-id";
+        DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 7.9);
+        DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 7.9);
+        DiscoveryNode dataNode = buildNode(dataNodeId, false, 24);
+
+        ClusterState clusterState = ClusterState.builder(new ClusterName("test"))
+            .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build())
+            .metadata(
+                Metadata.builder()
+                    .putCustom(
+                        TrainedModelAssignmentMetadata.NAME,
+                        TrainedModelAssignmentMetadata.Builder.empty()
+                            .addNewAssignment(
+                                modelId1,
+                                TrainedModelAssignment.Builder.empty(
+                                    new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 2, 2, 1024, ByteSizeValue.ONE)
+                                ).addRoutingEntry(mlNodeId1, new RoutingInfo(2, 2, RoutingState.STARTED, ""))
+                            )
+                            .addNewAssignment(
+                                modelId2,
+                                TrainedModelAssignment.Builder.empty(
+                                    new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 1, 1, 1024, ByteSizeValue.ONE)
+                                ).addRoutingEntry(mlNodeId2, new RoutingInfo(1, 1, RoutingState.STARTED, ""))
+                            )
+                            .build()
+                    )
+                    .build()
+            )
+            .build();
+
+        MlProcessorAutoscalingDecider decider = newDecider();
+        scaleTimer.markScale();
+
+        MlProcessorAutoscalingCapacity capacity = decider.scale(
+            Settings.EMPTY,
+            newContext(clusterState),
+            new MlAutoscalingContext(clusterState)
+        );
+
+        assertThat(capacity.nodeProcessors(), equalTo(Processors.of(7.9)));
+        assertThat(capacity.tierProcessors(), equalTo(Processors.of(15.8)));
+        assertThat(capacity.reason(), containsString("Passing currently perceived capacity as down scale delay has not been satisfied"));
+    }
+
+    public void testScale_GivenDownScalePossible_DelaySatisfied() {
+        String modelId1 = "model-id-1";
+        String modelId2 = "model-id-2";
+
+        String mlNodeId1 = "ml-node-id-1";
+        String mlNodeId2 = "ml-node-id-2";
+        String dataNodeId = "data-node-id";
+        DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 8);
+        DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 8);
+        DiscoveryNode dataNode = buildNode(dataNodeId, false, 24);
+
+        ClusterState clusterState = ClusterState.builder(new ClusterName("test"))
+            .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build())
+            .metadata(
+                Metadata.builder()
+                    .putCustom(
+                        TrainedModelAssignmentMetadata.NAME,
+                        TrainedModelAssignmentMetadata.Builder.empty()
+                            .addNewAssignment(
+                                modelId1,
+                                TrainedModelAssignment.Builder.empty(
+                                    new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 2, 2, 1024, ByteSizeValue.ONE)
+                                ).addRoutingEntry(mlNodeId1, new RoutingInfo(2, 2, RoutingState.STARTED, ""))
+                            )
+                            .addNewAssignment(
+                                modelId2,
+                                TrainedModelAssignment.Builder.empty(
+                                    new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 1, 1, 1024, ByteSizeValue.ONE)
+                                ).addRoutingEntry(mlNodeId2, new RoutingInfo(1, 1, RoutingState.STARTED, ""))
+                            )
+                            .build()
+                    )
+                    .build()
+            )
+            .build();
+
+        TimeMachine timeMachine = new TimeMachine();
+        scaleTimer = new ScaleTimer(timeMachine);
+        MlProcessorAutoscalingDecider decider = newDecider();
+        scaleTimer.markScale();
+        scaleTimer.markDownScaleAndGetMillisLeftFromDelay(Settings.EMPTY);
+        timeMachine.setOffset(TimeValue.timeValueHours(1));
+
+        MlProcessorAutoscalingCapacity capacity = decider.scale(
+            Settings.EMPTY,
+            newContext(clusterState),
+            new MlAutoscalingContext(clusterState)
+        );
+
+        assertThat(capacity.nodeProcessors(), equalTo(Processors.of(2.0)));
+        assertThat(capacity.tierProcessors(), equalTo(Processors.of(5.0)));
+        assertThat(capacity.reason(), containsString("requesting scale down as tier and/or node size could be smaller"));
+    }
+
+    private static DiscoveryNode buildNode(String name, boolean isML, double allocatedProcessors) {
+        return new DiscoveryNode(
+            name,
+            name,
+            buildNewFakeTransportAddress(),
+            MapBuilder.<String, String>newMapBuilder()
+                .put(MachineLearning.MAX_JVM_SIZE_NODE_ATTR, String.valueOf(10))
+                .put(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, String.valueOf(allocatedProcessors))
+                .map(),
+            isML ? DiscoveryNodeRole.roles() : Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE),
+            Version.CURRENT
+        );
+    }
+
+    private MlProcessorAutoscalingDecider newDecider() {
+        return new MlProcessorAutoscalingDecider(scaleTimer, nodeAvailabilityZoneMapper);
+    }
+
+    private AutoscalingDeciderContext newContext(ClusterState clusterState) {
+        AutoscalingDeciderContext context = mock(AutoscalingDeciderContext.class);
+        when(context.state()).thenReturn(clusterState);
+        return context;
+    }
+
+    private void givenMlAvailabilityZones(int zones) {
+        when(nodeAvailabilityZoneMapper.getNumMlAvailabilityZones()).thenReturn(OptionalInt.of(zones));
+    }
+
+    private static class TimeMachine implements LongSupplier {
+
+        private long offsetMillis;
+
+        void setOffset(TimeValue timeValue) {
+            this.offsetMillis = timeValue.millis();
+        }
+
+        @Override
+        public long getAsLong() {
+            return System.currentTimeMillis() + offsetMillis;
+        }
+    }
+}