Browse Source

[ML] Refactor assignment planner code (#104260)

This PR simplifies the code in a few places. In other places where I had the TODO comment, the possible simplification would lead to undesired consequences, so I removed the TODO comment with the reference to issue #101612.

Closes #101612
Valeriy Khakhutskyy 1 year ago
parent
commit
971cfb9317

+ 3 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java

@@ -142,13 +142,9 @@ class TrainedModelAssignmentRebalancer {
             for (Map.Entry<AssignmentPlan.Node, Integer> assignment : nodeAssignments.entrySet()) {
                 AssignmentPlan.Node originalNode = originalNodeById.get(assignment.getKey().id());
                 dest.assignModelToNode(m, originalNode, assignment.getValue());
-                if (m.currentAllocationsByNodeId().containsKey(originalNode.id())) {
-                    // TODO (#101612) requiredMemory should be calculated by the AssignmentPlan.Builder
-                    // As the node has all its available memory we need to manually account memory of models with
-                    // current allocations.
-                    long requiredMemory = m.estimateMemoryUsageBytes(m.currentAllocationsByNodeId().get(originalNode.id()));
-                    dest.accountMemory(m, originalNode, requiredMemory);
-                }
+                // As the node has all its available memory we need to manually account memory of models with
+                // current allocations.
+                dest.accountMemory(m, originalNode);
             }
         }
     }

+ 1 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AbstractPreserveAllocations.java

@@ -68,7 +68,7 @@ abstract class AbstractPreserveAllocations {
     AssignmentPlan mergePreservedAllocations(AssignmentPlan assignmentPlan) {
         // As the model/node objects the assignment plan are the modified ones,
         // they will not match the models/nodes members we have in this class.
-        // Therefore, we build a lookup table based on the ids so we can merge the plan
+        // Therefore, we build a lookup table based on the ids, so we can merge the plan
         // with its preserved allocations.
         final Map<Tuple<String, String>, Integer> plannedAssignmentsByModelNodeIdPair = new HashMap<>();
         for (Deployment m : assignmentPlan.models()) {
@@ -80,7 +80,6 @@ abstract class AbstractPreserveAllocations {
 
         AssignmentPlan.Builder mergedPlanBuilder = AssignmentPlan.builder(nodes, deployments);
         for (Node n : nodes) {
-            // TODO (#101612) Should the first loop happen in the builder constructor?
             for (Deployment deploymentAllocationsToPreserve : deployments) {
 
                 // if the model m is already allocated on the node n and I want to preserve this allocation

+ 7 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/AssignmentPlan.java

@@ -401,8 +401,7 @@ public class AssignmentPlan implements Comparable<AssignmentPlan> {
             if (allocations <= 0) {
                 return this;
             }
-            if (/*isAlreadyAssigned(deployment, node) == false
-                &&*/ requiredMemory > remainingNodeMemory.get(node)) {
+            if (requiredMemory > remainingNodeMemory.get(node)) {
                 throw new IllegalArgumentException(
                     "not enough memory on node ["
                         + node.id()
@@ -448,13 +447,14 @@ public class AssignmentPlan implements Comparable<AssignmentPlan> {
         }
 
         public void accountMemory(Deployment m, Node n) {
-            // TODO (#101612) remove or refactor unused method
-            long requiredMemory = getDeploymentMemoryRequirement(m, n, getCurrentAllocations(m, n));
-            accountMemory(m, n, requiredMemory);
+            if (m.currentAllocationsByNodeId().containsKey(n.id())) {
+                int allocations = m.currentAllocationsByNodeId().get(n.id());
+                long requiredMemory = m.estimateMemoryUsageBytes(allocations);
+                accountMemory(m, n, requiredMemory);
+            }
         }
 
-        public void accountMemory(Deployment m, Node n, long requiredMemory) {
-            // TODO (#101612) computation of required memory should be done internally
+        private void accountMemory(Deployment m, Node n, long requiredMemory) {
             remainingNodeMemory.computeIfPresent(n, (k, v) -> v - requiredMemory);
             if (remainingNodeMemory.containsKey(n) && remainingNodeMemory.get(n) < 0) {
                 throw new IllegalArgumentException("not enough memory on node [" + n.id() + "] to assign model [" + m.id() + "]");

+ 0 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/RandomizedAssignmentRounding.java

@@ -310,8 +310,6 @@ class RandomizedAssignmentRounding {
         private AssignmentPlan toPlan() {
             AssignmentPlan.Builder builder = AssignmentPlan.builder(nodes, deployments);
             for (Map.Entry<Tuple<AssignmentPlan.Deployment, Node>, Integer> assignment : tryAssigningRemainingCores().entrySet()) {
-                // TODO (#101612) The model should be assigned to the node only when it is possible. This means, that canAssign should be
-                // integrated into the assignModelToNode.
                 if (builder.canAssign(assignment.getKey().v1(), assignment.getKey().v2(), assignment.getValue())) {
                     builder.assignModelToNode(assignment.getKey().v1(), assignment.getKey().v2(), assignment.getValue());
                 }

+ 3 - 9
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/planning/ZoneAwareAssignmentPlanner.java

@@ -183,15 +183,9 @@ public class ZoneAwareAssignmentPlanner {
             for (Map.Entry<Node, Integer> assignment : nodeAssignments.entrySet()) {
                 Node originalNode = originalNodeById.get(assignment.getKey().id());
                 planBuilder.assignModelToNode(originalDeployment, originalNode, assignment.getValue());
-                if (originalDeployment.currentAllocationsByNodeId().containsKey(originalNode.id())) {
-                    // TODO (#101612) requiredMemory should be calculated by the AssignmentPlan.Builder
-                    // As the node has all its available memory we need to manually account memory of models with
-                    // current allocations.
-                    long requiredMemory = originalDeployment.estimateMemoryUsageBytes(
-                        originalDeployment.currentAllocationsByNodeId().get(originalNode.id())
-                    );
-                    planBuilder.accountMemory(m, originalNode, requiredMemory);
-                }
+                // As the node has all its available memory we need to manually account memory of models with
+                // current allocations.
+                planBuilder.accountMemory(originalDeployment, originalNode);
             }
         }
         return planBuilder.build();