소스 검색

[ML] Calculate results and snapshot retention using latest bucket timestamps (#51061)

The retention period is calculated relative to the last bucket result or snapshot
time rather than wall clock
David Kyle 5 년 전
부모
커밋
7978f0b8ef

+ 10 - 7
docs/reference/ml/ml-shared.asciidoc

@@ -863,9 +863,10 @@ example, `1575402236000 `.
 end::model-snapshot-id[]
 
 tag::model-snapshot-retention-days[]
-The time in days that model snapshots are retained for the job. Older snapshots
-are deleted. The default value is `1`, which means snapshots are retained for
-one day (twenty-four hours).
+Advanced configuration option. The period of time (in days) that model snapshots are retained.
+Age is calculated relative to the timestamp of the newest model snapshot.
+The default value is `1`, which means snapshots that are one day (twenty-four hours)
+older than the newest snapshot are deleted.
 end::model-snapshot-retention-days[]
 
 tag::multivariate-by-fields[]
@@ -963,10 +964,12 @@ is `shared`, which generates an index named `.ml-anomalies-shared`.
 end::results-index-name[]
 
 tag::results-retention-days[]
-Advanced configuration option. The number of days for which job results are 
-retained. Once per day at 00:30 (server time), results older than this period
-are deleted from {es}. The default value is null, which means results are
-retained.
+Advanced configuration option. The period of time (in days) that results are retained.
+Age is calculated relative to the timestamp of the latest bucket result.
+If this property has a non-null value, once per day at 00:30 (server time),
+results that are the specified number of days older than the latest
+bucket result are deleted from {es}. The default value is null, which means all
+results are retained.
 end::results-retention-days[]
 
 tag::retain[]

+ 8 - 10
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java

@@ -33,10 +33,8 @@ import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
 import org.junit.After;
 import org.junit.Before;
 
-import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -52,20 +50,20 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
     private static final String DATA_INDEX = "delete-expired-data-test-data";
 
     @Before
-    public void setUpData() throws IOException {
+    public void setUpData()  {
         client().admin().indices().prepareCreate(DATA_INDEX)
                 .setMapping("time", "type=date,format=epoch_millis")
                 .get();
 
-        // We are going to create data for last 2 days
-        long nowMillis = System.currentTimeMillis();
+        // We are going to create 3 days of data ending 1 hr ago
+        long latestBucketTime = System.currentTimeMillis() - TimeValue.timeValueHours(1).millis();
         int totalBuckets = 3 * 24;
         int normalRate = 10;
         int anomalousRate = 100;
         int anomalousBucket = 30;
         BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
         for (int bucket = 0; bucket < totalBuckets; bucket++) {
-            long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
+            long timestamp = latestBucketTime - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
             int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate;
             for (int point = 0; point < bucketRate; point++) {
                 IndexRequest indexRequest = new IndexRequest(DATA_INDEX);
@@ -120,7 +118,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
 
             String datafeedId = job.getId() + "-feed";
             DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, job.getId());
-            datafeedConfig.setIndices(Arrays.asList(DATA_INDEX));
+            datafeedConfig.setIndices(Collections.singletonList(DATA_INDEX));
             DatafeedConfig datafeed = datafeedConfig.build();
             registerDatafeed(datafeed);
             putDatafeed(datafeed);
@@ -208,7 +206,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
         assertThat(getModelSnapshots("no-retention").size(), equalTo(2));
 
         List<Bucket> buckets = getBuckets("results-retention");
-        assertThat(buckets.size(), is(lessThanOrEqualTo(24)));
+        assertThat(buckets.size(), is(lessThanOrEqualTo(25)));
         assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
         assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo));
         assertThat(getRecords("results-retention").size(), equalTo(0));
@@ -223,7 +221,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
         assertThat(getModelSnapshots("snapshots-retention-with-retain").size(), equalTo(2));
 
         buckets = getBuckets("results-and-snapshots-retention");
-        assertThat(buckets.size(), is(lessThanOrEqualTo(24)));
+        assertThat(buckets.size(), is(lessThanOrEqualTo(25)));
         assertThat(buckets.size(), is(greaterThanOrEqualTo(22)));
         assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo));
         assertThat(getRecords("results-and-snapshots-retention").size(), equalTo(0));
