Browse Source

Delete auto-generated annotations when model snapshot is reverted (#58240)

Przemysław Witek 5 years ago
parent
commit
6d31e685a9

+ 0 - 28
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java

@@ -6,16 +6,11 @@
 package org.elasticsearch.xpack.ml.integration;
 
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.common.xcontent.XContentParser;
-import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.xpack.core.ml.annotations.Annotation;
 import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
@@ -28,13 +23,9 @@ import org.junit.After;
 import org.junit.Before;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 
-import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent;
 import static org.elasticsearch.xpack.core.ml.annotations.AnnotationTests.randomAnnotation;
-import static org.hamcrest.Matchers.hasSize;
 
 public class DeleteJobIT extends MlNativeAutodetectIntegTestCase {
 
@@ -121,23 +112,4 @@ public class DeleteJobIT extends MlNativeAutodetectIntegTestCase {
                 .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
         }
     }
-
-    private void assertThatNumberOfAnnotationsIsEqualTo(int expectedNumberOfAnnotations) throws Exception {
-        // Refresh the annotations index so that recently indexed annotation docs are visible.
-        client().admin().indices().prepareRefresh(AnnotationIndex.INDEX_NAME)
-            .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
-            .execute()
-            .actionGet();
-
-        SearchRequest searchRequest =
-            new SearchRequest(AnnotationIndex.READ_ALIAS_NAME).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN);
-        SearchResponse searchResponse = client().search(searchRequest).actionGet();
-        List<Annotation> annotations = new ArrayList<>();
-        for (SearchHit hit : searchResponse.getHits().getHits()) {
-            try (XContentParser parser = createParser(jsonXContent, hit.getSourceRef())) {
-                annotations.add(Annotation.fromXContent(parser, null));
-            }
-        }
-        assertThat("Hits were: " + annotations, annotations, hasSize(expectedNumberOfAnnotations));
-    }
 }

+ 27 - 1
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java

@@ -5,7 +5,9 @@
  */
 package org.elasticsearch.xpack.ml.integration;
 
+import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesArray;
@@ -45,6 +47,8 @@ import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
+import org.elasticsearch.xpack.core.ml.annotations.Annotation;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
 import org.elasticsearch.xpack.core.ml.calendars.Calendar;
 import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
@@ -70,7 +74,9 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import static org.elasticsearch.common.xcontent.json.JsonXContent.jsonXContent;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.in;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
@@ -243,8 +249,9 @@ abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase {
         return response.getPage().results();
     }
 
-    protected RevertModelSnapshotAction.Response revertModelSnapshot(String jobId, String snapshotId) {
+    protected RevertModelSnapshotAction.Response revertModelSnapshot(String jobId, String snapshotId, boolean deleteInterveningResults) {
         RevertModelSnapshotAction.Request request = new RevertModelSnapshotAction.Request(jobId, snapshotId);
+        request.setDeleteInterveningResults(deleteInterveningResults);
         return client().execute(RevertModelSnapshotAction.INSTANCE, request).actionGet();
     }
 
@@ -295,6 +302,25 @@ abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase {
         }, 30, TimeUnit.SECONDS);
     }
 
+    protected void assertThatNumberOfAnnotationsIsEqualTo(int expectedNumberOfAnnotations) throws IOException {
+        // Refresh the annotations index so that recently indexed annotation docs are visible.
+        client().admin().indices().prepareRefresh(AnnotationIndex.INDEX_NAME)
+            .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
+            .execute()
+            .actionGet();
+
+        SearchRequest searchRequest =
+            new SearchRequest(AnnotationIndex.READ_ALIAS_NAME).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN);
+        SearchResponse searchResponse = client().search(searchRequest).actionGet();
+        List<Annotation> annotations = new ArrayList<>();
+        for (SearchHit hit : searchResponse.getHits().getHits()) {
+            try (XContentParser parser = createParser(jsonXContent, hit.getSourceRef())) {
+                annotations.add(Annotation.fromXContent(parser, null));
+            }
+        }
+        assertThat("Annotations were: " + annotations, annotations, hasSize(expectedNumberOfAnnotations));
+    }
+
     protected ForecastRequestStats getForecastStats(String jobId, String forecastId) {
         SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobResultsAliasedName(jobId))
             .setQuery(QueryBuilders.idsQuery().addIds(ForecastRequestStats.documentId(jobId, forecastId)))

