Browse Source

[ML] Delete left behind docs on data frame analytics creation (#74578)

In the case a data frame analytics job gets deleted partially
(e.g. the config gets removed but not the state), creating
a job with the same id might result into problems.

This commit adds logic to delete any prior documents when
a DFA job gets created. This way any issues because of partial
deletes gets fixed automatically.
Dimitris Athanasiou 4 years ago
parent
commit
8be47c9f15

+ 12 - 11
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsCRUDIT.java

@@ -6,19 +6,11 @@
  */
 package org.elasticsearch.xpack.ml.integration;
 
-import static java.util.Collections.emptyMap;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
-
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.client.OriginSettingClient;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction;
@@ -30,6 +22,15 @@ import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfig
 import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
 import org.junit.Before;
 
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Collections.emptyMap;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
 public class DataFrameAnalyticsCRUDIT extends MlSingleNodeTestCase {
 
     private DataFrameAnalyticsConfigProvider configProvider;
@@ -61,8 +62,8 @@ public class DataFrameAnalyticsCRUDIT extends MlSingleNodeTestCase {
         AtomicReference<DataFrameAnalyticsConfig> configHolder = new AtomicReference<>();
         AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
 
-        blockingCall(
-            actionListener -> configProvider.put(config, emptyMap(), actionListener), configHolder, exceptionHolder);
+        blockingCall(actionListener -> configProvider.put(config, emptyMap(), TimeValue.timeValueSeconds(5), actionListener),
+            configHolder, exceptionHolder);
         assertThat(configHolder.get(), is(notNullValue()));
         assertThat(configHolder.get(), is(equalTo(config)));
 

+ 10 - 6
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalyticsConfigProviderIT.java

@@ -15,6 +15,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.xpack.core.ml.MlTasks;
@@ -42,6 +43,8 @@ import static org.hamcrest.core.IsInstanceOf.instanceOf;
 
 public class DataFrameAnalyticsConfigProviderIT extends MlSingleNodeTestCase {
 
+    private static final TimeValue TIMEOUT = TimeValue.timeValueSeconds(5);
+
     private DataFrameAnalyticsConfigProvider configProvider;
 
     @Before
@@ -71,7 +74,7 @@ public class DataFrameAnalyticsConfigProviderIT extends MlSingleNodeTestCase {
             AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
 
             blockingCall(
-                actionListener -> configProvider.put(config, emptyMap(), actionListener), configHolder, exceptionHolder);
+                actionListener -> configProvider.put(config, emptyMap(), TIMEOUT, actionListener), configHolder, exceptionHolder);
 
             assertThat(configHolder.get(), is(notNullValue()));
             assertThat(configHolder.get(), is(equalTo(config)));
@@ -97,7 +100,8 @@ public class DataFrameAnalyticsConfigProviderIT extends MlSingleNodeTestCase {
             AtomicReference<DataFrameAnalyticsConfig> configHolder = new AtomicReference<>();
             AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
 
-            blockingCall(actionListener -> configProvider.put(config, securityHeaders, actionListener), configHolder, exceptionHolder);
+            blockingCall(actionListener -> configProvider.put(config, securityHeaders, TIMEOUT, actionListener),
+                configHolder, exceptionHolder);
 
             assertThat(configHolder.get(), is(notNullValue()));
             assertThat(
@@ -133,7 +137,7 @@ public class DataFrameAnalyticsConfigProviderIT extends MlSingleNodeTestCase {
 
             DataFrameAnalyticsConfig initialConfig = DataFrameAnalyticsConfigTests.createRandom(configId);
             blockingCall(
-                actionListener -> configProvider.put(initialConfig, emptyMap(), actionListener), configHolder, exceptionHolder);
+                actionListener -> configProvider.put(initialConfig, emptyMap(), TIMEOUT, actionListener), configHolder, exceptionHolder);
 
             assertThat(configHolder.get(), is(notNullValue()));
             assertThat(configHolder.get(), is(equalTo(initialConfig)));
@@ -145,7 +149,7 @@ public class DataFrameAnalyticsConfigProviderIT extends MlSingleNodeTestCase {
 
             DataFrameAnalyticsConfig configWithSameId = DataFrameAnalyticsConfigTests.createRandom(configId);
             blockingCall(
-                actionListener -> configProvider.put(configWithSameId, emptyMap(), actionListener),
+                actionListener -> configProvider.put(configWithSameId, emptyMap(), TIMEOUT, actionListener),
                 configHolder,
                 exceptionHolder);
 
@@ -163,7 +167,7 @@ public class DataFrameAnalyticsConfigProviderIT extends MlSingleNodeTestCase {
             AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
 
             blockingCall(
-                actionListener -> configProvider.put(initialConfig, emptyMap(), actionListener), configHolder, exceptionHolder);
+                actionListener -> configProvider.put(initialConfig, emptyMap(), TIMEOUT, actionListener), configHolder, exceptionHolder);
 
             assertNoException(exceptionHolder);
             assertThat(configHolder.get(), is(notNullValue()));
@@ -312,7 +316,7 @@ public class DataFrameAnalyticsConfigProviderIT extends MlSingleNodeTestCase {
             AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
 
             blockingCall(
-                actionListener -> configProvider.put(initialConfig, emptyMap(), actionListener), configHolder, exceptionHolder);
+                actionListener -> configProvider.put(initialConfig, emptyMap(), TIMEOUT, actionListener), configHolder, exceptionHolder);
 
             assertThat(configHolder.get(), is(notNullValue()));
             assertThat(configHolder.get(), is(equalTo(initialConfig)));

+ 5 - 157
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java

@@ -10,13 +10,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.delete.DeleteAction;
-import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
 import org.elasticsearch.client.Client;
@@ -27,35 +21,20 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
-import org.elasticsearch.index.reindex.BulkByScrollResponse;
-import org.elasticsearch.index.reindex.DeleteByQueryAction;
-import org.elasticsearch.index.reindex.DeleteByQueryRequest;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
-import org.elasticsearch.xpack.core.ml.MlConfigIndex;
-import org.elasticsearch.xpack.core.ml.MlStatsIndex;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
-import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
-import org.elasticsearch.xpack.core.ml.dataframe.stats.Fields;
-import org.elasticsearch.xpack.core.ml.job.messages.Messages;
-import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
-import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
 import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
+import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsDeleter;
 import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
-import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
 
 import java.util.Objects;
 
@@ -158,146 +137,15 @@ public class TransportDeleteDataFrameAnalyticsAction
         // We clean up the memory tracker on delete because there is no stop; the task stops by itself
         memoryTracker.removeDataFrameAnalyticsJob(id);
 
-        // Step 4. Delete the config
-        ActionListener<BulkByScrollResponse> deleteStatsHandler = ActionListener.wrap(
-            bulkByScrollResponse -> {
-                if (bulkByScrollResponse.isTimedOut()) {
-                    logger.warn("[{}] DeleteByQuery for stats timed out", id);
-                }
-                if (bulkByScrollResponse.getBulkFailures().isEmpty() == false) {
-                    logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery for stats", id,
-                        bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts());
-                    for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) {
-                        logger.warn("[{}] DBQ failure: {}", id, failure);
-                    }
-                }
-                deleteConfig(parentTaskClient, id, listener);
-            },
-            failure -> {
-                logger.warn(new ParameterizedMessage("[{}] failed to remove stats", id), ExceptionsHelper.unwrapCause(failure));
-                deleteConfig(parentTaskClient, id, listener);
-            }
-        );
-
-        // Step 3. Delete job docs from stats index
-        ActionListener<BulkByScrollResponse> deleteStateHandler = ActionListener.wrap(
-            bulkByScrollResponse -> {
-                if (bulkByScrollResponse.isTimedOut()) {
-                    logger.warn("[{}] DeleteByQuery for state timed out", id);
-                }
-                if (bulkByScrollResponse.getBulkFailures().isEmpty() == false) {
-                    logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery for state", id,
-                        bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts());
-                    for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) {
-                        logger.warn("[{}] DBQ failure: {}", id, failure);
-                    }
-                }
-                deleteStats(parentTaskClient, id, request.timeout(), deleteStatsHandler);
-            },
-            listener::onFailure
-        );
-
-        // Step 2. Delete state
-        ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap(
-            config -> deleteState(parentTaskClient, config, request.timeout(), deleteStateHandler),
-            listener::onFailure
-        );
-
-        // Step 1. Get the config to check if it exists
-        configProvider.get(id, configListener);
-    }
-
-    private void deleteConfig(ParentTaskAssigningClient parentTaskClient, String id, ActionListener<AcknowledgedResponse> listener) {
-        DeleteRequest deleteRequest = new DeleteRequest(MlConfigIndex.indexName());
-        deleteRequest.id(DataFrameAnalyticsConfig.documentId(id));
-        deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteAction.INSTANCE, deleteRequest, ActionListener.wrap(
-            deleteResponse -> {
-                if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
-                    listener.onFailure(ExceptionsHelper.missingDataFrameAnalytics(id));
-                    return;
-                }
-                assert deleteResponse.getResult() == DocWriteResponse.Result.DELETED;
-                logger.info("[{}] Deleted", id);
-                auditor.info(id, Messages.DATA_FRAME_ANALYTICS_AUDIT_DELETED);
-                listener.onResponse(AcknowledgedResponse.TRUE);
+        configProvider.get(id, ActionListener.wrap(
+            config -> {
+                DataFrameAnalyticsDeleter deleter = new DataFrameAnalyticsDeleter(parentTaskClient, auditor);
+                deleter.deleteAllDocuments(config, request.timeout(), listener);
             },
             listener::onFailure
         ));
     }
 
-    private void deleteState(ParentTaskAssigningClient parentTaskClient,
-                             DataFrameAnalyticsConfig config,
-                             TimeValue timeout,
-                             ActionListener<BulkByScrollResponse> listener) {
-        ActionListener<Boolean> deleteModelStateListener = ActionListener.wrap(
-            r -> executeDeleteByQuery(
-                    parentTaskClient,
-                    AnomalyDetectorsIndex.jobStateIndexPattern(),
-                    QueryBuilders.idsQuery().addIds(StoredProgress.documentId(config.getId())),
-                    timeout,
-                    listener
-                )
-            , listener::onFailure
-        );
-
-        deleteModelState(parentTaskClient, config, timeout, 1, deleteModelStateListener);
-    }
-
-    private void deleteModelState(ParentTaskAssigningClient parentTaskClient,
-                                  DataFrameAnalyticsConfig config,
-                                  TimeValue timeout,
-                                  int docNum,
-                                  ActionListener<Boolean> listener) {
-        if (config.getAnalysis().persistsState() == false) {
-            listener.onResponse(true);
-            return;
-        }
-
-        IdsQueryBuilder query = QueryBuilders.idsQuery().addIds(config.getAnalysis().getStateDocIdPrefix(config.getId()) + docNum);
-        executeDeleteByQuery(
-            parentTaskClient,
-            AnomalyDetectorsIndex.jobStateIndexPattern(),
-            query,
-            timeout,
-            ActionListener.wrap(
-                response -> {
-                    if (response.getDeleted() > 0) {
-                        deleteModelState(parentTaskClient, config, timeout, docNum + 1, listener);
-                        return;
-                    }
-                    listener.onResponse(true);
-                },
-                listener::onFailure
-            )
-        );
-    }
-
-    private void deleteStats(ParentTaskAssigningClient parentTaskClient,
-                             String jobId,
-                             TimeValue timeout,
-                             ActionListener<BulkByScrollResponse> listener) {
-        executeDeleteByQuery(
-            parentTaskClient,
-            MlStatsIndex.indexPattern(),
-            QueryBuilders.termQuery(Fields.JOB_ID.getPreferredName(), jobId),
-            timeout,
-            listener
-        );
-    }
-
-    private void executeDeleteByQuery(ParentTaskAssigningClient parentTaskClient, String index, QueryBuilder query, TimeValue timeout,
-                                      ActionListener<BulkByScrollResponse> listener) {
-        DeleteByQueryRequest request = new DeleteByQueryRequest(index);
-        request.setQuery(query);
-        request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
-        request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
-        request.setAbortOnVersionConflict(false);
-        request.setRefresh(true);
-        request.setTimeout(timeout);
-        executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, listener);
-    }
-
     @Override
     protected ClusterBlockException checkBlock(DeleteDataFrameAnalyticsAction.Request request, ClusterState state) {
         return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);

+ 3 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java

@@ -22,9 +22,9 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.license.License;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.XPackLicenseState;
@@ -204,7 +204,7 @@ public class TransportPutDataFrameAnalyticsAction
         ClusterState clusterState = clusterService.state();
         if (clusterState == null) {
             logger.warn("Cannot update doc mapping because clusterState == null");
-            configProvider.put(config, headers, listener);
+            configProvider.put(config, headers, masterNodeTimeout, listener);
             return;
         }
         ElasticsearchMappings.addDocMappingIfMissing(
@@ -214,7 +214,7 @@ public class TransportPutDataFrameAnalyticsAction
             clusterState,
             masterNodeTimeout,
             ActionListener.wrap(
-                unused -> configProvider.put(config, headers, ActionListener.wrap(
+                unused -> configProvider.put(config, headers, masterNodeTimeout, ActionListener.wrap(
                     indexResponse -> {
                         auditor.info(
                             config.getId(),

+ 64 - 9
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsConfigProvider.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.ml.dataframe.persistence;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.get.GetAction;
@@ -20,9 +21,9 @@ import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -31,11 +32,15 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
 import org.elasticsearch.xpack.core.action.util.PageParams;
 import org.elasticsearch.xpack.core.ml.MlConfigIndex;
 import org.elasticsearch.xpack.core.ml.MlTasks;
@@ -84,14 +89,64 @@ public class DataFrameAnalyticsConfigProvider {
     /**
      * Puts the given {@link DataFrameAnalyticsConfig} document into the config index.
      */
-    public void put(DataFrameAnalyticsConfig config, Map<String, String> headers, ActionListener<DataFrameAnalyticsConfig> listener) {
-        if (headers.isEmpty() == false) {
-            // Filter any values in headers that aren't security fields
-            config = new DataFrameAnalyticsConfig.Builder(config)
-                .setHeaders(filterSecurityHeaders(headers))
-                .build();
-        }
-        index(config, null, listener);
+    public void put(final DataFrameAnalyticsConfig config, Map<String, String> headers, TimeValue timeout,
+                    ActionListener<DataFrameAnalyticsConfig> listener) {
+
+        ActionListener<AcknowledgedResponse> deleteLeftOverDocsListener = ActionListener.wrap(
+            r -> index(prepareConfigForIndex(config, headers), null, listener),
+            listener::onFailure
+        );
+
+        ActionListener<Boolean> existsListener = ActionListener.wrap(
+            exists -> {
+                if (exists) {
+                    listener.onFailure(ExceptionsHelper.dataFrameAnalyticsAlreadyExists(config.getId()));
+                } else {
+                    deleteLeftOverDocs(config, timeout, deleteLeftOverDocsListener);
+                }
+            },
+            listener::onFailure
+        );
+
+        exists(config.getId(), existsListener);
+    }
+
+    private DataFrameAnalyticsConfig prepareConfigForIndex(DataFrameAnalyticsConfig config, Map<String, String> headers) {
+        return headers.isEmpty() ? config : new DataFrameAnalyticsConfig.Builder(config)
+            .setHeaders(filterSecurityHeaders(headers))
+            .build();
+    }
+
+    private void exists(String jobId, ActionListener<Boolean> listener) {
+        ActionListener<GetResponse> getListener = ActionListener.wrap(
+            getResponse -> listener.onResponse(getResponse.isExists()),
+            e -> {
+                if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
+                    listener.onResponse(false);
+                } else {
+                    listener.onFailure(e);
+                }
+            }
+        );
+
+        GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), DataFrameAnalyticsConfig.documentId(jobId));
+        getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
+        executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, getListener);
+    }
+
+    private void deleteLeftOverDocs(DataFrameAnalyticsConfig config, TimeValue timeout, ActionListener<AcknowledgedResponse> listener) {
+        DataFrameAnalyticsDeleter deleter = new DataFrameAnalyticsDeleter(client, auditor);
+        deleter.deleteAllDocuments(config, timeout, ActionListener.wrap(
+            r -> listener.onResponse(r),
+            e -> {
+                if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
+                    // This is expected
+                    listener.onResponse(AcknowledgedResponse.TRUE);
+                } else {
+                    listener.onFailure(ExceptionsHelper.serverError("error deleting prior documents", e));
+                }
+            }
+        ));
     }
 
     /**

+ 188 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/persistence/DataFrameAnalyticsDeleter.java

@@ -0,0 +1,188 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.ml.dataframe.persistence;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.delete.DeleteAction;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.query.IdsQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.index.reindex.DeleteByQueryAction;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.xpack.core.ml.MlConfigIndex;
+import org.elasticsearch.xpack.core.ml.MlStatsIndex;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
+import org.elasticsearch.xpack.core.ml.dataframe.stats.Fields;
+import org.elasticsearch.xpack.core.ml.job.messages.Messages;
+import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
+import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
+import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
+
+import java.util.Objects;
+
+import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
+import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
+
+public class DataFrameAnalyticsDeleter {
+
+    private static final Logger logger = LogManager.getLogger(DataFrameAnalyticsDeleter.class);
+
+    private final Client client;
+    private final DataFrameAnalyticsAuditor auditor;
+
+    public DataFrameAnalyticsDeleter(Client client, DataFrameAnalyticsAuditor auditor) {
+        this.client = Objects.requireNonNull(client);
+        this.auditor = Objects.requireNonNull(auditor);
+    }
+
+    public void deleteAllDocuments(DataFrameAnalyticsConfig config, TimeValue timeout, ActionListener<AcknowledgedResponse> listener) {
+        final String id = config.getId();
+
+        // Step 3. Delete the config
+        ActionListener<BulkByScrollResponse> deleteStatsHandler = ActionListener.wrap(
+            bulkByScrollResponse -> {
+                if (bulkByScrollResponse.isTimedOut()) {
+                    logger.warn("[{}] DeleteByQuery for stats timed out", id);
+                }
+                if (bulkByScrollResponse.getBulkFailures().isEmpty() == false) {
+                    logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery for stats", id,
+                        bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts());
+                    for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) {
+                        logger.warn("[{}] DBQ failure: {}", id, failure);
+                    }
+                }
+                deleteConfig(id, listener);
+            },
+            failure -> {
+                logger.warn(new ParameterizedMessage("[{}] failed to remove stats", id), ExceptionsHelper.unwrapCause(failure));
+                deleteConfig(id, listener);
+            }
+        );
+
+        // Step 2. Delete job docs from stats index
+        ActionListener<BulkByScrollResponse> deleteStateHandler = ActionListener.wrap(
+            bulkByScrollResponse -> {
+                if (bulkByScrollResponse.isTimedOut()) {
+                    logger.warn("[{}] DeleteByQuery for state timed out", id);
+                }
+                if (bulkByScrollResponse.getBulkFailures().isEmpty() == false) {
+                    logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery for state", id,
+                        bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts());
+                    for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) {
+                        logger.warn("[{}] DBQ failure: {}", id, failure);
+                    }
+                }
+                deleteStats(id, timeout, deleteStatsHandler);
+            },
+            listener::onFailure
+        );
+
+        // Step 1. Delete state
+        deleteState(config, timeout, deleteStateHandler);
+    }
+
+    private void deleteConfig(String id, ActionListener<AcknowledgedResponse> listener) {
+        DeleteRequest deleteRequest = new DeleteRequest(MlConfigIndex.indexName());
+        deleteRequest.id(DataFrameAnalyticsConfig.documentId(id));
+        deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+        executeAsyncWithOrigin(client, ML_ORIGIN, DeleteAction.INSTANCE, deleteRequest, ActionListener.wrap(
+            deleteResponse -> {
+                if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
+                    listener.onFailure(ExceptionsHelper.missingDataFrameAnalytics(id));
+                    return;
+                }
+                assert deleteResponse.getResult() == DocWriteResponse.Result.DELETED;
+                logger.info("[{}] Deleted", id);
+                auditor.info(id, Messages.DATA_FRAME_ANALYTICS_AUDIT_DELETED);
+                listener.onResponse(AcknowledgedResponse.TRUE);
+            },
+            e -> {
+                if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
+                    listener.onFailure(ExceptionsHelper.missingDataFrameAnalytics(id));
+                } else {
+                    listener.onFailure(e);
+                }
+            }
+        ));
+    }
+
+    private void deleteState(DataFrameAnalyticsConfig config, TimeValue timeout, ActionListener<BulkByScrollResponse> listener) {
+        ActionListener<Boolean> deleteModelStateListener = ActionListener.wrap(
+            r -> executeDeleteByQuery(
+                AnomalyDetectorsIndex.jobStateIndexPattern(),
+                QueryBuilders.idsQuery().addIds(StoredProgress.documentId(config.getId())),
+                timeout,
+                listener
+            )
+            , listener::onFailure
+        );
+
+        deleteModelState(config, timeout, 1, deleteModelStateListener);
+    }
+
+    private void deleteModelState(DataFrameAnalyticsConfig config, TimeValue timeout, int docNum, ActionListener<Boolean> listener) {
+        if (config.getAnalysis().persistsState() == false) {
+            listener.onResponse(true);
+            return;
+        }
+
+        IdsQueryBuilder query = QueryBuilders.idsQuery().addIds(config.getAnalysis().getStateDocIdPrefix(config.getId()) + docNum);
+        executeDeleteByQuery(
+            AnomalyDetectorsIndex.jobStateIndexPattern(),
+            query,
+            timeout,
+            ActionListener.wrap(
+                response -> {
+                    if (response.getDeleted() > 0) {
+                        deleteModelState(config, timeout, docNum + 1, listener);
+                        return;
+                    }
+                    listener.onResponse(true);
+                },
+                listener::onFailure
+            )
+        );
+    }
+
+    private void deleteStats(String jobId, TimeValue timeout, ActionListener<BulkByScrollResponse> listener) {
+        executeDeleteByQuery(
+            MlStatsIndex.indexPattern(),
+            QueryBuilders.termQuery(Fields.JOB_ID.getPreferredName(), jobId),
+            timeout,
+            listener
+        );
+    }
+
+    private void executeDeleteByQuery(String index, QueryBuilder query, TimeValue timeout,
+                                      ActionListener<BulkByScrollResponse> listener) {
+        DeleteByQueryRequest request = new DeleteByQueryRequest(index);
+        request.setQuery(query);
+        request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()));
+        request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
+        request.setAbortOnVersionConflict(false);
+        request.setRefresh(true);
+        request.setTimeout(timeout);
+        executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, listener);
+    }
+}