Jelajahi Sumber

[ML] mark forecasts for force closed/failed jobs as failed (#57143)

forecasts that are still running should be marked as failed/finished in the following scenarios:

- Job is force closed
- Job is re-assigned to another node.

Forecasts are not "resilient". Their execution does not continue after a node failure. Consequently, forecasts marked as STARTED or SCHEDULED should be flagged as failed. These forecasts can then be deleted.

Additionally, force closing a job kills the native task directly. This means that if a forecast was running, it is not allowed to complete and could still have the status of `STARTED` in the index.

relates to https://github.com/elastic/elasticsearch/issues/56419
Benjamin Trent 5 tahun lalu
induk
melakukan
6bfd006d82

+ 2 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

@@ -222,6 +222,8 @@ public final class Messages {
 
     public static final String JOB_UNKNOWN_ID = "No known job with id ''{0}''";
 
+    public static final String JOB_FORECAST_NATIVE_PROCESS_KILLED = "forecast unable to complete as native process was killed.";
+
     public static final String REST_CANNOT_DELETE_HIGHEST_PRIORITY =
             "Model snapshot ''{0}'' is the active snapshot for job ''{1}'', so cannot be deleted";
     public static final String REST_INVALID_DATETIME_PARAMS =

+ 25 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ForecastRequestStats.java

@@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
 
 import java.io.IOException;
 import java.time.Instant;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
@@ -90,6 +91,14 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
             return in.readEnum(ForecastRequestStatus.class);
         }
 
+        /**
+         * @return {@code true} if state matches any of the given {@code candidates}
+         */
+        public boolean isAnyOf(ForecastRequestStatus... candidates) {
+            return Arrays.stream(candidates).anyMatch(candidate -> this == candidate);
+        }
+
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeEnum(this);
@@ -120,6 +129,22 @@ public class ForecastRequestStats implements ToXContentObject, Writeable {
         this.forecastId = Objects.requireNonNull(forecastId);
     }
 
+    public ForecastRequestStats(ForecastRequestStats forecastRequestStats) {
+        this.jobId = forecastRequestStats.jobId;
+        this.forecastId = forecastRequestStats.forecastId;
+        this.recordCount = forecastRequestStats.recordCount;
+        this.messages = forecastRequestStats.messages;
+        this.timestamp = forecastRequestStats.timestamp;
+        this.startTime = forecastRequestStats.startTime;
+        this.endTime = forecastRequestStats.endTime;
+        this.createTime = forecastRequestStats.createTime;
+        this.expiryTime = forecastRequestStats.expiryTime;
+        this.progress = forecastRequestStats.progress;
+        this.processingTime = forecastRequestStats.processingTime;
+        this.memoryUsage = forecastRequestStats.memoryUsage;
+        this.status = forecastRequestStats.status;
+    }
+
     public ForecastRequestStats(StreamInput in) throws IOException {
         jobId = in.readString();
         forecastId = in.readString();

+ 48 - 2
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java

@@ -33,6 +33,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.xpack.core.ml.job.messages.Messages.JOB_FORECAST_NATIVE_PROCESS_KILLED;
 import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -380,6 +381,51 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase {
         }
     }
 
+    public void testForceStopSetsForecastToFailed() throws Exception {
+        Detector.Builder detector = new Detector.Builder("mean", "value");
+
+        TimeValue bucketSpan = TimeValue.timeValueHours(1);
+        AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
+        analysisConfig.setBucketSpan(bucketSpan);
+        DataDescription.Builder dataDescription = new DataDescription.Builder();
+        dataDescription.setTimeFormat("epoch");
+        Job.Builder job = new Job.Builder("forecast-it-test-failed-on-force-stop");
+        job.setAnalysisConfig(analysisConfig);
+        job.setDataDescription(dataDescription);
+        String jobId = job.getId();
+      
+        registerJob(job);
+        putJob(job);
+        openJob(job.getId());
+
+        long now = Instant.now().getEpochSecond();
+        long timestamp = now - 50 * bucketSpan.seconds();
+        List<String> data = new ArrayList<>();
+        while (timestamp < now) {
+            data.add(createJsonRecord(createRecord(timestamp, 10.0)));
+            data.add(createJsonRecord(createRecord(timestamp, 30.0)));
+            timestamp += bucketSpan.seconds();
+        }
+
+        postData(job.getId(), data.stream().collect(Collectors.joining()));
+        flushJob(job.getId(), false);
+      
+        String forecastId = forecast(jobId, TimeValue.timeValueDays(1000), TimeValue.ZERO);
+        waitForecastStatus(jobId, forecastId, ForecastRequestStats.ForecastRequestStatus.values());
+
+        closeJob(jobId, true);
+        // On force close job, it should always be at least failed or finished
+        waitForecastStatus(jobId,
+            forecastId,
+            ForecastRequestStats.ForecastRequestStatus.FAILED,
+            ForecastRequestStats.ForecastRequestStatus.FINISHED);
+        ForecastRequestStats forecastStats = getForecastStats(job.getId(), forecastId);
+        assertNotNull(forecastStats);
+        if (forecastStats.getStatus().equals(ForecastRequestStats.ForecastRequestStatus.FAILED)) {
+            assertThat(forecastStats.getMessages().get(0), equalTo(JOB_FORECAST_NATIVE_PROCESS_KILLED));
+        }
+    }
+
     public void testForecastWithHigherMemoryUse() throws Exception {
         Detector.Builder detector = new Detector.Builder("mean", "value");
 
@@ -407,7 +453,7 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase {
 
         postData(job.getId(), data.stream().collect(Collectors.joining()));
         flushJob(job.getId(), false);
-
+  
         // Now we can start doing forecast requests
 
         String forecastId = forecast(job.getId(),
@@ -426,7 +472,7 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase {
         assertThat(forecastDuration1HourNoExpiry.getRecordCount(), equalTo(1L));
         assertThat(forecasts.size(), equalTo(1));
     }
-
+  
     private void createDataWithLotsOfClientIps(TimeValue bucketSpan, Job.Builder job) {
         long now = Instant.now().getEpochSecond();
         long timestamp = now - 15 * bucketSpan.seconds();

+ 13 - 1
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java

@@ -71,6 +71,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.in;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 
@@ -145,7 +146,12 @@ abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase {
     }
 
     protected CloseJobAction.Response closeJob(String jobId) {
+        return closeJob(jobId, false);
+    }
+
+    protected CloseJobAction.Response closeJob(String jobId, boolean force) {
         CloseJobAction.Request request = new CloseJobAction.Request(jobId);
+        request.setForce(force);
         return client().execute(CloseJobAction.INSTANCE, request).actionGet();
     }
 
@@ -276,10 +282,16 @@ abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase {
     }
 
     protected void waitForecastToFinish(String jobId, String forecastId) throws Exception {
+        waitForecastStatus(jobId, forecastId, ForecastRequestStats.ForecastRequestStatus.FINISHED);
+    }
+
+    protected void waitForecastStatus(String jobId,
+                                      String forecastId,
+                                      ForecastRequestStats.ForecastRequestStatus... status) throws Exception {
         assertBusy(() -> {
             ForecastRequestStats forecastRequestStats = getForecastStats(jobId, forecastId);
             assertThat(forecastRequestStats, is(notNullValue()));
-            assertThat(forecastRequestStats.getStatus(), equalTo(ForecastRequestStats.ForecastRequestStatus.FINISHED));
+            assertThat(forecastRequestStats.getStatus(), in(status));
         }, 30, TimeUnit.SECONDS);
     }
 

+ 19 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

@@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.action;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceAlreadyExistsException;
@@ -58,6 +59,7 @@ import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
 import org.elasticsearch.xpack.ml.job.JobNodeSelector;
 import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
 
@@ -348,6 +350,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
         private final MlMemoryTracker memoryTracker;
         private final Client client;
         private final IndexNameExpressionResolver expressionResolver;
+        private final JobResultsProvider jobResultsProvider;
 
         private volatile int maxConcurrentJobAllocations;
         private volatile int maxMachineMemoryPercent;
@@ -363,6 +366,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
             this.memoryTracker = Objects.requireNonNull(memoryTracker);
             this.client = Objects.requireNonNull(client);
             this.expressionResolver = Objects.requireNonNull(expressionResolver);
+            this.jobResultsProvider = new JobResultsProvider(client, settings, expressionResolver);
             this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
             this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
             this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
@@ -440,6 +444,16 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
             jobTask.autodetectProcessManager = autodetectProcessManager;
             JobTaskState jobTaskState = (JobTaskState) state;
             JobState jobState = jobTaskState == null ? null : jobTaskState.getState();
+            jobResultsProvider.setRunningForecastsToFailed(params.getJobId(), ActionListener.wrap(
+                r -> runJob(jobTask, jobState, params),
+                e -> {
+                    logger.warn(new ParameterizedMessage("[{}] failed to set forecasts to failed", params.getJobId()), e);
+                    runJob(jobTask, jobState, params);
+                }
+            ));
+        }
+
+        private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams params) {
             // If the job is closing, simply stop and return
             if (JobState.CLOSING.equals(jobState)) {
                 // Mark as completed instead of using `stop` as stop assumes native processes have started
@@ -461,22 +475,22 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
                         FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(new String[]{jobId});
                         executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
                             ActionListener.wrap(
-                                response -> task.markAsCompleted(),
+                                response -> jobTask.markAsCompleted(),
                                 e -> {
                                     logger.error("error finalizing job [" + jobId + "]", e);
                                     Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
                                     if (unwrapped instanceof DocumentMissingException || unwrapped instanceof ResourceNotFoundException) {
-                                        task.markAsCompleted();
+                                        jobTask.markAsCompleted();
                                     } else {
-                                        task.markAsFailed(e);
+                                        jobTask.markAsFailed(e);
                                     }
                                 }
                             ));
                     } else {
-                        task.markAsCompleted();
+                        jobTask.markAsCompleted();
                     }
                 } else {
-                    task.markAsFailed(e2);
+                    jobTask.markAsFailed(e2);
                 }
             });
         }

