Browse Source

[ML] Ignore failed jobs in serverless autoscaling (#101692)

Failed jobs have persistent tasks but do not have corresponding
native processes running, so should not count towards the
memory requirements of the ML tier.

This PR filters failed jobs before calculating memory requirements
for serverless autoscaling. (This was already accounted for
correctly in stateful autoscaling.)

Also adds some missing tests.
David Roberts 2 years ago
parent
commit
07a68f2f63

+ 30 - 14
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java

@@ -22,6 +22,8 @@ import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
 import org.elasticsearch.xpack.core.ml.autoscaling.MlAutoscalingStats;
 import org.elasticsearch.xpack.core.ml.inference.assignment.AssignmentState;
 import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
+import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
+import org.elasticsearch.xpack.core.ml.utils.MemoryTrackedTaskState;
 import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
@@ -130,22 +132,27 @@ public final class MlAutoscalingResourceTracker {
             autoscalingContext.modelAssignments.size()
         );
 
-        // start with `minNodes = 1` if any ML job is started, further adjustments are made for trained models below
-        int minNodes = autoscalingContext.anomalyDetectionTasks.isEmpty()
-            && autoscalingContext.dataframeAnalyticsTasks.isEmpty()
-            && autoscalingContext.modelAssignments.isEmpty() ? 0 : 1;
+        // Start with `minNodes = 0`. If any ML job is started this will be increased to 1 in the loops below,
+        // and further adjustments are made for trained models depending on allocations.
+        int minNodes = 0;
 
         // anomaly detection
         for (var task : autoscalingContext.anomalyDetectionTasks) {
+            MemoryTrackedTaskState state = MlTasks.getMemoryTrackedTaskState(task);
+            if (state != null && state.consumesMemory() == false) {
+                continue;
+            }
+
             String jobId = ((OpenJobAction.JobParams) task.getParams()).getJobId();
             Long jobMemory = mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(jobId);
-
             if (jobMemory == null) {
                 logger.debug("could not find memory requirement for job [{}], returning no-scale", jobId);
                 listener.onResponse(noScaleStats(numberMlNodes));
                 return;
             }
 
+            minNodes = 1;
+
             if (AWAITING_LAZY_ASSIGNMENT.equals(task.getAssignment())) {
                 logger.debug("job [{}] lacks assignment , memory required [{}]", jobId, jobMemory);
 
@@ -165,15 +172,21 @@ public final class MlAutoscalingResourceTracker {
 
         // data frame analytics
         for (var task : autoscalingContext.dataframeAnalyticsTasks) {
+            MemoryTrackedTaskState state = MlTasks.getMemoryTrackedTaskState(task);
+            if (state != null && state.consumesMemory() == false) {
+                continue;
+            }
+
             String jobId = MlTasks.dataFrameAnalyticsId(task.getId());
             Long jobMemory = mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(jobId);
-
             if (jobMemory == null) {
                 logger.debug("could not find memory requirement for job [{}], returning no-scale", jobId);
                 listener.onResponse(noScaleStats(numberMlNodes));
                 return;
             }
 
+            minNodes = 1;
+
             if (AWAITING_LAZY_ASSIGNMENT.equals(task.getAssignment())) {
                 logger.debug("dfa job [{}] lacks assignment , memory required [{}]", jobId, jobMemory);
 
@@ -192,12 +205,12 @@ public final class MlAutoscalingResourceTracker {
 
         // trained models
         for (var modelAssignment : autoscalingContext.modelAssignments.entrySet()) {
-            final int numberOfAllocations = modelAssignment.getValue().getTaskParams().getNumberOfAllocations();
-            final int numberOfThreadsPerAllocation = modelAssignment.getValue().getTaskParams().getThreadsPerAllocation();
-            final long estimatedMemoryUsage = modelAssignment.getValue().getTaskParams().estimateMemoryUsageBytes();
+            TrainedModelAssignment assignment = modelAssignment.getValue();
+            final int numberOfAllocations = assignment.getTaskParams().getNumberOfAllocations();
+            final int numberOfThreadsPerAllocation = assignment.getTaskParams().getThreadsPerAllocation();
+            final long estimatedMemoryUsage = assignment.getTaskParams().estimateMemoryUsageBytes();
 
-            if (AssignmentState.STARTING.equals(modelAssignment.getValue().getAssignmentState())
-                && modelAssignment.getValue().getNodeRoutingTable().isEmpty()) {
+            if (AssignmentState.STARTING.equals(assignment.getAssignmentState()) && assignment.getNodeRoutingTable().isEmpty()) {
 
                 logger.debug(
                     () -> format(
@@ -216,6 +229,9 @@ public final class MlAutoscalingResourceTracker {
                     extraSingleNodeProcessors = Math.max(extraSingleNodeProcessors, numberOfThreadsPerAllocation);
                     extraProcessors += numberOfAllocations * numberOfThreadsPerAllocation;
                 }
+            } else if (assignment.getNodeRoutingTable().values().stream().allMatch(r -> r.getState().consumesMemory() == false)) {
+                // Ignore states that don't consume memory, for example all allocations are failed
+                continue;
             } else {
                 logger.debug(
                     () -> format(
@@ -229,9 +245,6 @@ public final class MlAutoscalingResourceTracker {
                 modelMemoryBytesSum += estimatedMemoryUsage;
                 processorsSum += numberOfAllocations * numberOfThreadsPerAllocation;
 
-                // min(3, max(number of allocations over all deployed models)
-                minNodes = Math.min(3, Math.max(minNodes, numberOfAllocations));
-
                 for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) {
                     perNodeModelMemoryInBytes.computeIfAbsent(node, k -> new ArrayList<>())
                         .add(
@@ -244,6 +257,9 @@ public final class MlAutoscalingResourceTracker {
                         );
                 }
             }
+
+            // min(3, max(number of allocations over all deployed models)
+            minNodes = Math.min(3, Math.max(minNodes, numberOfAllocations));
         }
 
         // check for downscaling

+ 137 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTrackerTests.java

@@ -14,12 +14,16 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.ml.MlTasks;
+import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
 import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
 import org.elasticsearch.xpack.core.ml.autoscaling.MlAutoscalingStats;
 import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
 import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo;
 import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingState;
 import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
+import org.elasticsearch.xpack.core.ml.job.config.JobState;
+import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
 import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
 
@@ -34,7 +38,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
 import static org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingResourceTracker.MlJobRequirements;
+import static org.elasticsearch.xpack.ml.job.JobNodeSelector.AWAITING_LAZY_ASSIGNMENT;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class MlAutoscalingResourceTrackerTests extends ESTestCase {
 
@@ -83,6 +89,137 @@ public class MlAutoscalingResourceTrackerTests extends ESTestCase {
         );
     }
 
+    public void testGetMemoryAndProcessorsScaleUpGivenAwaitingLazyAssignment() throws InterruptedException {
+        long memory = 1000000000;
+        Map<String, String> nodeAttr = Map.of(
+            MachineLearning.MACHINE_MEMORY_NODE_ATTR,
+            Long.toString(memory),
+            MachineLearning.MAX_JVM_SIZE_NODE_ATTR,
+            "400000000",
+            MachineLearning.ML_CONFIG_VERSION_NODE_ATTR,
+            "7.2.0"
+        );
+        String jobId = "lazy-job";
+        MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(
+            List.of(
+                new PersistentTasksCustomMetadata.PersistentTask<>(
+                    MlTasks.jobTaskId(jobId),
+                    MlTasks.JOB_TASK_NAME,
+                    new OpenJobAction.JobParams(jobId),
+                    1,
+                    AWAITING_LAZY_ASSIGNMENT
+                )
+            ),
+            List.of(),
+            List.of(),
+            Map.of(),
+            List.of(
+                DiscoveryNodeUtils.builder("ml-1")
+                    .name("ml-1")
+                    .address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
+                    .attributes(nodeAttr)
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .build(),
+                DiscoveryNodeUtils.builder("ml-2")
+                    .name("ml-2")
+                    .address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
+                    .attributes(nodeAttr)
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .build()
+            ),
+            PersistentTasksCustomMetadata.builder().build()
+        );
+        MlMemoryTracker mockTracker = mock(MlMemoryTracker.class);
+        when(mockTracker.getAnomalyDetectorJobMemoryRequirement(jobId)).thenReturn(memory / 4);
+        this.<MlAutoscalingStats>assertAsync(
+            listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors(
+                mlAutoscalingContext,
+                mockTracker,
+                Map.of("ml-1", memory, "ml-2", memory),
+                memory / 2,
+                10,
+                MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE,
+                listener
+            ),
+            stats -> {
+                assertEquals(memory, stats.perNodeMemoryInBytes());
+                assertEquals(2, stats.nodes());
+                assertEquals(1, stats.minNodes());
+                assertEquals(0, stats.extraSingleNodeProcessors());
+                assertEquals(memory / 4, stats.extraSingleNodeModelMemoryInBytes());
+                assertEquals(memory / 4, stats.extraModelMemoryInBytes());
+                assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.perNodeMemoryOverheadInBytes());
+            }
+        );
+    }
+
+    public void testGetMemoryAndProcessorsScaleUpGivenAwaitingLazyAssignmentButFailed() throws InterruptedException {
+        long memory = 1000000000;
+        Map<String, String> nodeAttr = Map.of(
+            MachineLearning.MACHINE_MEMORY_NODE_ATTR,
+            Long.toString(memory),
+            MachineLearning.MAX_JVM_SIZE_NODE_ATTR,
+            "400000000",
+            MachineLearning.ML_CONFIG_VERSION_NODE_ATTR,
+            "7.2.0"
+        );
+        String jobId = "lazy-job";
+        MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(
+            List.of(
+                new PersistentTasksCustomMetadata.PersistentTask<>(
+                    new PersistentTasksCustomMetadata.PersistentTask<>(
+                        MlTasks.jobTaskId(jobId),
+                        MlTasks.JOB_TASK_NAME,
+                        new OpenJobAction.JobParams(jobId),
+                        1,
+                        AWAITING_LAZY_ASSIGNMENT
+                    ),
+                    new JobTaskState(JobState.FAILED, 1, "a nasty bug")
+                )
+            ),
+            List.of(),
+            List.of(),
+            Map.of(),
+            List.of(
+                DiscoveryNodeUtils.builder("ml-1")
+                    .name("ml-1")
+                    .address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
+                    .attributes(nodeAttr)
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .build(),
+                DiscoveryNodeUtils.builder("ml-2")
+                    .name("ml-2")
+                    .address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
+                    .attributes(nodeAttr)
+                    .roles(Set.of(DiscoveryNodeRole.ML_ROLE))
+                    .build()
+            ),
+            PersistentTasksCustomMetadata.builder().build()
+        );
+        MlMemoryTracker mockTracker = mock(MlMemoryTracker.class);
+        when(mockTracker.getAnomalyDetectorJobMemoryRequirement(jobId)).thenReturn(memory / 4);
+        this.<MlAutoscalingStats>assertAsync(
+            listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors(
+                mlAutoscalingContext,
+                mockTracker,
+                Map.of("ml-1", memory, "ml-2", memory),
+                memory / 2,
+                10,
+                MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE,
+                listener
+            ),
+            stats -> {
+                assertEquals(memory, stats.perNodeMemoryInBytes());
+                assertEquals(2, stats.nodes());
+                assertEquals(0, stats.minNodes());
+                assertEquals(0, stats.extraSingleNodeProcessors());
+                assertEquals(0, stats.extraSingleNodeModelMemoryInBytes());
+                assertEquals(0, stats.extraModelMemoryInBytes());
+                assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.perNodeMemoryOverheadInBytes());
+            }
+        );
+    }
+
     public void testCheckIfJobsCanBeMovedInLeastEfficientWayMemoryOnly() {
         assertEquals(
             0L,
@@ -897,7 +1034,6 @@ public class MlAutoscalingResourceTrackerTests extends ESTestCase {
                     )
                 ).addRoutingEntry("ml-node-3", new RoutingInfo(1, 1, RoutingState.STARTED, "")).build()
             ),
-
             List.of(
                 DiscoveryNodeUtils.builder("ml-node-1")
                     .name("ml-node-name-1")