Browse Source

Extract TimingStats-related functionality into TimingStatsReporter (#43371)

Przemysław Witek 6 years ago
parent
commit
f26f26630d

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java

@@ -864,7 +864,7 @@ public class ElasticsearchMappings {
             .startObject(TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName())
                 .field(TYPE, DOUBLE)
             .endObject()
-            .startObject(TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName())
+            .startObject(TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName())
                 .field(TYPE, DOUBLE)
             .endObject();
     }

+ 3 - 33
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStats.java

@@ -31,7 +31,7 @@ public class TimingStats implements ToXContentObject, Writeable {
     public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
     public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
     public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
-    public static final ParseField EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS =
+    public static final ParseField EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS =
         new ParseField("exponential_average_bucket_processing_time_ms");
 
     public static final ParseField TYPE = new ParseField("timing_stats");
@@ -49,7 +49,7 @@ public class TimingStats implements ToXContentObject, Writeable {
         PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
         PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
         PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
-        PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS);
+        PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS);
     }
 
     public static String documentId(String jobId) {
@@ -185,7 +185,7 @@ public class TimingStats implements ToXContentObject, Writeable {
             builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs);
         }
         if (exponentialAvgBucketProcessingTimeMs != null) {
-            builder.field(EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
+            builder.field(EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
         }
         builder.endObject();
         return builder;
@@ -219,34 +219,4 @@ public class TimingStats implements ToXContentObject, Writeable {
     public String toString() {
         return Strings.toString(this);
     }
-
-    /**
-     * Returns true if given stats objects differ from each other by more than 10% for at least one of the statistics.
-     */
-    public static boolean differSignificantly(TimingStats stats1, TimingStats stats2) {
-        return differSignificantly(stats1.minBucketProcessingTimeMs, stats2.minBucketProcessingTimeMs)
-            || differSignificantly(stats1.maxBucketProcessingTimeMs, stats2.maxBucketProcessingTimeMs)
-            || differSignificantly(stats1.avgBucketProcessingTimeMs, stats2.avgBucketProcessingTimeMs)
-            || differSignificantly(stats1.exponentialAvgBucketProcessingTimeMs, stats2.exponentialAvgBucketProcessingTimeMs);
-    }
-
-    /**
-     * Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO.
-     * This can be interpreted as values { value1, value2 } differing significantly from each other.
-     * This method also returns:
-     *   - {@code true} in case one value is {@code null} while the other is not.
-     *   - {@code false} in case both values are {@code null}.
-     */
-    static boolean differSignificantly(Double value1, Double value2) {
-        if (value1 != null && value2 != null) {
-            return (value2 / value1 < MIN_VALID_RATIO) || (value1 / value2 < MIN_VALID_RATIO);
-        }
-        return (value1 != null) || (value2 != null);
-    }
-
-    /**
-     * Minimum ratio of values that is interpreted as values being similar.
-     * If the values ratio is less than MIN_VALID_RATIO, the values are interpreted as significantly different.
-     */
-    private static final double MIN_VALID_RATIO = 0.9;
 }

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java

@@ -179,7 +179,7 @@ public final class ReservedFieldNames {
             TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
             TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
             TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
-            TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
+            TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
 
             GetResult._ID,
             GetResult._INDEX,

+ 0 - 28
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStatsTests.java

@@ -13,7 +13,6 @@ import org.hamcrest.Matcher;
 
 import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 
 public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
@@ -124,33 +123,6 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
         assertThat(TimingStats.documentId("my-job-id"), equalTo("my-job-id_timing_stats"));
     }
 
-    public void testTimingStatsDifferSignificantly() {
-        assertThat(
-            TimingStats.differSignificantly(
-                new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0)),
-            is(false));
-        assertThat(
-            TimingStats.differSignificantly(
-                new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 11.0, 1.0, 10.0)),
-            is(false));
-        assertThat(
-            TimingStats.differSignificantly(
-                new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 12.0, 1.0, 10.0)),
-            is(true));
-    }
-
-    public void testValuesDifferSignificantly() {
-        assertThat(TimingStats.differSignificantly((Double) null, (Double) null), is(false));
-        assertThat(TimingStats.differSignificantly(1.0, null), is(true));
-        assertThat(TimingStats.differSignificantly(null, 1.0), is(true));
-        assertThat(TimingStats.differSignificantly(0.9, 1.0), is(false));
-        assertThat(TimingStats.differSignificantly(1.0, 0.9), is(false));
-        assertThat(TimingStats.differSignificantly(0.9, 1.000001), is(true));
-        assertThat(TimingStats.differSignificantly(1.0, 0.899999), is(true));
-        assertThat(TimingStats.differSignificantly(0.0, 1.0), is(true));
-        assertThat(TimingStats.differSignificantly(1.0, 0.0), is(true));
-    }
-
     /**
      * Creates a matcher of {@link TimingStats}s that matches when an examined stats are equal
      * to the specified <code>operand</code>, within a range of +/- <code>error</code>.

+ 79 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporter.java

@@ -0,0 +1,79 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.job.persistence;
+
+import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
+
+import java.util.Objects;
+
+/**
+ * {@link TimingStatsReporter} class handles the logic of persisting {@link TimingStats} if they changed significantly since the last time
+ * they were persisted.
+ *
+ * This class is not thread-safe.
+ */
+public class TimingStatsReporter {
+
+    /** Persisted timing stats. May be stale. */
+    private TimingStats persistedTimingStats;
+    /** Current timing stats. */
+    private TimingStats currentTimingStats;
+    /** Object used to persist current timing stats. */
+    private JobResultsPersister.Builder bulkResultsPersister;
+
+    public TimingStatsReporter(TimingStats timingStats, JobResultsPersister.Builder jobResultsPersister) {
+        Objects.requireNonNull(timingStats);
+        this.persistedTimingStats = new TimingStats(timingStats);
+        this.currentTimingStats = new TimingStats(timingStats);
+        this.bulkResultsPersister = Objects.requireNonNull(jobResultsPersister);
+    }
+
+    public TimingStats getCurrentTimingStats() {
+        return new TimingStats(currentTimingStats);
+    }
+
+    public void reportBucketProcessingTime(long bucketProcessingTimeMs) {
+        currentTimingStats.updateStats(bucketProcessingTimeMs);
+        if (differSignificantly(currentTimingStats, persistedTimingStats)) {
+            flush();
+        }
+    }
+
+    public void flush() {
+        persistedTimingStats = new TimingStats(currentTimingStats);
+        bulkResultsPersister.persistTimingStats(persistedTimingStats);
+    }
+
+    /**
+     * Returns true if given stats objects differ from each other by more than 10% for at least one of the statistics.
+     */
+    public static boolean differSignificantly(TimingStats stats1, TimingStats stats2) {
+        return differSignificantly(stats1.getMinBucketProcessingTimeMs(), stats2.getMinBucketProcessingTimeMs())
+            || differSignificantly(stats1.getMaxBucketProcessingTimeMs(), stats2.getMaxBucketProcessingTimeMs())
+            || differSignificantly(stats1.getAvgBucketProcessingTimeMs(), stats2.getAvgBucketProcessingTimeMs())
+            || differSignificantly(stats1.getExponentialAvgBucketProcessingTimeMs(), stats2.getExponentialAvgBucketProcessingTimeMs());
+    }
+
+    /**
+     * Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO.
+     * This can be interpreted as values { value1, value2 } differing significantly from each other.
+     * This method also returns:
+     *   - {@code true} in case one value is {@code null} while the other is not.
+     *   - {@code false} in case both values are {@code null}.
+     */
+    static boolean differSignificantly(Double value1, Double value2) {
+        if (value1 != null && value2 != null) {
+            return (value2 / value1 < MIN_VALID_RATIO) || (value1 / value2 < MIN_VALID_RATIO);
+        }
+        return (value1 != null) || (value2 != null);
+    }
+
+    /**
+     * Minimum ratio of values that is interpreted as values being similar.
+     * If the values ratio is less than MIN_VALID_RATIO, the values are interpreted as significantly different.
+     */
+    private static final double MIN_VALID_RATIO = 0.9;
+}

