浏览代码

[ML] Check dest index is empty when starting DF analytics (#45094)

If one tries to start a DF analytics job that has already run,
the result will be that the task will fail after reindexing the
dest index from the source index. The results of the prior run
will be gone and the task state is not properly set to failed
with the failure reason.

This commit improves the behavior in this scenario. First, we
set the task state to `failed` in a set of failures that were
missed. Second, a validation is added that if the destination
index exists, it must be empty.
Dimitris Athanasiou 6 年之前
父节点
当前提交
530ccc055d

+ 1 - 0
x-pack/plugin/ml/qa/ml-with-security/build.gradle

@@ -142,6 +142,7 @@ integTest.runner  {
     'ml/start_data_frame_analytics/Test start given missing source index',
     'ml/start_data_frame_analytics/Test start given source index has no compatible fields',
     'ml/start_data_frame_analytics/Test start with inconsistent body/param ids',
+    'ml/start_data_frame_analytics/Test start given dest index is not empty',
     'ml/start_stop_datafeed/Test start datafeed job, but not open',
     'ml/start_stop_datafeed/Test start non existing datafeed',
     'ml/start_stop_datafeed/Test stop non existing datafeed',

+ 50 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

@@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.action;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceAlreadyExistsException;
@@ -14,6 +15,8 @@ import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
 import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
+import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -31,6 +34,7 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.persistent.AllocatedPersistentTask;
@@ -44,6 +48,7 @@ import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.XPackField;
 import org.elasticsearch.xpack.core.ml.MlMetadata;
 import org.elasticsearch.xpack.core.ml.MlTasks;
@@ -178,26 +183,57 @@ public class TransportStartDataFrameAnalyticsAction
     }
 
     private void getConfigAndValidate(String id, ActionListener<DataFrameAnalyticsConfig> finalListener) {
-        // Validate mappings can be merged
-        ActionListener<DataFrameAnalyticsConfig> firstValidationListener = ActionListener.wrap(
+        // Step 4. Validate mappings can be merged
+        ActionListener<DataFrameAnalyticsConfig> toValidateMappingsListener = ActionListener.wrap(
             config -> MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource().getIndex(), ActionListener.wrap(
                     mappings -> finalListener.onResponse(config), finalListener::onFailure)),
             finalListener::onFailure
         );
 
-        // Validate source and dest; check data extraction is possible
+        // Step 3. Validate dest index is empty
+        ActionListener<DataFrameAnalyticsConfig> toValidateDestEmptyListener = ActionListener.wrap(
+            config -> checkDestIndexIsEmptyIfExists(config, toValidateMappingsListener),
+            finalListener::onFailure
+        );
+
+        // Step 2. Validate source and dest; check data extraction is possible
         ActionListener<DataFrameAnalyticsConfig> getConfigListener = ActionListener.wrap(
             config -> {
                 new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(config);
-                DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, config, firstValidationListener);
+                DataFrameDataExtractorFactory.validateConfigAndSourceIndex(client, config, toValidateDestEmptyListener);
             },
             finalListener::onFailure
         );
 
-        // First, get the config
+        // Step 1. Get the config
         configProvider.get(id, getConfigListener);
     }
 
+    private void checkDestIndexIsEmptyIfExists(DataFrameAnalyticsConfig config, ActionListener<DataFrameAnalyticsConfig> listener) {
+        String destIndex = config.getDest().getIndex();
+        SearchRequest destEmptySearch = new SearchRequest(destIndex);
+        destEmptySearch.source().size(0);
+        destEmptySearch.allowPartialSearchResults(false);
+        ClientHelper.executeWithHeadersAsync(config.getHeaders(), ClientHelper.ML_ORIGIN, client, SearchAction.INSTANCE,
+            destEmptySearch, ActionListener.wrap(
+                searchResponse -> {
+                    if (searchResponse.getHits().getTotalHits().value > 0) {
+                        listener.onFailure(ExceptionsHelper.badRequestException("dest index [{}] must be empty", destIndex));
+                    } else {
+                        listener.onResponse(config);
+                    }
+                },
+                e -> {
+                    if (e instanceof IndexNotFoundException) {
+                        listener.onResponse(config);
+                    } else {
+                        listener.onFailure(e);
+                    }
+                }
+            )
+        );
+    }
+
     private void waitForAnalyticsStarted(PersistentTasksCustomMetaData.PersistentTask<StartDataFrameAnalyticsAction.TaskParams> task,
                                          TimeValue timeout, ActionListener<AcknowledgedResponse> listener) {
         AnalyticsPredicate predicate = new AnalyticsPredicate();
@@ -373,6 +409,15 @@ public class TransportStartDataFrameAnalyticsAction
                 LOGGER.debug("[{}] Reindex task was successfully cancelled", taskParams.getId());
             }
         }
