|  | @@ -31,9 +31,12 @@ import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.core.ml.MlTasks;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction.DatafeedParams;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
 | 
	
		
			
				|  |  | +import org.elasticsearch.xpack.core.ml.inference.allocation.AllocationState;
 | 
	
		
			
				|  |  | +import org.elasticsearch.xpack.core.ml.inference.allocation.TrainedModelAllocation;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.core.ml.job.config.JobState;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.ml.MachineLearning;
 | 
	
		
			
				|  |  | +import org.elasticsearch.xpack.ml.inference.allocation.TrainedModelAllocationMetadata;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.ml.job.NodeLoad;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
 | 
	
		
			
				|  |  |  import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
 | 
	
	
		
			
				|  | @@ -348,6 +351,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
 | 
	
		
			
				|  |  |          PersistentTasksCustomMetadata tasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
 | 
	
		
			
				|  |  |          Collection<PersistentTask<?>> anomalyDetectionTasks = anomalyDetectionTasks(tasks);
 | 
	
		
			
				|  |  |          Collection<PersistentTask<?>> dataframeAnalyticsTasks = dataframeAnalyticsTasks(tasks);
 | 
	
		
			
				|  |  | +        Map<String, TrainedModelAllocation> modelAllocations = TrainedModelAllocationMetadata.fromState(clusterState).modelAllocations();
 | 
	
		
			
				|  |  |          final List<String> waitingAnomalyJobs = anomalyDetectionTasks.stream()
 | 
	
		
			
				|  |  |              .filter(t -> AWAITING_LAZY_ASSIGNMENT.equals(t.getAssignment()))
 | 
	
		
			
				|  |  |              .map(t -> MlTasks.jobId(t.getId()))
 | 
	
	
		
			
				|  | @@ -356,6 +360,13 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
 | 
	
		
			
				|  |  |              .filter(t -> AWAITING_LAZY_ASSIGNMENT.equals(t.getAssignment()))
 | 
	
		
			
				|  |  |              .map(t -> MlTasks.dataFrameAnalyticsId(t.getId()))
 | 
	
		
			
				|  |  |              .collect(Collectors.toList());
 | 
	
		
			
				|  |  | +        final List<String> waitingAllocatedModels = modelAllocations
 | 
	
		
			
				|  |  | +            .entrySet()
 | 
	
		
			
				|  |  | +            .stream()
 | 
	
		
			
				|  |  | +            // TODO: Eventually care about those that are STARTED but not FULLY_ALLOCATED
 | 
	
		
			
				|  |  | +            .filter(e -> e.getValue().getAllocationState().equals(AllocationState.STARTING) && e.getValue().getNodeRoutingTable().isEmpty())
 | 
	
		
			
				|  |  | +            .map(Map.Entry::getKey)
 | 
	
		
			
				|  |  | +            .collect(Collectors.toList());
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          final int numAnalyticsJobsInQueue = NUM_ANALYTICS_JOBS_IN_QUEUE.get(configuration);
 | 
	
		
			
				|  |  |          final int numAnomalyJobsInQueue = NUM_ANOMALY_JOBS_IN_QUEUE.get(configuration);
 | 
	
	
		
			
				|  | @@ -366,20 +377,22 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
 | 
	
		
			
				|  |  |          final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder()
 | 
	
		
			
				|  |  |              .setWaitingAnomalyJobs(waitingAnomalyJobs)
 | 
	
		
			
				|  |  |              .setWaitingAnalyticsJobs(waitingAnalyticsJobs)
 | 
	
		
			
				|  |  | +            .setWaitingModels(waitingAllocatedModels)
 | 
	
		
			
				|  |  |              .setCurrentMlCapacity(currentScale.autoscalingCapacity(maxMachineMemoryPercent, useAuto))
 | 
	
		
			
				|  |  |              .setPassedConfiguration(configuration);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          // There are no ML nodes, scale up as quick as possible, no matter if memory is stale or not
 | 
	
		
			
				|  |  |          if (nodes.isEmpty()
 | 
	
		
			
				|  |  |              && (waitingAnomalyJobs.isEmpty() == false
 | 
	
		
			
				|  |  | -            || waitingAnalyticsJobs.isEmpty() == false)) {
 | 
	
		
			
				|  |  | -            return scaleUpFromZero(waitingAnomalyJobs, waitingAnalyticsJobs, reasonBuilder);
 | 
	
		
			
				|  |  | +            || waitingAnalyticsJobs.isEmpty() == false
 | 
	
		
			
				|  |  | +            || waitingAllocatedModels.isEmpty() == false)) {
 | 
	
		
			
				|  |  | +            return scaleUpFromZero(waitingAnomalyJobs, waitingAnalyticsJobs, waitingAllocatedModels, reasonBuilder);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          // We don't need to check anything as there are no tasks
 | 
	
		
			
				|  |  |          // This is a quick path to downscale.
 | 
	
		
			
				|  |  |          // simply return `0` for scale down if delay is satisfied
 | 
	
		
			
				|  |  | -        if (anomalyDetectionTasks.isEmpty() && dataframeAnalyticsTasks.isEmpty()) {
 | 
	
		
			
				|  |  | +        if (anomalyDetectionTasks.isEmpty() && dataframeAnalyticsTasks.isEmpty() && modelAllocations.isEmpty()) {
 | 
	
		
			
				|  |  |              long msLeftToScale = msLeftToDownScale(configuration);
 | 
	
		
			
				|  |  |              if (msLeftToScale > 0) {
 | 
	
		
			
				|  |  |                  return new AutoscalingDeciderResult(
 | 
	
	
		
			
				|  | @@ -462,6 +475,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
 | 
	
		
			
				|  |  |              nodeLoads,
 | 
	
		
			
				|  |  |              waitingAnomalyJobs,
 | 
	
		
			
				|  |  |              waitingAnalyticsJobs,
 | 
	
		
			
				|  |  | +            waitingAllocatedModels,
 | 
	
		
			
				|  |  |              futureFreedCapacity.orElse(null),
 | 
	
		
			
				|  |  |              currentScale,
 | 
	
		
			
				|  |  |              reasonBuilder
 | 
	
	
		
			
				|  | @@ -492,7 +506,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
 | 
	
		
			
				|  |  |                      .build()));
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        long largestJob = Math.max(
 | 
	
		
			
				|  |  | +        long largestJobOrModel = Math.max(
 | 
	
		
			
				|  |  |              anomalyDetectionTasks.stream()
 | 
	
		
			
				|  |  |                  .filter(PersistentTask::isAssigned)
 | 
	
		
			
				|  |  |                  // Memory SHOULD be recently refreshed, so in our current state, we should at least have an idea of the memory used
 | 
	
	
		
			
				|  | @@ -513,15 +527,20 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
 | 
	
		
			
				|  |  |                  })
 | 
	
		
			
				|  |  |                  .max()
 | 
	
		
			
				|  |  |                  .orElse(0L));
 | 
	
		
			
				|  |  | +        largestJobOrModel = Math.max(
 | 
	
		
			
				|  |  | +            largestJobOrModel,
 | 
	
		
			
				|  |  | +            modelAllocations.values().stream().mapToLong(t -> t.getTaskParams().estimateMemoryUsageBytes()).max().orElse(0L)
 | 
	
		
			
				|  |  | +        );
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          // This is an exceptionally weird state
 | 
	
		
			
				|  |  |          // Our view of the memory is stale or we have tasks where the required job memory is 0, which should be impossible
 | 
	
		
			
				|  |  | -        if (largestJob == 0L && (dataframeAnalyticsTasks.size() + anomalyDetectionTasks.size() > 0)) {
 | 
	
		
			
				|  |  | +        if (largestJobOrModel == 0L && (dataframeAnalyticsTasks.size() + anomalyDetectionTasks.size() + modelAllocations.size() > 0)) {
 | 
	
		
			
				|  |  |              logger.warn(
 | 
	
		
			
				|  |  |                  "The calculated minimum required node size was unexpectedly [0] as there are "
 | 
	
		
			
				|  |  | -                    + "[{}] anomaly job tasks and [{}] data frame analytics tasks",
 | 
	
		
			
				|  |  | +                    + "[{}] anomaly job tasks, [{}] data frame analytics tasks and [{}] model allocations",
 | 
	
		
			
				|  |  |                  anomalyDetectionTasks.size(),
 | 
	
		
			
				|  |  | -                dataframeAnalyticsTasks.size()
 | 
	
		
			
				|  |  | +                dataframeAnalyticsTasks.size(),
 | 
	
		
			
				|  |  | +                modelAllocations.size()
 | 
	
		
			
				|  |  |              );
 | 
	
		
			
				|  |  |              return noScaleResultOrRefresh(reasonBuilder, true, new AutoscalingDeciderResult(
 | 
	
		
			
				|  |  |                  context.currentCapacity(),
 | 
	
	
		
			
				|  | @@ -531,7 +550,12 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
 | 
	
		
			
				|  |  |                      .build()));
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        final Optional<AutoscalingDeciderResult> maybeScaleDown = checkForScaleDown(nodeLoads, largestJob, currentScale, reasonBuilder)
 | 
	
		
			
				|  |  | +        final Optional<AutoscalingDeciderResult> maybeScaleDown = checkForScaleDown(
 | 
	
		
			
				|  |  | +            nodeLoads,
 | 
	
		
			
				|  |  | +            largestJobOrModel,
 | 
	
		
			
				|  |  | +            currentScale,
 | 
	
		
			
				|  |  | +            reasonBuilder
 | 
	
		
			
				|  |  | +        )
 | 
	
		
			
				|  |  |              // Due to weird rounding errors, it may be that a scale down result COULD cause a scale up
 | 
	
		
			
				|  |  |              // Ensuring the scaleDown here forces the scale down result to always be lower than the current capacity.
 | 
	
		
			
				|  |  |              // This is safe as we know that ALL jobs are assigned at the current capacity
 | 
	
	
		
			
				|  | @@ -643,6 +667,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
 | 
	
		
			
				|  |  |      // can eventually start, and given the current cluster, no job can eventually start.
 | 
	
		
			
				|  |  |      AutoscalingDeciderResult scaleUpFromZero(List<String> waitingAnomalyJobs,
 | 
	
		
			
				|  |  |                                               List<String> waitingAnalyticsJobs,
 | 
	
		
			
				|  |  | +                                             List<String> waitingAllocatedModels,
 | 
	
		
			
				|  |  |                                               MlScalingReason.Builder reasonBuilder) {
 | 
	
		
			
				|  |  |          final Optional<NativeMemoryCapacity> analyticsCapacity = requiredCapacityForUnassignedJobs(waitingAnalyticsJobs,
 | 
	
		
			
				|  |  |              this::getAnalyticsMemoryRequirement,
 | 
	
	
		
			
				|  | @@ -650,9 +675,13 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
 | 
	
		
			
				|  |  |          final Optional<NativeMemoryCapacity> anomalyCapacity = requiredCapacityForUnassignedJobs(waitingAnomalyJobs,
 | 
	
		
			
				|  |  |              this::getAnomalyMemoryRequirement,
 | 
	
		
			
				|  |  |              0);
 | 
	
		
			
				|  |  | +        final Optional<NativeMemoryCapacity> allocatedModelCapacity = requiredCapacityForUnassignedJobs(waitingAllocatedModels,
 | 
	
		
			
				|  |  | +            this::getAllocatedModelRequirement,
 | 
	
		
			
				|  |  | +            0);
 | 
	
		
			
				|  |  |          NativeMemoryCapacity updatedCapacity = NativeMemoryCapacity.ZERO
 | 
	
		
			
				|  |  |              .merge(anomalyCapacity.orElse(NativeMemoryCapacity.ZERO))
 | 
	
		
			
				|  |  | -            .merge(analyticsCapacity.orElse(NativeMemoryCapacity.ZERO));
 | 
	
		
			
				|  |  | +            .merge(analyticsCapacity.orElse(NativeMemoryCapacity.ZERO))
 | 
	
		
			
				|  |  | +            .merge(allocatedModelCapacity.orElse(NativeMemoryCapacity.ZERO));
 | 
	
		
			
				|  |  |          // If we still have calculated zero, this means the ml memory tracker does not have the required info.
 | 
	
		
			
				|  |  |          // So, request a scale for the default. This is only for the 0 -> N scaling case.
 | 
	
		
			
				|  |  |          if (updatedCapacity.getNode() == 0L) {
 | 
	
	
		
			
				|  | @@ -681,13 +710,15 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
 | 
	
		
			
				|  |  |                                                         List<NodeLoad> nodeLoads,
 | 
	
		
			
				|  |  |                                                         List<String> waitingAnomalyJobs,
 | 
	
		
			
				|  |  |                                                         List<String> waitingAnalyticsJobs,
 | 
	
		
			
				|  |  | +                                                       List<String> waitingAllocatedModels,
 | 
	
		
			
				|  |  |                                                         @Nullable NativeMemoryCapacity futureFreedCapacity,
 | 
	
		
			
				|  |  |                                                         NativeMemoryCapacity currentScale,
 | 
	
		
			
				|  |  |                                                         MlScalingReason.Builder reasonBuilder) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          // Are we in breach of maximum waiting jobs?
 | 
	
		
			
				|  |  |          if (waitingAnalyticsJobs.size() > numAnalyticsJobsInQueue
 | 
	
		
			
				|  |  | -            || waitingAnomalyJobs.size() > numAnomalyJobsInQueue) {
 | 
	
		
			
				|  |  | +            || waitingAnomalyJobs.size() > numAnomalyJobsInQueue
 | 
	
		
			
				|  |  | +            || waitingAllocatedModels.size() > 0) {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              Tuple<NativeMemoryCapacity, List<NodeLoad>> anomalyCapacityAndNewLoad = determineUnassignableJobs(
 | 
	
		
			
				|  |  |                  waitingAnomalyJobs,
 | 
	
	
		
			
				|  | @@ -701,8 +732,15 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
 | 
	
		
			
				|  |  |                  numAnalyticsJobsInQueue,
 | 
	
		
			
				|  |  |                  anomalyCapacityAndNewLoad.v2()).orElse(Tuple.tuple(NativeMemoryCapacity.ZERO, anomalyCapacityAndNewLoad.v2()));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +            Tuple<NativeMemoryCapacity, List<NodeLoad>> modelCapacityAndNewLoad = determineUnassignableJobs(
 | 
	
		
			
				|  |  | +                waitingAllocatedModels,
 | 
	
		
			
				|  |  | +                this::getAllocatedModelRequirement,
 | 
	
		
			
				|  |  | +                0,
 | 
	
		
			
				|  |  | +                analyticsCapacityAndNewLoad.v2()).orElse(Tuple.tuple(NativeMemoryCapacity.ZERO, analyticsCapacityAndNewLoad.v2()));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |              if (analyticsCapacityAndNewLoad.v1().equals(NativeMemoryCapacity.ZERO)
 | 
	
		
			
				|  |  | -                && anomalyCapacityAndNewLoad.v1().equals(NativeMemoryCapacity.ZERO)) {
 | 
	
		
			
				|  |  | +                && anomalyCapacityAndNewLoad.v1().equals(NativeMemoryCapacity.ZERO)
 | 
	
		
			
				|  |  | +                && modelCapacityAndNewLoad.v1().equals(NativeMemoryCapacity.ZERO)) {
 | 
	
		
			
				|  |  |                  logger.debug("no_scale event as current capacity, even though there are waiting jobs, is adequate to run the queued jobs");
 | 
	
		
			
				|  |  |                  return Optional.empty();
 | 
	
		
			
				|  |  |              }
 | 
	
	
		
			
				|  | @@ -710,6 +748,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
 | 
	
		
			
				|  |  |              NativeMemoryCapacity updatedCapacity = NativeMemoryCapacity.from(currentScale)
 | 
	
		
			
				|  |  |                  .merge(analyticsCapacityAndNewLoad.v1())
 | 
	
		
			
				|  |  |                  .merge(anomalyCapacityAndNewLoad.v1())
 | 
	
		
			
				|  |  | +                .merge(modelCapacityAndNewLoad.v1())
 | 
	
		
			
				|  |  |                  // Since we require new capacity, it COULD be we require a brand new node
 | 
	
		
			
				|  |  |                  // We should account for overhead in the tier capacity just in case.
 | 
	
		
			
				|  |  |                  .merge(new NativeMemoryCapacity(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), 0));
 | 
	
	
		
			
				|  | @@ -720,13 +759,15 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
 | 
	
		
			
				|  |  |                      .setRequiredCapacity(requiredCapacity)
 | 
	
		
			
				|  |  |                      .setSimpleReason(
 | 
	
		
			
				|  |  |                          "requesting scale up as number of jobs in queues exceeded configured limit "
 | 
	
		
			
				|  |  | -                            + "and current capacity is not large enough for waiting jobs"
 | 
	
		
			
				|  |  | +                            + "or there is at least one trained model waiting for allocation "
 | 
	
		
			
				|  |  | +                            + "and current capacity is not large enough for waiting jobs or models"
 | 
	
		
			
				|  |  |                      )
 | 
	
		
			
				|  |  |                      .build()
 | 
	
		
			
				|  |  |              ));
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          // Could the currently waiting jobs ever be assigned?
 | 
	
		
			
				|  |  | +        // NOTE: the previous predicate catches if an allocated model isn't assigned
 | 
	
		
			
				|  |  |          if (waitingAnalyticsJobs.isEmpty() == false || waitingAnomalyJobs.isEmpty() == false) {
 | 
	
		
			
				|  |  |              // we are unable to determine new tier size, but maybe we can see if our nodes are big enough.
 | 
	
		
			
				|  |  |              if (futureFreedCapacity == null) {
 | 
	
	
		
			
				|  | @@ -861,6 +902,10 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService,
 | 
	
		
			
				|  |  |          return mlMemoryTracker.getDataFrameAnalyticsJobMemoryRequirement(analyticsId);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    private Long getAllocatedModelRequirement(String modelId) {
 | 
	
		
			
				|  |  | +        return mlMemoryTracker.getTrainedModelAllocationMemoryRequirement(modelId);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      private Long getAnalyticsMemoryRequirement(PersistentTask<?> task) {
 | 
	
		
			
				|  |  |          return getAnalyticsMemoryRequirement(MlTasks.dataFrameAnalyticsId(task.getId()));
 | 
	
		
			
				|  |  |      }
 |