+ 2 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

@@ -517,12 +517,13 @@ public class AutodetectProcessManager implements ClusterStateListener {
                 jobId,
                 renormalizer,
                 jobResultsPersister,
+                process,
                 autodetectParams.modelSizeStats(),
                 autodetectParams.timingStats());
         ExecutorService autodetectWorkerExecutor;
         try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
             autodetectWorkerExecutor = createAutodetectExecutorService(autodetectExecutorService);
-            autodetectExecutorService.submit(() -> processor.process(process));
+            autodetectExecutorService.submit(processor::process);
         } catch (EsRejectedExecutionException e) {
             // If submitting the operation to read the results from the process fails we need to close
             // the process too, so that other submitted operations to threadpool are stopped.

+ 63 - 77
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

@@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
 import org.elasticsearch.xpack.core.ml.job.results.Influencer;
 import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
+import org.elasticsearch.xpack.ml.job.persistence.TimingStatsReporter;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
 import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
 import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
@@ -53,7 +54,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 
 /**
  * A runnable class that reads the autodetect process output in the
- * {@link #process(AutodetectProcess)} method and persists parsed
+ * {@link #process()} method and persists parsed
  * results via the {@linkplain JobResultsPersister} passed in the constructor.
  * <p>
  * Has methods to register and remove alert observers.
@@ -77,6 +78,8 @@ public class AutodetectResultProcessor {
     private final String jobId;
     private final Renormalizer renormalizer;
     private final JobResultsPersister persister;
+    private final AutodetectProcess process;
+    private final TimingStatsReporter timingStatsReporter;
 
     final CountDownLatch completionLatch = new CountDownLatch(1);
     final Semaphore updateModelSnapshotSemaphore = new Semaphore(1);
@@ -84,57 +87,55 @@ public class AutodetectResultProcessor {
     private volatile boolean processKilled;
     private volatile boolean failed;
     private int bucketCount; // only used from the process() thread, so doesn't need to be volatile
+    private final JobResultsPersister.Builder bulkResultsPersister;
+    private boolean deleteInterimRequired;
 
     /**
      * New model size stats are read as the process is running
      */
     private volatile ModelSizeStats latestModelSizeStats;
 
-    /**
-     * Current timing stats
-     */
-    private volatile TimingStats timingStats;
-
-    /**
-     * Persisted timing stats. May be stale
-     */
-    private TimingStats persistedTimingStats; // only used from the process() thread, so doesn't need to be volatile
-
-    public AutodetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
+    public AutodetectResultProcessor(Client client,
+                                     Auditor auditor,
+                                     String jobId,
+                                     Renormalizer renormalizer,
                                      JobResultsPersister persister,
+                                     AutodetectProcess process,
                                      ModelSizeStats latestModelSizeStats,
                                      TimingStats timingStats) {
-        this(client, auditor, jobId, renormalizer, persister, latestModelSizeStats, timingStats, new FlushListener());
+        this(client, auditor, jobId, renormalizer, persister, process, latestModelSizeStats, timingStats, new FlushListener());
     }
 
+    // Visible for testing
     AutodetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
-                              JobResultsPersister persister, ModelSizeStats latestModelSizeStats, TimingStats timingStats,
+                              JobResultsPersister persister, AutodetectProcess autodetectProcess, ModelSizeStats latestModelSizeStats,
+                              TimingStats timingStats,
                               FlushListener flushListener) {
         this.client = Objects.requireNonNull(client);
         this.auditor = Objects.requireNonNull(auditor);
         this.jobId = Objects.requireNonNull(jobId);
         this.renormalizer = Objects.requireNonNull(renormalizer);
         this.persister = Objects.requireNonNull(persister);
+        this.process = Objects.requireNonNull(autodetectProcess);
         this.flushListener = Objects.requireNonNull(flushListener);
         this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats);
-        this.persistedTimingStats = Objects.requireNonNull(timingStats);
-        this.timingStats = new TimingStats(persistedTimingStats);
+        this.bulkResultsPersister = persister.bulkPersisterBuilder(jobId);
+        this.timingStatsReporter = new TimingStatsReporter(timingStats, bulkResultsPersister);
+        this.deleteInterimRequired = true;
     }
 