@@ -276,7 +274,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
     private static Job.Builder newJobBuilder(String id) {
         Detector.Builder detector = new Detector.Builder();
         detector.setFunction("count");
-        AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Arrays.asList(detector.build()));
+        AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
         analysisConfig.setBucketSpan(TimeValue.timeValueHours(1));
         DataDescription.Builder dataDescription = new DataDescription.Builder();
         dataDescription.setTimeField("time");

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

@@ -81,7 +81,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
                                    Supplier<Boolean> isTimedOutSupplier) {
         AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
         List<MlDataRemover> dataRemovers = Arrays.asList(
-                new ExpiredResultsRemover(client, auditor),
+                new ExpiredResultsRemover(client, auditor, threadPool),
                 new ExpiredForecastsRemover(client, threadPool),
                 new ExpiredModelSnapshotsRemover(client, threadPool),
                 new UnusedStateRemover(client, clusterService)

+ 17 - 14
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java

@@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.job.retention;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.OriginSettingClient;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
@@ -16,12 +15,9 @@ import org.elasticsearch.xpack.core.ml.job.results.Result;
 import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
 import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
 
-import java.time.Clock;
-import java.time.Instant;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -68,9 +64,19 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
             removeData(jobIterator, listener, isTimedOutSupplier);
             return;
         }
-        long cutoffEpochMs = calcCutoffEpochMs(retentionDays);
-        removeDataBefore(job, cutoffEpochMs,
-            ActionListener.wrap(response -> removeData(jobIterator, listener, isTimedOutSupplier), listener::onFailure));
+
+        calcCutoffEpochMs(job.getId(), retentionDays, ActionListener.wrap(
+                cutoffEpochMs -> {
+                    if (cutoffEpochMs == null) {
+                        removeData(jobIterator, listener, isTimedOutSupplier);
+                    } else {
+                        removeDataBefore(job, cutoffEpochMs, ActionListener.wrap(
+                                response -> removeData(jobIterator, listener, isTimedOutSupplier),
+                                listener::onFailure));
+                    }
+                },
+                listener::onFailure
+        ));
     }
 
     private WrappedBatchedJobsIterator newJobIterator() {
@@ -78,20 +84,17 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
         return new WrappedBatchedJobsIterator(jobsIterator);
     }
 
-    private long calcCutoffEpochMs(long retentionDays) {
-        long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
-        return nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
-    }
+    abstract void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener);
 
-    protected abstract Long getRetentionDays(Job job);
+    abstract Long getRetentionDays(Job job);
 
     /**
      * Template method to allow implementation details of various types of data (e.g. results, model snapshots).
      * Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job.
      */
-    protected abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener);
+    abstract void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener);
 
-    protected static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
+    static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
         return QueryBuilders.boolQuery()
                 .filter(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobId))
                 .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).lt(cutoffEpochMs).format("epoch_millis"));

+ 57 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java

@@ -15,10 +15,14 @@ import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.OriginSettingClient;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.FieldSortBuilder;
+import org.elasticsearch.search.sort.SortBuilder;
+import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
@@ -27,12 +31,14 @@ import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField;
 import org.elasticsearch.xpack.ml.MachineLearning;
+import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
 import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
 
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Deletes all model snapshots that have expired the configured retention time
@@ -65,10 +71,59 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
     }
 
     @Override