+
+        public void updateState(DataFrameAnalyticsState state, @Nullable String reason) {
+            DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, getAllocationId(), reason);
+            updatePersistentTaskState(newTaskState, ActionListener.wrap(
+                updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", getParams().getId(), state),
+                e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}]",
+                    getParams().getId(), state), e)
+            ));
+        }
     }
 
     static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, String... indexNames) {

+ 6 - 6
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

@@ -63,7 +63,7 @@ public class DataFrameAnalyticsManager {
     public void execute(DataFrameAnalyticsTask task, DataFrameAnalyticsState currentState) {
         ActionListener<DataFrameAnalyticsConfig> reindexingStateListener = ActionListener.wrap(
             config -> reindexDataframeAndStartAnalysis(task, config),
-            task::markAsFailed
+            error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
         );
 
         // With config in hand, determine action to take
@@ -129,7 +129,7 @@ public class DataFrameAnalyticsManager {
                 task.setReindexingTaskId(null);
                 startAnalytics(task, config, false);
             },
-            task::markAsFailed
+            error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
         );
 
         // Refresh to ensure copied index is fully searchable
@@ -140,7 +140,7 @@ public class DataFrameAnalyticsManager {
                     RefreshAction.INSTANCE,
                     new RefreshRequest(config.getDest().getIndex()),
                     refreshListener),
-            task::markAsFailed
+            error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
         );
 
         // Reindex
@@ -196,15 +196,15 @@ public class DataFrameAnalyticsManager {
                     updatedTask -> processManager.runJob(task, config, dataExtractorFactory,
                         error -> {
                             if (error != null) {
-                                task.markAsFailed(error);
+                                task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());
                             } else {
                                 task.markAsCompleted();
                             }
                         }),
-                    task::markAsFailed
+                    error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
                 ));
             },
-            task::markAsFailed
+            error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
         );
 
         // TODO This could fail with errors. In that case we get stuck with the copied index.

+ 1 - 11
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

@@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.dataframe.process;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.client.Client;
@@ -17,7 +16,6 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
-import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.action.TransportStartDataFrameAnalyticsAction.DataFrameAnalyticsTask;
@@ -114,7 +112,7 @@ public class AnalyticsProcessManager {
                 finishHandler.accept(null);
             } else {
                 LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason());
-                updateTaskState(task, DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
+                task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
             }
         }
     }
@@ -176,14 +174,6 @@ public class AnalyticsProcessManager {
         };
     }
 
-    private void updateTaskState(DataFrameAnalyticsTask task, DataFrameAnalyticsState state, @Nullable String reason) {
-        DataFrameAnalyticsTaskState newTaskState = new DataFrameAnalyticsTaskState(state, task.getAllocationId(), reason);
-        task.updatePersistentTaskState(newTaskState, ActionListener.wrap(
-            updatedTask -> LOGGER.info("[{}] Successfully update task state to [{}]", task.getParams().getId(), state),
-            e -> LOGGER.error(new ParameterizedMessage("[{}] Could not update task state to [{}]", task.getParams().getId(), state), e)
-        ));
-    }
-
     @Nullable
     public Integer getProgressPercent(long allocationId) {
         ProcessContext processContext = processContextByAllocation.get(allocationId);

+ 40 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/ml/start_data_frame_analytics.yml

@@ -72,3 +72,43 @@
           {
             "id": "body_id"
           }
+
+---
+"Test start given dest index is not empty":
+
+  - do:
+      index:
+        index: non-empty-source
+        refresh: ""
+        body:  >
+          {
+            "numeric": 42.0
+          }
+
+  - do:
+      index:
+        index: non-empty-dest
+        refresh: ""
+        body:  >
+          {
+            "numeric": 42.0
+          }
+
+  - do:
+      ml.put_data_frame_analytics:
+        id: "start_given_empty_dest_index"
+        body: >
+          {
+            "source": {
+              "index": "non-empty-source"
+            },
+            "dest": {
+              "index": "non-empty-dest"
+            },
+            "analysis": {"outlier_detection":{}}
+          }
+
+  - do:
+      catch: /dest index \[non-empty-dest\] must be empty/
+      ml.start_data_frame_analytics:
+        id: "start_given_empty_dest_index"