-    public void process(AutodetectProcess process) {
-        Context context = new Context(jobId, persister.bulkPersisterBuilder(jobId));
+    public void process() {
 
         // If a function call in this throws for some reason we don't want it
         // to kill the results reader thread as autodetect will be blocked
         // trying to write its output.
         try {
-            readResults(process, context);
+            readResults();
 
             try {
                 if (processKilled == false) {
-                    context.bulkResultsPersister
-                        .persistTimingStats(timingStats)
-                        .executeRequest();
+                    timingStatsReporter.flush();
+                    bulkResultsPersister.executeRequest();
                 }
             } catch (Exception e) {
                 LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e);
@@ -164,14 +165,14 @@ public class AutodetectResultProcessor {
         }
     }
 
-    private void readResults(AutodetectProcess process, Context context) {
+    private void readResults() {
         bucketCount = 0;
         try {
             Iterator<AutodetectResult> iterator = process.readAutodetectResults();
             while (iterator.hasNext()) {
                 try {
                     AutodetectResult result = iterator.next();
-                    processResult(context, result);
+                    processResult(result);
                     if (result.getBucket() != null) {
                         LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
                     }
@@ -195,35 +196,35 @@ public class AutodetectResultProcessor {
         renormalizer.shutdown();
     }
 
-    void processResult(Context context, AutodetectResult result) {
+    void processResult(AutodetectResult result) {
         if (processKilled) {
             return;
         }
 
         Bucket bucket = result.getBucket();
         if (bucket != null) {
-            if (context.deleteInterimRequired) {
+            if (deleteInterimRequired) {
                 // Delete any existing interim results generated by a Flush command
                 // which have not been replaced or superseded by new results.
-                LOGGER.trace("[{}] Deleting interim results", context.jobId);
-                persister.deleteInterimResults(context.jobId);
-                context.deleteInterimRequired = false;
+                LOGGER.trace("[{}] Deleting interim results", jobId);
+                persister.deleteInterimResults(jobId);
+                deleteInterimRequired = false;
             }
 
             // persist after deleting interim results in case the new
             // results are also interim
-            processTimingStats(context, bucket.getProcessingTimeMs());
-            context.bulkResultsPersister.persistBucket(bucket).executeRequest();
+            timingStatsReporter.reportBucketProcessingTime(bucket.getProcessingTimeMs());
+            bulkResultsPersister.persistBucket(bucket).executeRequest();
 
             ++bucketCount;
         }
         List<AnomalyRecord> records = result.getRecords();
         if (records != null && !records.isEmpty()) {
-            context.bulkResultsPersister.persistRecords(records);
+            bulkResultsPersister.persistRecords(records);
         }
         List<Influencer> influencers = result.getInfluencers();
         if (influencers != null && !influencers.isEmpty()) {
-            context.bulkResultsPersister.persistInfluencers(influencers);
+            bulkResultsPersister.persistInfluencers(influencers);
         }
         CategoryDefinition categoryDefinition = result.getCategoryDefinition();
         if (categoryDefinition != null) {
@@ -231,16 +232,16 @@ public class AutodetectResultProcessor {
         }
         ModelPlot modelPlot = result.getModelPlot();
         if (modelPlot != null) {
-            context.bulkResultsPersister.persistModelPlot(modelPlot);
+            bulkResultsPersister.persistModelPlot(modelPlot);
         }
         Forecast forecast = result.getForecast();
         if (forecast != null) {
-            context.bulkResultsPersister.persistForecast(forecast);
+            bulkResultsPersister.persistForecast(forecast);
         }
         ForecastRequestStats forecastRequestStats = result.getForecastRequestStats();
         if (forecastRequestStats != null) {
             LOGGER.trace("Received Forecast Stats [{}]", forecastRequestStats.getId());
-            context.bulkResultsPersister.persistForecastRequestStats(forecastRequestStats);
+            bulkResultsPersister.persistForecastRequestStats(forecastRequestStats);
 
             // execute the bulk request only in some cases or in doubt
             // otherwise rely on the count-based trigger
@@ -252,13 +253,13 @@ public class AutodetectResultProcessor {
                 case SCHEDULED:
                 case FINISHED:
                 default:
-                    context.bulkResultsPersister.executeRequest();
+                    bulkResultsPersister.executeRequest();
 
             }
         }
         ModelSizeStats modelSizeStats = result.getModelSizeStats();
         if (modelSizeStats != null) {
-            processModelSizeStats(context, modelSizeStats);
+            processModelSizeStats(modelSizeStats);
         }
         ModelSnapshot modelSnapshot = result.getModelSnapshot();
         if (modelSnapshot != null) {
@@ -270,64 +271,55 @@ public class AutodetectResultProcessor {
         }
         Quantiles quantiles = result.getQuantiles();
         if (quantiles != null) {
-            LOGGER.debug("[{}] Parsed Quantiles with timestamp {}", context.jobId, quantiles.getTimestamp());
+            LOGGER.debug("[{}] Parsed Quantiles with timestamp {}", jobId, quantiles.getTimestamp());
             persister.persistQuantiles(quantiles);
-            context.bulkResultsPersister.executeRequest();
+            bulkResultsPersister.executeRequest();
 
             if (processKilled == false && renormalizer.isEnabled()) {
                 // We need to make all results written up to these quantiles available for renormalization
-                persister.commitResultWrites(context.jobId);
-                LOGGER.debug("[{}] Quantiles queued for renormalization", context.jobId);
+                persister.commitResultWrites(jobId);
+                LOGGER.debug("[{}] Quantiles queued for renormalization", jobId);
                 renormalizer.renormalize(quantiles);
             }
         }
         FlushAcknowledgement flushAcknowledgement = result.getFlushAcknowledgement();
         if (flushAcknowledgement != null) {
-            LOGGER.debug("[{}] Flush acknowledgement parsed from output for ID {}", context.jobId, flushAcknowledgement.getId());
+            LOGGER.debug("[{}] Flush acknowledgement parsed from output for ID {}", jobId, flushAcknowledgement.getId());
             // Commit previous writes here, effectively continuing
             // the flush from the C++ autodetect process right
             // through to the data store
-            context.bulkResultsPersister.executeRequest();
-            persister.commitResultWrites(context.jobId);
+            bulkResultsPersister.executeRequest();
+            persister.commitResultWrites(jobId);
             flushListener.acknowledgeFlush(flushAcknowledgement);
             // Interim results may have been produced by the flush,
             // which need to be
             // deleted when the next finalized results come through
-            context.deleteInterimRequired = true;
-        }
-    }
-
-    private void processTimingStats(Context context, long bucketProcessingTimeMs) {
-        timingStats.updateStats(bucketProcessingTimeMs);
-        if (TimingStats.differSignificantly(timingStats, persistedTimingStats)) {
-            context.bulkResultsPersister.persistTimingStats(timingStats);
-            persistedTimingStats = timingStats;
-            timingStats = new TimingStats(persistedTimingStats);
+            deleteInterimRequired = true;
         }
     }
 
-    private void processModelSizeStats(Context context, ModelSizeStats modelSizeStats) {
+    private void processModelSizeStats(ModelSizeStats modelSizeStats) {
         LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}",
-                context.jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(),
+                jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(),
                 modelSizeStats.getTotalOverFieldCount(), modelSizeStats.getTotalPartitionFieldCount(),
                 modelSizeStats.getBucketAllocationFailuresCount(), modelSizeStats.getMemoryStatus());
 
         persister.persistModelSizeStats(modelSizeStats);
-        notifyModelMemoryStatusChange(context, modelSizeStats);
+        notifyModelMemoryStatusChange(modelSizeStats);
         latestModelSizeStats = modelSizeStats;
     }
 
-    private void notifyModelMemoryStatusChange(Context context, ModelSizeStats modelSizeStats) {
+    private void notifyModelMemoryStatusChange(ModelSizeStats modelSizeStats) {
         ModelSizeStats.MemoryStatus memoryStatus = modelSizeStats.getMemoryStatus();
         if (memoryStatus != latestModelSizeStats.getMemoryStatus()) {
             if (memoryStatus == ModelSizeStats.MemoryStatus.SOFT_LIMIT) {
-                auditor.warning(context.jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT));
+                auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT));
             } else if (memoryStatus == ModelSizeStats.MemoryStatus.HARD_LIMIT) {
                 if (modelSizeStats.getModelBytesMemoryLimit() == null || modelSizeStats.getModelBytesExceeded() == null) {
-                    auditor.error(context.jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT_PRE_7_2,
+                    auditor.error(jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT_PRE_7_2,
                         new ByteSizeValue(modelSizeStats.getModelBytes(), ByteSizeUnit.BYTES).toString()));
                 } else {
-                    auditor.error(context.jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT,
+                    auditor.error(jobId, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT,
                         new ByteSizeValue(modelSizeStats.getModelBytesMemoryLimit(), ByteSizeUnit.BYTES).toString(),
                         new ByteSizeValue(modelSizeStats.getModelBytesExceeded(), ByteSizeUnit.BYTES).toString()));
                 }
@@ -419,25 +411,19 @@ public class AutodetectResultProcessor {
         return failed;
     }
 
-    static class Context {
-
-        private final String jobId;
-        private JobResultsPersister.Builder bulkResultsPersister;
-
-        boolean deleteInterimRequired;
-
-        Context(String jobId, JobResultsPersister.Builder bulkResultsPersister) {
-            this.jobId = jobId;
-            this.deleteInterimRequired = true;
-            this.bulkResultsPersister = bulkResultsPersister;
-        }
-    }
-
     public ModelSizeStats modelSizeStats() {
         return latestModelSizeStats;
     }
 
     public TimingStats timingStats() {
-        return timingStats;
+        return timingStatsReporter.getCurrentTimingStats();
+    }
+
+    boolean isDeleteInterimRequired() {
+        return deleteInterimRequired;
+    }
+
+    void setDeleteInterimRequired(boolean deleteInterimRequired) {
+        this.deleteInterimRequired = deleteInterimRequired;
     }
 }

+ 46 - 35
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java

@@ -50,7 +50,6 @@ import org.junit.After;
 import org.junit.Before;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
@@ -77,6 +76,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
     private List<ModelSnapshot> capturedUpdateModelSnapshotOnJobRequests;
     private AutodetectResultProcessor resultProcessor;
     private Renormalizer renormalizer;
+    private AutodetectProcess process;
 
     @Override
     protected Collection<Class<? extends Plugin>> getPlugins() {
@@ -90,9 +90,17 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         Auditor auditor = new Auditor(client(), "test_node");
         jobResultsProvider = new JobResultsProvider(client(), builder.build());
         renormalizer = mock(Renormalizer.class);
+        process = mock(AutodetectProcess.class);
         capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
-        resultProcessor = new AutodetectResultProcessor(client(), auditor, JOB_ID, renormalizer,
-                new JobResultsPersister(client()), new ModelSizeStats.Builder(JOB_ID).build(), new TimingStats(JOB_ID)) {
+        resultProcessor = new AutodetectResultProcessor(
+                client(),
+                auditor,
+                JOB_ID,
+                renormalizer,
+                new JobResultsPersister(client()),
+                process,
+                new ModelSizeStats.Builder(JOB_ID).build(),
+                new TimingStats(JOB_ID)) {
             @Override
             protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
                 capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot);
@@ -110,25 +118,26 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
     }
 
     public void testProcessResults() throws Exception {
-        ResultsBuilder builder = new ResultsBuilder();
+        ResultsBuilder resultsBuilder = new ResultsBuilder();
         Bucket bucket = createBucket(false);
-        builder.addBucket(bucket);
+        resultsBuilder.addBucket(bucket);
         List<AnomalyRecord> records = createRecords(false);
-        builder.addRecords(records);
+        resultsBuilder.addRecords(records);
         List<Influencer> influencers = createInfluencers(false);
-        builder.addInfluencers(influencers);
+        resultsBuilder.addInfluencers(influencers);
         CategoryDefinition categoryDefinition = createCategoryDefinition();
-        builder.addCategoryDefinition(categoryDefinition);
+        resultsBuilder.addCategoryDefinition(categoryDefinition);
         ModelPlot modelPlot = createModelPlot();
-        builder.addModelPlot(modelPlot);
+        resultsBuilder.addModelPlot(modelPlot);
         ModelSizeStats modelSizeStats = createModelSizeStats();
-        builder.addModelSizeStats(modelSizeStats);
+        resultsBuilder.addModelSizeStats(modelSizeStats);
         ModelSnapshot modelSnapshot = createModelSnapshot();
-        builder.addModelSnapshot(modelSnapshot);
+        resultsBuilder.addModelSnapshot(modelSnapshot);
         Quantiles quantiles = createQuantiles();
-        builder.addQuantiles(quantiles);
+        resultsBuilder.addQuantiles(quantiles);
+        when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
 
-        resultProcessor.process(builder.buildTestProcess());
+        resultProcessor.process();
         resultProcessor.awaitCompletion();
 
         BucketsQueryBuilder bucketsQuery = new BucketsQueryBuilder().includeInterim(true);
@@ -167,7 +176,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
     }
 
     public void testProcessResults_TimingStats() throws Exception {
-        ResultsBuilder resultBuilder = new ResultsBuilder()
+        ResultsBuilder resultsBuilder = new ResultsBuilder()
                 .addBucket(createBucket(true, 100))
                 .addBucket(createBucket(true, 1000))
                 .addBucket(createBucket(true, 100))
@@ -178,8 +187,9 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
                 .addBucket(createBucket(true, 1000))
                 .addBucket(createBucket(true, 100))
                 .addBucket(createBucket(true, 1000));
+        when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
 
-        resultProcessor.process(resultBuilder.buildTestProcess());
+        resultProcessor.process();
         resultProcessor.awaitCompletion();
 
         TimingStats timingStats = resultProcessor.timingStats();
@@ -194,11 +204,12 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
     public void testParseQuantiles_GivenRenormalizationIsEnabled() throws Exception {
         when(renormalizer.isEnabled()).thenReturn(true);
 
-        ResultsBuilder builder = new ResultsBuilder();
+        ResultsBuilder resultsBuilder = new ResultsBuilder();
         Quantiles quantiles = createQuantiles();
-        builder.addQuantiles(quantiles);
+        resultsBuilder.addQuantiles(quantiles);
+        when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
 
-        resultProcessor.process(builder.buildTestProcess());
+        resultProcessor.process();
         resultProcessor.awaitCompletion();
 
         Optional<Quantiles> persistedQuantiles = getQuantiles();
@@ -210,11 +221,12 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
     public void testParseQuantiles_GivenRenormalizationIsDisabled() throws Exception {
         when(renormalizer.isEnabled()).thenReturn(false);
 
-        ResultsBuilder builder = new ResultsBuilder();
+        ResultsBuilder resultsBuilder = new ResultsBuilder();
         Quantiles quantiles = createQuantiles();
-        builder.addQuantiles(quantiles);
+        resultsBuilder.addQuantiles(quantiles);
+        when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
 
-        resultProcessor.process(builder.buildTestProcess());
+        resultProcessor.process();
         resultProcessor.awaitCompletion();
 
         Optional<Quantiles> persistedQuantiles = getQuantiles();
@@ -227,14 +239,15 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         Bucket nonInterimBucket = createBucket(false);
         Bucket interimBucket = createBucket(true);
 
-        ResultsBuilder resultBuilder = new ResultsBuilder()
+        ResultsBuilder resultsBuilder = new ResultsBuilder()
                 .addRecords(createRecords(true))
                 .addInfluencers(createInfluencers(true))
                 .addBucket(interimBucket)  // this will persist the interim results
                 .addFlushAcknowledgement(createFlushAcknowledgement())
                 .addBucket(nonInterimBucket); // and this will delete the interim results
+        when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
 
-        resultProcessor.process(resultBuilder.buildTestProcess());
+        resultProcessor.process();
         resultProcessor.awaitCompletion();
 
         QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true));
@@ -255,7 +268,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         Bucket finalBucket = createBucket(true);
         List<AnomalyRecord> finalAnomalyRecords = createRecords(true);
 
-        ResultsBuilder resultBuilder = new ResultsBuilder()
+        ResultsBuilder resultsBuilder = new ResultsBuilder()
                 .addRecords(createRecords(true))
                 .addInfluencers(createInfluencers(true))
                 .addBucket(createBucket(true))  // this will persist the interim results
@@ -265,8 +278,9 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
                 .addFlushAcknowledgement(createFlushAcknowledgement())
                 .addRecords(finalAnomalyRecords)
                 .addBucket(finalBucket); // this deletes the previous interim and persists final bucket & records
+        when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
 
-        resultProcessor.process(resultBuilder.buildTestProcess());
+        resultProcessor.process();
         resultProcessor.awaitCompletion();
 
         QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true));
@@ -285,12 +299,13 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         List<AnomalyRecord> firstSetOfRecords = createRecords(false);
         List<AnomalyRecord> secondSetOfRecords = createRecords(false);
 
-        ResultsBuilder resultBuilder = new ResultsBuilder()
+        ResultsBuilder resultsBuilder = new ResultsBuilder()
                 .addRecords(firstSetOfRecords)
                 .addBucket(bucket)  // bucket triggers persistence
                 .addRecords(secondSetOfRecords);
+        when(process.readAutodetectResults()).thenReturn(resultsBuilder.build().iterator());
 
-        resultProcessor.process(resultBuilder.buildTestProcess());
+        resultProcessor.process();
         resultProcessor.awaitCompletion();
 
         QueryPage<Bucket> persistedBucket = getBucketQueryPage(new BucketsQueryBuilder().includeInterim(true));
@@ -389,9 +404,9 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         return new FlushAcknowledgement(randomAlphaOfLength(5), randomDate());
     }
 
-    private class ResultsBuilder {
+    private static class ResultsBuilder {
 
-        private List<AutodetectResult> results = new ArrayList<>();
+        private final List<AutodetectResult> results = new ArrayList<>();
 
         ResultsBuilder addBucket(Bucket bucket) {
             results.add(new AutodetectResult(Objects.requireNonNull(bucket), null, null, null, null, null, null, null, null, null, null));
@@ -438,12 +453,8 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
             return this;
         }
 
-
-        AutodetectProcess buildTestProcess() {
-            AutodetectResult[] results = this.results.toArray(new AutodetectResult[0]);
-            AutodetectProcess process = mock(AutodetectProcess.class);
-            when(process.readAutodetectResults()).thenReturn(Arrays.asList(results).iterator());
-            return process;
+        Iterable<AutodetectResult> build() {
+            return results;
         }
     }
 

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java

@@ -839,7 +839,7 @@ public class JobResultsProviderTests extends ESTestCase {
                     TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 1.0,
                     TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 1000.0,
                     TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 666.0,
-                    TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 777.0));
+                    TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 777.0));
         SearchResponse response = createSearchResponse(source);
         Client client = getMockedClient(
             queryBuilder -> assertThat(queryBuilder.getName(), equalTo("ids")),

+ 105 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/TimingStatsReporterTests.java

@@ -0,0 +1,105 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.job.persistence;
+
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
+import org.junit.Before;
+import org.mockito.InOrder;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+public class TimingStatsReporterTests extends ESTestCase {
+
+    private static final String JOB_ID = "my-job-id";
+
+    private JobResultsPersister.Builder bulkResultsPersister;
+
+    @Before
+    public void setUpTests() {
+        bulkResultsPersister = mock(JobResultsPersister.Builder.class);
+    }
+
+    public void testGetCurrentTimingStats() {
+        TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
+        TimingStatsReporter reporter = new TimingStatsReporter(stats, bulkResultsPersister);
+        assertThat(reporter.getCurrentTimingStats(), equalTo(stats));
+
+        verifyZeroInteractions(bulkResultsPersister);
+    }
+
+    public void testReporting() {
+        TimingStatsReporter reporter = new TimingStatsReporter(new TimingStats(JOB_ID), bulkResultsPersister);
+        assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID)));
+
+        reporter.reportBucketProcessingTime(10);
+        assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0)));
+
+        reporter.reportBucketProcessingTime(20);
+        assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 2, 10.0, 20.0, 15.0, 10.1)));
+
+        reporter.reportBucketProcessingTime(15);
+        assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 3, 10.0, 20.0, 15.0, 10.149)));
+
+        InOrder inOrder = inOrder(bulkResultsPersister);
+        inOrder.verify(bulkResultsPersister).persistTimingStats(new TimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0));
+        inOrder.verify(bulkResultsPersister).persistTimingStats(new TimingStats(JOB_ID, 2, 10.0, 20.0, 15.0, 10.1));
+        inOrder.verifyNoMoreInteractions();
+    }
+
+    public void testFlush() {
+        TimingStatsReporter reporter = new TimingStatsReporter(new TimingStats(JOB_ID), bulkResultsPersister);
+        assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID)));
+
+        reporter.reportBucketProcessingTime(10);
+        assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0)));
+
+        reporter.reportBucketProcessingTime(10);
+        assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 2, 10.0, 10.0, 10.0, 10.0)));
+
+        reporter.reportBucketProcessingTime(10);
+        assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0)));
+
+        reporter.flush();
+        assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0)));
+
+        InOrder inOrder = inOrder(bulkResultsPersister);
+        inOrder.verify(bulkResultsPersister).persistTimingStats(new TimingStats(JOB_ID, 1, 10.0, 10.0, 10.0, 10.0));
+        inOrder.verify(bulkResultsPersister).persistTimingStats(new TimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0));
+        inOrder.verifyNoMoreInteractions();
+    }
+
+    public void testTimingStatsDifferSignificantly() {
+        assertThat(
+            TimingStatsReporter.differSignificantly(
+                new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0)),
+            is(false));
+        assertThat(
+            TimingStatsReporter.differSignificantly(
+                new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 11.0, 1.0, 10.0)),
+            is(false));
+        assertThat(
+            TimingStatsReporter.differSignificantly(
+                new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 12.0, 1.0, 10.0)),
+            is(true));
+    }
+
+    public void testValuesDifferSignificantly() {
+        assertThat(TimingStatsReporter.differSignificantly((Double) null, (Double) null), is(false));
+        assertThat(TimingStatsReporter.differSignificantly(1.0, null), is(true));
+        assertThat(TimingStatsReporter.differSignificantly(null, 1.0), is(true));
+        assertThat(TimingStatsReporter.differSignificantly(0.9, 1.0), is(false));
+        assertThat(TimingStatsReporter.differSignificantly(1.0, 0.9), is(false));
+        assertThat(TimingStatsReporter.differSignificantly(0.9, 1.000001), is(true));
+        assertThat(TimingStatsReporter.differSignificantly(1.0, 0.899999), is(true));
+        assertThat(TimingStatsReporter.differSignificantly(0.0, 1.0), is(true));
+        assertThat(TimingStatsReporter.differSignificantly(1.0, 0.0), is(true));
+    }
+}