-    protected Long getRetentionDays(Job job) {
+    Long getRetentionDays(Job job) {
         return job.getModelSnapshotRetentionDays();
     }
 
+    @Override
+    void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
+        ThreadedActionListener<Long> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
+                MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);
+
+        latestSnapshotTimeStamp(jobId, ActionListener.wrap(
+                latestTime -> {
+                    if (latestTime == null) {
+                        threadedActionListener.onResponse(null);
+                    } else {
+                        long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
+                        threadedActionListener.onResponse(cutoff);
+                    }
+                },
+                listener::onFailure
+        ));
+    }
+
+    private void latestSnapshotTimeStamp(String jobId, ActionListener<Long> listener) {
+        SortBuilder<?> sortBuilder = new FieldSortBuilder(ModelSnapshot.TIMESTAMP.getPreferredName()).order(SortOrder.DESC);
+        QueryBuilder snapshotQuery = QueryBuilders.boolQuery()
+                .filter(QueryBuilders.existsQuery(ModelSnapshot.SNAPSHOT_DOC_COUNT.getPreferredName()));
+
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.sort(sortBuilder);
+        searchSourceBuilder.query(snapshotQuery);
+        searchSourceBuilder.size(1);
+        searchSourceBuilder.trackTotalHits(false);
+
+        String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
+        SearchRequest searchRequest = new SearchRequest(indexName);
+        searchRequest.source(searchSourceBuilder);
+        searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
+
+        client.search(searchRequest, ActionListener.wrap(
+                response -> {
+                    SearchHit[] hits = response.getHits().getHits();
+                    if (hits.length == 0) {
+                        // no snapshots found
+                        listener.onResponse(null);
+                    } else {
+                        ModelSnapshot snapshot = ModelSnapshot.fromJson(hits[0].getSourceRef());
+                        listener.onResponse(snapshot.getTimestamp().getTime());
+                    }
+                },
+                listener::onFailure)
+        );
+    }
+
     @Override
     protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
         if (job.getModelSnapshotId() == null) {
@@ -96,7 +151,7 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
     }
 
     private ActionListener<SearchResponse> expiredSnapshotsListener(String jobId, ActionListener<Boolean> listener) {
-        return new ActionListener<SearchResponse>() {
+        return new ActionListener<>() {
             @Override
             public void onResponse(SearchResponse searchResponse) {
                 try {

+ 79 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java

@@ -8,29 +8,50 @@ package org.elasticsearch.xpack.ml.job.retention;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.client.OriginSettingClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.index.reindex.DeleteByQueryAction;
 import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.FieldSortBuilder;
+import org.elasticsearch.search.sort.SortBuilder;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
+import org.elasticsearch.xpack.core.ml.job.results.Bucket;
 import org.elasticsearch.xpack.core.ml.job.results.Forecast;
 import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
 import org.elasticsearch.xpack.core.ml.job.results.Result;
+import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
+import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.time.Instant;
 import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Removes all results that have expired the configured retention time
@@ -48,15 +69,17 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
 
     private final OriginSettingClient client;
     private final AnomalyDetectionAuditor auditor;
+    private final ThreadPool threadPool;
 
-    public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor) {
+    public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor, ThreadPool threadPool) {
         super(client);
         this.client = Objects.requireNonNull(client);
         this.auditor = Objects.requireNonNull(auditor);
+        this.threadPool = Objects.requireNonNull(threadPool);
     }
 
     @Override
-    protected Long getRetentionDays(Job job) {
+    Long getRetentionDays(Job job) {
         return job.getResultsRetentionDays();
     }
 
@@ -65,7 +88,7 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
         LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
         DeleteByQueryRequest request = createDBQRequest(job, cutoffEpochMs);
 
-        client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
+        client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() {
             @Override
             public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
                 try {
@@ -107,6 +130,59 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
         return request;
     }
 
+    @Override
+    void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
+        ThreadedActionListener<Long> threadedActionListener = new ThreadedActionListener<>(LOGGER, threadPool,
+                MachineLearning.UTILITY_THREAD_POOL_NAME, listener, false);
+        latestBucketTime(jobId, ActionListener.wrap(
+                latestTime -> {
+                    if (latestTime == null) {
+                        threadedActionListener.onResponse(null);
+                    } else {
+                        long cutoff = latestTime - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis();
+                        threadedActionListener.onResponse(cutoff);
+                    }
+                },
+                listener::onFailure
+        ));
+    }
+
+    private void latestBucketTime(String jobId, ActionListener<Long> listener) {
+        SortBuilder<?> sortBuilder = new FieldSortBuilder(Result.TIMESTAMP.getPreferredName()).order(SortOrder.DESC);
+        QueryBuilder bucketType = QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), Bucket.RESULT_TYPE_VALUE);
+
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.sort(sortBuilder);
+        searchSourceBuilder.query(bucketType);
+        searchSourceBuilder.size(1);
+        searchSourceBuilder.trackTotalHits(false);
+
+        String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
+        SearchRequest searchRequest = new SearchRequest(indexName);
+        searchRequest.source(searchSourceBuilder);
+        searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(SearchRequest.DEFAULT_INDICES_OPTIONS));
+
+        client.search(searchRequest, ActionListener.wrap(
+                response -> {
+                    SearchHit[] hits = response.getHits().getHits();
+                    if (hits.length == 0) {
+                        // no buckets found
+                        listener.onResponse(null);
+                    } else {
+
+                        try (InputStream stream = hits[0].getSourceRef().streamInput();
+                             XContentParser parser = XContentFactory.xContent(XContentType.JSON)
+                                     .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
+                            Bucket bucket = Bucket.LENIENT_PARSER.apply(parser, null);
+                            listener.onResponse(bucket.getTimestamp().getTime());
+                        } catch (IOException e) {
+                            listener.onFailure(new ElasticsearchParseException("failed to parse bucket", e));
+                        }
+                    }
+                }, listener::onFailure
+        ));
+    }
+
     private void auditResultsWereDeleted(String jobId, long cutoffEpochMs) {
         Instant instant = Instant.ofEpochMilli(cutoffEpochMs);
         ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(instant, ZoneOffset.systemDefault());

+ 8 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.OriginSettingClient;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
@@ -25,6 +26,8 @@ import org.elasticsearch.xpack.ml.test.MockOriginSettingClient;
 import org.junit.Before;
 
 import java.io.IOException;
+import java.time.Clock;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -59,6 +62,11 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
             return randomBoolean() ? null : 0L;
         }
 
+        void calcCutoffEpochMs(String jobId, long retentionDays, ActionListener<Long> listener) {
+            long nowEpochMs = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
+            listener.onResponse(nowEpochMs - new TimeValue(retentionDays, TimeUnit.DAYS).getMillis());
+        }
+
         @Override
         protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener<Boolean> listener) {
             listener.onResponse(Boolean.TRUE);

+ 120 - 114
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java

@@ -12,19 +12,16 @@ import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.OriginSettingClient;
-import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.threadpool.FixedExecutorBuilder;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
-import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobTests;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.test.MockOriginSettingClient;
-import org.junit.After;
 import org.junit.Before;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -33,8 +30,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.elasticsearch.xpack.ml.job.retention.AbstractExpiredJobDataRemoverTests.TestListener;
@@ -45,114 +43,97 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
 
     private Client client;
     private OriginSettingClient originSettingClient;
-    private ThreadPool threadPool;
     private List<SearchRequest> capturedSearchRequests;
     private List<DeleteModelSnapshotAction.Request> capturedDeleteModelSnapshotRequests;
-    private List<SearchResponse> searchResponsesPerCall;
     private TestListener listener;
 
     @Before
     public void setUpTests() {
         capturedSearchRequests = new ArrayList<>();
         capturedDeleteModelSnapshotRequests = new ArrayList<>();
-        searchResponsesPerCall = new ArrayList<>();
 
         client = mock(Client.class);
         originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN);
 
         listener = new TestListener();
-
-        // Init thread pool
-        Settings settings = Settings.builder()
-                .put("node.name", "expired_model_snapshots_remover_test")
-                .build();
-        threadPool = new ThreadPool(settings,
-                new FixedExecutorBuilder(settings, MachineLearning.UTILITY_THREAD_POOL_NAME, 1, 1000, ""));
-    }
-
-    @After
-    public void shutdownThreadPool() {
-        terminate(threadPool);
-    }
-
-    public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException {
-        givenClientRequestsSucceed(Arrays.asList(
-                JobTests.buildJobBuilder("foo").build(),
-                JobTests.buildJobBuilder("bar").build()
-        ));
-
-        createExpiredModelSnapshotsRemover().remove(listener, () -> false);
-
-        listener.waitToCompletion();
-        assertThat(listener.success, is(true));
-        verify(client).execute(eq(SearchAction.INSTANCE), any(), any());
     }
 
     public void testRemove_GivenJobWithoutActiveSnapshot() throws IOException {
-        givenClientRequestsSucceed(Collections.singletonList(JobTests.buildJobBuilder("foo").setModelSnapshotRetentionDays(7L).build()));
+        List<SearchResponse> responses = Arrays.asList(
+                AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(JobTests.buildJobBuilder("foo")
+                        .setModelSnapshotRetentionDays(7L).build())),
+                AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList()));
+
+        givenClientRequestsSucceed(responses);
 
         createExpiredModelSnapshotsRemover().remove(listener, () -> false);
 
         listener.waitToCompletion();
         assertThat(listener.success, is(true));
