瀏覽代碼

[ML] Require correct tier processors when multiple AZs are present (#90903)

When we added an autoscaling decider for processors we assumed wrongly
that when there are multiple availability zones ML would get the
processors it requests for the tier multiplied by the number of AZs.

This assumption was incorrect. This commit fixes this issue which should
correct autoscaling behaviour in clusters with multiple AZs.
Dimitris Athanasiou 3 年之前
父節點
當前提交
7653f6b3dd

+ 5 - 0
docs/changelog/90903.yaml

@@ -0,0 +1,5 @@
+pr: 90903
+summary: Require correct tier processors when multiple AZs are present
+area: Machine Learning
+type: bug
+issues: []

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java

@@ -68,7 +68,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
             nodeLoadDetector,
             scaleTimer
         );
-        this.processorDecider = new MlProcessorAutoscalingDecider(scaleTimer, nodeAvailabilityZoneMapper);
+        this.processorDecider = new MlProcessorAutoscalingDecider(scaleTimer);
         clusterService.addLocalNodeMasterListener(this);
     }
 

+ 1 - 11
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java

@@ -35,11 +35,9 @@ class MlProcessorAutoscalingDecider {
     private static final Logger logger = LogManager.getLogger(MlProcessorAutoscalingDecider.class);
 
     private final ScaleTimer scaleTimer;
-    private final NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper;
 
-    MlProcessorAutoscalingDecider(ScaleTimer scaleTimer, NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper) {
+    MlProcessorAutoscalingDecider(ScaleTimer scaleTimer) {
         this.scaleTimer = Objects.requireNonNull(scaleTimer);
-        this.nodeAvailabilityZoneMapper = Objects.requireNonNull(nodeAvailabilityZoneMapper);
     }
 
     public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) {
@@ -120,14 +118,6 @@ class MlProcessorAutoscalingDecider {
             processorCount += assignment.getTaskParams().getNumberOfAllocations() * threadsPerAllocation;
         }
 
-        final int numMlAvailabilityZones = nodeAvailabilityZoneMapper.getNumMlAvailabilityZones().orElse(1);
-        if (numMlAvailabilityZones > 1) {
-            // We assume cloud provides what we ask for tier processors for each availability zone.
-            // Thus we need to devide the total processor count required by the number of ML availability zones.
-            processorCount = (processorCount - 1) / numMlAvailabilityZones + 1;
-        }
-        processorCount = Math.max(processorCount, maxThreadsPerAllocation);
-
         return MlProcessorAutoscalingCapacity.builder(
             maxThreadsPerAllocation > 0 ? Processors.of(Double.valueOf(maxThreadsPerAllocation)) : Processors.ZERO,
             processorCount > 0 ? Processors.of(Double.valueOf(processorCount)) : Processors.ZERO

+ 1 - 115
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java

@@ -29,7 +29,6 @@ import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.inference.assignment.TrainedModelAssignmentMetadata;
 import org.junit.Before;
 
-import java.util.OptionalInt;
 import java.util.Set;
 import java.util.function.LongSupplier;
 
@@ -41,13 +40,10 @@ import static org.mockito.Mockito.when;
 public class MlProcessorAutoscalingDeciderTests extends ESTestCase {
 
     private ScaleTimer scaleTimer;
-    private NodeAvailabilityZoneMapper nodeAvailabilityZoneMapper;
 
     @Before
     public void setup() {
         scaleTimer = new ScaleTimer(System::currentTimeMillis);
-        nodeAvailabilityZoneMapper = mock(NodeAvailabilityZoneMapper.class);
-        when(nodeAvailabilityZoneMapper.getNumMlAvailabilityZones()).thenReturn(OptionalInt.empty());
     }
 
     public void testScale_GivenCurrentCapacityIsUsedExactly() {
@@ -152,112 +148,6 @@ public class MlProcessorAutoscalingDeciderTests extends ESTestCase {
         assertThat(capacity.reason(), equalTo("requesting scale up as there are unsatisfied deployments"));
     }
 
-    public void testScale_GivenUnsatisfiedDeployments_AndThreeMlAvailabilityZones_AndNodeProcessorsMoreThanTierProcessors() {
-        givenMlAvailabilityZones(3);
-
-        String modelId1 = "model-id-1";
-        String modelId2 = "model-id-2";
-
-        String mlNodeId1 = "ml-node-id-1";
-        String mlNodeId2 = "ml-node-id-2";
-        String dataNodeId = "data-node-id";
-        DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 4);
-        DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 4);
-        DiscoveryNode dataNode = buildNode(dataNodeId, false, 24);
-
-        ClusterState clusterState = ClusterState.builder(new ClusterName("test"))
-            .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build())
-            .metadata(
-                Metadata.builder()
-                    .putCustom(
-                        TrainedModelAssignmentMetadata.NAME,
-                        TrainedModelAssignmentMetadata.Builder.empty()
-                            .addNewAssignment(
-                                modelId1,
-                                TrainedModelAssignment.Builder.empty(
-                                    new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 8, 1, 1024, ByteSizeValue.ONE)
-                                )
-                            )
-                            .addNewAssignment(
-                                modelId2,
-                                TrainedModelAssignment.Builder.empty(
-                                    new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 4, 3, 1024, ByteSizeValue.ONE)
-                                )
-                                    .addRoutingEntry(mlNodeId1, new RoutingInfo(1, 1, RoutingState.STARTED, ""))
-                                    .addRoutingEntry(mlNodeId2, new RoutingInfo(1, 1, RoutingState.STARTED, ""))
-                            )
-                            .build()
-                    )
-                    .build()
-            )
-            .build();
-
-        MlProcessorAutoscalingDecider decider = newDecider();
-
-        MlProcessorAutoscalingCapacity capacity = decider.scale(
-            Settings.EMPTY,
-            newContext(clusterState),
-            new MlAutoscalingContext(clusterState)
-        );
-
-        assertThat(capacity.nodeProcessors(), equalTo(Processors.of(8.0)));
-        assertThat(capacity.tierProcessors(), equalTo(Processors.of(8.0)));
-        assertThat(capacity.reason(), equalTo("requesting scale up as there are unsatisfied deployments"));
-    }
-
-    public void testScale_GivenUnsatisfiedDeployments_AndThreeMlAvailabilityZones_AndNodeProcessorsLessThanTierProcessors() {
-        givenMlAvailabilityZones(3);
-
-        String modelId1 = "model-id-1";
-        String modelId2 = "model-id-2";
-
-        String mlNodeId1 = "ml-node-id-1";
-        String mlNodeId2 = "ml-node-id-2";
-        String dataNodeId = "data-node-id";
-        DiscoveryNode mlNode1 = buildNode(mlNodeId1, true, 4);
-        DiscoveryNode mlNode2 = buildNode(mlNodeId2, true, 4);
-        DiscoveryNode dataNode = buildNode(dataNodeId, false, 24);
-
-        ClusterState clusterState = ClusterState.builder(new ClusterName("test"))
-            .nodes(DiscoveryNodes.builder().add(mlNode1).add(mlNode2).add(dataNode).build())
-            .metadata(
-                Metadata.builder()
-                    .putCustom(
-                        TrainedModelAssignmentMetadata.NAME,
-                        TrainedModelAssignmentMetadata.Builder.empty()
-                            .addNewAssignment(
-                                modelId1,
-                                TrainedModelAssignment.Builder.empty(
-                                    new StartTrainedModelDeploymentAction.TaskParams(modelId1, 42L, 8, 1, 1024, ByteSizeValue.ONE)
-                                )
-                            )
-                            .addNewAssignment(
-                                modelId2,
-                                TrainedModelAssignment.Builder.empty(
-                                    new StartTrainedModelDeploymentAction.TaskParams(modelId2, 42L, 4, 6, 1024, ByteSizeValue.ONE)
-                                )
-                                    .addRoutingEntry(mlNodeId1, new RoutingInfo(1, 1, RoutingState.STARTED, ""))
-                                    .addRoutingEntry(mlNodeId2, new RoutingInfo(1, 1, RoutingState.STARTED, ""))
-                            )
-                            .build()
-                    )
-                    .build()
-            )
-            .build();
-
-        MlProcessorAutoscalingDecider decider = newDecider();
-
-        MlProcessorAutoscalingCapacity capacity = decider.scale(
-            Settings.EMPTY,
-            newContext(clusterState),
-            new MlAutoscalingContext(clusterState)
-        );
-
-        assertThat(capacity.nodeProcessors(), equalTo(Processors.of(8.0)));
-        assertThat(capacity.tierProcessors(), equalTo(Processors.of(11.0)));
-        assertThat(capacity.reason(), equalTo("requesting scale up as there are unsatisfied deployments"));
-    }
-
     public void testScale_GivenMoreThanHalfProcessorsAreUsed() {
         String modelId1 = "model-id-1";
         String modelId2 = "model-id-2";
@@ -429,7 +319,7 @@ public class MlProcessorAutoscalingDeciderTests extends ESTestCase {
     }
 
     private MlProcessorAutoscalingDecider newDecider() {
-        return new MlProcessorAutoscalingDecider(scaleTimer, nodeAvailabilityZoneMapper);
+        return new MlProcessorAutoscalingDecider(scaleTimer);
     }
 
     private AutoscalingDeciderContext newContext(ClusterState clusterState) {
@@ -438,10 +328,6 @@ public class MlProcessorAutoscalingDeciderTests extends ESTestCase {
         return context;
     }
 
-    private void givenMlAvailabilityZones(int zones) {
-        when(nodeAvailabilityZoneMapper.getNumMlAvailabilityZones()).thenReturn(OptionalInt.of(zones));
-    }
-
     private static class TimeMachine implements LongSupplier {
 
         private long offsetMillis;