Răsfoiți Sursa

[ML] Improve anomaly detection results indexing speed (#92417)

The anomaly detection results were being indexed in much smaller
batches than they could be, leading to unnecessarily poor indexing
performance.

- We used to execute a bulk request for every bucket result seen.
  This is only necessary when interim results are present. When
  running on historical data there is no need for every bucket
  result to submit a new bulk request.
- Category definitions and model size stats were indexed singly.
  These are now added to the bulk requests that index other
  results. However, there is a complication which is that two
  results with the same key may be seen in quick succession. To
  avoid keeping the older result rather than the newer one the
  bulk requests now deduplicate by document ID, such that if two
  results with the same document ID are added then only the later
  one actually gets indexed.
- When receiving new quantiles, the bulk request is only submitted
  and the index refreshed if they are to be used for normalization.
  Many quantiles are superseded before they are used, and these
  do not need to refresh the index.
David Roberts 2 ani în urmă
părinte
comite
a422d82a07
15 a modificat fișierele cu 258 adăugiri și 221 ștergeri
  1. 5 0
      docs/changelog/92417.yaml
  2. 1 1
      x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnomalyJobCRUDIT.java
  3. 4 2
      x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java
  4. 12 12
      x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java
  5. 3 3
      x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java
  6. 7 7
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java
  7. 104 82
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java
  8. 49 42
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java
  9. 2 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/JobSnapshotUpgraderResultProcessor.java
  10. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Renormalizer.java
  11. 16 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java
  12. 2 5
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java
  13. 2 4
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java
  14. 48 54
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java
  15. 2 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizerTests.java

+ 5 - 0
docs/changelog/92417.yaml

@@ -0,0 +1,5 @@
+pr: 92417
+summary: Improve anomaly detection results indexing speed
+area: Machine Learning
+type: enhancement
+issues: []

+ 1 - 1
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AnomalyJobCRUDIT.java

@@ -92,7 +92,7 @@ public class AnomalyJobCRUDIT extends MlSingleNodeTestCase {
             new ModelSizeStats.Builder(jobId).setTimestamp(new Date()).setLogTime(new Date()).setModelBytes(10000000).build(),
             () -> false
         );
-        jobResultsPersister.commitResultWrites(jobId);
+        jobResultsPersister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
 
         ElasticsearchStatusException iae = expectThrows(
             ElasticsearchStatusException.class,

+ 4 - 2
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java

@@ -110,6 +110,8 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.startsWith;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -372,7 +374,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         Optional<Quantiles> persistedQuantiles = getQuantiles();
         assertTrue(persistedQuantiles.isPresent());
         assertEquals(quantiles, persistedQuantiles.get());
-        verify(renormalizer).renormalize(quantiles);
+        verify(renormalizer).renormalize(eq(quantiles), any(Runnable.class));
     }
 
     public void testParseQuantiles_GivenRenormalizationIsDisabled() throws Exception {
@@ -389,7 +391,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         Optional<Quantiles> persistedQuantiles = getQuantiles();
         assertTrue(persistedQuantiles.isPresent());
         assertEquals(quantiles, persistedQuantiles.get());
-        verify(renormalizer, never()).renormalize(quantiles);
+        verify(renormalizer, never()).renormalize(any(), any());
     }
 
     public void testDeleteInterimResults() throws Exception {

+ 12 - 12
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/EstablishedMemUsageIT.java

@@ -82,7 +82,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
         initClusterAndJob(jobId);
 
         createBuckets(jobId, 25);
-        jobResultsPersister.commitResultWrites(jobId);
+        jobResultsPersister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
 
         assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
     }
@@ -93,7 +93,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
         initClusterAndJob(jobId);
 
         createBuckets(jobId, 5);
-        jobResultsPersister.commitResultWrites(jobId);
+        jobResultsPersister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
 
         assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
     }
@@ -106,7 +106,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
         createBuckets(jobId, 19);
         createModelSizeStats(jobId, 1, 19000L);
         ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L);
-        jobResultsPersister.commitResultWrites(jobId);
+        jobResultsPersister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
 
         assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
         assertThat(queryEstablishedMemoryUsage(jobId, 19, latestModelSizeStats), equalTo(0L));
@@ -120,7 +120,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
         createBuckets(jobId, 20);
         createModelSizeStats(jobId, 1, 19000L);
         ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L);
-        jobResultsPersister.commitResultWrites(jobId);
+        jobResultsPersister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
 
         assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
         assertThat(queryEstablishedMemoryUsage(jobId, 20, latestModelSizeStats), equalTo(20000L));
@@ -134,7 +134,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
         createBuckets(jobId, 20);
         createModelSizeStats(jobId, 1, 0L);
         ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 0L);
-        jobResultsPersister.commitResultWrites(jobId);
+        jobResultsPersister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
 
         assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
         assertThat(queryEstablishedMemoryUsage(jobId, 20, latestModelSizeStats), equalTo(0L));
@@ -148,7 +148,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
         createBuckets(jobId, 20);
         createModelSizeStats(jobId, 1, 1000L);
         ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L);
-        jobResultsPersister.commitResultWrites(jobId);
+        jobResultsPersister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
 
         assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
         assertThat(queryEstablishedMemoryUsage(jobId, 20, latestModelSizeStats), equalTo(0L));
@@ -162,7 +162,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
         createBuckets(jobId, 25);
         createModelSizeStats(jobId, 1, 10000L);
         ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 2, 20000L);
-        jobResultsPersister.commitResultWrites(jobId);
+        jobResultsPersister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
 
         assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
         assertThat(queryEstablishedMemoryUsage(jobId, 25, latestModelSizeStats), equalTo(20000L));
@@ -176,7 +176,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
         createBuckets(jobId, 25);
         createModelSizeStats(jobId, 1, 10000L);
         ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L);
-        jobResultsPersister.commitResultWrites(jobId);
+        jobResultsPersister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
 
         assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
         assertThat(queryEstablishedMemoryUsage(jobId, 25, latestModelSizeStats), equalTo(20000L));
@@ -189,7 +189,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
 
         createBuckets(jobId, 25);
         ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 0L);
-        jobResultsPersister.commitResultWrites(jobId);
+        jobResultsPersister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
 
         assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
         assertThat(queryEstablishedMemoryUsage(jobId, 25, latestModelSizeStats), equalTo(0L));
@@ -202,7 +202,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
 
         createBuckets(jobId, 25);
         ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 10, 20000L);
-        jobResultsPersister.commitResultWrites(jobId);
+        jobResultsPersister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
 
         assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
         assertThat(queryEstablishedMemoryUsage(jobId, 25, latestModelSizeStats), equalTo(20000L));
@@ -220,7 +220,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
         createModelSizeStats(jobId, 19, 9000L);
         createModelSizeStats(jobId, 30, 19000L);
         ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 35, 20000L);
-        jobResultsPersister.commitResultWrites(jobId);
+        jobResultsPersister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
 
         assertThat(queryEstablishedMemoryUsage(jobId), equalTo(20000L));
         assertThat(queryEstablishedMemoryUsage(jobId, 40, latestModelSizeStats), equalTo(20000L));