-        verify(client).execute(eq(SearchAction.INSTANCE), any(), any());
+        verify(client, times(2)).execute(eq(SearchAction.INSTANCE), any(), any());
     }
 
     public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException {
-        givenClientRequestsSucceed(
-                Arrays.asList(
-                JobTests.buildJobBuilder("none").build(),
-                JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(),
-                JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
-        ));
-
-        List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
-                createModelSnapshot("snapshots-1", "snapshots-1_2"));
-        List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
-        searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
-        searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
-
+        List<SearchResponse> searchResponses = new ArrayList<>();
+        searchResponses.add(
+                AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList(
+                        JobTests.buildJobBuilder("job-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(),
+                        JobTests.buildJobBuilder("job-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
+        )));
+
+        Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis());
+        ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "fresh-snapshot", oneDayAgo);
+        searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1)));
+
+        Date eightDaysAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(8).getMillis());
+        ModelSnapshot snapshotToBeDeleted = createModelSnapshot("job-1", "old-snapshot", eightDaysAgo);
+        searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshotToBeDeleted)));
+
+        ModelSnapshot snapshot2_1 = createModelSnapshot("job-1", "snapshots-1_1", eightDaysAgo);
+        searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_1)));
+        searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList()));
+
+        givenClientRequestsSucceed(searchResponses);
         createExpiredModelSnapshotsRemover().remove(listener, () -> false);
 
         listener.waitToCompletion();
         assertThat(listener.success, is(true));
 