+ 52 - 3
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java

@@ -5,14 +5,22 @@
  */
 package org.elasticsearch.xpack.ml.integration;
 
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.xpack.core.ml.annotations.Annotation;
+import org.elasticsearch.xpack.core.ml.annotations.Annotation.Event;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.Detector;
@@ -21,9 +29,12 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeSta
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
 import org.elasticsearch.xpack.core.ml.job.results.Bucket;
+import org.elasticsearch.xpack.core.security.user.XPackUser;
 import org.junit.After;
 
 import java.io.IOException;
+import java.sql.Date;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -33,6 +44,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.xpack.core.ml.annotations.AnnotationTests.randomAnnotation;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
@@ -50,11 +62,19 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase {
         cleanUp();
     }
 
-    public void test() throws Exception {
+    public void testRevertModelSnapshot() throws Exception {
+        test("revert-model-snapshot-it-job", false);
+    }
+
+    public void testRevertModelSnapshot_DeleteInterveningResults() throws Exception {
+        test("revert-model-snapshot-it-job-delete-intervening-results", true);
+    }
+
+    private void test(String jobId, boolean deleteInterveningResults) throws Exception {
         TimeValue bucketSpan = TimeValue.timeValueHours(1);
         long startTime = 1491004800000L;
 
-        Job.Builder job = buildAndRegisterJob("revert-model-snapshot-it-job", bucketSpan);
+        Job.Builder job = buildAndRegisterJob(jobId, bucketSpan);
         openJob(job.getId());
         postData(job.getId(), generateData(startTime, bucketSpan, 10, Arrays.asList("foo"),
                 (bucketIndex, series) -> bucketIndex == 5 ? 100.0 : 10.0).stream().collect(Collectors.joining()));
@@ -97,7 +117,20 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase {
         assertThat(getJob(job.getId()).get(0).getModelSnapshotId(), equalTo(modelSnapshots.get(0).getSnapshotId()));
         ModelSnapshot revertSnapshot = modelSnapshots.get(1);
 
-        assertThat(revertModelSnapshot(job.getId(), revertSnapshot.getSnapshotId()).status(), equalTo(RestStatus.OK));
+        // Check there are 2 annotations (one per model snapshot)
+        assertThatNumberOfAnnotationsIsEqualTo(2);
+
+        // Add 3 new annotations...
+        Instant lastResultTimestamp = revertSnapshot.getLatestResultTimeStamp().toInstant();
+        client().index(randomAnnotationIndexRequest(job.getId(), lastResultTimestamp.plusSeconds(10), Event.DELAYED_DATA)).actionGet();
+        client().index(randomAnnotationIndexRequest(job.getId(), lastResultTimestamp.plusSeconds(20), Event.MODEL_CHANGE)).actionGet();
+        client().index(randomAnnotationIndexRequest(job.getId(), lastResultTimestamp.minusSeconds(10), Event.MODEL_CHANGE)).actionGet();
+        // ... and check there are 5 annotations in total now
+        assertThatNumberOfAnnotationsIsEqualTo(5);
+
+        assertThat(
+            revertModelSnapshot(job.getId(), revertSnapshot.getSnapshotId(), deleteInterveningResults).status(),
+            equalTo(RestStatus.OK));
 
         // Check model_size_stats has been reverted
         assertThat(getJobStats(job.getId()).get(0).getModelSizeStats().getModelBytes(), equalTo(modelSizeStats1.getModelBytes()));
@@ -105,6 +138,9 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase {
         // Check quantiles have been reverted
         assertThat(getQuantiles(job.getId()).getTimestamp(), equalTo(revertSnapshot.getLatestResultTimeStamp()));
 
+        // Check annotations with event type from {delayed_data, model_change} have been removed if deleteInterveningResults flag is set
+        assertThatNumberOfAnnotationsIsEqualTo(deleteInterveningResults ? 3 : 5);
+
         // Re-run 2nd half of data
         openJob(job.getId());
         postData(job.getId(), generateData(startTime + 10 * bucketSpan.getMillis(), bucketSpan, 10, Arrays.asList("foo", "bar"),
@@ -170,4 +206,17 @@ public class RevertModelSnapshotIT extends MlNativeAutodetectIntegTestCase {
             throw new IllegalStateException(e);
         }
     }
+
+    private static IndexRequest randomAnnotationIndexRequest(String jobId, Instant timestamp, Event event) throws IOException {
+        Annotation annotation = new Annotation.Builder(randomAnnotation(jobId))
+            .setTimestamp(Date.from(timestamp))
+            .setCreateUsername(XPackUser.NAME)
+            .setEvent(event)
+            .build();
+        try (XContentBuilder xContentBuilder = annotation.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) {
+            return new IndexRequest(AnnotationIndex.WRITE_ALIAS_NAME)
+                .source(xContentBuilder)
+                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+        }
+    }
 }

+ 1 - 1
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/SetUpgradeModeIT.java

@@ -145,7 +145,7 @@ public class SetUpgradeModeIT extends MlNativeAutodetectIntegTestCase {
         expectThrowsUpgradeModeException(() -> forecast(jobId, null, null));
 
         String snapshotId = "snapshot_id";
-        expectThrowsUpgradeModeException(() -> revertModelSnapshot(jobId, snapshotId));
+        expectThrowsUpgradeModeException(() -> revertModelSnapshot(jobId, snapshotId, false));
 
         String datafeedId = "datafeed_id";
         expectThrowsUpgradeModeException(() -> putDatafeed(createDatafeed(datafeedId, jobId, Collections.singletonList("index"))));

+ 3 - 17
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

@@ -59,8 +59,6 @@ import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
 import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
 import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
-import org.elasticsearch.xpack.core.ml.annotations.Annotation;
-import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
@@ -71,7 +69,6 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerS
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
-import org.elasticsearch.xpack.core.security.user.XPackUser;
 import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
 import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
 import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
@@ -501,20 +498,9 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
     }
 
     private void deleteAnnotations(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<Boolean> finishedHandler) {
-        ConstantScoreQueryBuilder query =
-            QueryBuilders.constantScoreQuery(
-                QueryBuilders.boolQuery()
-                    .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
-                    .filter(QueryBuilders.termQuery(Annotation.CREATE_USERNAME.getPreferredName(), XPackUser.NAME)));
-        DeleteByQueryRequest request = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME)
-            .setQuery(query)
-            .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen()))
-            .setAbortOnVersionConflict(false)
-            .setRefresh(true);
-
-        executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap(
-            response -> finishedHandler.onResponse(true),
-            ignoreIndexNotFoundException(finishedHandler)));
+        JobDataDeleter deleter = new JobDataDeleter(parentTaskClient, jobId);
+        deleter.deleteAllAnnotations(
+            ActionListener.wrap(r -> finishedHandler.onResponse(true), ignoreIndexNotFoundException(finishedHandler)));
     }
 
     private static Consumer<Exception> ignoreIndexNotFoundException(ActionListener<Boolean> finishedHandler) {

+ 38 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java

@@ -26,6 +26,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
+import org.elasticsearch.xpack.core.ml.annotations.Annotation;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -39,6 +40,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 
 import java.io.IOException;
 import java.util.Date;
+import java.util.Set;
 import java.util.function.Consumer;
 
 public class TransportRevertModelSnapshotAction extends TransportMasterNodeAction<RevertModelSnapshotAction.Request,
@@ -101,6 +103,7 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
                 getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
                     ActionListener<RevertModelSnapshotAction.Response> wrappedListener = listener;
                     if (request.getDeleteInterveningResults()) {
+                        wrappedListener = wrapDeleteOldAnnotationsListener(wrappedListener, modelSnapshot, request.getJobId());
                         wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId());
                         wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId());
                     }