@@ -238,7 +238,7 @@ public class EstablishedMemUsageIT extends BaseMlIntegTestCase {
         createModelSizeStats(jobId, 27, 39000L);
         createModelSizeStats(jobId, 30, 67000L);
         ModelSizeStats latestModelSizeStats = createModelSizeStats(jobId, 35, 95000L);
-        jobResultsPersister.commitResultWrites(jobId);
+        jobResultsPersister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
 
         assertThat(queryEstablishedMemoryUsage(jobId), equalTo(0L));
         assertThat(queryEstablishedMemoryUsage(jobId, 40, latestModelSizeStats), equalTo(0L));

+ 3 - 3
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java

@@ -468,7 +468,7 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
 
         ModelSizeStats storedModelSizeStats = new ModelSizeStats.Builder(job.getId()).setModelBytes(10L).build();
         jobResultsPersister.persistModelSizeStats(storedModelSizeStats, () -> false);
-        jobResultsPersister.commitResultWrites(job.getId());
+        jobResultsPersister.commitWrites(job.getId(), JobResultsPersister.CommitType.RESULTS);
 
         setOrThrow.get();
         assertThat(dataCountsAtomicReference.get().getJobId(), equalTo(job.getId()));
@@ -479,7 +479,7 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
         storedTimingStats.updateStats(10);
 
         jobResultsPersister.bulkPersisterBuilder(job.getId()).persistTimingStats(storedTimingStats).executeRequest();
-        jobResultsPersister.commitResultWrites(job.getId());
+        jobResultsPersister.commitWrites(job.getId(), JobResultsPersister.CommitType.RESULTS);
 
         setOrThrow.get();
 
@@ -492,7 +492,7 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
         storedDataCounts.incrementMissingFieldCount(1L);
         JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client(), resultsPersisterService, auditor);
         jobDataCountsPersister.persistDataCounts(job.getId(), storedDataCounts);
-        jobResultsPersister.commitResultWrites(job.getId());
+        jobResultsPersister.commitWrites(job.getId(), JobResultsPersister.CommitType.RESULTS);
 
         setOrThrow.get();
         assertThat(dataCountsAtomicReference.get(), equalTo(storedDataCounts));

+ 7 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/annotations/AnnotationPersister.java