-        assertThat(capturedSearchRequests.size(), equalTo(2));
-        SearchRequest searchRequest = capturedSearchRequests.get(0);
-        assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
-        searchRequest = capturedSearchRequests.get(1);
-        assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-2")}));
+        assertThat(capturedSearchRequests.size(), equalTo(5));
+        SearchRequest searchRequest = capturedSearchRequests.get(1);
+        assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("job-1")}));
+        searchRequest = capturedSearchRequests.get(3);
+        assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("job-2")}));
 
-        assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(3));
+        assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1));
         DeleteModelSnapshotAction.Request deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(0);
-        assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1"));
-        assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1"));
-        deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(1);
-        assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-1"));
-        assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_2"));
-        deleteSnapshotRequest = capturedDeleteModelSnapshotRequests.get(2);
-        assertThat(deleteSnapshotRequest.getJobId(), equalTo("snapshots-2"));
-        assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-2_1"));
+        assertThat(deleteSnapshotRequest.getJobId(), equalTo("job-1"));
+        assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("old-snapshot"));
     }
 
     public void testRemove_GivenTimeout() throws IOException {
-        givenClientRequestsSucceed(
-                Arrays.asList(
+        List<SearchResponse> searchResponses = new ArrayList<>();
+        searchResponses.add(
+                AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList(
             JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(),
             JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
-        ));
+        )));
 
         List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
             createModelSnapshot("snapshots-1", "snapshots-1_2"));
         List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
-        searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
-        searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
+        searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
+        searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
+
+        givenClientRequestsSucceed(searchResponses);
 
         final int timeoutAfter = randomIntBetween(0, 1);
         AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter);
@@ -164,52 +145,53 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
     }
 
     public void testRemove_GivenClientSearchRequestsFail() throws IOException {
-        givenClientSearchRequestsFail(
-                Arrays.asList(
-                JobTests.buildJobBuilder("none").build(),
+        List<SearchResponse> searchResponses = new ArrayList<>();
+        searchResponses.add(
+                AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList(
                 JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(),
                 JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
-        ));
-
-        List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
-                createModelSnapshot("snapshots-1", "snapshots-1_2"));
-        List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
-        searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
-        searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
+        )));
 
+        givenClientSearchRequestsFail(searchResponses);
         createExpiredModelSnapshotsRemover().remove(listener, () -> false);
 
         listener.waitToCompletion();
         assertThat(listener.success, is(false));
 
-        assertThat(capturedSearchRequests.size(), equalTo(1));
-        SearchRequest searchRequest = capturedSearchRequests.get(0);
+        assertThat(capturedSearchRequests.size(), equalTo(2));
+        SearchRequest searchRequest = capturedSearchRequests.get(1);
         assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
 
         assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(0));
     }
 
     public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOException {
-        givenClientDeleteModelSnapshotRequestsFail(
-                Arrays.asList(
-                JobTests.buildJobBuilder("none").build(),
+        List<SearchResponse> searchResponses = new ArrayList<>();
+        searchResponses.add(
+                AbstractExpiredJobDataRemoverTests.createSearchResponse(Arrays.asList(
                 JobTests.buildJobBuilder("snapshots-1").setModelSnapshotRetentionDays(7L).setModelSnapshotId("active").build(),
                 JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build()
-        ));
+        )));
 
-        List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"),
+        ModelSnapshot snapshot1_1 = createModelSnapshot("snapshots-1", "snapshots-1_1");
+        searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1)));
+
+        List<ModelSnapshot> snapshots1JobSnapshots = Arrays.asList(
+                snapshot1_1,
                 createModelSnapshot("snapshots-1", "snapshots-1_2"));
