Browse Source

[ML] Make job data deletion reusable (#73602)

This commit moves the logic for deleting job data
from the `TransportDeleteJobAction` into the `JobDataDeleter`
class in order to allow reusing it. In particular, it will
be necessary for the implementation of the reset API.
Dimitris Athanasiou 4 years ago
parent
commit
443e09ffdd

+ 2 - 318
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

@@ -6,24 +6,12 @@
  */
 package org.elasticsearch.xpack.ml.action;
 
-import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.search.MultiSearchAction;
-import org.elasticsearch.action.search.MultiSearchRequest;
-import org.elasticsearch.action.search.MultiSearchResponse;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
 import org.elasticsearch.client.Client;
@@ -31,43 +19,24 @@ import org.elasticsearch.client.ParentTaskAssigningClient;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
-import org.elasticsearch.cluster.metadata.AliasMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.index.IndexNotFoundException;
-import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.TermQueryBuilder;
-import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
-import org.elasticsearch.index.reindex.BulkByScrollResponse;
-import org.elasticsearch.index.reindex.DeleteByQueryAction;
-import org.elasticsearch.index.reindex.DeleteByQueryRequest;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.persistent.PersistentTasksService;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
-import org.elasticsearch.xpack.core.action.util.PageParams;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
-import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
 import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
-import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
-import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
-import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
-import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
-import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
-import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
 import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
@@ -76,17 +45,12 @@ import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
-import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 
 import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
@@ -95,8 +59,6 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
 
     private static final Logger logger = LogManager.getLogger(TransportDeleteJobAction.class);
 
-    private static final int MAX_SNAPSHOTS_TO_DELETE = 10000;
-
     private final Client client;
     private final PersistentTasksService persistentTasksService;
     private final AnomalyDetectionAuditor auditor;
@@ -266,286 +228,8 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
 
 
         // Step 1. Delete the physical storage
-        deleteJobDocuments(parentTaskClient, jobId, removeFromCalendarsHandler, listener::onFailure);
-    }
-
-    private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, String jobId,
-                                    CheckedConsumer<Boolean, Exception> finishedHandler, Consumer<Exception> failureHandler) {
-
-        AtomicReference<String[]> indexNames = new AtomicReference<>();
-
-        final ActionListener<AcknowledgedResponse> completionHandler = ActionListener.wrap(
-                response -> finishedHandler.accept(response.isAcknowledged()),
-                failureHandler);
-
-        // Step 9. If we did not drop the indices and after DBQ state done, we delete the aliases
-        ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(
-                bulkByScrollResponse -> {
-                    if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume indices were deleted
-                        completionHandler.onResponse(AcknowledgedResponse.TRUE);
-                    } else {
-                        if (bulkByScrollResponse.isTimedOut()) {
-                            logger.warn("[{}] DeleteByQuery for indices [{}] timed out.", jobId, String.join(", ", indexNames.get()));
-                        }
-                        if (bulkByScrollResponse.getBulkFailures().isEmpty() == false) {
-                            logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}].",
-                                    jobId, bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts(),
-                                    String.join(", ", indexNames.get()));
-                            for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) {
-                                logger.warn("DBQ failure: " + failure);
-                            }
-                        }
-                        deleteAliases(parentTaskClient, jobId, completionHandler);
-                    }
-                },
-                failureHandler);
-
-        // Step 8. If we did not delete the indices, we run a delete by query
-        ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap(
-                response -> {
-                    if (response && indexNames.get().length > 0) {
-                        logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indexNames.get()));
-                        ConstantScoreQueryBuilder query =
-                            new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
-                        DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames.get())
-                            .setQuery(query)
-                            .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))
-                            .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
-                            .setAbortOnVersionConflict(false)
-                            .setRefresh(true);
-
-                        executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler);
-                    } else { // We did not execute DBQ, no need to delete aliases or check the response
-                        dbqHandler.onResponse(null);
-                    }
-                },
-                failureHandler);
-
-        // Step 7. Handle each multi-search response. There should be one response for each underlying index.
-        // For each underlying index that contains results ONLY for the current job, we will delete that index.
-        // If there exists at least 1 index that has another job's results, we will run DBQ.
-        ActionListener<MultiSearchResponse> customIndexSearchHandler = ActionListener.wrap(
-           multiSearchResponse -> {
-               if (multiSearchResponse == null) {
-                   deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion
-                   return;
-               }
-               String defaultSharedIndex = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX +
-                   AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
-               List<String> indicesToDelete = new ArrayList<>();
-               boolean needToRunDBQTemp = false;
-               assert multiSearchResponse.getResponses().length == indexNames.get().length;
-               int i = 0;
-               for (MultiSearchResponse.Item item : multiSearchResponse.getResponses()) {
-                   if (item.isFailure()) {
-                       ++i;
-                       if (ExceptionsHelper.unwrapCause(item.getFailure()) instanceof IndexNotFoundException) {
-                           // index is already deleted, no need to take action against it
-                           continue;
-                       } else {
-                           failureHandler.accept(item.getFailure());
-                           return;
-                       }
-                   }
-                   SearchResponse searchResponse = item.getResponse();
-                   if (searchResponse.getHits().getTotalHits().value > 0 || indexNames.get()[i].equals(defaultSharedIndex)) {
-                       needToRunDBQTemp = true;
-                   } else {
-                       indicesToDelete.add(indexNames.get()[i]);
-                   }
-                   ++i;
-               }
-               final boolean needToRunDBQ = needToRunDBQTemp;
-               if (indicesToDelete.isEmpty()) {
-                   deleteByQueryExecutor.onResponse(needToRunDBQ);
-                   return;
-               }
-               logger.info("[{}] deleting the following indices directly {}", jobId, indicesToDelete);
-               DeleteIndexRequest request = new DeleteIndexRequest(indicesToDelete.toArray(String[]::new));
-               request.indicesOptions(IndicesOptions.lenientExpandOpenHidden());
-               executeAsyncWithOrigin(
-                   parentTaskClient.threadPool().getThreadContext(),
-                   ML_ORIGIN,
-                   request,
-                   ActionListener.<AcknowledgedResponse>wrap(
-                       response -> deleteByQueryExecutor.onResponse(needToRunDBQ), // only run DBQ if there is a shared index
-                       failureHandler),
-                   parentTaskClient.admin().indices()::delete);
-           },
-           failure -> {
-               if (ExceptionsHelper.unwrapCause(failure) instanceof IndexNotFoundException) { // assume the index is already deleted
-                   deleteByQueryExecutor.onResponse(false); // skip DBQ && Alias
-               } else {
-                   failureHandler.accept(failure);
-               }
-           }
-        );
-
-        // Step 6. If we successfully find a job, gather information about its result indices.
-        // This will execute a multi-search action for every concrete index behind the job results alias.
-        // If there are no concrete indices, take no action and go to the next step.
-        ActionListener<Job.Builder> getJobHandler = ActionListener.wrap(
-            builder -> {
-                indexNames.set(indexNameExpressionResolver.concreteIndexNames(clusterService.state(),
-                    IndicesOptions.lenientExpandOpen(), AnomalyDetectorsIndex.jobResultsAliasedName(jobId)));
-                if (indexNames.get().length == 0) {
-                    // don't bother searching the index any further - it's already been closed or deleted
-                    customIndexSearchHandler.onResponse(null);
-                    return;
-                }
-                MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
-                // It is important that the requests are in the same order as the index names.
-                // This is because responses are ordered according to their requests.
-                for (String indexName : indexNames.get()) {
-                    SearchSourceBuilder source = new SearchSourceBuilder()
-                        .size(0)
-                        // if we have just one hit we cannot delete the index
-                        .trackTotalHitsUpTo(1)
-                        .query(QueryBuilders.boolQuery().filter(
-                            QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))));
-                    multiSearchRequest.add(new SearchRequest(indexName).source(source));
-                }
-                executeAsyncWithOrigin(parentTaskClient,
-                    ML_ORIGIN,
-                    MultiSearchAction.INSTANCE,
-                    multiSearchRequest,
-                    customIndexSearchHandler);
-            },
-            failureHandler
-        );
-
-        // Step 5. Get the job as the initial result index name is required
-        ActionListener<Boolean> deleteAnnotationsHandler = ActionListener.wrap(
-                response -> jobConfigProvider.getJob(jobId, getJobHandler),
-                failureHandler);
-
-        // Step 4. Delete annotations associated with the job
-        ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
-                response -> deleteAnnotations(parentTaskClient, jobId, deleteAnnotationsHandler),
-                failureHandler);
-
-        // Step 3. Delete quantiles done, delete the categorizer state
-        ActionListener<Boolean> deleteQuantilesHandler = ActionListener.wrap(
-                response -> deleteCategorizerState(parentTaskClient, jobId, 1, deleteCategorizerStateHandler),
-                failureHandler);
-
-        // Step 2. Delete state done, delete the quantiles
-        ActionListener<BulkByScrollResponse> deleteStateHandler = ActionListener.wrap(
-                bulkResponse -> deleteQuantiles(parentTaskClient, jobId, deleteQuantilesHandler),
-                failureHandler);
-
-        // Step 1. Delete the model state
-        deleteModelState(parentTaskClient, jobId, deleteStateHandler);
-    }
-
-    private void deleteQuantiles(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<Boolean> finishedHandler) {
-        // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
-        IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId));
-        DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
-            .setQuery(query)
-            .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
-            .setAbortOnVersionConflict(false)
-            .setRefresh(true);
-
-        executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
-                response -> finishedHandler.onResponse(true),
-                ignoreIndexNotFoundException(finishedHandler)));
-    }
-
-    private void deleteModelState(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<BulkByScrollResponse> listener) {
-        GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null);
-        request.setPageParams(new PageParams(0, MAX_SNAPSHOTS_TO_DELETE));
-        executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap(
-                response -> {
-                    List<ModelSnapshot> deleteCandidates = response.getPage().results();
-                    JobDataDeleter deleter = new JobDataDeleter(parentTaskClient, jobId);
-                    deleter.deleteModelSnapshots(deleteCandidates, listener);
-                },
-                listener::onFailure));
-    }
-
-    private void deleteCategorizerState(ParentTaskAssigningClient parentTaskClient, String jobId, int docNum,
-                                        ActionListener<Boolean> finishedHandler) {
-        // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
-        IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum));
-        DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
-            .setQuery(query)
-            .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
-            .setAbortOnVersionConflict(false)
-            .setRefresh(true);
-
-        executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
-                response -> {
-                    // If we successfully deleted a document try the next one; if not we're done
-                    if (response.getDeleted() > 0) {
-                        // There's an assumption here that there won't be very many categorizer
-                        // state documents, so the recursion won't go more than, say, 5 levels deep
-                        deleteCategorizerState(parentTaskClient, jobId, docNum + 1, finishedHandler);
-                        return;
-                    }
-                    finishedHandler.onResponse(true);
-                },
-                ignoreIndexNotFoundException(finishedHandler)));
-    }
-
-    private void deleteAnnotations(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<Boolean> finishedHandler) {
-        JobDataDeleter deleter = new JobDataDeleter(parentTaskClient, jobId);
-        deleter.deleteAllAnnotations(
-            ActionListener.wrap(r -> finishedHandler.onResponse(true), ignoreIndexNotFoundException(finishedHandler)));
-    }
-
-    private static Consumer<Exception> ignoreIndexNotFoundException(ActionListener<Boolean> finishedHandler) {
-        return e -> {
-            // It's not a problem for us if the index wasn't found - it's equivalent to document not found
-            if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
-                finishedHandler.onResponse(true);
-            } else {
-                finishedHandler.onFailure(e);
-            }
-        };
-    }
-
-    private void deleteAliases(ParentTaskAssigningClient parentTaskClient, String jobId,
-                               ActionListener<AcknowledgedResponse> finishedHandler) {
-        final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
-        final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
-
-        // first find the concrete indices associated with the aliases
-        GetAliasesRequest aliasesRequest = new GetAliasesRequest().aliases(readAliasName, writeAliasName)
-                .indicesOptions(IndicesOptions.lenientExpandOpenHidden());
-        executeAsyncWithOrigin(parentTaskClient.threadPool().getThreadContext(), ML_ORIGIN, aliasesRequest,
-                ActionListener.<GetAliasesResponse>wrap(
-                        getAliasesResponse -> {
-                            // remove the aliases from the concrete indices found in the first step
-                            IndicesAliasesRequest removeRequest = buildRemoveAliasesRequest(getAliasesResponse);
-                            if (removeRequest == null) {
-                                // don't error if the job's aliases have already been deleted - carry on and delete the
-                                // rest of the job's data
-                                finishedHandler.onResponse(AcknowledgedResponse.TRUE);
-                                return;
-                            }
-                            executeAsyncWithOrigin(parentTaskClient.threadPool().getThreadContext(), ML_ORIGIN, removeRequest,
-                                    finishedHandler,
-                                    parentTaskClient.admin().indices()::aliases);
-                        },
-                        finishedHandler::onFailure), parentTaskClient.admin().indices()::getAliases);
-    }
-
-    private IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesResponse getAliasesResponse) {
-        Set<String> aliases = new HashSet<>();
-        List<String> indices = new ArrayList<>();
-        for (ObjectObjectCursor<String, List<AliasMetadata>> entry : getAliasesResponse.getAliases()) {
-            // The response includes _all_ indices, but only those associated with
-            // the aliases we asked about will have associated AliasMetadata
-            if (entry.value.isEmpty() == false) {
-                indices.add(entry.key);
-                entry.value.forEach(metadata -> aliases.add(metadata.getAlias()));
-            }
-        }
-        return aliases.isEmpty() ? null : new IndicesAliasesRequest().addAliasAction(
-                IndicesAliasesRequest.AliasActions.remove()
-                        .aliases(aliases.toArray(new String[aliases.size()]))
-                        .indices(indices.toArray(new String[indices.size()])));
+        new JobDataDeleter(parentTaskClient, jobId).deleteJobDocuments(
+            jobId, jobConfigProvider, indexNameExpressionResolver, clusterService.state(), removeFromCalendarsHandler, listener::onFailure);
     }
 
     private void forceDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request,