+ 147 - 187
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java

@@ -52,6 +52,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.mockito.Matchers.any;
@@ -77,6 +78,8 @@ public class AutodetectResultProcessorTests extends ESTestCase {
     private Auditor auditor;
     private Renormalizer renormalizer;
     private JobResultsPersister persister;
+    private JobResultsPersister.Builder bulkBuilder;
+    private AutodetectProcess process;
     private FlushListener flushListener;
     private AutodetectResultProcessor processorUnderTest;
     private ScheduledThreadPoolExecutor executor;
@@ -91,8 +94,9 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         auditor = mock(Auditor.class);
         renormalizer = mock(Renormalizer.class);
         persister = mock(JobResultsPersister.class);
-        when(persister.persistModelSnapshot(any(), any()))
-                .thenReturn(new IndexResponse(new ShardId("ml", "uid", 0), "doc", "1", 0L, 0L, 0L, true));
+        bulkBuilder = mock(JobResultsPersister.Builder.class);
+        when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
+        process = mock(AutodetectProcess.class);
         flushListener = mock(FlushListener.class);
         processorUnderTest = new AutodetectResultProcessor(
             client,
@@ -100,6 +104,7 @@ public class AutodetectResultProcessorTests extends ESTestCase {
             JOB_ID,
             renormalizer,
             persister,
+            process,
             new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(),
             new TimingStats(JOB_ID),
             flushListener);
@@ -107,143 +112,120 @@ public class AutodetectResultProcessorTests extends ESTestCase {
 
     @After
     public void cleanup() {
+        verifyNoMoreInteractions(auditor, renormalizer, persister);
         executor.shutdown();
     }
 
     public void testProcess() throws TimeoutException {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-        when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
         AutodetectResult autodetectResult = mock(AutodetectResult.class);
-        @SuppressWarnings("unchecked")
-        Iterator<AutodetectResult> iterator = mock(Iterator.class);
-        when(iterator.hasNext()).thenReturn(true).thenReturn(false);
-        when(iterator.next()).thenReturn(autodetectResult);
-        AutodetectProcess process = mock(AutodetectProcess.class);
-        when(process.readAutodetectResults()).thenReturn(iterator);
-        processorUnderTest.process(process);
+        when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator());
+
+        processorUnderTest.process();
         processorUnderTest.awaitCompletion();
-        verify(renormalizer, times(1)).waitUntilIdle();
-        assertEquals(0, processorUnderTest.completionLatch.getCount());
+        assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L)));
+
+        verify(renormalizer).waitUntilIdle();
+        verify(persister).bulkPersisterBuilder(JOB_ID);
+        verify(persister).commitResultWrites(JOB_ID);
+        verify(persister).commitStateWrites(JOB_ID);
     }
 
     public void testProcessResult_bucket() {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-        when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
         when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
         when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
-
-        AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
-        context.deleteInterimRequired = false;
         AutodetectResult result = mock(AutodetectResult.class);
         Bucket bucket = mock(Bucket.class);
         when(result.getBucket()).thenReturn(bucket);
-        processorUnderTest.processResult(context, result);
 
-        verify(bulkBuilder, times(1)).persistTimingStats(any(TimingStats.class));
-        verify(bulkBuilder, times(1)).persistBucket(bucket);
-        verify(bulkBuilder, times(1)).executeRequest();
+        processorUnderTest.setDeleteInterimRequired(false);
+        processorUnderTest.processResult(result);
+
+        verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
+        verify(bulkBuilder).persistBucket(bucket);
+        verify(bulkBuilder).executeRequest();
+        verify(persister).bulkPersisterBuilder(JOB_ID);
         verify(persister, never()).deleteInterimResults(JOB_ID);
-        verifyNoMoreInteractions(persister);
     }
 
     public void testProcessResult_bucket_deleteInterimRequired() {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-        when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
         when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder);
         when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder);