-        List<ModelSnapshot> snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1"));
-        searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
-        searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots));
+        searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots));
+
+        ModelSnapshot snapshot2_2 = createModelSnapshot("snapshots-2", "snapshots-2_1");
+        searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2)));
 
+        givenClientDeleteModelSnapshotRequestsFail(searchResponses);
         createExpiredModelSnapshotsRemover().remove(listener, () -> false);
 
         listener.waitToCompletion();
         assertThat(listener.success, is(false));
 
-        assertThat(capturedSearchRequests.size(), equalTo(1));
-        SearchRequest searchRequest = capturedSearchRequests.get(0);
+        assertThat(capturedSearchRequests.size(), equalTo(3));
+        SearchRequest searchRequest = capturedSearchRequests.get(1);
         assertThat(searchRequest.indices(), equalTo(new String[] {AnomalyDetectorsIndex.jobResultsAliasedName("snapshots-1")}));
 
         assertThat(capturedDeleteModelSnapshotRequests.size(), equalTo(1));
@@ -218,49 +200,76 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
         assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1"));
     }
 
+    @SuppressWarnings("unchecked")
+    public void testCalcCutoffEpochMs() throws IOException {
+        List<SearchResponse> searchResponses = new ArrayList<>();
+
+        Date oneDayAgo = new Date(new Date().getTime() - TimeValue.timeValueDays(1).getMillis());
+        ModelSnapshot snapshot1_1 = createModelSnapshot("job-1", "newest-snapshot", oneDayAgo);
+        searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot1_1)));
+
+        givenClientRequests(searchResponses, true, true);
+
+        long retentionDays = 3L;
+        ActionListener<Long> cutoffListener = mock(ActionListener.class);
+        createExpiredModelSnapshotsRemover().calcCutoffEpochMs("job-1", retentionDays, cutoffListener);
+
+        long dayInMills = 60 * 60 * 24 * 1000;
+        long expectedCutoffTime = oneDayAgo.getTime() - (dayInMills * retentionDays);
+        verify(cutoffListener).onResponse(eq(expectedCutoffTime));
+    }
+
     private ExpiredModelSnapshotsRemover createExpiredModelSnapshotsRemover() {
+        ThreadPool threadPool = mock(ThreadPool.class);
+        ExecutorService executor = mock(ExecutorService.class);
+
+        when(threadPool.executor(eq(MachineLearning.UTILITY_THREAD_POOL_NAME))).thenReturn(executor);
+
+        doAnswer(invocationOnMock -> {
+                    Runnable run = (Runnable) invocationOnMock.getArguments()[0];
+                    run.run();
+                    return null;
+                }
+        ).when(executor).execute(any());
         return new ExpiredModelSnapshotsRemover(originSettingClient, threadPool);
     }
 
     private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId) {
-        return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).build();
+        return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(new Date()).build();
     }
 
-    private void givenClientRequestsSucceed(List<Job> jobs) throws IOException {
-        givenClientRequests(jobs, true, true);
+    private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) {
+        return new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(date).build();
     }
 
