Browse Source

[ML] Fix ML memory tracker for old jobs (#37311)

Jobs created in version 6.1 or earlier can have a
null model_memory_limit.  If these are parsed from
cluster state following a full cluster restart then
we replace the null with 4096mb to make the meaning
explicit.  But if such jobs are streamed from an
old node in a mixed version cluster this does not
happen.  Therefore we need to account for the
possibility of a null model_memory_limit in the ML
memory tracker.
David Roberts 6 years ago
parent
commit
b65006e8cd

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisLimits.java

@@ -36,7 +36,7 @@ public class AnalysisLimits implements ToXContentObject, Writeable {
      * the old default value should be used. From 6.3 onwards, the value will always be explicit.
      */
     public static final long DEFAULT_MODEL_MEMORY_LIMIT_MB = 1024L;
-    static final long PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB = 4096L;
+    public static final long PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB = 4096L;
 
     public static final long DEFAULT_CATEGORIZATION_EXAMPLES_LIMIT = 4;
 

+ 10 - 8
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java

@@ -20,6 +20,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
+import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.job.JobManager;
@@ -269,15 +270,16 @@ public class MlMemoryTracker implements LocalNodeMasterListener {
 
     private void setJobMemoryToLimit(String jobId, ActionListener<Long> listener) {
         jobManager.getJob(jobId, ActionListener.wrap(job -> {
-            Long memoryLimitMb = job.getAnalysisLimits().getModelMemoryLimit();
-            if (memoryLimitMb != null) {
-                Long memoryRequirementBytes = ByteSizeUnit.MB.toBytes(memoryLimitMb) + Job.PROCESS_MEMORY_OVERHEAD.getBytes();
-                memoryRequirementByJob.put(jobId, memoryRequirementBytes);
-                listener.onResponse(memoryRequirementBytes);
-            } else {
-                memoryRequirementByJob.remove(jobId);
-                listener.onResponse(null);
+            Long memoryLimitMb = (job.getAnalysisLimits() != null) ? job.getAnalysisLimits().getModelMemoryLimit() : null;
+            // Although recent versions of the code enforce a non-null model_memory_limit
+            // when parsing, the job could have been streamed from an older version node in
+            // a mixed version cluster
+            if (memoryLimitMb == null) {
+                memoryLimitMb = AnalysisLimits.PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB;
             }
+            Long memoryRequirementBytes = ByteSizeUnit.MB.toBytes(memoryLimitMb) + Job.PROCESS_MEMORY_OVERHEAD.getBytes();
+            memoryRequirementByJob.put(jobId, memoryRequirementBytes);
+            listener.onResponse(memoryRequirementBytes);
         }, e -> {
             if (e instanceof ResourceNotFoundException) {
                 // TODO: does this also happen if the .ml-config index exists but is unavailable?

+ 6 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java

@@ -123,9 +123,10 @@ public class MlMemoryTrackerTests extends ESTestCase {
             return null;
         }).when(jobResultsProvider).getEstablishedMemoryUsage(eq(jobId), any(), any(), any(Consumer.class), any());
 
-        long modelMemoryLimitMb = 2;
+        boolean simulateVeryOldJob = randomBoolean();
+        long recentJobModelMemoryLimitMb = 2;
         Job job = mock(Job.class);
-        when(job.getAnalysisLimits()).thenReturn(new AnalysisLimits(modelMemoryLimitMb, 4L));
+        when(job.getAnalysisLimits()).thenReturn(simulateVeryOldJob ? null : new AnalysisLimits(recentJobModelMemoryLimitMb, 4L));
         doAnswer(invocation -> {
             @SuppressWarnings("unchecked")
             ActionListener<Job> listener = (ActionListener<Job>) invocation.getArguments()[1];
@@ -141,7 +142,9 @@ public class MlMemoryTrackerTests extends ESTestCase {
                 assertEquals(Long.valueOf(modelBytes + Job.PROCESS_MEMORY_OVERHEAD.getBytes()),
                     memoryTracker.getJobMemoryRequirement(jobId));
             } else {
-                assertEquals(Long.valueOf(ByteSizeUnit.MB.toBytes(modelMemoryLimitMb) + Job.PROCESS_MEMORY_OVERHEAD.getBytes()),
+                long expectedModelMemoryLimit =
+                    simulateVeryOldJob ? AnalysisLimits.PRE_6_1_DEFAULT_MODEL_MEMORY_LIMIT_MB : recentJobModelMemoryLimitMb;
+                assertEquals(Long.valueOf(ByteSizeUnit.MB.toBytes(expectedModelMemoryLimit) + Job.PROCESS_MEMORY_OVERHEAD.getBytes()),
                     memoryTracker.getJobMemoryRequirement(jobId));
             }
         } else {