+ 309 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

@@ -6,31 +6,59 @@
  */
 package org.elasticsearch.xpack.ml.job.persistence;
 
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.search.MultiSearchAction;
+import org.elasticsearch.action.search.MultiSearchRequest;
+import org.elasticsearch.action.search.MultiSearchResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.AliasMetadata;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
+import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.TermQueryBuilder;
 import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.index.reindex.BulkByScrollTask;
 import org.elasticsearch.index.reindex.DeleteByQueryAction;
 import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.xpack.core.action.util.PageParams;
+import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
 import org.elasticsearch.xpack.core.ml.annotations.Annotation;
 import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
+import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
 import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
+import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
+import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
 import org.elasticsearch.xpack.core.ml.job.results.Result;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.core.security.user.XPackUser;
+import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -38,13 +66,17 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 
 public class JobDataDeleter {
 
-    private static final Logger LOGGER = LogManager.getLogger(JobDataDeleter.class);
+    private static final Logger logger = LogManager.getLogger(JobDataDeleter.class);
+
+    private static final int MAX_SNAPSHOTS_TO_DELETE = 10000;
 
     private final Client client;
     private final String jobId;
@@ -188,7 +220,7 @@ public class JobDataDeleter {
         try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
             client.execute(DeleteByQueryAction.INSTANCE, dbqRequest).get();
         } catch (Exception e) {
-            LOGGER.error("[" + jobId + "] An error occurred while deleting interim results", e);
+            logger.error("[" + jobId + "] An error occurred while deleting interim results", e);
         }
     }
 