-    private void givenClientSearchRequestsFail(List<Job> jobs) throws IOException {
-        givenClientRequests(jobs, false, true);
+    private void givenClientRequestsSucceed(List<SearchResponse> searchResponses)  {
+        givenClientRequests(searchResponses, true, true);
     }
 
-    private void givenClientDeleteModelSnapshotRequestsFail(List<Job> jobs) throws IOException {
-        givenClientRequests(jobs, true, false);
+    private void givenClientSearchRequestsFail(List<SearchResponse> searchResponses) {
+        givenClientRequests(searchResponses, false, true);
+    }
+
+    private void givenClientDeleteModelSnapshotRequestsFail(List<SearchResponse> searchResponses) {
+        givenClientRequests(searchResponses, true, false);
     }
 
     @SuppressWarnings("unchecked")
-    private void givenClientRequests(List<Job> jobs,
-                                     boolean shouldSearchRequestsSucceed, boolean shouldDeleteSnapshotRequestsSucceed) throws IOException {
-        SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs);
+    private void givenClientRequests(List<SearchResponse> searchResponses,
+                                     boolean shouldSearchRequestsSucceed, boolean shouldDeleteSnapshotRequestsSucceed) {
 
         doAnswer(new Answer<Void>() {
-            int callCount = 0;
-            AtomicBoolean isJobQuery = new AtomicBoolean(true);
+            AtomicInteger callCount = new AtomicInteger();
 
             @Override
             public Void answer(InvocationOnMock invocationOnMock) {
                 ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[2];
 
-                if (isJobQuery.get()) {
-                    listener.onResponse(response);
-                    isJobQuery.set(false);
-                    return null;
-                }
-
                 SearchRequest searchRequest = (SearchRequest) invocationOnMock.getArguments()[1];
                 capturedSearchRequests.add(searchRequest);
-                if (shouldSearchRequestsSucceed) {
-                    listener.onResponse(searchResponsesPerCall.get(callCount++));
+                // Only the last search request should fail
+                if (shouldSearchRequestsSucceed || callCount.get() < searchResponses.size()) {
+                    listener.onResponse(searchResponses.get(callCount.getAndIncrement()));
                 } else {
                     listener.onFailure(new RuntimeException("search failed"));
                 }
@@ -268,9 +277,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
             }
         }).when(client).execute(same(SearchAction.INSTANCE), any(), any());
 
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocationOnMock) {
+        doAnswer(invocationOnMock -> {
                 capturedDeleteModelSnapshotRequests.add((DeleteModelSnapshotAction.Request) invocationOnMock.getArguments()[1]);
                 ActionListener<AcknowledgedResponse> listener =
                         (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[2];
@@ -281,7 +288,6 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
                 }
                 return null;
             }
-        }).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any());
+        ).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any());
     }
-
 }

+ 82 - 35
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java

@@ -7,26 +7,32 @@ package org.elasticsearch.xpack.ml.job.retention;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.OriginSettingClient;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.index.reindex.DeleteByQueryAction;
 import org.elasticsearch.index.reindex.DeleteByQueryRequest;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobTests;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
