浏览代码

[ML] Filter out shutting down nodes from the zone mapper (#134899)

Avoids a warning message to be logged repeatedly.
David Kyle 2 周之前
父节点
当前提交
021820e2f8

+ 1 - 13
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartTrainedModelDeploymentAction.java

@@ -27,7 +27,6 @@ import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContentObject;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
-import org.elasticsearch.xpack.core.ml.MlConfigVersion;
 import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
 import org.elasticsearch.xpack.core.ml.inference.assignment.AdaptiveAllocationsSettings;
 import org.elasticsearch.xpack.core.ml.inference.assignment.AllocationStatus;
@@ -445,17 +444,10 @@ public class StartTrainedModelDeploymentAction extends ActionType<CreateTrainedM
 
     public static class TaskParams implements MlTaskParams, Writeable, ToXContentObject {
 
-        // TODO add support for other roles? If so, it may have to be an instance method...
-        // NOTE, whatever determines assignment should not be dynamically set on the node
-        // Otherwise assignment logic might fail
         public static boolean mayAssignToNode(@Nullable DiscoveryNode node) {
-            return node != null
-                && node.getRoles().contains(DiscoveryNodeRole.ML_ROLE)
-                && MlConfigVersion.fromNode(node).onOrAfter(VERSION_INTRODUCED);
+            return node != null && node.getRoles().contains(DiscoveryNodeRole.ML_ROLE);
         }
 
-        public static final MlConfigVersion VERSION_INTRODUCED = MlConfigVersion.V_8_0_0;
-
         private static final ParseField MODEL_BYTES = new ParseField("model_bytes");
         public static final ParseField NUMBER_OF_ALLOCATIONS = new ParseField("number_of_allocations");
         public static final ParseField THREADS_PER_ALLOCATION = new ParseField("threads_per_allocation");
@@ -646,10 +638,6 @@ public class StartTrainedModelDeploymentAction extends ActionType<CreateTrainedM
             );
         }
 
-        public MlConfigVersion getMinimalSupportedVersion() {
-            return VERSION_INTRODUCED;
-        }
-
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeString(modelId);

+ 11 - 8
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/AbstractNodeAvailabilityZoneMapper.java

@@ -11,8 +11,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
-import org.elasticsearch.common.settings.ClusterSettings;
-import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Nullable;
 
 import java.util.Collection;
 import java.util.List;
@@ -24,14 +23,18 @@ public abstract class AbstractNodeAvailabilityZoneMapper implements ClusterState
     private volatile Map<List<String>, Collection<DiscoveryNode>> allNodesByAvailabilityZone;
     private volatile Map<List<String>, Collection<DiscoveryNode>> mlNodesByAvailabilityZone;
 