+ 4 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java

@@ -239,6 +239,10 @@ public class JobResultsPersister {
             bulkRequest = new BulkRequest();
         }
 
+        public void clearBulkRequest() {
+            bulkRequest = new BulkRequest();
+        }
+
         // for testing
         BulkRequest getBulkRequest() {
             return bulkRequest;

+ 39 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java

@@ -20,6 +20,7 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
 import org.elasticsearch.action.bulk.BulkAction;
+import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.get.GetRequest;
@@ -63,7 +64,10 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.TermsQueryBuilder;
+import org.elasticsearch.index.reindex.UpdateByQueryAction;
+import org.elasticsearch.index.reindex.UpdateByQueryRequest;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.script.Script;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.Aggregation;
@@ -136,6 +140,7 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
+import static org.elasticsearch.xpack.core.ml.job.messages.Messages.JOB_FORECAST_NATIVE_PROCESS_KILLED;
 
 public class JobResultsProvider {
     private static final Logger LOGGER = LogManager.getLogger(JobResultsProvider.class);
@@ -1272,6 +1277,40 @@ public class JobResultsProvider {
                 client::search);
     }
 
+    public void setRunningForecastsToFailed(String jobId, ActionListener<Boolean> listener) {
+        QueryBuilder forecastQuery = QueryBuilders.boolQuery()
+            .filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE))
+            .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
+            .filter(QueryBuilders.termsQuery(ForecastRequestStats.STATUS.getPreferredName(),
+                ForecastRequestStats.ForecastRequestStatus.SCHEDULED.toString(),
+                ForecastRequestStats.ForecastRequestStatus.STARTED.toString()));
+
+        UpdateByQueryRequest request = new UpdateByQueryRequest(AnomalyDetectorsIndex.resultsWriteAlias(jobId))
+            .setQuery(forecastQuery)
+            .setIndicesOptions(IndicesOptions.lenientExpandOpen())
+            .setAbortOnVersionConflict(false)
+            .setMaxRetries(3)
+            .setRefresh(true)
+            .setScript(new Script("ctx._source.forecast_status='failed';" +
+                "ctx._source.forecast_messages=['" + JOB_FORECAST_NATIVE_PROCESS_KILLED + "']"));
+
+        client.execute(UpdateByQueryAction.INSTANCE, request, ActionListener.wrap(
+            response -> {
+                LOGGER.info("[{}] set [{}] forecasts to failed", jobId, response.getUpdated());
+                if (response.getBulkFailures().size() > 0) {
+                    LOGGER.warn(
+                        "[{}] failed to set [{}] forecasts to failed. Bulk failures experienced {}",
+                        jobId,
+                        response.getTotal() - response.getUpdated(),
+                        response.getBulkFailures().stream().map(BulkItemResponse.Failure::getMessage).collect(Collectors.toList())
+                    );
+                }
+                listener.onResponse(true);
+            },
+            listener::onFailure
+        ));
+    }
+
     public void getForecastRequestStats(String jobId, String forecastId, Consumer<ForecastRequestStats> handler,
             Consumer<Exception> errorHandler) {
         String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);