-
-        AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
-        context.deleteInterimRequired = true;
         AutodetectResult result = mock(AutodetectResult.class);
         Bucket bucket = mock(Bucket.class);
         when(result.getBucket()).thenReturn(bucket);
-        processorUnderTest.processResult(context, result);
-
-        verify(bulkBuilder, times(1)).persistTimingStats(any(TimingStats.class));
-        verify(bulkBuilder, times(1)).persistBucket(bucket);
-        verify(bulkBuilder, times(1)).executeRequest();
-        verify(persister, times(1)).deleteInterimResults(JOB_ID);
-        verifyNoMoreInteractions(persister);
-        assertFalse(context.deleteInterimRequired);
+
+        processorUnderTest.processResult(result);
+        assertFalse(processorUnderTest.isDeleteInterimRequired());
+
+        verify(bulkBuilder).persistTimingStats(any(TimingStats.class));
+        verify(bulkBuilder).persistBucket(bucket);
+        verify(bulkBuilder).executeRequest();
+        verify(persister).bulkPersisterBuilder(JOB_ID);
+        verify(persister).deleteInterimResults(JOB_ID);
     }
 
     public void testProcessResult_records() {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-        when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
-
-        AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context("foo", bulkBuilder);
-        context.deleteInterimRequired = false;
         AutodetectResult result = mock(AutodetectResult.class);
-        AnomalyRecord record1 = new AnomalyRecord("foo", new Date(123), 123);
-        AnomalyRecord record2 = new AnomalyRecord("foo", new Date(123), 123);
-        List<AnomalyRecord> records = Arrays.asList(record1, record2);
+        List<AnomalyRecord> records =
+            Arrays.asList(
+                new AnomalyRecord(JOB_ID, new Date(123), 123),
+                new AnomalyRecord(JOB_ID, new Date(123), 123));
         when(result.getRecords()).thenReturn(records);
-        processorUnderTest.processResult(context, result);
 
-        verify(bulkBuilder, times(1)).persistRecords(records);
+        processorUnderTest.setDeleteInterimRequired(false);
+        processorUnderTest.processResult(result);
+
+        verify(bulkBuilder).persistRecords(records);
         verify(bulkBuilder, never()).executeRequest();
-        verifyNoMoreInteractions(persister);
+        verify(persister).bulkPersisterBuilder(JOB_ID);
     }
 
