|
@@ -7,19 +7,14 @@ package org.elasticsearch.xpack.ml.job;
|
|
|
|
|
|
import org.apache.logging.log4j.LogManager;
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
+import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.common.Strings;
|
|
|
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
|
|
-import org.elasticsearch.xpack.core.ml.MlTasks;
|
|
|
-import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
|
|
|
-import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
|
|
|
-import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
|
|
|
-import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
|
|
import org.elasticsearch.xpack.ml.MachineLearning;
|
|
|
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
|
|
|
|
|
|
-import java.util.Collection;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -57,6 +52,7 @@ public class JobNodeSelector {
|
|
|
private final ClusterState clusterState;
|
|
|
private final MlMemoryTracker memoryTracker;
|
|
|
private final Function<DiscoveryNode, String> nodeFilter;
|
|
|
+ private final NodeLoadDetector nodeLoadDetector;
|
|
|
private final int maxLazyNodes;
|
|
|
|
|
|
/**
|
|
@@ -70,6 +66,7 @@ public class JobNodeSelector {
|
|
|
this.taskName = Objects.requireNonNull(taskName);
|
|
|
this.clusterState = Objects.requireNonNull(clusterState);
|
|
|
this.memoryTracker = Objects.requireNonNull(memoryTracker);
|
|
|
+ this.nodeLoadDetector = new NodeLoadDetector(Objects.requireNonNull(memoryTracker));
|
|
|
this.maxLazyNodes = maxLazyNodes;
|
|
|
this.nodeFilter = node -> {
|
|
|
if (MachineLearning.isMlNode(node)) {
|
|
@@ -105,26 +102,42 @@ public class JobNodeSelector {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- // Assuming the node is eligible at all, check loading
|
|
|
- CurrentLoad currentLoad = calculateCurrentLoadForNode(node, persistentTasks, allocateByMemory);
|
|
|
- allocateByMemory = currentLoad.allocateByMemory;
|
|
|
+ NodeLoadDetector.NodeLoad currentLoad = nodeLoadDetector.detectNodeLoad(
|
|
|
+ clusterState,
|
|
|
+ true, // Remove in 8.0.0
|
|
|
+ node,
|
|
|
+ dynamicMaxOpenJobs,
|
|
|
+ maxMachineMemoryPercent,
|
|
|
+ allocateByMemory
|
|
|
+ );
|
|
|
+ if (currentLoad.getError() != null) {
|
|
|
+ reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
|
|
|
+ + "], because [" + currentLoad.getError() + "]";
|
|
|
+ logger.trace(reason);
|
|
|
+ reasons.add(reason);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
- if (currentLoad.numberOfAllocatingJobs >= maxConcurrentJobAllocations) {
|
|
|
- reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because node exceeds ["
|
|
|
- + currentLoad.numberOfAllocatingJobs + "] the maximum number of jobs [" + maxConcurrentJobAllocations
|
|
|
+ // Assuming the node is eligible at all, check loading
|
|
|
+ allocateByMemory = currentLoad.isUseMemory();
|
|
|
+ int maxNumberOfOpenJobs = currentLoad.getMaxJobs();
|
|
|
+
|
|
|
+ if (currentLoad.getNumAllocatingJobs() >= maxConcurrentJobAllocations) {
|
|
|
+ reason = "Not opening job ["
|
|
|
+ + jobId
|
|
|
+ + "] on node [" + nodeNameAndMlAttributes(node) + "], because node exceeds ["
|
|
|
+ + currentLoad.getNumAllocatingJobs()
|
|
|
+ + "] the maximum number of jobs [" + maxConcurrentJobAllocations
|
|
|
+ "] in opening state";
|
|
|
logger.trace(reason);
|
|
|
reasons.add(reason);
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- Map<String, String> nodeAttributes = node.getAttributes();
|
|
|
- int maxNumberOfOpenJobs = dynamicMaxOpenJobs;
|
|
|
-
|
|
|
- long availableCount = maxNumberOfOpenJobs - currentLoad.numberOfAssignedJobs;
|
|
|
+ long availableCount = maxNumberOfOpenJobs - currentLoad.getNumAssignedJobs();
|
|
|
if (availableCount == 0) {
|
|
|
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
|
|
|
- + "], because this node is full. Number of opened jobs [" + currentLoad.numberOfAssignedJobs
|
|
|
+ + "], because this node is full. Number of opened jobs [" + currentLoad.getNumAssignedJobs()
|
|
|
+ "], " + MAX_OPEN_JOBS_PER_NODE.getKey() + " [" + maxNumberOfOpenJobs + "]";
|
|
|
logger.trace(reason);
|
|
|
reasons.add(reason);
|
|
@@ -136,33 +149,21 @@ public class JobNodeSelector {
|
|
|
minLoadedNodeByCount = node;
|
|
|
}
|
|
|
|
|
|
- String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR);
|
|
|
- long machineMemory;
|
|
|
- try {
|
|
|
- machineMemory = Long.parseLong(machineMemoryStr);
|
|
|
- } catch (NumberFormatException e) {
|
|
|
- reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " +
|
|
|
- MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long";
|
|
|
- logger.trace(reason);
|
|
|
- reasons.add(reason);
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
if (allocateByMemory) {
|
|
|
- if (machineMemory > 0) {
|
|
|
- long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100;
|
|
|
+ if (currentLoad.getMaxMlMemory() > 0) {
|
|
|
Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId);
|
|
|
if (estimatedMemoryFootprint != null) {
|
|
|
// If this will be the first job assigned to the node then it will need to
|
|
|
// load the native code shared libraries, so add the overhead for this
|
|
|
- if (currentLoad.numberOfAssignedJobs == 0) {
|
|
|
+ if (currentLoad.getNumAssignedJobs() == 0) {
|
|
|
estimatedMemoryFootprint += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
|
|
|
}
|
|
|
- long availableMemory = maxMlMemory - currentLoad.assignedJobMemory;
|
|
|
+ long availableMemory = currentLoad.getMaxMlMemory() - currentLoad.getAssignedJobMemory();
|
|
|
if (estimatedMemoryFootprint > availableMemory) {
|
|
|
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
|
|
|
- + "], because this node has insufficient available memory. Available memory for ML [" + maxMlMemory
|
|
|
- + "], memory required by existing jobs [" + currentLoad.assignedJobMemory
|
|
|
+ + "], because this node has insufficient available memory. Available memory for ML ["
|
|
|
+ + currentLoad.getMaxMlMemory()
|
|
|
+ + "], memory required by existing jobs [" + currentLoad.getAssignedJobMemory()
|
|
|
+ "], estimated memory required for this job [" + estimatedMemoryFootprint + "]";
|
|
|
logger.trace(reason);
|
|
|
reasons.add(reason);
|
|
@@ -177,15 +178,20 @@ public class JobNodeSelector {
|
|
|
// If we cannot get the job memory requirement,
|
|
|
// fall back to simply allocating by job count
|
|
|
allocateByMemory = false;
|
|
|
- logger.debug("Falling back to allocating job [{}] by job counts because its memory requirement was not available",
|
|
|
- jobId);
|
|
|
+ logger.debug(
|
|
|
+ () -> new ParameterizedMessage(
|
|
|
+ "Falling back to allocating job [{}] by job counts because its memory requirement was not available",
|
|
|
+ jobId));
|
|
|
}
|
|
|
} else {
|
|
|
// If we cannot get the available memory on any machine in
|
|
|
// the cluster, fall back to simply allocating by job count
|
|
|
allocateByMemory = false;
|
|
|
- logger.debug("Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]",
|
|
|
- jobId, nodeNameAndMlAttributes(node));
|
|
|
+ logger.debug(
|
|
|
+ () -> new ParameterizedMessage(
|
|
|
+ "Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]",
|
|
|
+ jobId,
|
|
|
+ nodeNameAndMlAttributes(node)));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -220,67 +226,6 @@ public class JobNodeSelector {
|
|
|
return currentAssignment;
|
|
|
}
|
|
|
|
|
|
- private CurrentLoad calculateCurrentLoadForNode(DiscoveryNode node, PersistentTasksCustomMetadata persistentTasks,
|
|
|
- final boolean allocateByMemory) {
|
|
|
- CurrentLoad result = new CurrentLoad(allocateByMemory);
|
|
|
-
|
|
|
- if (persistentTasks != null) {
|
|
|
- // find all the anomaly detector job tasks assigned to this node
|
|
|
- Collection<PersistentTasksCustomMetadata.PersistentTask<?>> assignedAnomalyDetectorTasks = persistentTasks.findTasks(
|
|
|
- MlTasks.JOB_TASK_NAME, task -> node.getId().equals(task.getExecutorNode()));
|
|
|
- for (PersistentTasksCustomMetadata.PersistentTask<?> assignedTask : assignedAnomalyDetectorTasks) {
|
|
|
- JobState jobState = MlTasks.getJobStateModifiedForReassignments(assignedTask);
|
|
|
- if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
|
|
|
- // Don't count CLOSED or FAILED jobs, as they don't consume native memory
|
|
|
- ++result.numberOfAssignedJobs;
|
|
|
- if (jobState == JobState.OPENING) {
|
|
|
- ++result.numberOfAllocatingJobs;
|
|
|
- }
|
|
|
- OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams();
|
|
|
- Long jobMemoryRequirement = memoryTracker.getAnomalyDetectorJobMemoryRequirement(params.getJobId());
|
|
|
- if (jobMemoryRequirement == null) {
|
|
|
- result.allocateByMemory = false;
|
|
|
- logger.debug("Falling back to allocating job [{}] by job counts because " +
|
|
|
- "the memory requirement for job [{}] was not available", jobId, params.getJobId());
|
|
|
- } else {
|
|
|
- logger.debug("adding " + jobMemoryRequirement);
|
|
|
- result.assignedJobMemory += jobMemoryRequirement;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- // find all the data frame analytics job tasks assigned to this node
|
|
|
- Collection<PersistentTasksCustomMetadata.PersistentTask<?>> assignedAnalyticsTasks = persistentTasks.findTasks(
|
|
|
- MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> node.getId().equals(task.getExecutorNode()));
|
|
|
- for (PersistentTasksCustomMetadata.PersistentTask<?> assignedTask : assignedAnalyticsTasks) {
|
|
|
- DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks.getDataFrameAnalyticsState(assignedTask);
|
|
|
-
|
|
|
- // Don't count stopped and failed df-analytics tasks as they don't consume native memory
|
|
|
- if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) {
|
|
|
- // The native process is only running in the ANALYZING and STOPPING states, but in the STARTED
|
|
|
- // and REINDEXING states we're committed to using the memory soon, so account for it here
|
|
|
- ++result.numberOfAssignedJobs;
|
|
|
- StartDataFrameAnalyticsAction.TaskParams params =
|
|
|
- (StartDataFrameAnalyticsAction.TaskParams) assignedTask.getParams();
|
|
|
- Long jobMemoryRequirement = memoryTracker.getDataFrameAnalyticsJobMemoryRequirement(params.getId());
|
|
|
- if (jobMemoryRequirement == null) {
|
|
|
- result.allocateByMemory = false;
|
|
|
- logger.debug("Falling back to allocating job [{}] by job counts because " +
|
|
|
- "the memory requirement for job [{}] was not available", jobId, params.getId());
|
|
|
- } else {
|
|
|
- result.assignedJobMemory += jobMemoryRequirement;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- // if any jobs are running then the native code will be loaded, but shared between all jobs,
|
|
|
- // so increase the total memory usage of the assigned jobs to account for this
|
|
|
- if (result.numberOfAssignedJobs > 0) {
|
|
|
- result.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
static String nodeNameOrId(DiscoveryNode node) {
|
|
|
String nodeNameOrID = node.getName();
|
|
|
if (Strings.isNullOrEmpty(nodeNameOrID)) {
|
|
@@ -308,15 +253,4 @@ public class JobNodeSelector {
|
|
|
return builder.toString();
|
|
|
}
|
|
|
|
|
|
- private static class CurrentLoad {
|
|
|
-
|
|
|
- long numberOfAssignedJobs = 0;
|
|
|
- long numberOfAllocatingJobs = 0;
|
|
|
- long assignedJobMemory = 0;
|
|
|
- boolean allocateByMemory;
|
|
|
-
|
|
|
- CurrentLoad(boolean allocateByMemory) {
|
|
|
- this.allocateByMemory = allocateByMemory;
|
|
|
- }
|
|
|
- }
|
|
|
}
|