瀏覽代碼

[ML] Ensure mappings are up to date before reverting state (#68746)

When a model snapshot is reverted the job document in the
config index needs to be updated. Before this is done it
is essential to ensure the config index mappings are up
to date, otherwise we will try to create dynamic mappings
for fields newly added to the job config.
David Roberts 4 年之前
父節點
當前提交
55d78ba58c

+ 23 - 14
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java

@@ -24,12 +24,14 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.ml.MlConfigIndex;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
 import org.elasticsearch.xpack.core.ml.annotations.Annotation;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
+import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
@@ -71,30 +73,31 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
     @Override
     protected void masterOperation(Task task, RevertModelSnapshotAction.Request request, ClusterState state,
                                    ActionListener<RevertModelSnapshotAction.Response> listener) {
-        if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobId(), state)) {
-            listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("revert model snapshot", request.getJobId()));
+        final String jobId = request.getJobId();
+
+        if (migrationEligibilityCheck.jobIsEligibleForMigration(jobId, state)) {
+            listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("revert model snapshot", jobId));
             return;
         }
 
         logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}",
-                request.getSnapshotId(), request.getJobId(), request.getDeleteInterveningResults());
-
+                request.getSnapshotId(), jobId, request.getDeleteInterveningResults());
 
-        // 3. Revert the state
-        ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
-            exists -> {
+        // 4. Revert the state
+        ActionListener<Boolean> configMappingUpdateListener = ActionListener.wrap(
+            r -> {
                 PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
-                JobState jobState = MlTasks.getJobState(request.getJobId(), tasks);
+                JobState jobState = MlTasks.getJobState(jobId, tasks);
 
                 if (request.isForce() == false && jobState.equals(JobState.CLOSED) == false) {
                     listener.onFailure(ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT)));
                     return;
                 }
 
-                if (MlTasks.getSnapshotUpgraderTask(request.getJobId(), request.getSnapshotId(), tasks) != null) {
+                if (MlTasks.getSnapshotUpgraderTask(jobId, request.getSnapshotId(), tasks) != null) {
                     listener.onFailure(ExceptionsHelper.conflictStatusException(
                         "Cannot revert job [{}] to snapshot [{}] as it is being upgraded",
-                        request.getJobId(),
+                        jobId,
                         request.getSnapshotId()
                     ));
                     return;
@@ -103,9 +106,9 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
                 getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
                     ActionListener<RevertModelSnapshotAction.Response> wrappedListener = listener;
                     if (request.getDeleteInterveningResults()) {
-                        wrappedListener = wrapDeleteOldAnnotationsListener(wrappedListener, modelSnapshot, request.getJobId());
-                        wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId());
-                        wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId());
+                        wrappedListener = wrapDeleteOldAnnotationsListener(wrappedListener, modelSnapshot, jobId);
+                        wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, jobId);
+                        wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, jobId);
                     }
                     jobManager.revertSnapshot(request, wrappedListener, modelSnapshot);
                 }, listener::onFailure);
@@ -113,10 +116,16 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
             listener::onFailure
         );
 
+        // 3. Ensure the config index mappings are up to date
+        ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
+            r -> ElasticsearchMappings.addDocMappingIfMissing(MlConfigIndex.indexName(), MlConfigIndex::mapping,
+                client, state, configMappingUpdateListener),
+            listener::onFailure
+        );
 
         // 2. Verify the job exists
         ActionListener<Boolean> createStateIndexListener = ActionListener.wrap(
-            r -> jobManager.jobExists(request.getJobId(), jobExistsListener),
+            r -> jobManager.jobExists(jobId, jobExistsListener),
             listener::onFailure
         );