|
@@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.common.collect.Tuple;
|
|
|
+import org.elasticsearch.common.unit.ByteSizeValue;
|
|
|
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
|
|
import org.elasticsearch.xpack.ml.MachineLearning;
|
|
|
import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingDeciderService;
|
|
@@ -23,6 +24,7 @@ import org.elasticsearch.xpack.ml.utils.NativeMemoryCalculator;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
+import java.util.Locale;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
import java.util.function.Function;
|
|
@@ -54,6 +56,14 @@ public class JobNodeSelector {
|
|
|
|
|
|
private static final Logger logger = LogManager.getLogger(JobNodeSelector.class);
|
|
|
|
|
|
+ private static String createReason(String job, String node, String msg, Object... params) {
|
|
|
+ String preamble = String.format(
|
|
|
+ Locale.ROOT,
|
|
|
+ "Not opening job [%s] on node [%s], because ",
|
|
|
+ job,
|
|
|
+ node);
|
|
|
+ return preamble + ParameterizedMessage.format(msg, params);
|
|
|
+ }
|
|
|
private final String jobId;
|
|
|
private final String taskName;
|
|
|
private final ClusterState clusterState;
|
|
@@ -83,7 +93,7 @@ public class JobNodeSelector {
|
|
|
if (MachineLearning.isMlNode(node)) {
|
|
|
return (nodeFilter != null) ? nodeFilter.apply(node) : null;
|
|
|
}
|
|
|
- return "Not opening job [" + jobId + "] on node [" + nodeNameOrId(node) + "], because this node isn't a ml node.";
|
|
|
+ return createReason(jobId, nodeNameOrId(node), "this node isn't a ml node.");
|
|
|
};
|
|
|
}
|
|
|
|
|
@@ -155,8 +165,7 @@ public class JobNodeSelector {
|
|
|
useAutoMemoryPercentage
|
|
|
);
|
|
|
if (currentLoad.getError() != null) {
|
|
|
- reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
|
|
|
- + "], because [" + currentLoad.getError() + "]";
|
|
|
+ reason = createReason(jobId, nodeNameAndMlAttributes(node), currentLoad.getError());
|
|
|
logger.trace(reason);
|
|
|
reasons.add(reason);
|
|
|
continue;
|
|
@@ -166,12 +175,11 @@ public class JobNodeSelector {
|
|
|
int maxNumberOfOpenJobs = currentLoad.getMaxJobs();
|
|
|
|
|
|
if (currentLoad.getNumAllocatingJobs() >= maxConcurrentJobAllocations) {
|
|
|
- reason = "Not opening job ["
|
|
|
- + jobId
|
|
|
- + "] on node [" + nodeNameAndMlAttributes(node) + "], because node exceeds ["
|
|
|
- + currentLoad.getNumAllocatingJobs()
|
|
|
- + "] the maximum number of jobs [" + maxConcurrentJobAllocations
|
|
|
- + "] in opening state";
|
|
|
+ reason = createReason(jobId,
|
|
|
+ nodeNameAndMlAttributes(node),
|
|
|
+ "node exceeds [{}] the maximum number of jobs [{}] in opening state",
|
|
|
+ currentLoad.getNumAllocatingJobs(),
|
|
|
+ maxConcurrentJobAllocations);
|
|
|
logger.trace(reason);
|
|
|
reasons.add(reason);
|
|
|
continue;
|
|
@@ -179,9 +187,12 @@ public class JobNodeSelector {
|
|
|
|
|
|
long availableCount = maxNumberOfOpenJobs - currentLoad.getNumAssignedJobs();
|
|
|
if (availableCount == 0) {
|
|
|
- reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
|
|
|
- + "], because this node is full. Number of opened jobs [" + currentLoad.getNumAssignedJobs()
|
|
|
- + "], " + MAX_OPEN_JOBS_PER_NODE.getKey() + " [" + maxNumberOfOpenJobs + "]";
|
|
|
+ reason = createReason(jobId,
|
|
|
+ nodeNameAndMlAttributes(node),
|
|
|
+ "this node is full. Number of opened jobs [{}], {} [{}]",
|
|
|
+ currentLoad.getNumAssignedJobs(),
|
|
|
+ MAX_OPEN_JOBS_PER_NODE.getKey(),
|
|
|
+ maxNumberOfOpenJobs);
|
|
|
logger.trace(reason);
|
|
|
reasons.add(reason);
|
|
|
continue;
|
|
@@ -203,11 +214,17 @@ public class JobNodeSelector {
|
|
|
}
|
|
|
long availableMemory = currentLoad.getMaxMlMemory() - currentLoad.getAssignedJobMemory();
|
|
|
if (estimatedMemoryFootprint > availableMemory) {
|
|
|
- reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
|
|
|
- + "], because this node has insufficient available memory. Available memory for ML ["
|
|
|
- + currentLoad.getMaxMlMemory()
|
|
|
- + "], memory required by existing jobs [" + currentLoad.getAssignedJobMemory()
|
|
|
- + "], estimated memory required for this job [" + estimatedMemoryFootprint + "]";
|
|
|
+ reason = createReason(jobId,
|
|
|
+ nodeNameAndMlAttributes(node),
|
|
|
+ "this node has insufficient available memory. Available memory for ML [{} ({})], "
|
|
|
+ + "memory required by existing jobs [{} ({})], "
|
|
|
+ + "estimated memory required for this job [{} ({})]",
|
|
|
+ currentLoad.getMaxMlMemory(),
|
|
|
+ ByteSizeValue.ofBytes(currentLoad.getMaxMlMemory()).toString(),
|
|
|
+ currentLoad.getAssignedJobMemory(),
|
|
|
+ ByteSizeValue.ofBytes(currentLoad.getAssignedJobMemory()).toString(),
|
|
|
+ estimatedMemoryFootprint,
|
|
|
+ ByteSizeValue.ofBytes(estimatedMemoryFootprint).toString());
|
|
|
logger.trace(reason);
|
|
|
reasons.add(reason);
|
|
|
continue;
|