@@ -134,9 +137,41 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
         }, errorHandler);
     }
 
+    private ActionListener<RevertModelSnapshotAction.Response> wrapDeleteOldAnnotationsListener(
+            ActionListener<RevertModelSnapshotAction.Response> listener,
+            ModelSnapshot modelSnapshot,
+            String jobId) {
+
+        return ActionListener.wrap(response -> {
+            Date deleteAfter = modelSnapshot.getLatestResultTimeStamp();
+            logger.info("[{}] Removing intervening annotations after reverting model: deleting annotations after [{}]", jobId, deleteAfter);
+
+            JobDataDeleter dataDeleter = new JobDataDeleter(client, jobId);
+            Set<String> eventsToDelete =
+                Set.of(
+                    // Because the results based on the delayed data are being deleted, the fact that the data was originally delayed is
+                    // not relevant
+                    Annotation.Event.DELAYED_DATA.toString(),
+                    // Because the model that changed is no longer in use as it has been rolled back to a time before those changes occurred
+                    Annotation.Event.MODEL_CHANGE.toString());
+            dataDeleter.deleteAnnotationsFromTime(deleteAfter.getTime() + 1, eventsToDelete, new ActionListener<Boolean>() {
+                @Override
+                public void onResponse(Boolean success) {
+                    listener.onResponse(response);
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    listener.onFailure(e);
+                }
+            });
+        }, listener::onFailure);
+    }
+
     private ActionListener<RevertModelSnapshotAction.Response> wrapDeleteOldDataListener(
             ActionListener<RevertModelSnapshotAction.Response> listener,
-            ModelSnapshot modelSnapshot, String jobId) {
+            ModelSnapshot modelSnapshot,
+            String jobId) {
 
         // If we need to delete buckets that occurred after the snapshot, we
         // wrap the listener with one that invokes the OldDataRemover on
@@ -162,8 +197,8 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
 
     private ActionListener<RevertModelSnapshotAction.Response> wrapRevertDataCountsListener(
             ActionListener<RevertModelSnapshotAction.Response> listener,
-            ModelSnapshot modelSnapshot, String jobId) {
-
+            ModelSnapshot modelSnapshot,
+            String jobId) {
 
         return ActionListener.wrap(response -> {
             jobResultsProvider.dataCounts(jobId, counts -> {

+ 82 - 37
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

@@ -10,10 +10,10 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
-import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
-import org.elasticsearch.index.query.IdsQueryBuilder;
+import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
@@ -21,12 +21,15 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.index.reindex.BulkByScrollTask;
 import org.elasticsearch.index.reindex.DeleteByQueryAction;
 import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.xpack.core.ml.annotations.Annotation;
 import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.core.ml.job.results.Result;
+import org.elasticsearch.xpack.core.security.user.XPackUser;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -81,7 +84,7 @@ public class JobDataDeleter {
         DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indices.toArray(new String[0]))
             .setRefresh(true)
             .setIndicesOptions(IndicesOptions.lenientExpandOpen())
-            .setQuery(new IdsQueryBuilder().addIds(idsToDelete.toArray(new String[0])));
+            .setQuery(QueryBuilders.idsQuery().addIds(idsToDelete.toArray(new String[0])));
 
         // _doc is the most efficient sort order and will also disable scoring
         deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
@@ -89,6 +92,55 @@ public class JobDataDeleter {
         executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
     }
 
+    /**
+     * Asynchronously delete all the auto-generated (i.e. created by the _xpack user) annotations
+     *
+     * @param listener Response listener
+     */
+    public void deleteAllAnnotations(ActionListener<Boolean> listener) {
+        deleteAnnotationsFromTime(null, null, listener);
+    }
+
+    /**
+     * Asynchronously delete all the auto-generated (i.e. created by the _xpack user) annotations starting from {@code cutOffTime}
+     *
+     * @param cutoffEpochMs Only annotations at and after this time will be deleted. If {@code null}, no cutoff is applied
+     * @param eventsToDelete Only annotations with one of the provided event types will be deleted.
+     *                       If {@code null} or empty, no event-related filtering is applied
+     * @param listener Response listener
+     */
+    public void deleteAnnotationsFromTime(@Nullable Long cutoffEpochMs,
+                                          @Nullable Set<String> eventsToDelete,
+                                          ActionListener<Boolean> listener) {
+        BoolQueryBuilder boolQuery =
+            QueryBuilders.boolQuery()
+                .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
+                .filter(QueryBuilders.termQuery(Annotation.CREATE_USERNAME.getPreferredName(), XPackUser.NAME));
+        if (cutoffEpochMs != null) {
+            boolQuery.filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
+        }
+        if (eventsToDelete != null && eventsToDelete.isEmpty() == false) {
+            boolQuery.filter(QueryBuilders.termsQuery(Annotation.EVENT.getPreferredName(), eventsToDelete));
+        }
+        QueryBuilder query = QueryBuilders.constantScoreQuery(boolQuery);
+        DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnnotationIndex.READ_ALIAS_NAME)
+            .setQuery(query)
+            .setIndicesOptions(IndicesOptions.lenientExpandOpen())
+            .setAbortOnVersionConflict(false)
+            .setRefresh(true)
+            .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
+
+        // _doc is the most efficient sort order and will also disable scoring
+        dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
+
+        executeAsyncWithOrigin(
+            client,
+            ML_ORIGIN,
+            DeleteByQueryAction.INSTANCE,
+            dbqRequest,
+            ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure));
+    }
+
     /**
      * Asynchronously delete all result types (Buckets, Records, Influencers) from {@code cutOffTime}
      *
@@ -96,43 +148,49 @@ public class JobDataDeleter {
      * @param listener Response listener
      */
     public void deleteResultsFromTime(long cutoffEpochMs, ActionListener<Boolean> listener) {
-        DeleteByQueryHolder deleteByQueryHolder = new DeleteByQueryHolder(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
-        deleteByQueryHolder.dbqRequest.setRefresh(true);
-
         QueryBuilder query = QueryBuilders.boolQuery()
-                .filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
-                .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
-        deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen());
-        deleteByQueryHolder.dbqRequest.setQuery(query);
+            .filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName()))
+            .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs));
+        DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId))
+            .setQuery(query)
+            .setIndicesOptions(IndicesOptions.lenientExpandOpen())
+            .setAbortOnVersionConflict(false)
+            .setRefresh(true)
+            .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
 
         // _doc is the most efficient sort order and will also disable scoring
