Browse Source

[ML] ensure results index is updated before writing forecast documents (#68748)

When a job opens, we update the current forecasts that are opened to failed.

But, before doing so, we need to make sure the results index is up to date.

This is to protect us from any forecast document changes in the future.
Benjamin Trent 4 years ago
parent
commit
2e36ba1656

+ 20 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java

@@ -40,6 +40,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
+import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.MachineLearning;
@@ -195,13 +196,27 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
         jobTask.setAutodetectProcessManager(autodetectProcessManager);
         JobTaskState jobTaskState = (JobTaskState) state;
         JobState jobState = jobTaskState == null ? null : jobTaskState.getState();
-        jobResultsProvider.setRunningForecastsToFailed(params.getJobId(), ActionListener.wrap(
-            r -> runJob(jobTask, jobState, params),
+        ActionListener<Boolean> resultsMappingUpdateHandler = ActionListener.wrap(
+            mappingsUpdate -> jobResultsProvider.setRunningForecastsToFailed(params.getJobId(), ActionListener.wrap(
+                r -> runJob(jobTask, jobState, params),
+                e -> {
+                    logger.warn(new ParameterizedMessage("[{}] failed to set forecasts to failed", params.getJobId()), e);
+                    runJob(jobTask, jobState, params);
+                }
+            )),
             e -> {
-                logger.warn(new ParameterizedMessage("[{}] failed to set forecasts to failed", params.getJobId()), e);
-                runJob(jobTask, jobState, params);
+                logger.error(new ParameterizedMessage("[{}] Failed to update results mapping", params.getJobId()), e);
+                jobTask.markAsFailed(e);
             }
-        ));
+        );
+        // We need to update the results index as we MAY update the current forecast results, setting the running forcasts to failed
+        // This writes to the results index, which might need updating
+        ElasticsearchMappings.addDocMappingIfMissing(
+            AnomalyDetectorsIndex.jobResultsAliasedName(params.getJobId()),
+            AnomalyDetectorsIndex::resultsMapping,
+            client,
+            clusterState,
+            resultsMappingUpdateHandler);
     }
 
     private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams params) {