-
     public void testProcessResult_influencers() {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-        when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
-
-        AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
-        context.deleteInterimRequired = false;
         AutodetectResult result = mock(AutodetectResult.class);
-        Influencer influencer1 = new Influencer(JOB_ID, "infField", "infValue", new Date(123), 123);
-        Influencer influencer2 = new Influencer(JOB_ID, "infField2", "infValue2", new Date(123), 123);
-        List<Influencer> influencers = Arrays.asList(influencer1, influencer2);
+        List<Influencer> influencers =
+            Arrays.asList(
+                new Influencer(JOB_ID, "infField", "infValue", new Date(123), 123),
+                new Influencer(JOB_ID, "infField2", "infValue2", new Date(123), 123));
         when(result.getInfluencers()).thenReturn(influencers);
-        processorUnderTest.processResult(context, result);
 
-        verify(bulkBuilder, times(1)).persistInfluencers(influencers);
+        processorUnderTest.setDeleteInterimRequired(false);
+        processorUnderTest.processResult(result);
+
+        verify(bulkBuilder).persistInfluencers(influencers);
         verify(bulkBuilder, never()).executeRequest();
-        verifyNoMoreInteractions(persister);
+        verify(persister).bulkPersisterBuilder(JOB_ID);
     }
 
     public void testProcessResult_categoryDefinition() {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-        when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
-
-        AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
-        context.deleteInterimRequired = false;
         AutodetectResult result = mock(AutodetectResult.class);
         CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
         when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
-        processorUnderTest.processResult(context, result);
+
+        processorUnderTest.setDeleteInterimRequired(false);
+        processorUnderTest.processResult(result);
 
         verify(bulkBuilder, never()).executeRequest();
-        verify(persister, times(1)).persistCategoryDefinition(categoryDefinition);
-        verifyNoMoreInteractions(persister);
+        verify(persister).persistCategoryDefinition(categoryDefinition);
+        verify(persister).bulkPersisterBuilder(JOB_ID);
     }
 
     public void testProcessResult_flushAcknowledgement() {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-        when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
-
-        AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
-        context.deleteInterimRequired = false;
         AutodetectResult result = mock(AutodetectResult.class);
         FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
         when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
         when(result.getFlushAcknowledgement()).thenReturn(flushAcknowledgement);
-        processorUnderTest.processResult(context, result);
 
-        verify(flushListener, times(1)).acknowledgeFlush(flushAcknowledgement);
-        verify(persister, times(1)).commitResultWrites(JOB_ID);
-        verify(bulkBuilder, times(1)).executeRequest();
-        verifyNoMoreInteractions(persister);
-        assertTrue(context.deleteInterimRequired);
+        processorUnderTest.setDeleteInterimRequired(false);
+        processorUnderTest.processResult(result);
+        assertTrue(processorUnderTest.isDeleteInterimRequired());
+
+        verify(persister).bulkPersisterBuilder(JOB_ID);
+        verify(flushListener).acknowledgeFlush(flushAcknowledgement);
+        verify(persister).commitResultWrites(JOB_ID);
+        verify(bulkBuilder).executeRequest();
     }
 
     public void testProcessResult_flushAcknowledgementMustBeProcessedLast() {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-
-        AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
-        context.deleteInterimRequired = false;
         AutodetectResult result = mock(AutodetectResult.class);
         FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class);
         when(flushAcknowledgement.getId()).thenReturn(JOB_ID);