@@ -208,4 +240,279 @@ public class JobDataDeleter {
 
         executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
     }
+
+    /**
+     * Deletes all documents associated with a job except user annotations and notifications
+     */
+    public void deleteJobDocuments(String jobId, JobConfigProvider jobConfigProvider,
+                                   IndexNameExpressionResolver indexNameExpressionResolver, ClusterState clusterState,
+                                   CheckedConsumer<Boolean, Exception> finishedHandler, Consumer<Exception> failureHandler) {
+
+        AtomicReference<String[]> indexNames = new AtomicReference<>();
+
+        final ActionListener<AcknowledgedResponse> completionHandler = ActionListener.wrap(
+            response -> finishedHandler.accept(response.isAcknowledged()),
+            failureHandler);
+
+        // Step 9. If we did not drop the indices and after DBQ state done, we delete the aliases
+        ActionListener<BulkByScrollResponse> dbqHandler = ActionListener.wrap(
+            bulkByScrollResponse -> {
+                if (bulkByScrollResponse == null) { // no action was taken by DBQ, assume indices were deleted
+                    completionHandler.onResponse(AcknowledgedResponse.TRUE);
+                } else {
+                    if (bulkByScrollResponse.isTimedOut()) {
+                        logger.warn("[{}] DeleteByQuery for indices [{}] timed out.", jobId, String.join(", ", indexNames.get()));
+                    }
+                    if (bulkByScrollResponse.getBulkFailures().isEmpty() == false) {
+                        logger.warn("[{}] {} failures and {} conflicts encountered while running DeleteByQuery on indices [{}].",
+                            jobId, bulkByScrollResponse.getBulkFailures().size(), bulkByScrollResponse.getVersionConflicts(),
+                            String.join(", ", indexNames.get()));
+                        for (BulkItemResponse.Failure failure : bulkByScrollResponse.getBulkFailures()) {
+                            logger.warn("DBQ failure: " + failure);
+                        }
+                    }
+                    deleteAliases(jobId, completionHandler);
+                }
+            },
+            failureHandler);
+
+        // Step 8. If we did not delete the indices, we run a delete by query
+        ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap(
+            response -> {
+                if (response && indexNames.get().length > 0) {
+                    logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indexNames.get()));
+                    ConstantScoreQueryBuilder query =
+                        new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
+                    DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames.get())
+                        .setQuery(query)
+                        .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))
+                        .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
+                        .setAbortOnVersionConflict(false)
+                        .setRefresh(true);
+
+                    executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler);
+                } else { // We did not execute DBQ, no need to delete aliases or check the response
+                    dbqHandler.onResponse(null);
+                }
+            },
+            failureHandler);
+
+        // Step 7. Handle each multi-search response. There should be one response for each underlying index.
+        // For each underlying index that contains results ONLY for the current job, we will delete that index.
+        // If there exists at least 1 index that has another job's results, we will run DBQ.
+        ActionListener<MultiSearchResponse> customIndexSearchHandler = ActionListener.wrap(
+            multiSearchResponse -> {
+                if (multiSearchResponse == null) {
+                    deleteByQueryExecutor.onResponse(true); // We need to run DBQ and alias deletion
+                    return;
+                }
+                String defaultSharedIndex = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX +
+                    AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
+                List<String> indicesToDelete = new ArrayList<>();
+                boolean needToRunDBQTemp = false;
+                assert multiSearchResponse.getResponses().length == indexNames.get().length;
+                int i = 0;
+                for (MultiSearchResponse.Item item : multiSearchResponse.getResponses()) {
+                    if (item.isFailure()) {
+                        ++i;
+                        if (ExceptionsHelper.unwrapCause(item.getFailure()) instanceof IndexNotFoundException) {
+                            // index is already deleted, no need to take action against it
+                            continue;
+                        } else {
+                            failureHandler.accept(item.getFailure());
+                            return;
+                        }
+                    }
+                    SearchResponse searchResponse = item.getResponse();
+                    if (searchResponse.getHits().getTotalHits().value > 0 || indexNames.get()[i].equals(defaultSharedIndex)) {
+                        needToRunDBQTemp = true;
+                    } else {
+                        indicesToDelete.add(indexNames.get()[i]);
+                    }
+                    ++i;
+                }
+                final boolean needToRunDBQ = needToRunDBQTemp;
+                if (indicesToDelete.isEmpty()) {
+                    deleteByQueryExecutor.onResponse(needToRunDBQ);
+                    return;
+                }
+                logger.info("[{}] deleting the following indices directly {}", jobId, indicesToDelete);
+                DeleteIndexRequest request = new DeleteIndexRequest(indicesToDelete.toArray(String[]::new));
+                request.indicesOptions(IndicesOptions.lenientExpandOpenHidden());
+                executeAsyncWithOrigin(
+                    client.threadPool().getThreadContext(),
+                    ML_ORIGIN,
+                    request,
+                    ActionListener.<AcknowledgedResponse>wrap(
+                        response -> deleteByQueryExecutor.onResponse(needToRunDBQ), // only run DBQ if there is a shared index
+                        failureHandler),
+                    client.admin().indices()::delete);
+            },
+            failure -> {
+                if (ExceptionsHelper.unwrapCause(failure) instanceof IndexNotFoundException) { // assume the index is already deleted
+                    deleteByQueryExecutor.onResponse(false); // skip DBQ && Alias
+                } else {
+                    failureHandler.accept(failure);
+                }
+            }
+        );
+
+        // Step 6. If we successfully find a job, gather information about its result indices.
+        // This will execute a multi-search action for every concrete index behind the job results alias.
+        // If there are no concrete indices, take no action and go to the next step.
+        ActionListener<Job.Builder> getJobHandler = ActionListener.wrap(
+            builder -> {
+                indexNames.set(indexNameExpressionResolver.concreteIndexNames(clusterState,
+                    IndicesOptions.lenientExpandOpen(), AnomalyDetectorsIndex.jobResultsAliasedName(jobId)));
+                if (indexNames.get().length == 0) {
+                    // don't bother searching the index any further - it's already been closed or deleted
+                    customIndexSearchHandler.onResponse(null);
+                    return;
+                }
+                MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
+                // It is important that the requests are in the same order as the index names.
+                // This is because responses are ordered according to their requests.
+                for (String indexName : indexNames.get()) {
+                    SearchSourceBuilder source = new SearchSourceBuilder()
+                        .size(0)
+                        // if we have just one hit we cannot delete the index
+                        .trackTotalHitsUpTo(1)
+                        .query(QueryBuilders.boolQuery().filter(
+                            QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))));
+                    multiSearchRequest.add(new SearchRequest(indexName).source(source));
+                }
+                executeAsyncWithOrigin(client,
+                    ML_ORIGIN,
+                    MultiSearchAction.INSTANCE,
+                    multiSearchRequest,
+                    customIndexSearchHandler);
+            },
+            failureHandler
+        );
+
+        // Step 5. Get the job as the initial result index name is required
+        ActionListener<Boolean> deleteAnnotationsHandler = ActionListener.wrap(
+            response -> jobConfigProvider.getJob(jobId, getJobHandler),
+            failureHandler);
+
+        // Step 4. Delete annotations associated with the job
+        ActionListener<Boolean> deleteCategorizerStateHandler = ActionListener.wrap(
+            response -> deleteAllAnnotations(deleteAnnotationsHandler),
+            failureHandler);
+
+        // Step 3. Delete quantiles done, delete the categorizer state
+        ActionListener<Boolean> deleteQuantilesHandler = ActionListener.wrap(
+            response -> deleteCategorizerState(jobId, 1, deleteCategorizerStateHandler),
+            failureHandler);
+
+        // Step 2. Delete state done, delete the quantiles
+        ActionListener<BulkByScrollResponse> deleteStateHandler = ActionListener.wrap(
+            bulkResponse -> deleteQuantiles(jobId, deleteQuantilesHandler),
+            failureHandler);
+
+        // Step 1. Delete the model state
+        deleteModelState(jobId, deleteStateHandler);
+    }
+
+    private void deleteAliases(String jobId, ActionListener<AcknowledgedResponse> finishedHandler) {
+        final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
+        final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
+
+        // first find the concrete indices associated with the aliases
+        GetAliasesRequest aliasesRequest = new GetAliasesRequest().aliases(readAliasName, writeAliasName)
+            .indicesOptions(IndicesOptions.lenientExpandOpenHidden());
+        executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, aliasesRequest,
+            ActionListener.<GetAliasesResponse>wrap(
+                getAliasesResponse -> {
+                    // remove the aliases from the concrete indices found in the first step
+                    IndicesAliasesRequest removeRequest = buildRemoveAliasesRequest(getAliasesResponse);
+                    if (removeRequest == null) {
+                        // don't error if the job's aliases have already been deleted - carry on and delete the
+                        // rest of the job's data
+                        finishedHandler.onResponse(AcknowledgedResponse.TRUE);
+                        return;
+                    }
+                    executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, removeRequest,
+                        finishedHandler,
+                        client.admin().indices()::aliases);
+                },
+                finishedHandler::onFailure), client.admin().indices()::getAliases);
+    }
+
+    private IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesResponse getAliasesResponse) {
+        Set<String> aliases = new HashSet<>();
+        List<String> indices = new ArrayList<>();
+        for (ObjectObjectCursor<String, List<AliasMetadata>> entry : getAliasesResponse.getAliases()) {
+            // The response includes _all_ indices, but only those associated with
+            // the aliases we asked about will have associated AliasMetadata
+            if (entry.value.isEmpty() == false) {
+                indices.add(entry.key);
+                entry.value.forEach(metadata -> aliases.add(metadata.getAlias()));
+            }
+        }
+        return aliases.isEmpty() ? null : new IndicesAliasesRequest().addAliasAction(
+            IndicesAliasesRequest.AliasActions.remove()
+                .aliases(aliases.toArray(new String[aliases.size()]))
+                .indices(indices.toArray(new String[indices.size()])));
+    }
+
+    private void deleteQuantiles(String jobId, ActionListener<Boolean> finishedHandler) {
+        // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
+        IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId));
+        DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
+            .setQuery(query)
+            .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
+            .setAbortOnVersionConflict(false)
+            .setRefresh(true);
+
+        executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
+            response -> finishedHandler.onResponse(true),
+            ignoreIndexNotFoundException(finishedHandler)));
+    }
+
+    private void deleteModelState(String jobId, ActionListener<BulkByScrollResponse> listener) {
+        GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null);
+        request.setPageParams(new PageParams(0, MAX_SNAPSHOTS_TO_DELETE));
+        executeAsyncWithOrigin(client, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap(
+            response -> {
+                List<ModelSnapshot> deleteCandidates = response.getPage().results();
+                JobDataDeleter deleter = new JobDataDeleter(client, jobId);
+                deleter.deleteModelSnapshots(deleteCandidates, listener);
+            },
+            listener::onFailure));
+    }
+
+    private void deleteCategorizerState(String jobId, int docNum, ActionListener<Boolean> finishedHandler) {
+        // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
+        IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum));
+        DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
+            .setQuery(query)
+            .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
+            .setAbortOnVersionConflict(false)
+            .setRefresh(true);
+
+        executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
+            response -> {
+                // If we successfully deleted a document try the next one; if not we're done
+                if (response.getDeleted() > 0) {
+                    // There's an assumption here that there won't be very many categorizer
+                    // state documents, so the recursion won't go more than, say, 5 levels deep
+                    deleteCategorizerState(jobId, docNum + 1, finishedHandler);
+                    return;
+                }
+                finishedHandler.onResponse(true);
+            },
+            ignoreIndexNotFoundException(finishedHandler)));
+    }
+
+    private static Consumer<Exception> ignoreIndexNotFoundException(ActionListener<Boolean> finishedHandler) {
+        return e -> {
+            // It's not a problem for us if the index wasn't found - it's equivalent to document not found
+            if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
+                finishedHandler.onResponse(true);
+            } else {
+                finishedHandler.onFailure(e);
+            }
+        };
+    }
 }