ソースを参照

[ML] Ensure config index mappings are up-to-date before updating configs (#58916)

We already had code to ensure the config index mappings were
up-to-date before creating a new config.  However, it's also
possible that an update to a config could add the latest
settings that require the latest mappings to index correctly.

This change checks that the latest config index mappings are
in place in the 3 update actions in the same way as the checks
are done in the 3 put actions.
David Roberts 5 年 前
コミット
be4bc336ec

+ 22 - 11
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDataFrameAnalyticsAction.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.action;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -23,8 +24,10 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.XPackField;
 import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.ml.MlConfigIndex;
 import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.action.UpdateDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.security.SecurityContext;
 import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
 
@@ -39,10 +42,11 @@ public class TransportUpdateDataFrameAnalyticsAction
     private final XPackLicenseState licenseState;
     private final DataFrameAnalyticsConfigProvider configProvider;
     private final SecurityContext securityContext;
+    private final Client client;
 
     @Inject
     public TransportUpdateDataFrameAnalyticsAction(Settings settings, TransportService transportService, ActionFilters actionFilters,
-                                                   XPackLicenseState licenseState, ThreadPool threadPool,
+                                                   XPackLicenseState licenseState, ThreadPool threadPool, Client client,
                                                    ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
                                                    DataFrameAnalyticsConfigProvider configProvider) {
         super(UpdateDataFrameAnalyticsAction.NAME, transportService, clusterService, threadPool, actionFilters,
@@ -52,6 +56,7 @@ public class TransportUpdateDataFrameAnalyticsAction
         this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings)
             ? new SecurityContext(settings, threadPool.getThreadContext())
             : null;
+        this.client = client;
     }
 
     @Override
@@ -73,17 +78,23 @@ public class TransportUpdateDataFrameAnalyticsAction
     protected void masterOperation(Task task, UpdateDataFrameAnalyticsAction.Request request, ClusterState state,
                                    ActionListener<PutDataFrameAnalyticsAction.Response> listener) {
 
-        useSecondaryAuthIfAvailable(securityContext, () -> {
-            Map<String, String> headers = threadPool.getThreadContext().getHeaders();
-            configProvider.update(
-                request.getUpdate(),
-                headers,
-                state,
-                ActionListener.wrap(
-                    updatedConfig -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(updatedConfig)),
-                    listener::onFailure));
-        });
+        Runnable doUpdate = () ->
+            useSecondaryAuthIfAvailable(securityContext, () -> {
+                Map<String, String> headers = threadPool.getThreadContext().getHeaders();
+                configProvider.update(
+                    request.getUpdate(),
+                    headers,
+                    state,
+                    ActionListener.wrap(
+                        updatedConfig -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(updatedConfig)),
+                        listener::onFailure));
+            });
 
+        // Obviously if we're updating a job it's impossible that the config index has no mappings at
+        // all, but if we rewrite the job config we may add new fields that require the latest mappings
+        ElasticsearchMappings.addDocMappingIfMissing(
+            MlConfigIndex.indexName(), MlConfigIndex::mapping, client, state,
+            ActionListener.wrap(bool -> doUpdate.run(), listener::onFailure));
     }
 
     @Override

+ 23 - 12
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java

@@ -23,11 +23,13 @@ import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.ml.MlConfigIndex;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
+import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.core.security.SecurityContext;
 import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
@@ -46,6 +48,7 @@ public class TransportUpdateDatafeedAction extends
     private final JobConfigProvider jobConfigProvider;
     private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
     private final SecurityContext securityContext;
+    private final Client client;
 
     @Inject
     public TransportUpdateDatafeedAction(Settings settings, TransportService transportService, ClusterService clusterService,
@@ -60,6 +63,7 @@ public class TransportUpdateDatafeedAction extends
         this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
         this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
             new SecurityContext(settings, threadPool.getThreadContext()) : null;
+        this.client = client;
     }
 
     @Override