@@ -66,22 +66,22 @@ public class AnnotationPersister {
     }
 
     public Builder bulkPersisterBuilder(String jobId) {
-        return new Builder(jobId);
+        return new Builder(jobId, () -> true);
+    }
+
+    public Builder bulkPersisterBuilder(String jobId, Supplier<Boolean> shouldRetry) {
+        return new Builder(jobId, shouldRetry);
     }
 
     public class Builder {
 
         private final String jobId;
         private BulkRequest bulkRequest = new BulkRequest(AnnotationIndex.WRITE_ALIAS_NAME);
-        private Supplier<Boolean> shouldRetry = () -> true;
+        private final Supplier<Boolean> shouldRetry;
 
-        private Builder(String jobId) {
+        private Builder(String jobId, Supplier<Boolean> shouldRetry) {
             this.jobId = Objects.requireNonNull(jobId);
-        }
-
-        public Builder shouldRetry(Supplier<Boolean> shouldRetry) {
             this.shouldRetry = Objects.requireNonNull(shouldRetry);
-            return this;
         }
 
         public Builder persistAnnotation(Annotation annotation) {

+ 104 - 82
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java

@@ -49,9 +49,14 @@ import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
 import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.Supplier;
 
 import static org.elasticsearch.core.Strings.format;
@@ -82,30 +87,39 @@ public class JobResultsPersister {
     private final OriginSettingClient client;
     private final ResultsPersisterService resultsPersisterService;
 
+    /**
+     * The possible types of data that may be committed.
+     */
+    public enum CommitType {
+        RESULTS,
+        STATE,
+        ANNOTATIONS
+    };
+
     public JobResultsPersister(OriginSettingClient client, ResultsPersisterService resultsPersisterService) {
         this.client = client;
         this.resultsPersisterService = resultsPersisterService;
     }
 
     public Builder bulkPersisterBuilder(String jobId) {
-        return new Builder(jobId);
+        return new Builder(jobId, () -> true);
+    }
+
+    public Builder bulkPersisterBuilder(String jobId, Supplier<Boolean> shouldRetry) {
+        return new Builder(jobId, shouldRetry);
     }
 
     public class Builder {
-        private BulkRequest bulkRequest;
+        private final Map<String, IndexRequest> items;
         private final String jobId;
         private final String indexName;
-        private Supplier<Boolean> shouldRetry = () -> true;
+        private final Supplier<Boolean> shouldRetry;
 
-        private Builder(String jobId) {
-            this.bulkRequest = new BulkRequest();
+        private Builder(String jobId, Supplier<Boolean> shouldRetry) {
+            this.items = new LinkedHashMap<>();
             this.jobId = Objects.requireNonNull(jobId);
             this.indexName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
-        }
-
-        public Builder shouldRetry(Supplier<Boolean> shouldRetry) {
-            this.shouldRetry = Objects.requireNonNull(shouldRetry);
-            return this;
+            this.shouldRetry = shouldRetry;
         }
 
         /**
@@ -115,7 +129,7 @@ public class JobResultsPersister {
          * @param bucket The bucket to persist
          * @return this
          */
-        public Builder persistBucket(Bucket bucket) {
+        public synchronized Builder persistBucket(Bucket bucket) {
             // If the supplied bucket has records then create a copy with records
             // removed, because we never persist nested records in buckets
             Bucket bucketWithoutRecords = bucket;
@@ -132,7 +146,7 @@ public class JobResultsPersister {
             return this;
         }
 
-        private void persistBucketInfluencersStandalone(
+        private synchronized void persistBucketInfluencersStandalone(
             @SuppressWarnings("HiddenField") String jobId,
             List<BucketInfluencer> bucketInfluencers
         ) {
@@ -151,7 +165,7 @@ public class JobResultsPersister {
          * @param timingStats timing stats to persist
          * @return this
          */
-        public Builder persistTimingStats(TimingStats timingStats) {
+        public synchronized Builder persistTimingStats(TimingStats timingStats) {
             indexResult(
                 TimingStats.documentId(timingStats.getJobId()),
                 timingStats,
@@ -167,7 +181,7 @@ public class JobResultsPersister {
          * @param records the records to persist
          * @return this
          */
-        public Builder persistRecords(List<AnomalyRecord> records) {
+        public synchronized Builder persistRecords(List<AnomalyRecord> records) {
             for (AnomalyRecord record : records) {
                 logger.trace("[{}] ES BULK ACTION: index record to index [{}] with ID [{}]", jobId, indexName, record.getId());
                 indexResult(record.getId(), record, "record");
@@ -183,7 +197,7 @@ public class JobResultsPersister {
          * @param influencers the influencers to persist
          * @return this
          */
-        public Builder persistInfluencers(List<Influencer> influencers) {
+        public synchronized Builder persistInfluencers(List<Influencer> influencers) {
             for (Influencer influencer : influencers) {
                 logger.trace("[{}] ES BULK ACTION: index influencer to index [{}] with ID [{}]", jobId, indexName, influencer.getId());
                 indexResult(influencer.getId(), influencer, "influencer");
@@ -192,13 +206,13 @@ public class JobResultsPersister {
             return this;
         }
 
-        public Builder persistModelPlot(ModelPlot modelPlot) {
+        public synchronized Builder persistModelPlot(ModelPlot modelPlot) {
             logger.trace("[{}] ES BULK ACTION: index model plot to index [{}] with ID [{}]", jobId, indexName, modelPlot.getId());
             indexResult(modelPlot.getId(), modelPlot, "model plot");
             return this;
         }
 
-        public Builder persistCategorizerStats(CategorizerStats categorizerStats) {
+        public synchronized Builder persistCategorizerStats(CategorizerStats categorizerStats) {
             logger.trace(
                 "[{}] ES BULK ACTION: index categorizer stats to index [{}] with ID [{}]",
                 jobId,
@@ -209,20 +223,42 @@ public class JobResultsPersister {
             return this;
         }
 
-        public Builder persistForecast(Forecast forecast) {
+        public synchronized Builder persistCategoryDefinition(CategoryDefinition categoryDefinition) {
+            logger.trace(
+                "[{}] ES BULK ACTION: index category definition to index [{}] with ID [{}]",
+                jobId,
+                indexName,
+                categoryDefinition.getId()
+            );
+            indexResult(categoryDefinition.getId(), categoryDefinition, "category definition");
+            return this;
+        }
+
+        public synchronized Builder persistModelSizeStats(ModelSizeStats modelSizeStats) {
+            logger.trace(
+                "[{}] ES BULK ACTION: index model size stats to index [{}] with ID [{}]",
+                jobId,
+                indexName,
+                modelSizeStats.getId()
+            );
+            indexResult(modelSizeStats.getId(), modelSizeStats, "model size stats");
+            return this;
+        }
+
+        public synchronized Builder persistForecast(Forecast forecast) {
             logger.trace("[{}] ES BULK ACTION: index forecast to index [{}] with ID [{}]", jobId, indexName, forecast.getId());
             indexResult(forecast.getId(), forecast, Forecast.RESULT_TYPE_VALUE);
             return this;
         }
 
-        public Builder persistForecastRequestStats(ForecastRequestStats forecastRequestStats) {
+        public synchronized Builder persistForecastRequestStats(ForecastRequestStats forecastRequestStats) {
             logger.trace(
                 "[{}] ES BULK ACTION: index forecast request stats to index [{}] with ID [{}]",
                 jobId,
                 indexName,
                 forecastRequestStats.getId()
             );
-            indexResult(forecastRequestStats.getId(), forecastRequestStats, Forecast.RESULT_TYPE_VALUE);
+            indexResult(forecastRequestStats.getId(), forecastRequestStats, "forecast request stats");
             return this;
         }
 
@@ -232,12 +268,12 @@ public class JobResultsPersister {
 
         private void indexResult(String id, ToXContent resultDoc, ToXContent.Params params, String resultType) {
             try (XContentBuilder content = toXContentBuilder(resultDoc, params)) {
-                bulkRequest.add(new IndexRequest(indexName).id(id).source(content));
+                items.put(id, new IndexRequest(indexName).id(id).source(content));
             } catch (IOException e) {
                 logger.error(() -> format("[%s] Error serialising %s", jobId, resultType), e);
             }
 
-            if (bulkRequest.numberOfActions() >= JobRenormalizedResultsPersister.BULK_LIMIT) {
+            if (items.size() >= JobRenormalizedResultsPersister.BULK_LIMIT) {
                 executeRequest();
             }
         }
@@ -245,45 +281,36 @@ public class JobResultsPersister {
         /**
          * Execute the bulk action
          */
-        public void executeRequest() {
-            if (bulkRequest.numberOfActions() == 0) {
+        public synchronized void executeRequest() {
+            if (items.isEmpty()) {
                 return;
             }
-            logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, bulkRequest.numberOfActions());
+            logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, items.size());
             resultsPersisterService.bulkIndexWithRetry(
-                bulkRequest,
+                buildBulkRequest(),
                 jobId,
                 shouldRetry,
                 retryMessage -> logger.debug("[{}] Bulk indexing of results failed {}", jobId, retryMessage)
             );
-            bulkRequest = new BulkRequest();
+            clear();
         }
 
-        public void clearBulkRequest() {
-            bulkRequest = new BulkRequest();
+        private BulkRequest buildBulkRequest() {
+            BulkRequest bulkRequest = new BulkRequest();
+            for (IndexRequest item : items.values()) {
+                bulkRequest.add(item);
+            }
+            return bulkRequest;
         }
 
-        // for testing
-        BulkRequest getBulkRequest() {
-            return bulkRequest;
+        public synchronized void clear() {
+            items.clear();
         }
-    }
 
-    /**
-     * Persist the category definition
-     *
-     * @param category The category to be persisted
-     */
-    public void persistCategoryDefinition(CategoryDefinition category, Supplier<Boolean> shouldRetry) {
-        Persistable persistable = new Persistable(
-            AnomalyDetectorsIndex.resultsWriteAlias(category.getJobId()),
-            category.getJobId(),
-            category,
-            category.getId()
-        );
-        persistable.persist(shouldRetry, true);
-        // Don't commit as we expect masses of these updates and they're not
-        // read again by this process
+        // for testing
+        synchronized BulkRequest getBulkRequest() {
+            return buildBulkRequest();
+        }
     }
 
     /**
@@ -411,52 +438,47 @@ public class JobResultsPersister {
      * Once all the job data has been written this function will be
      * called to commit the writes to the datastore.
      *
-     * @param jobId The job Id
+     * @param jobId The job ID.
+     * @param commitType Which type of data will be committed?
      */
-    public void commitResultWrites(String jobId) {
-        // We refresh using the read alias in order to ensure all indices will
-        // be refreshed even if a rollover occurs in between.
-        String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
-
-        // Refresh should wait for Lucene to make the data searchable
-        logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName);
-        RefreshRequest refreshRequest = new RefreshRequest(indexName);
-        refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
-        try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
-            client.admin().indices().refresh(refreshRequest).actionGet();
-        }
+    public void commitWrites(String jobId, CommitType commitType) {
+        commitWrites(jobId, EnumSet.of(commitType));
     }
 
     /**
-     * Makes annotations searchable as they are considered part of a job's results
-     * to fulfil the contract that job results are searchable immediately after a
-     * close or flush.
+     * Once all the job data has been written this function will be
+     * called to commit the writes to the datastore.
+     *
+     * @param jobId The job ID.
+     * @param commitTypes Which type(s) of data will be committed?
      */
-    public void commitAnnotationWrites() {
-        // We refresh using the read alias in order to ensure all indices will
-        // be refreshed even if a rollover occurs in between.
-        RefreshRequest refreshRequest = new RefreshRequest(AnnotationIndex.READ_ALIAS_NAME);
-        refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
-        try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
-            client.admin().indices().refresh(refreshRequest).actionGet();
+    public void commitWrites(String jobId, Set<CommitType> commitTypes) {
+        if (commitTypes.isEmpty()) {
+            return;
+        }
+        List<String> indexNames = new ArrayList<>();
+        if (commitTypes.contains(CommitType.RESULTS)) {
+            // We refresh using the read alias in order to ensure all indices will
+            // be refreshed even if a rollover occurs in between.
+            indexNames.add(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
+        }
+        if (commitTypes.contains(CommitType.STATE)) {
+            indexNames.add(AnomalyDetectorsIndex.jobStateIndexPattern());
+        }
+        if (commitTypes.contains(CommitType.ANNOTATIONS)) {
+            // We refresh using the read alias in order to ensure all indices will
+            // be refreshed even if a rollover occurs in between.
+            indexNames.add(AnnotationIndex.READ_ALIAS_NAME);
         }
-    }
 
-    /**
-     * Once the job state has been written calling this function makes it
-     * immediately searchable.
-     *
-     * @param jobId The job Id
-     * */
-    public void commitStateWrites(String jobId) {
-        String indexName = AnomalyDetectorsIndex.jobStateIndexPattern();
         // Refresh should wait for Lucene to make the data searchable
-        logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName);
-        RefreshRequest refreshRequest = new RefreshRequest(indexName);
+        logger.trace("[{}] ES API CALL: refresh indices {}", jobId, indexNames);
+        RefreshRequest refreshRequest = new RefreshRequest(indexNames.toArray(String[]::new));
         refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
         try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
             client.admin().indices().refresh(refreshRequest).actionGet();
         }
+        logger.trace("[{}] ES API CALL: finished refresh indices {}", jobId, indexNames);
     }
 
     /**

+ 49 - 42
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

@@ -47,6 +47,7 @@ import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
 import java.time.Clock;
 import java.time.Duration;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -68,7 +69,7 @@ import static org.elasticsearch.xpack.core.ml.job.messages.Messages.JOB_FORECAST
  * <p>
  * Has methods to register and remove alert observers.
  * Also has a method to wait for a flush to be complete.
- *
+ * <p>
  * Buckets are the written last after records, influencers etc
  * when the end of bucket is reached. Therefore results aren't persisted
  * until the bucket is read, this means that interim results for all
@@ -80,7 +81,7 @@ import static org.elasticsearch.xpack.core.ml.job.messages.Messages.JOB_FORECAST
  */
 public class AutodetectResultProcessor {
 
-    private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class);
+    private static final Logger logger = LogManager.getLogger(AutodetectResultProcessor.class);
 
     private final Client client;
     private final AnomalyDetectionAuditor auditor;
@@ -157,8 +158,8 @@ public class AutodetectResultProcessor {
         this.process = Objects.requireNonNull(autodetectProcess);
         this.flushListener = Objects.requireNonNull(flushListener);
         this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats);
-        this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId).shouldRetry(this::isAlive);
-        this.bulkAnnotationsPersister = annotationPersister.bulkPersisterBuilder(jobId).shouldRetry(this::isAlive);
+        this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId, this::isAlive);
+        this.bulkAnnotationsPersister = annotationPersister.bulkPersisterBuilder(jobId, this::isAlive);
         this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister);
         this.clock = Objects.requireNonNull(clock);
         this.deleteInterimRequired = true;
@@ -181,9 +182,9 @@ public class AutodetectResultProcessor {
                     bulkAnnotationsPersister.executeRequest();
                 }
             } catch (Exception e) {
-                LOGGER.warn(() -> "[" + jobId + "] Error persisting autodetect results", e);
+                logger.warn(() -> "[" + jobId + "] Error persisting autodetect results", e);
             }
-            LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, currentRunBucketCount);
+            logger.info("[{}] {} buckets parsed from autodetect output", jobId, currentRunBucketCount);
 
         } catch (Exception e) {
             failed = true;
@@ -193,14 +194,14 @@ public class AutodetectResultProcessor {
                 // that it would have been better to close jobs before shutting down,
                 // but we now fully expect jobs to move between nodes without doing
                 // all their graceful close activities.
-                LOGGER.warn("[{}] some results not processed due to the process being killed", jobId);
+                logger.warn("[{}] some results not processed due to the process being killed", jobId);
             } else if (process.isProcessAliveAfterWaiting() == false) {
                 // Don't log the stack trace to not shadow the root cause.
-                LOGGER.warn("[{}] some results not processed due to the termination of autodetect", jobId);
+                logger.warn("[{}] some results not processed due to the termination of autodetect", jobId);
             } else {
                 // We should only get here if the iterator throws in which
                 // case parsing the autodetect output has failed.
-                LOGGER.error(() -> "[" + jobId + "] error parsing autodetect output", e);
+                logger.error(() -> "[" + jobId + "] error parsing autodetect output", e);
             }
         } finally {
             flushListener.clear();
@@ -218,13 +219,13 @@ public class AutodetectResultProcessor {
                     AutodetectResult result = iterator.next();
                     processResult(result);
                     if (result.getBucket() != null) {
-                        LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, currentRunBucketCount);
+                        logger.trace("[{}] Bucket number {} parsed from output", jobId, currentRunBucketCount);
                     }
                 } catch (Exception e) {
                     if (isAlive() == false) {
                         throw e;
                     }
-                    LOGGER.warn(() -> "[" + jobId + "] Error processing autodetect result", e);
+                    logger.warn(() -> "[" + jobId + "] Error processing autodetect result", e);
                 }
             }
         } finally {
@@ -250,9 +251,9 @@ public class AutodetectResultProcessor {
     void handleOpenForecasts() {
         try {
             if (runningForecasts.isEmpty() == false) {
-                LOGGER.warn("[{}] still had forecasts {} executing. Attempting to set them to failed.", jobId, runningForecasts.keySet());
+                logger.warn("[{}] still had forecasts {} executing. Attempting to set them to failed.", jobId, runningForecasts.keySet());
                 // There may be many docs in the results persistence queue. But we only want to bother updating the running forecasts
-                bulkResultsPersister.clearBulkRequest();
+                bulkResultsPersister.clear();
                 for (ForecastRequestStats forecastRequestStats : runningForecasts.values()) {
                     ForecastRequestStats failedStats = new ForecastRequestStats(forecastRequestStats);
                     failedStats.setStatus(ForecastRequestStats.ForecastRequestStatus.FAILED);
@@ -262,7 +263,7 @@ public class AutodetectResultProcessor {
                 bulkResultsPersister.executeRequest();
             }
         } catch (Exception ex) {
-            LOGGER.warn(() -> "[" + jobId + "] failure setting running forecasts to failed.", ex);
+            logger.warn(() -> "[" + jobId + "] failure setting running forecasts to failed.", ex);
         }
     }
 
@@ -276,19 +277,23 @@ public class AutodetectResultProcessor {
             if (deleteInterimRequired) {
                 // Delete any existing interim results generated by a Flush command
                 // which have not been replaced or superseded by new results.
-                LOGGER.trace("[{}] Deleting interim results", jobId);
+                logger.trace("[{}] Deleting interim results", jobId);
                 persister.deleteInterimResults(jobId);
-                deleteInterimRequired = false;
             }
 
             if (bucket.isInterim() == false) {
                 timingStatsReporter.reportBucket(bucket);
                 ++currentRunBucketCount;
             }
-            // persist after deleting interim results in case the new
-            // results are also interim
-            bulkResultsPersister.persistBucket(bucket).executeRequest();
-            bulkAnnotationsPersister.executeRequest();
+            bulkResultsPersister.persistBucket(bucket);
+            if (deleteInterimRequired || bucket.isInterim()) {
+                // Execute the bulk request after deleting interim results in case the new
+                // results are also interim. Also execute the bulk request after creating new
+                // interim results, so that they exist before any subsequent deletion.
+                bulkResultsPersister.executeRequest();
+                bulkAnnotationsPersister.executeRequest();
+                deleteInterimRequired = false;
+            }
         }
         List<AnomalyRecord> records = result.getRecords();
         if (records != null && records.isEmpty() == false) {
@@ -300,7 +305,7 @@ public class AutodetectResultProcessor {
         }
         CategoryDefinition categoryDefinition = result.getCategoryDefinition();
         if (categoryDefinition != null) {
-            persister.persistCategoryDefinition(categoryDefinition, this::isAlive);
+            bulkResultsPersister.persistCategoryDefinition(categoryDefinition);
         }
         CategorizerStats categorizerStats = result.getCategorizerStats();
         if (categorizerStats != null) {
@@ -321,7 +326,7 @@ public class AutodetectResultProcessor {
         }
         ForecastRequestStats forecastRequestStats = result.getForecastRequestStats();
         if (forecastRequestStats != null) {
-            LOGGER.trace("Received Forecast Stats [{}]", forecastRequestStats.getId());
+            logger.trace("Received Forecast Stats [{}]", forecastRequestStats.getId());
             bulkResultsPersister.persistForecastRequestStats(forecastRequestStats);
 
             if (forecastRequestStats.getStatus()
@@ -364,21 +369,24 @@ public class AutodetectResultProcessor {
         }
         Quantiles quantiles = result.getQuantiles();
         if (quantiles != null) {
-            LOGGER.debug("[{}] Parsed Quantiles with timestamp {}", jobId, quantiles.getTimestamp());
+            logger.debug("[{}] Parsed Quantiles with timestamp {}", jobId, quantiles.getTimestamp());
             persister.persistQuantiles(quantiles, this::isAlive);
-            bulkResultsPersister.executeRequest();
 
             // If a node is trying to shut down then don't trigger any further normalizations on the node
             if (vacating == false && processKilled == false && renormalizer.isEnabled()) {
-                // We need to make all results written up to these quantiles available for renormalization
-                persister.commitResultWrites(jobId);
-                LOGGER.debug("[{}] Quantiles queued for renormalization", jobId);
-                renormalizer.renormalize(quantiles);
+                logger.debug("[{}] Quantiles queued for renormalization", jobId);
+                renormalizer.renormalize(quantiles, () -> {
+                    // We need to make all results written up to these quantiles available for renormalization.
+                    // However, this should be done as close to the point of normalization as possible, as many
+                    // quantiles are superseded before they're used.
+                    bulkResultsPersister.executeRequest();
+                    persister.commitWrites(jobId, JobResultsPersister.CommitType.RESULTS);
+                });
             }
         }
         FlushAcknowledgement flushAcknowledgement = result.getFlushAcknowledgement();
         if (flushAcknowledgement != null) {
-            LOGGER.debug("[{}] Flush acknowledgement parsed from output for ID {}", jobId, flushAcknowledgement.getId());
+            logger.debug("[{}] Flush acknowledgement parsed from output for ID {}", jobId, flushAcknowledgement.getId());
             // Commit previous writes here, effectively continuing
             // the flush from the C++ autodetect process right
             // through to the data store
@@ -386,11 +394,12 @@ public class AutodetectResultProcessor {
             try {
                 bulkResultsPersister.executeRequest();
                 bulkAnnotationsPersister.executeRequest();
-                persister.commitResultWrites(jobId);
-                persister.commitAnnotationWrites();
-                LOGGER.debug("[{}] Flush acknowledgement sent to listener for ID {}", jobId, flushAcknowledgement.getId());
+                persister.commitWrites(
+                    jobId,
+                    EnumSet.of(JobResultsPersister.CommitType.RESULTS, JobResultsPersister.CommitType.ANNOTATIONS)
+                );
             } catch (Exception e) {
-                LOGGER.error(
+                logger.error(
                     "["
                         + jobId
                         + "] failed to bulk persist results and commit writes during flush acknowledgement for ID "
@@ -428,7 +437,7 @@ public class AutodetectResultProcessor {
     }
 
     private void processModelSizeStats(ModelSizeStats modelSizeStats) {
-        LOGGER.trace(
+        logger.trace(
             "[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}",
             jobId,
             modelSizeStats.getModelBytes(),
@@ -439,7 +448,7 @@ public class AutodetectResultProcessor {
             modelSizeStats.getMemoryStatus()
         );
 
-        persister.persistModelSizeStats(modelSizeStats, this::isAlive);
+        bulkResultsPersister.persistModelSizeStats(modelSizeStats);
         notifyModelMemoryStatusChange(modelSizeStats);
 
         latestModelSizeStats = modelSizeStats;
@@ -491,7 +500,7 @@ public class AutodetectResultProcessor {
             updateModelSnapshotSemaphore.acquire();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            LOGGER.info("[{}] Interrupted acquiring update model snapshot semaphore", jobId);
+            logger.info("[{}] Interrupted acquiring update model snapshot semaphore", jobId);
             return;
         }
 
@@ -499,13 +508,13 @@ public class AutodetectResultProcessor {
             @Override
             public void onResponse(PutJobAction.Response response) {
                 updateModelSnapshotSemaphore.release();
-                LOGGER.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId());
+                logger.debug("[{}] Updated job with model snapshot id [{}]", jobId, modelSnapshot.getSnapshotId());
             }
 
             @Override
             public void onFailure(Exception e) {
                 updateModelSnapshotSemaphore.release();
-                LOGGER.error("[" + jobId + "] Failed to update job with new model snapshot id [" + modelSnapshot.getSnapshotId() + "]", e);
+                logger.error("[" + jobId + "] Failed to update job with new model snapshot id [" + modelSnapshot.getSnapshotId() + "]", e);
             }
         });
     }
@@ -525,13 +534,11 @@ public class AutodetectResultProcessor {
 
             // These lines ensure that the "completion" we're awaiting includes making the results searchable
             waitUntilRenormalizerIsIdle();
-            persister.commitResultWrites(jobId);
-            persister.commitAnnotationWrites();
-            persister.commitStateWrites(jobId);
+            persister.commitWrites(jobId, EnumSet.allOf(JobResultsPersister.CommitType.class));
 
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            LOGGER.info("[{}] Interrupted waiting for results processor to complete", jobId);
+            logger.info("[{}] Interrupted waiting for results processor to complete", jobId);
         }
     }
 

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/JobSnapshotUpgraderResultProcessor.java

@@ -69,7 +69,7 @@ public class JobSnapshotUpgraderResultProcessor {
         this.snapshotId = Objects.requireNonNull(snapshotId);
         this.persister = Objects.requireNonNull(persister);
         this.process = Objects.requireNonNull(autodetectProcess);
-        this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId).shouldRetry(this::isAlive);
+        this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId, this::isAlive);
         this.flushListener = new FlushListener();
     }
 
@@ -249,7 +249,7 @@ public class JobSnapshotUpgraderResultProcessor {
             }
 
             // These lines ensure that the "completion" we're awaiting includes making the results searchable
-            persister.commitStateWrites(jobId);
+            persister.commitWrites(jobId, JobResultsPersister.CommitType.STATE);
 
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/Renormalizer.java

@@ -20,7 +20,7 @@ public interface Renormalizer {
      * Update the anomaly score field on all previously persisted buckets
      * and all contained records
      */
-    void renormalize(Quantiles quantiles);
+    void renormalize(Quantiles quantiles, Runnable setupStep);
 
     /**
      * Blocks until the renormalizer is idle and no further quantiles updates are pending.

+ 16 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizer.java

@@ -53,7 +53,7 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
     }
 
     @Override
-    public void renormalize(Quantiles quantiles) {
+    public void renormalize(Quantiles quantiles, Runnable setupStep) {
         if (isEnabled() == false) {
             return;
         }
@@ -61,8 +61,13 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
         // Needed to ensure work is not added while the tryFinishWork() method is running
         synchronized (this) {
             latestQuantilesHolder = (latestQuantilesHolder == null)
-                ? new AugmentedQuantiles(quantiles, null, new CountDownLatch(1))
-                : new AugmentedQuantiles(quantiles, latestQuantilesHolder.getEvictedTimestamp(), latestQuantilesHolder.getLatch());
+                ? new AugmentedQuantiles(quantiles, setupStep, null, new CountDownLatch(1))
+                : new AugmentedQuantiles(
+                    quantiles,
+                    setupStep,
+                    latestQuantilesHolder.getEvictedTimestamp(),
+                    latestQuantilesHolder.getLatch()
+                );
             tryStartWork();
         }
     }
@@ -153,6 +158,7 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
             AugmentedQuantiles latestAugmentedQuantiles = getLatestAugmentedQuantilesAndClear();
             assert latestAugmentedQuantiles != null;
             if (latestAugmentedQuantiles != null) { // TODO: remove this if the assert doesn't trip in CI over the next year or so
+                latestAugmentedQuantiles.runSetupStep();
                 Quantiles latestQuantiles = latestAugmentedQuantiles.getQuantiles();
                 CountDownLatch latch = latestAugmentedQuantiles.getLatch();
                 try {
@@ -181,11 +187,13 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
      */
     private class AugmentedQuantiles {
         private final Quantiles quantiles;
+        private final Runnable setupStep;
         private final Date earliestEvictedTimestamp;
         private final CountDownLatch latch;
 
-        AugmentedQuantiles(Quantiles quantiles, Date earliestEvictedTimestamp, CountDownLatch latch) {
+        AugmentedQuantiles(Quantiles quantiles, Runnable setupStep, Date earliestEvictedTimestamp, CountDownLatch latch) {
             this.quantiles = Objects.requireNonNull(quantiles);
+            this.setupStep = Objects.requireNonNull(setupStep);
             this.earliestEvictedTimestamp = earliestEvictedTimestamp;
             this.latch = Objects.requireNonNull(latch);
         }
@@ -194,6 +202,10 @@ public class ShortCircuitingRenormalizer implements Renormalizer {
             return quantiles;
         }
 
+        void runSetupStep() {
+            setupStep.run();
+        }
+
         Date getEvictedTimestamp() {
             return (earliestEvictedTimestamp != null) ? earliestEvictedTimestamp : quantiles.getTimestamp();
         }

+ 2 - 5
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java

@@ -74,7 +74,6 @@ public class JobResultsPersisterTests extends ESTestCase {
     private static final String JOB_ID = "foo";
 
     private Client client;
-    private OriginSettingClient originSettingClient;
     private ArgumentCaptor<BulkRequest> bulkRequestCaptor;
     private JobResultsPersister persister;
 
@@ -83,7 +82,7 @@ public class JobResultsPersisterTests extends ESTestCase {
         bulkRequestCaptor = ArgumentCaptor.forClass(BulkRequest.class);
         client = mock(Client.class);
         doAnswer(withResponse(mock(BulkResponse.class))).when(client).execute(eq(BulkAction.INSTANCE), any(), any());
-        originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN);
+        OriginSettingClient originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN);
         persister = new JobResultsPersister(originSettingClient, buildResultsPersisterService(originSettingClient));
     }
 
@@ -222,8 +221,8 @@ public class JobResultsPersisterTests extends ESTestCase {
 
     public void testBulkRequestExecutesWhenReachMaxDocs() {
         JobResultsPersister.Builder bulkBuilder = persister.bulkPersisterBuilder("foo");
-        ModelPlot modelPlot = new ModelPlot("foo", new Date(), 123456, 0);
         for (int i = 0; i <= JobRenormalizedResultsPersister.BULK_LIMIT; i++) {
+            ModelPlot modelPlot = new ModelPlot("foo", new Date(), 123456, i);
             bulkBuilder.persistModelPlot(modelPlot);
         }
 
@@ -282,7 +281,6 @@ public class JobResultsPersisterTests extends ESTestCase {
         );
     }
 
-    @SuppressWarnings("unchecked")
     public void testPersistDatafeedTimingStats() {
         DatafeedTimingStats timingStats = new DatafeedTimingStats(
             "foo",
@@ -325,7 +323,6 @@ public class JobResultsPersisterTests extends ESTestCase {
         );
     }
 
-    @SuppressWarnings("unchecked")
     private void testPersistQuantilesSync(SearchHits searchHits, String expectedIndexOrAlias) {
         SearchResponse searchResponse = mock(SearchResponse.class);
         when(searchResponse.status()).thenReturn(RestStatus.OK);

+ 2 - 4
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java

@@ -186,13 +186,11 @@ public class AutodetectProcessManagerTests extends ESTestCase {
         jobResultsProvider = mock(JobResultsProvider.class);
         jobResultsPersister = mock(JobResultsPersister.class);
         JobResultsPersister.Builder bulkPersisterBuilder = mock(JobResultsPersister.Builder.class);
-        when(bulkPersisterBuilder.shouldRetry(any())).thenReturn(bulkPersisterBuilder);
-        when(jobResultsPersister.bulkPersisterBuilder(any())).thenReturn(bulkPersisterBuilder);
+        when(jobResultsPersister.bulkPersisterBuilder(any(), any())).thenReturn(bulkPersisterBuilder);
         jobDataCountsPersister = mock(JobDataCountsPersister.class);
         annotationPersister = mock(AnnotationPersister.class);
         AnnotationPersister.Builder bulkAnnotationsBuilder = mock(AnnotationPersister.Builder.class);
-        when(bulkAnnotationsBuilder.shouldRetry(any())).thenReturn(bulkAnnotationsBuilder);
-        when(annotationPersister.bulkPersisterBuilder(any())).thenReturn(bulkAnnotationsBuilder);
+        when(annotationPersister.bulkPersisterBuilder(any(), any())).thenReturn(bulkAnnotationsBuilder);
         autodetectCommunicator = mock(AutodetectCommunicator.class);
         autodetectFactory = mock(AutodetectProcessFactory.class);
         normalizerFactory = mock(NormalizerFactory.class);

+ 48 - 54
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java

@@ -58,6 +58,7 @@ import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -84,7 +85,6 @@ public class AutodetectResultProcessorTests extends ESTestCase {
     private static final long BUCKET_SPAN_MS = 1000;
     private static final Instant CURRENT_TIME = Instant.ofEpochMilli(2000000000);
 
-    private ThreadPool threadPool;
     private Client client;
     private AnomalyDetectionAuditor auditor;
     private Renormalizer renormalizer;
@@ -101,19 +101,17 @@ public class AutodetectResultProcessorTests extends ESTestCase {
     public void setUpMocks() {
         executor = new Scheduler.SafeScheduledThreadPoolExecutor(1);
         client = mock(Client.class);
-        threadPool = mock(ThreadPool.class);
+        ThreadPool threadPool = mock(ThreadPool.class);
         when(client.threadPool()).thenReturn(threadPool);
         when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
         auditor = mock(AnomalyDetectionAuditor.class);
         renormalizer = mock(Renormalizer.class);
         persister = mock(JobResultsPersister.class);
         bulkResultsPersister = mock(JobResultsPersister.Builder.class);
-        when(bulkResultsPersister.shouldRetry(any())).thenReturn(bulkResultsPersister);
-        when(persister.bulkPersisterBuilder(eq(JOB_ID))).thenReturn(bulkResultsPersister);
+        when(persister.bulkPersisterBuilder(eq(JOB_ID), any())).thenReturn(bulkResultsPersister);
         annotationPersister = mock(AnnotationPersister.class);
         bulkAnnotationsPersister = mock(AnnotationPersister.Builder.class);
-        when(bulkAnnotationsPersister.shouldRetry(any())).thenReturn(bulkAnnotationsPersister);
-        when(annotationPersister.bulkPersisterBuilder(eq(JOB_ID))).thenReturn(bulkAnnotationsPersister);
+        when(annotationPersister.bulkPersisterBuilder(eq(JOB_ID), any())).thenReturn(bulkAnnotationsPersister);
         process = mock(AutodetectProcess.class);
         flushListener = mock(FlushListener.class);
         processorUnderTest = new AutodetectResultProcessor(
@@ -133,7 +131,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
 
     @After
     public void cleanup() {
-        verify(annotationPersister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(annotationPersister).bulkPersisterBuilder(eq(JOB_ID), any());
         verifyNoMoreInteractions(auditor, renormalizer, persister, annotationPersister);
         executor.shutdown();
     }
@@ -147,10 +145,8 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L)));
 
         verify(renormalizer).waitUntilIdle();
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
-        verify(persister).commitResultWrites(JOB_ID);
-        verify(persister).commitAnnotationWrites();
-        verify(persister).commitStateWrites(JOB_ID);
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
+        verify(persister).commitWrites(JOB_ID, EnumSet.allOf(JobResultsPersister.CommitType.class));
     }
 
     public void testProcessResult_bucket() {
@@ -163,10 +159,10 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         processorUnderTest.setDeleteInterimRequired(false);
         processorUnderTest.processResult(result);
 
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
         verify(bulkResultsPersister).persistTimingStats(any(TimingStats.class));
         verify(bulkResultsPersister).persistBucket(bucket);
-        verify(bulkResultsPersister).executeRequest();
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(bulkResultsPersister, never()).executeRequest();
         verify(persister, never()).deleteInterimResults(JOB_ID);
     }
 
@@ -184,7 +180,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(bulkResultsPersister).persistTimingStats(any(TimingStats.class));
         verify(bulkResultsPersister).persistBucket(bucket);
         verify(bulkResultsPersister).executeRequest();
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
         verify(persister).deleteInterimResults(JOB_ID);
     }
 
@@ -203,7 +199,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(bulkResultsPersister, never()).persistTimingStats(any(TimingStats.class));
         verify(bulkResultsPersister).persistBucket(bucket);
         verify(bulkResultsPersister).executeRequest();
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
     }
 
     public void testProcessResult_records() {
@@ -219,7 +215,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
 
         verify(bulkResultsPersister).persistRecords(records);
         verify(bulkResultsPersister, never()).executeRequest();
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
     }
 
     public void testProcessResult_influencers() {
@@ -235,7 +231,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
 
         verify(bulkResultsPersister).persistInfluencers(influencers);
         verify(bulkResultsPersister, never()).executeRequest();
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
     }
 
     public void testProcessResult_categoryDefinition() {
@@ -247,9 +243,9 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         processorUnderTest.setDeleteInterimRequired(false);
         processorUnderTest.processResult(result);
 
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
+        verify(bulkResultsPersister).persistCategoryDefinition(eq(categoryDefinition));
         verify(bulkResultsPersister, never()).executeRequest();
-        verify(persister).persistCategoryDefinition(eq(categoryDefinition), any());
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
     }
 
     public void testProcessResult_flushAcknowledgement() {
@@ -262,17 +258,19 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         processorUnderTest.processResult(result);
         assertTrue(processorUnderTest.isDeleteInterimRequired());
 
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
         verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
-        verify(persister).commitResultWrites(JOB_ID);
-        verify(persister).commitAnnotationWrites();
+        verify(persister).commitWrites(
+            JOB_ID,
+            EnumSet.of(JobResultsPersister.CommitType.RESULTS, JobResultsPersister.CommitType.ANNOTATIONS)
+        );
         verify(bulkResultsPersister).executeRequest();
     }
 
     public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
         AutodetectResult result = mock(AutodetectResult.class);
         FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
-        when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
+        when(flushAcknowledgement.getId()).thenReturn(Integer.valueOf(randomInt(100)).toString());
         when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
         CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
         when(categoryDefinition.getCategoryId()).thenReturn(1L);
@@ -283,11 +281,13 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         assertTrue(processorUnderTest.isDeleteInterimRequired());
 
         InOrder inOrder = inOrder(persister, bulkResultsPersister, flushListener);
-        inOrder.verify(persister).bulkPersisterBuilder(eq(JOB_ID));
-        inOrder.verify(persister).persistCategoryDefinition(eq(categoryDefinition), any());
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
+        inOrder.verify(bulkResultsPersister).persistCategoryDefinition(eq(categoryDefinition));
         inOrder.verify(bulkResultsPersister).executeRequest();
-        inOrder.verify(persister).commitResultWrites(JOB_ID);
-        inOrder.verify(persister).commitAnnotationWrites();
+        verify(persister).commitWrites(
+            JOB_ID,
+            EnumSet.of(JobResultsPersister.CommitType.RESULTS, JobResultsPersister.CommitType.ANNOTATIONS)
+        );
         inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement, null);
     }
 
@@ -299,7 +299,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         processorUnderTest.setDeleteInterimRequired(false);
         processorUnderTest.processResult(result);
 
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
         verify(bulkResultsPersister).persistModelPlot(modelPlot);
     }
 
@@ -310,7 +310,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
 
         processorUnderTest.processResult(result);
 
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
         verify(bulkAnnotationsPersister).persistAnnotation(annotation);
         if (annotation.getEvent() == Annotation.Event.CATEGORIZATION_STATUS_CHANGE) {
             verify(auditor).warning(eq(JOB_ID), anyString());
@@ -326,8 +326,8 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         processorUnderTest.processResult(result);
         assertThat(processorUnderTest.modelSizeStats(), is(equalTo(modelSizeStats)));
 
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
-        verify(persister).persistModelSizeStats(eq(modelSizeStats), any());
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
+        verify(bulkResultsPersister).persistModelSizeStats(eq(modelSizeStats));
     }
 
     public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() {
@@ -357,8 +357,8 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         when(result.getModelSizeStats()).thenReturn(modelSizeStats);
         processorUnderTest.processResult(result);
 
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
-        verify(persister, times(4)).persistModelSizeStats(any(ModelSizeStats.class), any());
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
+        verify(bulkResultsPersister, times(4)).persistModelSizeStats(any(ModelSizeStats.class));
         // We should have only fired two notifications: one for soft_limit and one for hard_limit
         verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT));
         verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb"));
@@ -381,7 +381,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         when(result.getAnnotation()).thenReturn(annotation);
         processorUnderTest.processResult(result);
 
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
         verify(bulkAnnotationsPersister).persistAnnotation(annotation);
         verify(auditor).warning(JOB_ID, "Categorization status changed to 'warn' for partition 'foo' after 0 buckets");
     }
@@ -419,7 +419,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
             new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build()
         );
 
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
         verify(persister).persistModelSnapshot(eq(modelSnapshot), eq(WriteRequest.RefreshPolicy.IMMEDIATE), any());
         verify(bulkAnnotationsPersister).persistAnnotation(ModelSnapshot.annotationDocumentId(modelSnapshot), expectedAnnotation);
         verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any());
@@ -434,12 +434,10 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         processorUnderTest.setDeleteInterimRequired(false);
         processorUnderTest.processResult(result);
 
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
         verify(persister).persistQuantiles(eq(quantiles), any());
-        verify(bulkResultsPersister).executeRequest();
-        verify(persister).commitResultWrites(JOB_ID);
         verify(renormalizer).isEnabled();
-        verify(renormalizer).renormalize(quantiles);
+        verify(renormalizer).renormalize(eq(quantiles), any(Runnable.class));
     }
 
     public void testProcessResult_quantiles_givenRenormalizationIsDisabled() {
@@ -451,9 +449,9 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         processorUnderTest.setDeleteInterimRequired(false);
         processorUnderTest.processResult(result);
 
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
         verify(persister).persistQuantiles(eq(quantiles), any());
-        verify(bulkResultsPersister).executeRequest();
+        verify(bulkResultsPersister, never()).executeRequest();
         verify(renormalizer).isEnabled();
     }
 
@@ -466,10 +464,8 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L)));
         assertThat(processorUnderTest.updateModelSnapshotSemaphore.availablePermits(), is(equalTo(1)));
 
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
-        verify(persister).commitResultWrites(JOB_ID);
-        verify(persister).commitAnnotationWrites();
-        verify(persister).commitStateWrites(JOB_ID);
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
+        verify(persister).commitWrites(JOB_ID, EnumSet.allOf(JobResultsPersister.CommitType.class));
         verify(renormalizer).waitUntilIdle();
     }
 
@@ -486,7 +482,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
 
         processorUnderTest.process();
 
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
         verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE), any());
     }
 
@@ -507,7 +503,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         );
         assertThat(flushAcknowledgement, is(nullValue()));
 
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
     }
 
     public void testKill() throws Exception {
@@ -520,11 +516,9 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L)));
         assertThat(processorUnderTest.updateModelSnapshotSemaphore.availablePermits(), is(equalTo(1)));
 
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
-        verify(persister).commitResultWrites(JOB_ID);
-        verify(persister).commitAnnotationWrites();
-        verify(persister).commitStateWrites(JOB_ID);
-        verify(renormalizer, never()).renormalize(any());
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
+        verify(persister).commitWrites(JOB_ID, EnumSet.allOf(JobResultsPersister.CommitType.class));
+        verify(renormalizer, never()).renormalize(any(), any());
         verify(renormalizer).shutdown();
         verify(renormalizer).waitUntilIdle();
         verify(flushListener).clear();
@@ -546,7 +540,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
 
         verify(bulkResultsPersister, times(2)).persistForecastRequestStats(argument.capture());
         verify(bulkResultsPersister, times(1)).executeRequest();
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
         verify(persister, never()).deleteInterimResults(JOB_ID);
 
         // Get all values is in reverse call order
@@ -578,7 +572,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
 
         verify(bulkResultsPersister, times(2)).persistForecastRequestStats(argument.capture());
         verify(bulkResultsPersister, times(1)).executeRequest();
-        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID), any());
         verify(persister, never()).deleteInterimResults(JOB_ID);
 
         List<ForecastRequestStats> stats = argument.getAllValues();

+ 2 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/ShortCircuitingRenormalizerTests.java

@@ -47,12 +47,12 @@ public class ShortCircuitingRenormalizerTests extends ESTestCase {
             // Blast through many sets of quantiles in quick succession, faster than the normalizer can process them
             for (int i = 1; i < TEST_SIZE / 2; ++i) {
                 Quantiles quantiles = new Quantiles(JOB_ID, new Date(), Integer.toString(i));
-                renormalizer.renormalize(quantiles);
+                renormalizer.renormalize(quantiles, () -> {});
             }
             renormalizer.waitUntilIdle();
             for (int i = TEST_SIZE / 2; i <= TEST_SIZE; ++i) {
                 Quantiles quantiles = new Quantiles(JOB_ID, new Date(), Integer.toString(i));
-                renormalizer.renormalize(quantiles);
+                renormalizer.renormalize(quantiles, () -> {});
             }
             renormalizer.waitUntilIdle();