Bläddra i källkod

[ML] Revert model snapshot now waits for annotations index (#72926)

Reverting a model snapshot with the delete_intervening_results
option deletes system-generated annotations that are more recent
than the model snapshot. Doing this relies on the annotations
index being available, so the revert model snapshot action now
waits for this.

This problem is more likely to be seen in recent releases, as we
now revert to the most recent model snapshot when a job relocates
from one node to another, so we are more likely to be reverting
a model snapshot at a time when there has been cluster disruption
and this could also be causing the annotations index to be
temporarily unavailable.

Fixes #72917
David Roberts 4 år sedan
förälder
incheckning
0a0fc8a5eb

+ 37 - 12
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java

@@ -9,16 +9,19 @@ package org.elasticsearch.xpack.core.ml.annotations;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.xpack.core.ml.MlMetadata;
 import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
@@ -40,20 +43,42 @@ public class AnnotationIndex {
 
     /**
      * Create the .ml-annotations-6 index with correct mappings if it does not already exist. This index is read and written by the UI
-     * results views, so needs to exist when there might be ML results to view.
+     * results views, so needs to exist when there might be ML results to view.  This method also waits for the index to be ready to search
+     * before it returns.
      */
-    public static void createAnnotationsIndexIfNecessary(Client client, ClusterState state, final ActionListener<Boolean> finalListener) {
-
-        final ActionListener<Boolean> checkMappingsListener = ActionListener.wrap(success -> {
-            ElasticsearchMappings.addDocMappingIfMissing(
-                WRITE_ALIAS_NAME,
-                AnnotationIndex::annotationsMapping,
-                client,
-                state,
-                MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT,
-                finalListener);
+    public static void createAnnotationsIndexIfNecessaryAndWaitForYellow(Client client, ClusterState state, TimeValue masterNodeTimeout,
+                                                                         final ActionListener<Boolean> finalListener) {
+
+        final ActionListener<Boolean> annotationsIndexCreatedListener = ActionListener.wrap(success -> {
+            final ClusterHealthRequest request = Requests.clusterHealthRequest(READ_ALIAS_NAME)
+                .waitForYellowStatus()
+                .masterNodeTimeout(masterNodeTimeout);
+            executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request,
+                ActionListener.<ClusterHealthResponse>wrap(
+                    r -> finalListener.onResponse(r.isTimedOut() == false), finalListener::onFailure),
+                client.admin().cluster()::health);
         }, finalListener::onFailure);
 
+        createAnnotationsIndexIfNecessary(client, state, masterNodeTimeout, annotationsIndexCreatedListener);
+    }
+
+    /**
+     * Create the .ml-annotations-6 index with correct mappings if it does not already exist. This index is read and written by the UI
+     * results views, so needs to exist when there might be ML results to view.
+     */
+    public static void createAnnotationsIndexIfNecessary(Client client, ClusterState state, TimeValue masterNodeTimeout,
+                                                         final ActionListener<Boolean> finalListener) {
+
+        final ActionListener<Boolean> checkMappingsListener = ActionListener.wrap(success ->
+                ElasticsearchMappings.addDocMappingIfMissing(
+                    WRITE_ALIAS_NAME,
+                    AnnotationIndex::annotationsMapping,
+                    client,
+                    state,
+                    masterNodeTimeout,
+                    finalListener),
+            finalListener::onFailure);
+
         final ActionListener<Boolean> createAliasListener = ActionListener.wrap(success -> {
             final IndicesAliasesRequest request =
                 client.admin().indices().prepareAliases()

+ 8 - 6
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.ml;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterStateListener;
@@ -96,12 +97,13 @@ class MlInitializationService implements ClusterStateListener {
         // The atomic flag prevents multiple simultaneous attempts to create the
         // index if there is a flurry of cluster state updates in quick succession
         if (this.isMaster && isIndexCreationInProgress.compareAndSet(false, true)) {
-            AnnotationIndex.createAnnotationsIndexIfNecessary(client, event.state(), ActionListener.wrap(
-                r -> isIndexCreationInProgress.set(false),
-                e -> {
-                    isIndexCreationInProgress.set(false);
-                    logger.error("Error creating ML annotations index or aliases", e);
-                }));
+            AnnotationIndex.createAnnotationsIndexIfNecessary(client, event.state(), MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT,
+                ActionListener.wrap(
+                    r -> isIndexCreationInProgress.set(false),
+                    e -> {
+                        isIndexCreationInProgress.set(false);
+                        logger.error("Error creating ML annotations index or aliases", e);
+                    }));
         }
     }
 

+ 10 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java

@@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.ml.MlConfigIndex;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
 import org.elasticsearch.xpack.core.ml.annotations.Annotation;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -83,8 +84,8 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
         logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}",
                 request.getSnapshotId(), jobId, request.getDeleteInterveningResults());
 
-        // 4. Revert the state
-        ActionListener<Boolean> configMappingUpdateListener = ActionListener.wrap(
+        // 5. Revert the state
+        ActionListener<Boolean> annotationsIndexUpdateListener = ActionListener.wrap(
             r -> {
                 PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
                 JobState jobState = MlTasks.getJobState(jobId, tasks);
@@ -116,6 +117,13 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
             listener::onFailure
         );
 
+        // 4. Ensure the annotations index mappings are up to date
+        ActionListener<Boolean> configMappingUpdateListener = ActionListener.wrap(
+            r -> AnnotationIndex.createAnnotationsIndexIfNecessaryAndWaitForYellow(client, state, request.masterNodeTimeout(),
+                annotationsIndexUpdateListener),
+            listener::onFailure
+        );
+
         // 3. Ensure the config index mappings are up to date
         ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
             r -> ElasticsearchMappings.addDocMappingIfMissing(MlConfigIndex.indexName(), MlConfigIndex::mapping,

+ 2 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

@@ -504,7 +504,8 @@ public class AutodetectProcessManager implements ClusterStateListener {
         );
 
         // Create the annotations index if necessary - this also updates the mappings if an old mapping is present
-        AnnotationIndex.createAnnotationsIndexIfNecessary(client, clusterState, annotationsIndexUpdateHandler);
+        AnnotationIndex.createAnnotationsIndexIfNecessaryAndWaitForYellow(client, clusterState, masterNodeTimeout,
+            annotationsIndexUpdateHandler);
     }
 
     private void startProcess(JobTask jobTask, Job job, BiConsumer<Exception, Boolean> closeHandler) {

+ 2 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java

@@ -219,7 +219,8 @@ public class SnapshotUpgradeTaskExecutor extends AbstractJobPersistentTasksExecu
         );
 
         // Create the annotations index if necessary - this also updates the mappings if an old mapping is present
-        AnnotationIndex.createAnnotationsIndexIfNecessary(client, clusterState, annotationsIndexUpdateHandler);
+        AnnotationIndex.createAnnotationsIndexIfNecessaryAndWaitForYellow(client, clusterState, MlTasks.PERSISTENT_TASK_MASTER_NODE_TIMEOUT,
+            annotationsIndexUpdateHandler);
     }
 
     @Override