-    public AbstractNodeAvailabilityZoneMapper(Settings settings, ClusterSettings clusterSettings) {
-        this(settings, clusterSettings, null);
-    }
-
-    public AbstractNodeAvailabilityZoneMapper(Settings settings, ClusterSettings clusterSettings, DiscoveryNodes discoveryNodes) {
+    public AbstractNodeAvailabilityZoneMapper(@Nullable DiscoveryNodes discoveryNodes) {
         lastDiscoveryNodes = discoveryNodes;
     }
 
+    /**
+     * Build a node by zone map for all nodes in the cluster and
+     * for just the ml nodes.
+     * @param discoveryNodes All the nodes in the cluster
+     * @return All nodes by zone and ml nodes by zone.
+     */
+    protected abstract NodesByAvailabilityZone buildNodesByAvailabilityZone(Collection<DiscoveryNode> discoveryNodes);
+
     /**
      * @return A map whose keys are lists of attributes that together define an availability zone, and whose values are
      *         collections of nodes that have that combination of attributes. If availability zones
@@ -86,7 +89,7 @@ public abstract class AbstractNodeAvailabilityZoneMapper implements ClusterState
             mlNodesByAvailabilityZone = allNodesByAvailabilityZone;
             return;
         }
-        NodesByAvailabilityZone nodesByAvailabilityZone = buildNodesByAvailabilityZone(lastDiscoveryNodes);
+        NodesByAvailabilityZone nodesByAvailabilityZone = buildNodesByAvailabilityZone(lastDiscoveryNodes.getAllNodes());
         this.allNodesByAvailabilityZone = nodesByAvailabilityZone.allNodes();
         this.mlNodesByAvailabilityZone = nodesByAvailabilityZone.mlNodes();
     }

+ 2 - 6
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/NodeAvailabilityZoneMapper.java

@@ -7,9 +7,7 @@
 
 package org.elasticsearch.xpack.ml.autoscaling;
 
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
 
 import java.util.Collection;
 import java.util.List;
@@ -18,17 +16,15 @@ import java.util.OptionalInt;
 
 public interface NodeAvailabilityZoneMapper {
     /**
-     * @param clusterState The specific cluster state whose nodes will be used to detect ML nodes by availability zone.
+     * @param mlNodes The nodes which will be used to detect ML nodes by availability zone.
      * @return A map whose keys are conceptually lists of availability zone attributes, and whose values are collections
      *         of nodes corresponding to the availability zone attributes.
      *         An empty map will be returned if there are no ML nodes in the cluster.
      */
-    Map<List<String>, Collection<DiscoveryNode>> buildMlNodesByAvailabilityZone(ClusterState clusterState);
+    Map<List<String>, Collection<DiscoveryNode>> buildMlNodesByAvailabilityZone(List<DiscoveryNode> mlNodes);
 
     OptionalInt getNumMlAvailabilityZones();
 
-    NodesByAvailabilityZone buildNodesByAvailabilityZone(DiscoveryNodes discoveryNodes);
-
     record NodesByAvailabilityZone(
         Map<List<String>, Collection<DiscoveryNode>> allNodes,
         Map<List<String>, Collection<DiscoveryNode>> mlNodes

+ 7 - 9
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/NodeFakeAvailabilityZoneMapper.java

@@ -7,7 +7,6 @@
 
 package org.elasticsearch.xpack.ml.autoscaling;
 
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -31,7 +30,7 @@ public class NodeFakeAvailabilityZoneMapper extends AbstractNodeAvailabilityZone
 
     @SuppressWarnings("this-escape")
     public NodeFakeAvailabilityZoneMapper(Settings settings, ClusterSettings clusterSettings, DiscoveryNodes discoveryNodes) {
-        super(settings, clusterSettings, discoveryNodes);
+        super(discoveryNodes);
         updateNodesByAvailabilityZone();
     }
 
@@ -41,12 +40,11 @@ public class NodeFakeAvailabilityZoneMapper extends AbstractNodeAvailabilityZone
      *         of nodes corresponding to the node ids. An empty map will be returned if there are no ML nodes in the
      *         cluster.
      */
-    public NodesByAvailabilityZone buildNodesByAvailabilityZone(DiscoveryNodes discoveryNodes) {
-        Collection<DiscoveryNode> nodes = discoveryNodes.getNodes().values();
-
+    @Override
+    protected NodesByAvailabilityZone buildNodesByAvailabilityZone(Collection<DiscoveryNode> discoveryNodes) {
         Map<List<String>, Collection<DiscoveryNode>> allNodesByAvailabilityZone = new HashMap<>();
         Map<List<String>, Collection<DiscoveryNode>> mlNodesByAvailabilityZone = new HashMap<>();
-        for (DiscoveryNode node : nodes) {
+        for (DiscoveryNode node : discoveryNodes) {
             List<String> nodeIdValues = List.of(node.getId());
             List<DiscoveryNode> nodeList = List.of(node);
             allNodesByAvailabilityZone.put(nodeIdValues, nodeList);
@@ -61,13 +59,13 @@ public class NodeFakeAvailabilityZoneMapper extends AbstractNodeAvailabilityZone
      * This is different to {@link #getMlNodesByAvailabilityZone()} in that the latter returns the ML nodes by (fake) availability zone
      * of the latest cluster state, while this method does the same for a specific cluster state.
      *
-     * @param clusterState The cluster state whose nodes will be used to detect ML nodes by fake availability zone.
+     * @param discoveryNodes The nodes used to detect ML nodes by fake availability zone.
      * @return A map whose keys are single item lists of node id values, and whose values are single item collections
      *         of nodes corresponding to the node ids. An empty map will be returned if there are no ML nodes in the
      *         cluster.
      */
     @Override
-    public Map<List<String>, Collection<DiscoveryNode>> buildMlNodesByAvailabilityZone(ClusterState clusterState) {
-        return buildNodesByAvailabilityZone(clusterState.nodes()).mlNodes();
+    public Map<List<String>, Collection<DiscoveryNode>> buildMlNodesByAvailabilityZone(List<DiscoveryNode> discoveryNodes) {
+        return buildNodesByAvailabilityZone(discoveryNodes).mlNodes();
     }
 }

+ 7 - 9
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/NodeRealAvailabilityZoneMapper.java

@@ -9,7 +9,6 @@ package org.elasticsearch.xpack.ml.autoscaling;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -40,7 +39,7 @@ public class NodeRealAvailabilityZoneMapper extends AbstractNodeAvailabilityZone
 
     @SuppressWarnings("this-escape")
     public NodeRealAvailabilityZoneMapper(Settings settings, ClusterSettings clusterSettings, DiscoveryNodes discoveryNodes) {
-        super(settings, clusterSettings, discoveryNodes);
+        super(discoveryNodes);
         awarenessAttributes = AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
         updateNodesByAvailabilityZone();
         clusterSettings.addSettingsUpdateConsumer(
@@ -58,13 +57,12 @@ public class NodeRealAvailabilityZoneMapper extends AbstractNodeAvailabilityZone
         return awarenessAttributes;
     }
 
-    public NodesByAvailabilityZone buildNodesByAvailabilityZone(DiscoveryNodes discoveryNodes) {
+    @Override
+    protected NodesByAvailabilityZone buildNodesByAvailabilityZone(Collection<DiscoveryNode> discoveryNodes) {
         return buildNodesByAvailabilityZone(discoveryNodes, awarenessAttributes);
     }
 
-    private static NodesByAvailabilityZone buildNodesByAvailabilityZone(DiscoveryNodes discoveryNodes, List<String> awarenessAttributes) {
-        Collection<DiscoveryNode> nodes = discoveryNodes.getNodes().values();
-
+    private static NodesByAvailabilityZone buildNodesByAvailabilityZone(Collection<DiscoveryNode> nodes, List<String> awarenessAttributes) {
         if (awarenessAttributes.isEmpty()) {
             return new NodesByAvailabilityZone(
                 Map.of(List.of(), nodes),
@@ -110,7 +108,7 @@ public class NodeRealAvailabilityZoneMapper extends AbstractNodeAvailabilityZone
      * This is different to {@link #getMlNodesByAvailabilityZone()} in that the latter returns the ML nodes by availability zone
      * of the latest cluster state, while this method does the same for a specific cluster state.
      *
-     * @param clusterState The cluster state whose nodes will be used to detect ML nodes by availability zone.
+     * @param nodes The nodes which will be used to detect ML nodes by availability zone.
      * @return A map whose keys are lists of awareness attribute values in the same order as the configured awareness attribute
      *         names, and whose values are collections of nodes that have that combination of attributes. If availability zones
      *         are not configured then the map will contain one entry mapping an empty list to a collection of all nodes. If
@@ -119,7 +117,7 @@ public class NodeRealAvailabilityZoneMapper extends AbstractNodeAvailabilityZone
      *         distinguished by calling one of the other methods.)
      */
     @Override
-    public Map<List<String>, Collection<DiscoveryNode>> buildMlNodesByAvailabilityZone(ClusterState clusterState) {
-        return buildNodesByAvailabilityZone(clusterState.nodes(), awarenessAttributes).mlNodes();
+    public Map<List<String>, Collection<DiscoveryNode>> buildMlNodesByAvailabilityZone(List<DiscoveryNode> nodes) {
+        return buildNodesByAvailabilityZone(nodes, awarenessAttributes).mlNodes();
     }
 }

+ 9 - 8
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java

@@ -613,21 +613,21 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
     private TrainedModelAssignmentMetadata.Builder rebalanceAssignments(
         ClusterState currentState,
         Optional<CreateTrainedModelAssignmentAction.Request> createAssignmentRequest
-    ) throws Exception {
-        List<DiscoveryNode> nodes = getAssignableNodes(currentState);
-        logger.debug(() -> format("assignable nodes are %s", nodes.stream().map(DiscoveryNode::getId).toList()));
-        Map<DiscoveryNode, NodeLoad> nodeLoads = detectNodeLoads(nodes, currentState);
+    ) {
+        List<DiscoveryNode> assignableNodes = getAssignableNodes(currentState);
+        logger.debug(() -> format("assignable nodes are %s", assignableNodes.stream().map(DiscoveryNode::getId).toList()));
+        Map<DiscoveryNode, NodeLoad> nodeLoads = detectNodeLoads(assignableNodes, currentState);
         TrainedModelAssignmentMetadata currentMetadata = TrainedModelAssignmentMetadata.fromState(currentState);
 
         TrainedModelAssignmentRebalancer rebalancer = new TrainedModelAssignmentRebalancer(
             currentMetadata,
             nodeLoads,
-            nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(currentState),
+            nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(assignableNodes),
             createAssignmentRequest,
             allocatedProcessorsScale
         );
 
-        Set<String> shuttingDownNodeIds = currentState.metadata().nodeShutdowns().getAllNodeIds();
+        Set<String> shuttingDownNodeIds = nodesShuttingDown(currentState);
         /*
          * To signal that we should gracefully stop the deployments routed to a particular node we set the routing state to stopping.
          * The TrainedModelAssignmentNodeService will see that the route is in stopping for a shutting down node and gracefully shut down
@@ -643,7 +643,7 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
             checkModelIsFullyAllocatedIfScalingIsNotPossible(
                 createAssignmentRequest.get().getTaskParams().getDeploymentId(),
                 rebalanced,
-                nodes
+                assignableNodes
             );
         }
 
@@ -968,8 +968,9 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
         AdaptiveAllocationsSettings adaptiveAllocationsSettings,
         ActionListener<TrainedModelAssignmentMetadata.Builder> listener
     ) {
+        List<DiscoveryNode> assignableNodes = getAssignableNodes(clusterState);
         TrainedModelAssignment.Builder updatedAssignment = numberOfAllocations < assignment.totalTargetAllocations()
-            ? new AllocationReducer(assignment, nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(clusterState)).reduceTo(
+            ? new AllocationReducer(assignment, nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(assignableNodes)).reduceTo(
                 numberOfAllocations
             )
             : TrainedModelAssignment.Builder.fromAssignment(assignment).setNumberOfAllocations(numberOfAllocations);

+ 0 - 14
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java

@@ -15,7 +15,6 @@ import org.apache.logging.log4j.message.Message;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.ResourceNotFoundException;
-import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.LatchedActionListener;
 import org.elasticsearch.client.internal.Client;
@@ -439,7 +438,6 @@ public class TrainedModelAssignmentClusterServiceTests extends ESTestCase {
             .add(buildNode("ml-node-without-room", true, 1000L, 2))
             .add(buildNode("not-ml-node", false, ByteSizeValue.ofGb(4).getBytes(), 2))
             .add(buildNode("ml-node-shutting-down", true, ByteSizeValue.ofGb(4).getBytes(), 2))
-            .add(buildOldNode("old-ml-node-with-room", true, ByteSizeValue.ofGb(4).getBytes(), 2))
             .build();
         nodeAvailabilityZoneMapper = randomFrom(
             new NodeRealAvailabilityZoneMapper(settings, clusterSettings, discoveryNodes),
@@ -489,7 +487,6 @@ public class TrainedModelAssignmentClusterServiceTests extends ESTestCase {
             .add(buildNode("ml-node-without-room", true, 1000L, 2))
             .add(buildNode("not-ml-node", false, ByteSizeValue.ofGb(4).getBytes(), 2))
             .add(buildNode("ml-node-shutting-down", true, ByteSizeValue.ofGb(4).getBytes(), 2))
-            .add(buildOldNode("old-ml-node-with-room", true, ByteSizeValue.ofGb(4).getBytes(), 2))
             .build();
         nodeAvailabilityZoneMapper = randomFrom(
             new NodeRealAvailabilityZoneMapper(settings, clusterSettings, discoveryNodes),
@@ -2139,17 +2136,6 @@ public class TrainedModelAssignmentClusterServiceTests extends ESTestCase {
         return RoutingInfoUpdate.updateStateAndReason(new RoutingStateAndReason(RoutingState.STARTED, ""));
     }
 
-    private static DiscoveryNode buildOldNode(String name, boolean isML, long nativeMemory, int allocatedProcessors) {
-        return buildNode(
-            name,
-            isML,
-            nativeMemory,
-            allocatedProcessors,
-            VersionInformation.inferVersions(Version.V_7_15_0),
-            MlConfigVersion.V_7_15_0
-        );
-    }
-
     private static StartTrainedModelDeploymentAction.TaskParams newParams(String modelId, long modelSize) {
         return newParams(modelId, modelSize, 1, 1);
     }