Browse Source

[ML] Add integration tests to check persistence (#40272)

Additional checks have been added to exercise the behaviour of
persistence on graceful close of an anomaly job. In particular:

 - check that persistence does not occur for a job that is opened and
 then immediately closed, with nothing else having happened.
 - check that persistence occurs on graceful close of a job if it has
 processed data.
 - check that persistence occurs subsequent to time being manually
 advanced - even if no additional data has been seen by the job
 - check the edge case where persistence occurs if a job is opened, time
 is manually advanced and then the job is closed, having seen no data.

 Related to elastic/ml-cpp#393
Ed Savage 6 years ago
parent
commit
b0ce522869

+ 150 - 5
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/PersistJobIT.java

@@ -5,12 +5,17 @@
  */
 package org.elasticsearch.xpack.ml.integration;
 
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
 import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.Detector;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.junit.After;
 
@@ -18,8 +23,14 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
 public class PersistJobIT extends MlNativeAutodetectIntegTestCase {
 
+    private static final long BUCKET_SPAN_SECONDS = 300;
+    private static final TimeValue BUCKET_SPAN = TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS);
+
     @After
     public void cleanUpJobs() {
         cleanUp();
@@ -39,11 +50,141 @@ public class PersistJobIT extends MlNativeAutodetectIntegTestCase {
         });
     }
 