-        deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
-
-        executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest,
-                ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure));
+        dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
+
+        executeAsyncWithOrigin(
+            client,
+            ML_ORIGIN,
+            DeleteByQueryAction.INSTANCE,
+            dbqRequest,
+            ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure));
     }
 
     /**
      * Delete all results marked as interim
      */
     public void deleteInterimResults() {
-        DeleteByQueryHolder deleteByQueryHolder = new DeleteByQueryHolder(AnomalyDetectorsIndex.jobResultsAliasedName(jobId));
-        deleteByQueryHolder.dbqRequest.setRefresh(false);
-
-        deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen());
-        QueryBuilder qb = QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true);
-        deleteByQueryHolder.dbqRequest.setQuery(new ConstantScoreQueryBuilder(qb));
+        QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true));
+        DeleteByQueryRequest dbqRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId))
+            .setQuery(query)
+            .setIndicesOptions(IndicesOptions.lenientExpandOpen())
+            .setAbortOnVersionConflict(false)
+            .setRefresh(false)
+            .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
 
         // _doc is the most efficient sort order and will also disable scoring
-        deleteByQueryHolder.dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
+        dbqRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
 
         try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) {
-            client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest).get();
+            client.execute(DeleteByQueryAction.INSTANCE, dbqRequest).get();
         } catch (Exception e) {
             LOGGER.error("[" + jobId + "] An error occurred while deleting interim results", e);
         }
     }