@@ -251,64 +233,61 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         CategoryDefinition categoryDefinition = mock(CategoryDefinition.class);
         when(result.getCategoryDefinition()).thenReturn(categoryDefinition);
 
+        processorUnderTest.setDeleteInterimRequired(false);
+        processorUnderTest.processResult(result);
+        assertTrue(processorUnderTest.isDeleteInterimRequired());
+
         InOrder inOrder = inOrder(persister, bulkBuilder, flushListener);
-        processorUnderTest.processResult(context, result);
-
-        inOrder.verify(persister, times(1)).persistCategoryDefinition(categoryDefinition);
-        inOrder.verify(bulkBuilder, times(1)).executeRequest();
-        inOrder.verify(persister, times(1)).commitResultWrites(JOB_ID);
-        inOrder.verify(flushListener, times(1)).acknowledgeFlush(flushAcknowledgement);
-        verifyNoMoreInteractions(persister);
-        assertTrue(context.deleteInterimRequired);
+        inOrder.verify(persister).bulkPersisterBuilder(JOB_ID);
+        inOrder.verify(persister).persistCategoryDefinition(categoryDefinition);
+        inOrder.verify(bulkBuilder).executeRequest();
+        inOrder.verify(persister).commitResultWrites(JOB_ID);
+        inOrder.verify(flushListener).acknowledgeFlush(flushAcknowledgement);
     }
 
     public void testProcessResult_modelPlot() {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-
-        AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
-        context.deleteInterimRequired = false;
         AutodetectResult result = mock(AutodetectResult.class);
         ModelPlot modelPlot = mock(ModelPlot.class);
         when(result.getModelPlot()).thenReturn(modelPlot);
-        processorUnderTest.processResult(context, result);
 
-        verify(bulkBuilder, times(1)).persistModelPlot(modelPlot);
-        verifyNoMoreInteractions(persister);
+        processorUnderTest.setDeleteInterimRequired(false);
+        processorUnderTest.processResult(result);
+
+        verify(persister).bulkPersisterBuilder(JOB_ID);
+        verify(bulkBuilder).persistModelPlot(modelPlot);
     }
 
     public void testProcessResult_modelSizeStats() {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-
-        AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
-        context.deleteInterimRequired = false;
         AutodetectResult result = mock(AutodetectResult.class);
         ModelSizeStats modelSizeStats = mock(ModelSizeStats.class);
         when(result.getModelSizeStats()).thenReturn(modelSizeStats);
-        processorUnderTest.processResult(context, result);
 
-        verify(persister, times(1)).persistModelSizeStats(modelSizeStats);
-        verifyNoMoreInteractions(persister);
-        assertEquals(modelSizeStats, processorUnderTest.modelSizeStats());
+        processorUnderTest.setDeleteInterimRequired(false);
+        processorUnderTest.processResult(result);
+        assertThat(processorUnderTest.modelSizeStats(), is(equalTo(modelSizeStats)));
+
+        verify(persister).bulkPersisterBuilder(JOB_ID);
+        verify(persister).persistModelSizeStats(modelSizeStats);
     }
 
     public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-
-        setupScheduleDelayTime(TimeValue.timeValueSeconds(5));
+        TimeValue delay = TimeValue.timeValueSeconds(5);
+        // Set up schedule delay time
+        when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString()))
+            .thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[0], delay.nanos(), TimeUnit.NANOSECONDS));
 
-        AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
-        context.deleteInterimRequired = false;
         AutodetectResult result = mock(AutodetectResult.class);
+        processorUnderTest.setDeleteInterimRequired(false);
 
         // First one with soft_limit
         ModelSizeStats modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.SOFT_LIMIT).build();
         when(result.getModelSizeStats()).thenReturn(modelSizeStats);
-        processorUnderTest.processResult(context, result);
+        processorUnderTest.processResult(result);
 
         // Another with soft_limit
         modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.SOFT_LIMIT).build();
         when(result.getModelSizeStats()).thenReturn(modelSizeStats);
-        processorUnderTest.processResult(context, result);
+        processorUnderTest.processResult(result);
 
         // Now with hard_limit
         modelSizeStats = new ModelSizeStats.Builder(JOB_ID)
@@ -317,115 +296,104 @@ public class AutodetectResultProcessorTests extends ESTestCase {
                 .setModelBytesExceeded(new ByteSizeValue(1, ByteSizeUnit.KB).getBytes())
                 .build();
         when(result.getModelSizeStats()).thenReturn(modelSizeStats);
-        processorUnderTest.processResult(context, result);
+        processorUnderTest.processResult(result);
 
         // And another with hard_limit
         modelSizeStats = new ModelSizeStats.Builder(JOB_ID).setMemoryStatus(ModelSizeStats.MemoryStatus.HARD_LIMIT).build();
         when(result.getModelSizeStats()).thenReturn(modelSizeStats);
-        processorUnderTest.processResult(context, result);
+        processorUnderTest.processResult(result);
 
+        verify(persister).bulkPersisterBuilder(JOB_ID);
+        verify(persister, times(4)).persistModelSizeStats(any(ModelSizeStats.class));
         // We should have only fired two notifications: one for soft_limit and one for hard_limit
         verify(auditor).warning(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_SOFT_LIMIT));
         verify(auditor).error(JOB_ID, Messages.getMessage(Messages.JOB_AUDIT_MEMORY_STATUS_HARD_LIMIT, "512mb", "1kb"));
-        verifyNoMoreInteractions(auditor);
     }
 
     public void testProcessResult_modelSnapshot() {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-
-        AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
-        context.deleteInterimRequired = false;
         AutodetectResult result = mock(AutodetectResult.class);
         ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID)
-                .setSnapshotId("a_snapshot_id")
-                .setMinVersion(Version.CURRENT)
-                .build();
+            .setSnapshotId("a_snapshot_id")
+            .setMinVersion(Version.CURRENT)
+            .build();
         when(result.getModelSnapshot()).thenReturn(modelSnapshot);
-        processorUnderTest.processResult(context, result);
 
-        verify(persister, times(1)).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE);
+        when(persister.persistModelSnapshot(any(), any()))
+            .thenReturn(new IndexResponse(new ShardId("ml", "uid", 0), "doc", "1", 0L, 0L, 0L, true));
+
+        processorUnderTest.setDeleteInterimRequired(false);
+        processorUnderTest.processResult(result);
+
+        verify(persister).bulkPersisterBuilder(JOB_ID);
+        verify(persister).persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE);
 
         UpdateJobAction.Request expectedJobUpdateRequest = UpdateJobAction.Request.internal(JOB_ID,
                 new JobUpdate.Builder(JOB_ID).setModelSnapshotId("a_snapshot_id").build());
 
         verify(client).execute(same(UpdateJobAction.INSTANCE), eq(expectedJobUpdateRequest), any());
