Browse Source

[ML] Wait for updates to established memory usage

Tests need to wait for changes to the job's established memory usage to
propagate and an over enthusiastic optimisation meant jobs were updated
from stale state causing recent change to be lost.
David Kyle 7 years ago
parent
commit
cfc66a1fd5

+ 11 - 15
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java

@@ -63,6 +63,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -305,29 +306,22 @@ public class JobManager extends AbstractComponent {
     }
 
     private void internalJobUpdate(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
-
-        Job job = getJobOrThrowIfUnknown(request.getJobId());
-        final Job updatedJob = request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit);
-        if (updatedJob.equals(job)) {
-            // No change will results in a clusterstate update no-op so don't
-            // submit the request.
-            actionListener.onResponse(new PutJobAction.Response(updatedJob));
-            return;
-        }
-
         if (request.isWaitForAck()) {
             // Use the ack cluster state update
             clusterService.submitStateUpdateTask("update-job-" + request.getJobId(),
                     new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) {
+                        private AtomicReference<Job> updatedJob = new AtomicReference<>();
 
                         @Override
                         protected PutJobAction.Response newResponse(boolean acknowledged) {
-                            return new PutJobAction.Response(updatedJob);
+                            return new PutJobAction.Response(updatedJob.get());
                         }
 
                         @Override
                         public ClusterState execute(ClusterState currentState) {
-                            return updateClusterState(updatedJob, true, currentState);
+                            Job job = getJobOrThrowIfUnknown(request.getJobId(), currentState);
+                            updatedJob.set(request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit));
+                            return updateClusterState(updatedJob.get(), true, currentState);
                         }
 
                         @Override
@@ -337,10 +331,13 @@ public class JobManager extends AbstractComponent {
                     });
         } else {
             clusterService.submitStateUpdateTask("update-job-" + request.getJobId(), new ClusterStateUpdateTask() {
+                private AtomicReference<Job> updatedJob = new AtomicReference<>();
 
                 @Override
                 public ClusterState execute(ClusterState currentState) throws Exception {
-                    return updateClusterState(updatedJob, true, currentState);
+                    Job job = getJobOrThrowIfUnknown(request.getJobId(), currentState);
+                    updatedJob.set(request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit));
+                    return updateClusterState(updatedJob.get(), true, currentState);
                 }
 
                 @Override
@@ -351,8 +348,7 @@ public class JobManager extends AbstractComponent {
                 @Override
                 public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
                     afterClusterStateUpdate(clusterChangedEvent.state(), request);
-                    actionListener.onResponse(new PutJobAction.Response(updatedJob));
-
+                    actionListener.onResponse(new PutJobAction.Response(updatedJob.get()));
                 }
             });
         }

+ 6 - 4
x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/BasicRenormalizationIT.java

@@ -50,10 +50,12 @@ public class BasicRenormalizationIT extends MlNativeAutodetectIntegTestCase {
 
         // Since this job ran for 50 buckets, it's a good place to assert
         // that established model memory matches model memory in the job stats
-        GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId).get(0);
-        ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
-        Job updatedJob = getJob(jobId).get(0);
-        assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes()));
+        assertBusy(() -> {
+            GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId).get(0);
+            ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
+            Job updatedJob = getJob(jobId).get(0);
+            assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes()));
+        });
     }
 
     public void testRenormalizationDisabled() throws Exception {

+ 6 - 4
x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java

@@ -94,10 +94,12 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
 
         // Since this job ran for 168 buckets, it's a good place to assert
         // that established model memory matches model memory in the job stats
-        GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0);
-        ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
-        Job updatedJob = getJob(job.getId()).get(0);
-        assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes()));
+        assertBusy(() -> {
+            GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0);
+            ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
+            Job updatedJob = getJob(job.getId()).get(0);
+            assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes()));
+        });
     }
 
     public void testRealtime() throws Exception {

+ 6 - 4
x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/OverallBucketsIT.java

@@ -103,10 +103,12 @@ public class OverallBucketsIT extends MlNativeAutodetectIntegTestCase {
 
         // Since this job ran for 3000 buckets, it's a good place to assert
         // that established model memory matches model memory in the job stats
-        GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0);
-        ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
-        Job updatedJob = getJob(job.getId()).get(0);
-        assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes()));
+        assertBusy(() -> {
+            GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0);
+            ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
+            Job updatedJob = getJob(job.getId()).get(0);
+            assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes()));
+        });
     }
 
     private static Map<String, Object> createRecord(long timestamp) {

+ 6 - 4
x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RestoreModelSnapshotIT.java

@@ -75,10 +75,12 @@ public class RestoreModelSnapshotIT extends MlNativeAutodetectIntegTestCase {
 
         // Since these jobs ran for 72 buckets, it's a good place to assert
         // that established model memory matches model memory in the job stats
-        GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0);
-        ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
-        Job updatedJob = getJob(job.getId()).get(0);
-        assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes()));
+        assertBusy(() -> {
+            GetJobsStatsAction.Response.JobStats jobStats = getJobStats(job.getId()).get(0);
+            ModelSizeStats modelSizeStats = jobStats.getModelSizeStats();
+            Job updatedJob = getJob(job.getId()).get(0);
+            assertThat(updatedJob.getEstablishedModelMemory(), equalTo(modelSizeStats.getModelBytes()));
+        });
     }
 
     private Job.Builder buildAndRegisterJob(String jobId, TimeValue bucketSpan) throws Exception {