-    
+
     /**
      * Delete the datafeed timing stats document from all the job results indices
      *
@@ -142,24 +200,11 @@ public class JobDataDeleter {
         DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId))
             .setRefresh(true)
             .setIndicesOptions(IndicesOptions.lenientExpandOpen())
-            .setQuery(new IdsQueryBuilder().addIds(DatafeedTimingStats.documentId(jobId)));
+            .setQuery(QueryBuilders.idsQuery().addIds(DatafeedTimingStats.documentId(jobId)));
 
         // _doc is the most efficient sort order and will also disable scoring
         deleteByQueryRequest.getSearchRequest().source().sort(ElasticsearchMappings.ES_DOC);
 
         executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
     }
-
-    // Wrapper to ensure safety
-    private static class DeleteByQueryHolder {
-
-        private final DeleteByQueryRequest dbqRequest;
-
-        private DeleteByQueryHolder(String index) {
-            dbqRequest = new DeleteByQueryRequest();
-            dbqRequest.indices(index);
-            dbqRequest.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
-            dbqRequest.setAbortOnVersionConflict(false);
-        }
-    }
 }

+ 64 - 6
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleterTests.java

@@ -7,17 +7,25 @@ package org.elasticsearch.xpack.ml.job.persistence;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.reindex.DeleteByQueryAction;
 import org.elasticsearch.index.reindex.DeleteByQueryRequest;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
+import org.junit.After;
 import org.junit.Before;
 import org.mockito.ArgumentCaptor;
 
+import java.util.Set;
+
 import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -30,13 +38,66 @@ public class JobDataDeleterTests extends ESTestCase {
     private static final String JOB_ID = "my-job-id";
 
     private Client client;
+    private ArgumentCaptor<DeleteByQueryRequest> deleteRequestCaptor;
 
     @Before
     public void setUpTests() {
-        client = mock(Client.class);
         ThreadPool threadPool = mock(ThreadPool.class);
-        when(client.threadPool()).thenReturn(threadPool);
         when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
+        client = mock(Client.class);
+        when(client.threadPool()).thenReturn(threadPool);
+        deleteRequestCaptor = ArgumentCaptor.forClass(DeleteByQueryRequest.class);
+    }
+
+    @After
+    public void verifyNoMoreInteractionsWithClient() {
+        verify(client).threadPool();
+        verifyNoMoreInteractions(client);
+    }
+
+    public void testDeleteAllAnnotations() {
+        JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID);
+        jobDataDeleter.deleteAllAnnotations(ActionListener.wrap(
+            deleteResponse -> {},
+            e -> fail(e.toString())
+        ));
+
+        verify(client).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any());
+
+        DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue();
+        assertThat(deleteRequest.indices(), is(arrayContaining(AnnotationIndex.READ_ALIAS_NAME)));
+        assertThat(Strings.toString(deleteRequest), not(containsString("timestamp")));
+        assertThat(Strings.toString(deleteRequest), not(containsString("event")));
+    }
+
+    public void testDeleteAnnotationsFromTime_TimestampFiltering() {
+        JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID);
+        jobDataDeleter.deleteAnnotationsFromTime(1_000_000_000L, null, ActionListener.wrap(
+            deleteResponse -> {},
+            e -> fail(e.toString())
+        ));
+
+        verify(client).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any());
+
+        DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue();
+        assertThat(deleteRequest.indices(), is(arrayContaining(AnnotationIndex.READ_ALIAS_NAME)));
+        assertThat(Strings.toString(deleteRequest), containsString("timestamp"));
+        assertThat(Strings.toString(deleteRequest), not(containsString("event")));
+    }
+
+    public void testDeleteAnnotationsFromTime_EventFiltering() {
+        JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID);
+        jobDataDeleter.deleteAnnotationsFromTime(null, Set.of("dummy_event"), ActionListener.wrap(
+            deleteResponse -> {},
+            e -> fail(e.toString())
+        ));
+
+        verify(client).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any());
+
+        DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue();
+        assertThat(deleteRequest.indices(), is(arrayContaining(AnnotationIndex.READ_ALIAS_NAME)));
+        assertThat(Strings.toString(deleteRequest), not(containsString("timestamp")));
+        assertThat(Strings.toString(deleteRequest), containsString("event"));
     }
 
     public void testDeleteDatafeedTimingStats() {
@@ -46,12 +107,9 @@ public class JobDataDeleterTests extends ESTestCase {
             e -> fail(e.toString())
         ));
 
-        ArgumentCaptor<DeleteByQueryRequest> deleteRequestCaptor = ArgumentCaptor.forClass(DeleteByQueryRequest.class);
-        verify(client).threadPool();
         verify(client).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any());
-        verifyNoMoreInteractions(client);
 
         DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue();
-        assertThat(deleteRequest.indices(), arrayContaining(AnomalyDetectorsIndex.jobResultsAliasedName(JOB_ID)));
+        assertThat(deleteRequest.indices(), is(arrayContaining(AnomalyDetectorsIndex.jobResultsAliasedName(JOB_ID))));
     }
 }