+import org.elasticsearch.xpack.core.ml.job.results.Bucket;
+import org.elasticsearch.xpack.ml.MachineLearning;
 import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
 import org.elasticsearch.xpack.ml.test.MockOriginSettingClient;
 import org.junit.Before;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -50,13 +56,13 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
     public void setUpTests() {
         capturedDeleteByQueryRequests = new ArrayList<>();
 
-        client = org.mockito.Mockito.mock(Client.class);
+        client = mock(Client.class);
         originSettingClient = MockOriginSettingClient.mockOriginSettingClient(client, ClientHelper.ML_ORIGIN);
         listener = mock(ActionListener.class);
     }
 
     public void testRemove_GivenNoJobs() throws IOException {
-        givenClientRequestsSucceed();
+        givenDBQRequestsSucceed();
         AbstractExpiredJobDataRemoverTests.givenJobs(client, Collections.emptyList());
 
         createExpiredResultsRemover().remove(listener, () -> false);
@@ -66,7 +72,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
     }
 
     public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException {
-        givenClientRequestsSucceed();
+        givenDBQRequestsSucceed();
         AbstractExpiredJobDataRemoverTests.givenJobs(client,
                 Arrays.asList(
                 JobTests.buildJobBuilder("foo").build(),
@@ -79,14 +85,14 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
         verify(client).execute(eq(SearchAction.INSTANCE), any(), any());
     }
 
-    public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() throws Exception {
-        givenClientRequestsSucceed();
-        AbstractExpiredJobDataRemoverTests.givenJobs(client,
-                Arrays.asList(
+    public void testRemove_GivenJobsWithAndWithoutRetentionPolicy() {
+        givenDBQRequestsSucceed();
+
+        givenSearchResponses(Arrays.asList(
                 JobTests.buildJobBuilder("none").build(),
                 JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(),
-                JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
-        ));
+                JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()),
+                new Bucket("id_not_important", new Date(), 60));
 
         createExpiredResultsRemover().remove(listener, () -> false);
 
@@ -98,13 +104,12 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
         verify(listener).onResponse(true);
     }
 
-    public void testRemove_GivenTimeout() throws Exception {
-        givenClientRequestsSucceed();
-        AbstractExpiredJobDataRemoverTests.givenJobs(client,
-                Arrays.asList(
-            JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(),
-            JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
-        ));
+    public void testRemove_GivenTimeout() {
+        givenDBQRequestsSucceed();
+        givenSearchResponses(Arrays.asList(
+                JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(),
+                JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
+        ), new Bucket("id_not_important", new Date(), 60));
 
         final int timeoutAfter = randomIntBetween(0, 1);
         AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter);
@@ -115,14 +120,14 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
         verify(listener).onResponse(false);
     }
 
-    public void testRemove_GivenClientRequestsFailed() throws IOException {
-        givenClientRequestsFailed();
-        AbstractExpiredJobDataRemoverTests.givenJobs(client,
+    public void testRemove_GivenClientRequestsFailed() {
+        givenDBQRequestsFailed();
+        givenSearchResponses(
                 Arrays.asList(
-                JobTests.buildJobBuilder("none").build(),
-                JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(),
-                JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()
-        ));
+                        JobTests.buildJobBuilder("none").build(),
+                        JobTests.buildJobBuilder("results-1").setResultsRetentionDays(10L).build(),
+                        JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()),
+                new Bucket("id_not_important", new Date(), 60));
 
         createExpiredResultsRemover().remove(listener, () -> false);
 
@@ -132,19 +137,33 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
         verify(listener).onFailure(any());
     }
 
-    private void givenClientRequestsSucceed() {
-        givenClientRequests(true);
+    @SuppressWarnings("unchecked")
+    public void testCalcCutoffEpochMs() {
+        String jobId = "calc-cutoff";
+        Date latest = new Date();
+
+        givenSearchResponses(Collections.singletonList(JobTests.buildJobBuilder(jobId).setResultsRetentionDays(1L).build()),
+                new Bucket(jobId, latest, 60));
+
+        ActionListener<Long> cutoffListener = mock(ActionListener.class);
+        createExpiredResultsRemover().calcCutoffEpochMs(jobId, 1L, cutoffListener);
+
+        long dayInMills = 60 * 60 * 24 * 1000;
+        long expectedCutoffTime = latest.getTime() - dayInMills;
+        verify(cutoffListener).onResponse(eq(expectedCutoffTime));
     }
 
-    private void givenClientRequestsFailed() {
-        givenClientRequests(false);
+    private void givenDBQRequestsSucceed() {
+        givenDBQRequest(true);
+    }
+
+    private void givenDBQRequestsFailed() {
+        givenDBQRequest(false);
     }
 
     @SuppressWarnings("unchecked")
-    private void givenClientRequests(boolean shouldSucceed) {
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocationOnMock) {
+    private void givenDBQRequest(boolean shouldSucceed) {
+        doAnswer(invocationOnMock -> {
                 capturedDeleteByQueryRequests.add((DeleteByQueryRequest) invocationOnMock.getArguments()[1]);
                 ActionListener<BulkByScrollResponse> listener =
                         (ActionListener<BulkByScrollResponse>) invocationOnMock.getArguments()[2];
@@ -157,10 +176,38 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
                 }
                 return null;
             }
-        }).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any());
+        ).when(client).execute(same(DeleteByQueryAction.INSTANCE), any(), any());
+    }
+
+    @SuppressWarnings("unchecked")
+    private void givenSearchResponses(List<Job> jobs, Bucket bucket) {
+        doAnswer(invocationOnMock -> {
+            SearchRequest request = (SearchRequest) invocationOnMock.getArguments()[1];
+            ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[2];
+
+            if (request.indices()[0].startsWith(AnomalyDetectorsIndex.jobResultsIndexPrefix())) {
+                // asking for the bucket result
+                listener.onResponse(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(bucket)));
+            } else {
+                listener.onResponse(AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs));
+            }
+            return null;
+        }).when(client).execute(eq(SearchAction.INSTANCE), any(), any());
     }
 
     private ExpiredResultsRemover createExpiredResultsRemover() {
-        return new ExpiredResultsRemover(originSettingClient, mock(AnomalyDetectionAuditor.class));
+        ThreadPool threadPool = mock(ThreadPool.class);
+        ExecutorService executor = mock(ExecutorService.class);
+
+        when(threadPool.executor(eq(MachineLearning.UTILITY_THREAD_POOL_NAME))).thenReturn(executor);
+
+        doAnswer(invocationOnMock -> {
+                Runnable run = (Runnable) invocationOnMock.getArguments()[0];
+                run.run();
+                return null;
+            }
+        ).when(executor).execute(any());
+
+        return new ExpiredResultsRemover(originSettingClient, mock(AnomalyDetectionAuditor.class), threadPool);
     }
 }