+ 33 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessor.java

@@ -49,7 +49,9 @@ import java.time.Duration;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -57,6 +59,7 @@ import java.util.concurrent.TimeoutException;
 
 import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
+import static org.elasticsearch.xpack.core.ml.job.messages.Messages.JOB_FORECAST_NATIVE_PROCESS_KILLED;
 
 /**
  * A runnable class that reads the autodetect process output in the
@@ -94,6 +97,7 @@ public class AutodetectResultProcessor {
     private final FlushListener flushListener;
     private volatile boolean processKilled;
     private volatile boolean failed;
+    private final Map<String, ForecastRequestStats> runningForecasts;
     private final long priorRunsBucketCount;
     private long currentRunBucketCount; // only used from the process() thread, so doesn't need to be volatile
     private final JobResultsPersister.Builder bulkResultsPersister;
@@ -135,6 +139,7 @@ public class AutodetectResultProcessor {
         this.clock = Objects.requireNonNull(clock);
         this.deleteInterimRequired = true;
         this.priorRunsBucketCount = timingStats.getBucketCount();
+        this.runningForecasts = new ConcurrentHashMap<>();
     }
 
     public void process() {
@@ -174,6 +179,7 @@ public class AutodetectResultProcessor {
             }
         } finally {
             flushListener.clear();
+            handleOpenForecasts();
             completionLatch.countDown();
         }
     }
@@ -206,6 +212,27 @@ public class AutodetectResultProcessor {
         renormalizer.shutdown();
     }
 
+    void handleOpenForecasts() {
+        try {
+            if (runningForecasts.isEmpty() == false) {
+                LOGGER.warn("[{}] still had forecasts {} executing. Attempting to set them to failed.",
+                    jobId,
+                    runningForecasts.keySet());
+                // There may be many docs in the results persistence queue. But we only want to bother updating the running forecasts
+                bulkResultsPersister.clearBulkRequest();
+                for (ForecastRequestStats forecastRequestStats : runningForecasts.values()) {
+                    ForecastRequestStats failedStats = new ForecastRequestStats(forecastRequestStats);
+                    failedStats.setStatus(ForecastRequestStats.ForecastRequestStatus.FAILED);
+                    failedStats.setMessages(List.of(JOB_FORECAST_NATIVE_PROCESS_KILLED));
+                    bulkResultsPersister.persistForecastRequestStats(failedStats);
+                }
+                bulkResultsPersister.executeRequest();
+            }
+        } catch (Exception ex) {
+            LOGGER.warn(new ParameterizedMessage("[{}] failure setting running forecasts to failed.", jobId), ex);
+        }
+    }
+
     void processResult(AutodetectResult result) {
         if (processKilled) {
             return;
@@ -252,6 +279,12 @@ public class AutodetectResultProcessor {
             LOGGER.trace("Received Forecast Stats [{}]", forecastRequestStats.getId());
             bulkResultsPersister.persistForecastRequestStats(forecastRequestStats);
 
+            if (forecastRequestStats.getStatus()
+                .isAnyOf(ForecastRequestStats.ForecastRequestStatus.FAILED, ForecastRequestStats.ForecastRequestStatus.FINISHED)) {
+                runningForecasts.remove(forecastRequestStats.getForecastId());
+            } else {
+                runningForecasts.put(forecastRequestStats.getForecastId(), forecastRequestStats);
+            }
             // execute the bulk request only in some cases or in doubt
             // otherwise rely on the count-based trigger
             switch (forecastRequestStats.getStatus()) {

+ 58 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectResultProcessorTests.java

@@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
 import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
 import org.elasticsearch.xpack.core.ml.job.results.Bucket;
 import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
+import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
 import org.elasticsearch.xpack.core.ml.job.results.Influencer;
 import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
 import org.elasticsearch.xpack.core.security.user.XPackUser;
@@ -506,4 +507,61 @@ public class AutodetectResultProcessorTests extends ESTestCase {
         verify(flushListener).clear();
     }
 
+    public void testProcessingOpenedForecasts() {
+        when(bulkBuilder.persistForecastRequestStats(any(ForecastRequestStats.class))).thenReturn(bulkBuilder);
+        AutodetectResult result = mock(AutodetectResult.class);
+        ForecastRequestStats forecastRequestStats = new ForecastRequestStats("foo", "forecast");
+        forecastRequestStats.setStatus(ForecastRequestStats.ForecastRequestStatus.OK);
+        when(result.getForecastRequestStats()).thenReturn(forecastRequestStats);
+
+        ArgumentCaptor<ForecastRequestStats> argument = ArgumentCaptor.forClass(ForecastRequestStats.class);
+
+        processorUnderTest.setDeleteInterimRequired(false);
+        processorUnderTest.processResult(result);
+
+        processorUnderTest.handleOpenForecasts();
+
+        verify(bulkBuilder, times(2)).persistForecastRequestStats(argument.capture());
+        verify(bulkBuilder, times(1)).executeRequest();
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister, never()).deleteInterimResults(JOB_ID);
+
+        // Get all values is in reverse call order
+        List<ForecastRequestStats> stats = argument.getAllValues();
+        assertThat(stats.get(0).getStatus(), equalTo(ForecastRequestStats.ForecastRequestStatus.OK));
+        assertThat(stats.get(1).getStatus(), equalTo(ForecastRequestStats.ForecastRequestStatus.FAILED));
+    }
+
+    public void testProcessingForecasts() {
+        when(bulkBuilder.persistForecastRequestStats(any(ForecastRequestStats.class))).thenReturn(bulkBuilder);
+        AutodetectResult result = mock(AutodetectResult.class);
+        ForecastRequestStats forecastRequestStats = new ForecastRequestStats("foo", "forecast");
+        forecastRequestStats.setStatus(ForecastRequestStats.ForecastRequestStatus.OK);
+        when(result.getForecastRequestStats()).thenReturn(forecastRequestStats);
+
+        ArgumentCaptor<ForecastRequestStats> argument = ArgumentCaptor.forClass(ForecastRequestStats.class);
+
+        processorUnderTest.setDeleteInterimRequired(false);
+        processorUnderTest.processResult(result);
+
+
+        result = mock(AutodetectResult.class);
+        forecastRequestStats = new ForecastRequestStats("foo", "forecast");
+        forecastRequestStats.setStatus(ForecastRequestStats.ForecastRequestStatus.FINISHED);
+        when(result.getForecastRequestStats()).thenReturn(forecastRequestStats);
+
+        processorUnderTest.processResult(result);
+        // There shouldn't be any opened forecasts. This call should do nothing
+        processorUnderTest.handleOpenForecasts();
+
+        verify(bulkBuilder, times(2)).persistForecastRequestStats(argument.capture());
+        verify(bulkBuilder, times(1)).executeRequest();
+        verify(persister).bulkPersisterBuilder(eq(JOB_ID));
+        verify(persister, never()).deleteInterimResults(JOB_ID);
+
+        List<ForecastRequestStats> stats = argument.getAllValues();
+        assertThat(stats.get(0).getStatus(), equalTo(ForecastRequestStats.ForecastRequestStatus.OK));
+        assertThat(stats.get(1).getStatus(), equalTo(ForecastRequestStats.ForecastRequestStatus.FINISHED));
+    }
+
 }

+ 9 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/ForecastRequestStatsTests.java

@@ -18,6 +18,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 
 public class ForecastRequestStatsTests extends AbstractSerializingTestCase<ForecastRequestStats> {
 
@@ -97,4 +98,12 @@ public class ForecastRequestStatsTests extends AbstractSerializingTestCase<Forec
             ForecastRequestStats.LENIENT_PARSER.apply(parser, null);
         }
     }
+
+    public void testCopyCtor() {
+        for (int i = 0; i < NUMBER_OF_TEST_RUNS; ++i) {
+            ForecastRequestStats forecastRequestStats = createTestInstance();
+            ForecastRequestStats clone = new ForecastRequestStats(forecastRequestStats);
+            assertThat(clone, equalTo(forecastRequestStats));
+        }
+    }
 }