@@ -74,7 +78,7 @@ public class TransportUpdateDatafeedAction extends
 
     @Override
     protected void masterOperation(Task task, UpdateDatafeedAction.Request request, ClusterState state,
-                                   ActionListener<PutDatafeedAction.Response> listener) throws Exception {
+                                   ActionListener<PutDatafeedAction.Response> listener) {
 
         if (migrationEligibilityCheck.datafeedIsEligibleForMigration(request.getUpdate().getId(), state)) {
             listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("update datafeed", request.getUpdate().getId()));
@@ -89,17 +93,24 @@ public class TransportUpdateDatafeedAction extends
             return;
         }
 
-        useSecondaryAuthIfAvailable(securityContext, () -> {
-            final Map<String, String> headers = threadPool.getThreadContext().getHeaders();
-            datafeedConfigProvider.updateDatefeedConfig(
-                request.getUpdate().getId(),
-                request.getUpdate(),
-                headers,
-                jobConfigProvider::validateDatafeedJob,
-                ActionListener.wrap(
-                    updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
-                    listener::onFailure));
-        });
+        Runnable doUpdate = () ->
+            useSecondaryAuthIfAvailable(securityContext, () -> {
+                final Map<String, String> headers = threadPool.getThreadContext().getHeaders();
+                datafeedConfigProvider.updateDatefeedConfig(
+                    request.getUpdate().getId(),
+                    request.getUpdate(),
+                    headers,
+                    jobConfigProvider::validateDatafeedJob,
+                    ActionListener.wrap(
+                        updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
+                        listener::onFailure));
+            });
+
+        // Obviously if we're updating a datafeed it's impossible that the config index has no mappings at
+        // all, but if we rewrite the datafeed config we may add new fields that require the latest mappings
+        ElasticsearchMappings.addDocMappingIfMissing(
+            MlConfigIndex.indexName(), MlConfigIndex::mapping, client, state,
+            ActionListener.wrap(bool -> doUpdate.run(), listener::onFailure));
     }
 
     @Override

+ 16 - 10
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java

@@ -349,27 +349,33 @@ public class JobManager {
 
     public void updateJob(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
 
-        Runnable doUpdate = () -> {
-                jobConfigProvider.updateJobWithValidation(request.getJobId(), request.getJobUpdate(), maxModelMemoryLimit,
-                        this::validate, ActionListener.wrap(
-                                updatedJob -> postJobUpdate(request, updatedJob, actionListener),
-                                actionListener::onFailure
-                        ));
-        };
-
         ClusterState clusterState = clusterService.state();
         if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobId(), clusterState)) {
             actionListener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("update job", request.getJobId()));
             return;
         }
 
+        Runnable doUpdate = () ->
+            jobConfigProvider.updateJobWithValidation(request.getJobId(), request.getJobUpdate(), maxModelMemoryLimit,
+                this::validate, ActionListener.wrap(
+                    updatedJob -> postJobUpdate(request, updatedJob, actionListener),
+                    actionListener::onFailure
+                ));
+
+        // Obviously if we're updating a job it's impossible that the config index has no mappings at
+        // all, but if we rewrite the job config we may add new fields that require the latest mappings
+        Runnable checkMappingsAreUpToDate = () ->
+            ElasticsearchMappings.addDocMappingIfMissing(
+                MlConfigIndex.indexName(), MlConfigIndex::mapping, client, clusterState,
+                ActionListener.wrap(bool -> doUpdate.run(), actionListener::onFailure));
+
         if (request.getJobUpdate().getGroups() != null && request.getJobUpdate().getGroups().isEmpty() == false) {
 
             // check the new groups are not job Ids
             jobConfigProvider.jobIdMatches(request.getJobUpdate().getGroups(), ActionListener.wrap(
                     matchingIds -> {
                         if (matchingIds.isEmpty()) {
-                            doUpdate.run();
+                            checkMappingsAreUpToDate.run();
                         } else {
                             actionListener.onFailure(new ResourceAlreadyExistsException(
                                     Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, matchingIds.get(0))));
@@ -378,7 +384,7 @@ public class JobManager {
                     actionListener::onFailure
             ));
         } else {
-            doUpdate.run();
+            checkMappingsAreUpToDate.run();
         }
     }