-        verifyNoMoreInteractions(persister);
     }
 
     public void testProcessResult_quantiles_givenRenormalizationIsEnabled() {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-
-        AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
-        context.deleteInterimRequired = false;
         AutodetectResult result = mock(AutodetectResult.class);
         Quantiles quantiles = mock(Quantiles.class);
         when(result.getQuantiles()).thenReturn(quantiles);
         when(renormalizer.isEnabled()).thenReturn(true);
-        processorUnderTest.processResult(context, result);
 
-        verify(persister, times(1)).persistQuantiles(quantiles);
+        processorUnderTest.setDeleteInterimRequired(false);
+        processorUnderTest.processResult(result);
+
+        verify(persister).bulkPersisterBuilder(JOB_ID);
+        verify(persister).persistQuantiles(quantiles);
         verify(bulkBuilder).executeRequest();
         verify(persister).commitResultWrites(JOB_ID);
-        verify(renormalizer, times(1)).isEnabled();
-        verify(renormalizer, times(1)).renormalize(quantiles);
-        verifyNoMoreInteractions(persister);
-        verifyNoMoreInteractions(renormalizer);
+        verify(renormalizer).isEnabled();
+        verify(renormalizer).renormalize(quantiles);
     }
 
     public void testProcessResult_quantiles_givenRenormalizationIsDisabled() {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-
-        AutodetectResultProcessor.Context context = new AutodetectResultProcessor.Context(JOB_ID, bulkBuilder);
-        context.deleteInterimRequired = false;
         AutodetectResult result = mock(AutodetectResult.class);
         Quantiles quantiles = mock(Quantiles.class);
         when(result.getQuantiles()).thenReturn(quantiles);
         when(renormalizer.isEnabled()).thenReturn(false);
-        processorUnderTest.processResult(context, result);
 
-        verify(persister, times(1)).persistQuantiles(quantiles);
+        processorUnderTest.setDeleteInterimRequired(false);
+        processorUnderTest.processResult(result);
+
+        verify(persister).bulkPersisterBuilder(JOB_ID);
+        verify(persister).persistQuantiles(quantiles);
         verify(bulkBuilder).executeRequest();
-        verify(renormalizer, times(1)).isEnabled();
-        verifyNoMoreInteractions(persister);
-        verifyNoMoreInteractions(renormalizer);
+        verify(renormalizer).isEnabled();
     }
 
     public void testAwaitCompletion() throws TimeoutException {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-        when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
         AutodetectResult autodetectResult = mock(AutodetectResult.class);
-        @SuppressWarnings("unchecked")
-        Iterator<AutodetectResult> iterator = mock(Iterator.class);
-        when(iterator.hasNext()).thenReturn(true).thenReturn(false);
-        when(iterator.next()).thenReturn(autodetectResult);
-        AutodetectProcess process = mock(AutodetectProcess.class);
-        when(process.readAutodetectResults()).thenReturn(iterator);
-        processorUnderTest.process(process);
+        when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator());
 
+        processorUnderTest.process();
         processorUnderTest.awaitCompletion();
-        assertEquals(0, processorUnderTest.completionLatch.getCount());
-        assertEquals(1, processorUnderTest.updateModelSnapshotSemaphore.availablePermits());
+        assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L)));
+        assertThat(processorUnderTest.updateModelSnapshotSemaphore.availablePermits(), is(equalTo(1)));
+
+        verify(persister).bulkPersisterBuilder(JOB_ID);
+        verify(persister).commitResultWrites(JOB_ID);
+        verify(persister).commitStateWrites(JOB_ID);
+        verify(renormalizer).waitUntilIdle();
     }
 
     public void testPersisterThrowingDoesntBlockProcessing() {
-        JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class);
-        when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder);
         AutodetectResult autodetectResult = mock(AutodetectResult.class);
         ModelSnapshot modelSnapshot = mock(ModelSnapshot.class);
         when(autodetectResult.getModelSnapshot()).thenReturn(modelSnapshot);
 
-        @SuppressWarnings("unchecked")
-        Iterator<AutodetectResult> iterator = mock(Iterator.class);
-        when(iterator.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false);
-        when(iterator.next()).thenReturn(autodetectResult);
-        AutodetectProcess process = mock(AutodetectProcess.class);
         when(process.isProcessAlive()).thenReturn(true);
         when(process.isProcessAliveAfterWaiting()).thenReturn(true);
-        when(process.readAutodetectResults()).thenReturn(iterator);
+        when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult, autodetectResult).iterator());
 
         doThrow(new ElasticsearchException("this test throws")).when(persister).persistModelSnapshot(any(), any());
 
-        processorUnderTest.process(process);
+        processorUnderTest.process();
+
+        verify(persister).bulkPersisterBuilder(JOB_ID);
         verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE));
     }
 
@@ -433,44 +401,36 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         @SuppressWarnings("unchecked")
         Iterator<AutodetectResult> iterator = mock(Iterator.class);
         when(iterator.hasNext()).thenThrow(new ElasticsearchParseException("this test throws"));
-        AutodetectProcess process = mock(AutodetectProcess.class);
         when(process.readAutodetectResults()).thenReturn(iterator);
 
         assertFalse(processorUnderTest.isFailed());
-        processorUnderTest.process(process);
+        processorUnderTest.process();
         assertTrue(processorUnderTest.isFailed());
 
         // Wait for flush should return immediately
-        FlushAcknowledgement flushAcknowledgement = processorUnderTest.waitForFlushAcknowledgement(
-                "foo", Duration.of(300, ChronoUnit.SECONDS));
+        FlushAcknowledgement flushAcknowledgement =
+            processorUnderTest.waitForFlushAcknowledgement(JOB_ID, Duration.of(300, ChronoUnit.SECONDS));
         assertThat(flushAcknowledgement, is(nullValue()));
+
+        verify(persister).bulkPersisterBuilder(JOB_ID);
     }
 
     public void testKill() throws TimeoutException {
         AutodetectResult autodetectResult = mock(AutodetectResult.class);
-        @SuppressWarnings("unchecked")
-        Iterator<AutodetectResult> iterator = mock(Iterator.class);
-        when(iterator.hasNext()).thenReturn(true).thenReturn(false);
-        when(iterator.next()).thenReturn(autodetectResult);
-        AutodetectProcess process = mock(AutodetectProcess.class);
-        when(process.readAutodetectResults()).thenReturn(iterator);
-        processorUnderTest.setProcessKilled();
-        processorUnderTest.process(process);
+        when(process.readAutodetectResults()).thenReturn(Arrays.asList(autodetectResult).iterator());
 
+        processorUnderTest.setProcessKilled();
+        processorUnderTest.process();
         processorUnderTest.awaitCompletion();
-        assertEquals(0, processorUnderTest.completionLatch.getCount());
-        assertEquals(1, processorUnderTest.updateModelSnapshotSemaphore.availablePermits());
+        assertThat(processorUnderTest.completionLatch.getCount(), is(equalTo(0L)));
+        assertThat(processorUnderTest.updateModelSnapshotSemaphore.availablePermits(), is(equalTo(1)));
 
-        verify(persister, times(1)).commitResultWrites(JOB_ID);
-        verify(persister, times(1)).commitStateWrites(JOB_ID);
+        verify(persister).bulkPersisterBuilder(JOB_ID);
+        verify(persister).commitResultWrites(JOB_ID);
+        verify(persister).commitStateWrites(JOB_ID);
         verify(renormalizer, never()).renormalize(any());
         verify(renormalizer).shutdown();
-        verify(renormalizer, times(1)).waitUntilIdle();
-        verify(flushListener, times(1)).clear();
-    }
-
-    private void setupScheduleDelayTime(TimeValue delay) {
-        when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString()))
-            .thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[0], delay.nanos(), TimeUnit.NANOSECONDS));
+        verify(renormalizer).waitUntilIdle();
+        verify(flushListener).clear();
     }
 }