-    private void runJob(String jobId) throws Exception {
-        TimeValue bucketSpan = TimeValue.timeValueMinutes(5);
+    // check that state is persisted after time has been advanced even if no new data is seen in the interim
+    public void testPersistJobOnGracefulShutdown_givenTimeAdvancedAfterNoNewData() throws Exception {
+        String jobId = "time-advanced-after-no-new-data-test";
+
+        // open and run a job with a small data set
+        runJob(jobId);
+        FlushJobAction.Response flushResponse = flushJob(jobId, true);
+
+        closeJob(jobId);
+
+        // Check that state has been persisted
+        SearchResponse stateDocsResponse1 = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
+            .setFetchSource(false)
+            .setTrackTotalHits(true)
+            .setSize(10000)
+            .get();
+
+        int numQuantileRecords = 0;
+        int numStateRecords = 0;
+        for (SearchHit hit : stateDocsResponse1.getHits().getHits()) {
+            logger.info(hit.getId());
+            if (hit.getId().contains("quantiles")) {
+                ++numQuantileRecords;
+            } else if (hit.getId().contains("model_state")) {
+                ++numStateRecords;
+            }
+        }
+        assertThat(stateDocsResponse1.getHits().getTotalHits().value, equalTo(2L));
+        assertThat(numQuantileRecords, equalTo(1));
+        assertThat(numStateRecords, equalTo(1));
+
+        // re-open the job
+        openJob(jobId);
+
+        // advance time
+        long lastFinalizedBucketEnd = flushResponse.getLastFinalizedBucketEnd().getTime();
+        FlushJobAction.Request advanceTimeRequest = new FlushJobAction.Request(jobId);
+        advanceTimeRequest.setAdvanceTime(String.valueOf(lastFinalizedBucketEnd + BUCKET_SPAN_SECONDS * 1000));
+        advanceTimeRequest.setCalcInterim(false);
+        assertThat(client().execute(FlushJobAction.INSTANCE, advanceTimeRequest).actionGet().isFlushed(), is(true));
+
+        closeJob(jobId);
+
+        // Check that a new state record exists.
+        SearchResponse stateDocsResponse2 = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
+            .setFetchSource(false)
+            .setTrackTotalHits(true)
+            .setSize(10000)
+            .get();
+
+        numQuantileRecords = 0;
+        numStateRecords = 0;
+        for (SearchHit hit : stateDocsResponse2.getHits().getHits()) {
+            logger.info(hit.getId());
+            if (hit.getId().contains("quantiles")) {
+                ++numQuantileRecords;
+            } else if (hit.getId().contains("model_state")) {
+                ++numStateRecords;
+            }
+        }
+
+        assertThat(stateDocsResponse2.getHits().getTotalHits().value, equalTo(3L));
+        assertThat(numQuantileRecords, equalTo(1));
+        assertThat(numStateRecords, equalTo(2));
+
+        deleteJob(jobId);
+    }
+
+    // Check an edge case where time is manually advanced before any valid data is seen
+    public void testPersistJobOnGracefulShutdown_givenNoDataAndTimeAdvanced() throws Exception {
+        String jobId = "no-data-and-time-advanced-test";
+
+        createAndOpenJob(jobId);
+
+        // Manually advance time.
+        FlushJobAction.Request advanceTimeRequest = new FlushJobAction.Request(jobId);
+        advanceTimeRequest.setAdvanceTime(String.valueOf(BUCKET_SPAN_SECONDS * 1000));
+        advanceTimeRequest.setCalcInterim(false);
+        assertThat(client().execute(FlushJobAction.INSTANCE, advanceTimeRequest).actionGet().isFlushed(), is(true));
+
+        closeJob(jobId);
+
+        // Check that state has been persisted
+        SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
+            .setFetchSource(false)
+            .setTrackTotalHits(true)
+            .setSize(10000)
+            .get();
+
+        int numQuantileRecords = 0;
+        int numStateRecords = 0;
+        for (SearchHit hit : stateDocsResponse.getHits().getHits()) {
+            logger.info(hit.getId());
+            if (hit.getId().contains("quantiles")) {
+                ++numQuantileRecords;
+            } else if (hit.getId().contains("model_state")) {
+                ++numStateRecords;
+            }
+        }
+        assertThat(stateDocsResponse.getHits().getTotalHits().value, equalTo(2L));
+        assertThat(numQuantileRecords, equalTo(1));
+        assertThat(numStateRecords, equalTo(1));
+
+        // now check that the job can be happily restored - even though no data has been seen
+        AcknowledgedResponse ack = openJob(jobId);
+        assertTrue(ack.isAcknowledged());
+
+        closeJob(jobId);
+        deleteJob(jobId);
+    }
+
+    // Check an edge case where a job is opened and then immediately closed
+    public void testPersistJobOnGracefulShutdown_givenNoDataAndNoTimeAdvance() throws Exception {
+        String jobId = "no-data-and-no-time-advance-test";
+
+        createAndOpenJob(jobId);
+
+        closeJob(jobId);
+
+        // Check that state has not been persisted
+        SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
+            .setFetchSource(false)
+            .setTrackTotalHits(true)
+            .setSize(10000)
+            .get();
+
+        assertThat(stateDocsResponse.getHits().getTotalHits().value, equalTo(0L));
+
+        deleteJob(jobId);
+    }
+
+    private void createAndOpenJob(String jobId) throws Exception {
         Detector.Builder detector = new Detector.Builder("count", null);
         AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
-        analysisConfig.setBucketSpan(bucketSpan);
+        analysisConfig.setBucketSpan(BUCKET_SPAN);
         Job.Builder job = new Job.Builder(jobId);
         job.setAnalysisConfig(analysisConfig);
         job.setDataDescription(new DataDescription.Builder());
@@ -51,7 +192,11 @@ public class PersistJobIT extends MlNativeAutodetectIntegTestCase {
         putJob(job);
 
         openJob(job.getId());
-        List<String> data = generateData(System.currentTimeMillis(), bucketSpan, 10, bucketIndex -> randomIntBetween(10, 20));
-        postData(job.getId(), data.stream().collect(Collectors.joining()));
+    }
+
+    private void runJob(String jobId) throws Exception {
+        createAndOpenJob(jobId);
+        List<String> data = generateData(System.currentTimeMillis(), BUCKET_SPAN, 10, bucketIndex -> randomIntBetween(10, 20));
+        postData(jobId, data.stream().collect(Collectors.joining()));
     }
 }