瀏覽代碼

Delete expired data by job (#57337)

Deleting expired data can take a long time leading to timeouts if there
are many jobs. Often the problem is due to a few large jobs which 
prevent the regular maintenance of the remaining jobs. This change adds
a job_id parameter to the delete expired data endpoint to help clean up
those problematic jobs.
David Kyle 5 年之前
父節點
當前提交
bbeda643a6
共有 18 個文件被更改,包括 339 次插入95 次删除
  1. 16 1
      docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc
  2. 21 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java
  3. 9 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java
  4. 17 4
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataActionRequestTests.java
  5. 9 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java
  6. 2 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java
  7. 7 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedJobsIterator.java
  8. 3 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java
  9. 43 40
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java
  10. 2 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java
  11. 3 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java
  12. 5 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java
  13. 32 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java
  14. 5 16
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java
  15. 1 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java
  16. 14 3
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java
  17. 24 0
      x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_expired_data.json
  18. 126 4
      x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_expired_data.yml

+ 16 - 1
docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc

@@ -11,7 +11,9 @@ Deletes expired and unused machine learning data.
 [[ml-delete-expired-data-request]]
 ==== {api-request-title}
 
-`DELETE _ml/_delete_expired_data`
+`DELETE _ml/_delete_expired_data` +
+
+`DELETE _ml/_delete_expired_data/<job_id>`
 
 [[ml-delete-expired-data-prereqs]]
 ==== {api-prereq-title}
@@ -27,6 +29,19 @@ Deletes all job results, model snapshots and forecast data that have exceeded
 their `retention days` period. Machine learning state documents that are not
 associated with any job are also deleted.
 
+You can limit the request to a single or set of {anomaly-jobs} by using a job identifier,
+a group name, a comma-separated list of jobs, or a wildcard expression.
+You can delete expired data for all {anomaly-jobs} by using `_all`, by specifying
+`*` as the `<job_id>`, or by omitting the `<job_id>`.
+
+[[ml-delete-expired-data-path-parms]]
+==== {api-path-parms-title}
+
+`<job_id>`::
+(Optional, string)
+Identifier for an {anomaly-job}. It can be a job identifier, a group name, or a
+wildcard expression.
+
 [[ml-delete-expired-data-request-body]]
 ==== {api-request-body-title}
 

+ 21 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java

@@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.client.ElasticsearchClient;
+import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -19,6 +20,7 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
 
 import java.io.IOException;
 import java.util.Objects;
@@ -46,10 +48,12 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
             PARSER.declareFloat(Request::setRequestsPerSecond, REQUESTS_PER_SECOND);
             PARSER.declareString((obj, value) -> obj.setTimeout(TimeValue.parseTimeValue(value, TIMEOUT.getPreferredName())),
                 TIMEOUT);
+            PARSER.declareString(Request::setJobId, Job.ID);
         }
 
         private Float requestsPerSecond;
         private TimeValue timeout;
+        private String jobId = Metadata.ALL;
 
         public Request() {}
 
@@ -67,6 +71,9 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
                 this.requestsPerSecond = null;
                 this.timeout = null;
             }
+            if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // TODO BWC for V_7_9_0
+                jobId = in.readString();
+            }
         }
 
         public Float getRequestsPerSecond() {
@@ -77,6 +84,10 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
             return timeout;
         }
 
+        public String getJobId() {
+            return jobId;
+        }
+
         public Request setRequestsPerSecond(Float requestsPerSecond) {
             this.requestsPerSecond = requestsPerSecond;
             return this;
@@ -87,6 +98,11 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
             return this;
         }
 
+        public Request setJobId(String jobId) {
+            this.jobId = jobId;
+            return this;
+        }
+
         @Override
         public ActionRequestValidationException validate() {
             if (this.requestsPerSecond != null && this.requestsPerSecond != -1.0f && this.requestsPerSecond <= 0) {
@@ -103,12 +119,13 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
             if (o == null || getClass() != o.getClass()) return false;
             Request request = (Request) o;
             return Objects.equals(requestsPerSecond, request.requestsPerSecond)
+                && Objects.equals(jobId, request.jobId)
                 && Objects.equals(timeout, request.timeout);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(requestsPerSecond, timeout);
+            return Objects.hash(requestsPerSecond, timeout, jobId);
         }
 
         @Override
@@ -118,6 +135,9 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
                 out.writeOptionalFloat(requestsPerSecond);
                 out.writeOptionalTimeValue(timeout);
             }
+            if (out.getVersion().onOrAfter(Version.V_8_0_0)) {  // TODO BWC for V_7_9_0
+                out.writeString(jobId);
+            }
         }
     }
 

+ 9 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java

@@ -248,6 +248,15 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
         return ANOMALY_DETECTOR_JOB_TYPE + "-" + jobId;
     }
 
+    /**
+     * Returns the job id from the doc id. Returns {@code null} if the doc id is invalid.
+     */
+    @Nullable
+    public static String extractJobIdFromDocumentId(String docId) {
+        String jobId = docId.replaceAll("^" + ANOMALY_DETECTOR_JOB_TYPE +"-", "");
+        return jobId.equals(docId) ? null : jobId;
+    }
+
 
     /**
      * Return the Job Id.

+ 17 - 4
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataActionRequestTests.java

@@ -15,10 +15,17 @@ public class DeleteExpiredDataActionRequestTests extends AbstractBWCWireSerializ
 
     @Override
     protected Request createTestInstance() {
-        return new Request(
-            randomBoolean() ? null : randomFloat(),
-            randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test")
-        );
+        Request request = new Request();
+        if (randomBoolean()) {
+            request.setRequestsPerSecond(randomFloat());
+        }
+        if (randomBoolean()) {
+            request.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), "test"));
+        }
+        if (randomBoolean()) {
+            request.setJobId(randomAlphaOfLength(5));
+        }
+        return request;
     }
 
     @Override
@@ -31,6 +38,12 @@ public class DeleteExpiredDataActionRequestTests extends AbstractBWCWireSerializ
         if (version.before(Version.V_7_8_0)) {
             return new Request();
         }
+        if (version.before(Version.V_8_0_0)) {  // TODO make V_7_9_0
+            Request request = new Request();
+            request.setRequestsPerSecond(instance.getRequestsPerSecond());
+            request.setTimeout(instance.getTimeout());
+            return request;
+        }
         return instance;
     }
 }

+ 9 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java

@@ -584,6 +584,15 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
         }
     }
 
+    public void testDocumentId() {
+        String jobFoo = "foo";
+        assertEquals("anomaly_detector-" + jobFoo, Job.documentId(jobFoo));
+        assertEquals(jobFoo, Job.extractJobIdFromDocumentId(
+            Job.documentId(jobFoo)
+        ));
+        assertNull(Job.extractJobIdFromDocumentId("some_other_type-foo"));
+    }
+
     public static Job.Builder buildJobBuilder(String id, Date date) {
         Job.Builder builder = new Job.Builder(id);
         builder.setCreateTime(date);

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java

@@ -88,9 +88,9 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
                                    Supplier<Boolean> isTimedOutSupplier) {
         AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
         List<MlDataRemover> dataRemovers = Arrays.asList(
-                new ExpiredResultsRemover(client, auditor, threadPool),
+                new ExpiredResultsRemover(client, request.getJobId(), auditor, threadPool),
                 new ExpiredForecastsRemover(client, threadPool),
-                new ExpiredModelSnapshotsRemover(client, threadPool),
+                new ExpiredModelSnapshotsRemover(client, request.getJobId(), threadPool),
                 new UnusedStateRemover(client, clusterService),
                 new EmptyStateIndexRemover(client)
         );

+ 7 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedJobsIterator.java

@@ -7,13 +7,13 @@ package org.elasticsearch.xpack.ml.job.persistence;
 
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.client.OriginSettingClient;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.TermQueryBuilder;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
@@ -23,13 +23,17 @@ import java.io.InputStream;
 
 public class BatchedJobsIterator extends BatchedDocumentsIterator<Job.Builder> {
 
-    public BatchedJobsIterator(OriginSettingClient client, String index) {
+    private final String jobIdExpression;
+
+    public BatchedJobsIterator(OriginSettingClient client, String index, String jobIdExpression) {
         super(client, index);
+        this.jobIdExpression = jobIdExpression;
     }
 
     @Override
     protected QueryBuilder getQuery() {
-        return new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
+        String [] tokens = Strings.tokenizeToStringArray(jobIdExpression, ",");
+        return JobConfigProvider.buildJobWildcardQuery(tokens, true);
     }
 
     @Override

+ 3 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java

@@ -512,7 +512,7 @@ public class JobConfigProvider {
                               boolean allowMissingConfigs,
                               ActionListener<SortedSet<String>> listener) {
         String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
-        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
+        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildJobWildcardQuery(tokens, excludeDeleting));
         sourceBuilder.sort(Job.ID.getPreferredName());
         sourceBuilder.fetchSource(false);
         sourceBuilder.docValueField(Job.ID.getPreferredName(), null);
@@ -573,7 +573,7 @@ public class JobConfigProvider {
      */
     public void expandJobs(String expression, boolean allowNoJobs, boolean excludeDeleting, ActionListener<List<Job.Builder>> listener) {
         String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
-        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens, excludeDeleting));
+        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildJobWildcardQuery(tokens, excludeDeleting));
         sourceBuilder.sort(Job.ID.getPreferredName());
 
         SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
@@ -767,7 +767,7 @@ public class JobConfigProvider {
         }
     }
 
-    private QueryBuilder buildQuery(String [] tokens, boolean excludeDeleting) {
+    public static QueryBuilder buildJobWildcardQuery(String [] tokens, boolean excludeDeleting) {
         QueryBuilder jobQuery = new TermQueryBuilder(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE);
         if (Strings.isAllOrWildcard(tokens) && excludeDeleting == false) {
             // match all

+ 43 - 40
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java

@@ -31,9 +31,11 @@ import java.util.stream.Collectors;
  */
 abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
 
-    private final OriginSettingClient client;
+    private final String jobIdExpression;
+    protected final OriginSettingClient client;
 
-    AbstractExpiredJobDataRemover(OriginSettingClient client) {
+    AbstractExpiredJobDataRemover(String jobIdExpression, OriginSettingClient client) {
+        this.jobIdExpression = jobIdExpression;
         this.client = client;
     }
 
@@ -85,7 +87,7 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
     }
 
     private WrappedBatchedJobsIterator newJobIterator() {
-        BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName());
+        BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName(), jobIdExpression);
         return new WrappedBatchedJobsIterator(jobsIterator);
     }
 
@@ -112,8 +114,44 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
     }
 
     /**
-     * BatchedJobsIterator efficiently returns batches of jobs using a scroll
-     * search but AbstractExpiredJobDataRemover works with one job at a time.
+     * The latest time that cutoffs are measured from is not wall clock time,
+     * but some other reference point that makes sense for the type of data
+     * being removed.  This class groups the cutoff time with it's "latest"
+     * reference point.
+     */
+    protected static final class CutoffDetails {
+
+        public final long latestTimeMs;
+        public final long cutoffEpochMs;
+
+        public CutoffDetails(long latestTimeMs, long cutoffEpochMs) {
+            this.latestTimeMs = latestTimeMs;
+            this.cutoffEpochMs = cutoffEpochMs;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(latestTimeMs, cutoffEpochMs);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (other == this) {
+                return true;
+            }
+            if (other instanceof CutoffDetails == false) {
+                return false;
+            }
+            CutoffDetails that = (CutoffDetails) other;
+            return this.latestTimeMs == that.latestTimeMs &&
+                this.cutoffEpochMs == that.cutoffEpochMs;
+        }
+    }
+
+    /**
+     * A wrapper around {@link BatchedJobsIterator} that allows iterating jobs one
+     * at a time from the batches returned by {@code BatchedJobsIterator}
+     *
      * This class abstracts away the logic of pulling one job at a time from
      * multiple batches.
      */
@@ -155,39 +193,4 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
             return new VolatileCursorIterator<>(jobs);
         }
     }
-
-    /**
-     * The latest time that cutoffs are measured from is not wall clock time,
-     * but some other reference point that makes sense for the type of data
-     * being removed.  This class groups the cutoff time with it's "latest"
-     * reference point.
-     */
-    protected static final class CutoffDetails {
-
-        public final long latestTimeMs;
-        public final long cutoffEpochMs;
-
-        public CutoffDetails(long latestTimeMs, long cutoffEpochMs) {
-            this.latestTimeMs = latestTimeMs;
-            this.cutoffEpochMs = cutoffEpochMs;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(latestTimeMs, cutoffEpochMs);
-        }
-
-        @Override
-        public boolean equals(Object other) {
-            if (other == this) {
-                return true;
-            }
-            if (other instanceof CutoffDetails == false) {
-                return false;
-            }
-            CutoffDetails that = (CutoffDetails) other;
-            return this.latestTimeMs == that.latestTimeMs &&
-                this.cutoffEpochMs == that.cutoffEpochMs;
-        }
-    }
 }

+ 2 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java

@@ -63,12 +63,10 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
      */
     private static final int MODEL_SNAPSHOT_SEARCH_SIZE = 10000;
 
-    private final OriginSettingClient client;
     private final ThreadPool threadPool;
 
-    public ExpiredModelSnapshotsRemover(OriginSettingClient client, ThreadPool threadPool) {
-        super(client);
-        this.client = Objects.requireNonNull(client);
+    public ExpiredModelSnapshotsRemover(OriginSettingClient client, String jobIdExpression, ThreadPool threadPool) {
+        super(jobIdExpression, client);
         this.threadPool = Objects.requireNonNull(threadPool);
     }
 

+ 3 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java

@@ -67,13 +67,12 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
 
     private static final Logger LOGGER = LogManager.getLogger(ExpiredResultsRemover.class);
 
-    private final OriginSettingClient client;
     private final AnomalyDetectionAuditor auditor;
     private final ThreadPool threadPool;
 
-    public ExpiredResultsRemover(OriginSettingClient client, AnomalyDetectionAuditor auditor, ThreadPool threadPool) {
-        super(client);
-        this.client = Objects.requireNonNull(client);
+    public ExpiredResultsRemover(OriginSettingClient client, String jobIdExpression,
+                                 AnomalyDetectionAuditor auditor, ThreadPool threadPool) {
+        super(jobIdExpression, client);
         this.auditor = Objects.requireNonNull(auditor);
         this.threadPool = Objects.requireNonNull(threadPool);
     }

+ 5 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java

@@ -26,7 +26,6 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerS
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
 import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
-import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
 import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator;
 import org.elasticsearch.xpack.ml.utils.persistence.DocIdBatchedDocumentIterator;
 
@@ -110,10 +109,11 @@ public class UnusedStateRemover implements MlDataRemover {
         // and remove cluster service as a member all together.
         jobIds.addAll(MlMetadata.getMlMetadata(clusterService.state()).getJobs().keySet());
 
-        BatchedJobsIterator jobsIterator = new BatchedJobsIterator(client, AnomalyDetectorsIndex.configIndexName());
-        while (jobsIterator.hasNext()) {
-            Deque<Job.Builder> jobs = jobsIterator.next();
-            jobs.stream().map(Job.Builder::getId).forEach(jobIds::add);
+        DocIdBatchedDocumentIterator iterator = new DocIdBatchedDocumentIterator(client, AnomalyDetectorsIndex.configIndexName(),
+            QueryBuilders.termQuery(Job.JOB_TYPE.getPreferredName(), Job.ANOMALY_DETECTOR_JOB_TYPE));
+        while (iterator.hasNext()) {
+            Deque<String> docIds = iterator.next();
+            docIds.stream().map(Job::extractJobIdFromDocumentId).filter(Objects::nonNull).forEach(jobIds::add);
         }
         return jobIds;
     }

+ 32 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java

@@ -6,10 +6,12 @@
 package org.elasticsearch.xpack.ml.rest;
 
 import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.ml.MachineLearning;
 
 import java.io.IOException;
@@ -22,7 +24,8 @@ public class RestDeleteExpiredDataAction extends BaseRestHandler {
 
     @Override
     public List<Route> routes() {
-        return Collections.emptyList();
+        return Collections.singletonList(
+            new Route(DELETE, MachineLearning.BASE_PATH + "_delete_expired_data/{" + Job.ID.getPreferredName() + "}"));
     }
 
     @Override
@@ -41,9 +44,34 @@ public class RestDeleteExpiredDataAction extends BaseRestHandler {
 
     @Override
     protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
-        DeleteExpiredDataAction.Request request = restRequest.hasContent() ?
-            DeleteExpiredDataAction.Request.PARSER.apply(restRequest.contentParser(), null) :
-            new DeleteExpiredDataAction.Request();
+        DeleteExpiredDataAction.Request request;
+        if (restRequest.hasContent()) {
+            request = DeleteExpiredDataAction.Request.PARSER.apply(restRequest.contentParser(), null);
+        } else {
+            request = new DeleteExpiredDataAction.Request();
+
+            String perSecondParam = restRequest.param(DeleteExpiredDataAction.Request.REQUESTS_PER_SECOND.getPreferredName());
+            if (perSecondParam != null) {
+                try {
+                    request.setRequestsPerSecond(Float.parseFloat(perSecondParam));
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Failed to parse float parameter [" +
+                        DeleteExpiredDataAction.Request.REQUESTS_PER_SECOND.getPreferredName() +
+                        "] with value [" + perSecondParam + "]", e);
+                }
+            }
+
+            String timeoutParam = restRequest.param(DeleteExpiredDataAction.Request.TIMEOUT.getPreferredName());
+            if (timeoutParam != null) {
+                request.setTimeout(restRequest.paramAsTime(timeoutParam, null));
+            }
+        }
+
+        String jobId = restRequest.param(Job.ID.getPreferredName());
+        if (Strings.isNullOrEmpty(jobId) == false) {
+            request.setJobId(jobId);
+        }
+
         return channel -> client.execute(DeleteExpiredDataAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }
 }

+ 5 - 16
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java

@@ -51,8 +51,8 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
 
         private int getRetentionDaysCallCount = 0;
 
-        ConcreteExpiredJobDataRemover(OriginSettingClient client) {
-            super(client);
+        ConcreteExpiredJobDataRemover(String jobId, OriginSettingClient client) {
+            super(jobId, client);
         }
 
         @Override
@@ -101,17 +101,6 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
         return searchResponse;
     }
 
-    @SuppressWarnings("unchecked")
-    static void givenJobs(Client client, List<Job> jobs) throws IOException {
-        SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs);
-
-        doAnswer(invocationOnMock -> {
-            ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[2];
-            listener.onResponse(response);
-            return null;
-        }).when(client).execute(eq(SearchAction.INSTANCE), any(), any());
-    }
-
     private static SearchResponse createSearchResponse(List<? extends ToXContent> toXContents, int totalHits) throws IOException {
         SearchHit[] hitsArray = new SearchHit[toXContents.size()];
         for (int i = 0; i < toXContents.size(); i++) {
@@ -131,7 +120,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
         mockSearchResponse(response);
 
         TestListener listener = new TestListener();
-        ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient);
+        ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover("*", originSettingClient);
         remover.remove(1.0f,listener, () -> false);
 
         listener.waitToCompletion();
@@ -170,7 +159,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
         }).when(client).execute(eq(SearchAction.INSTANCE), any(), any());
 
         TestListener listener = new TestListener();
-        ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient);
+        ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover("*", originSettingClient);
         remover.remove(1.0f,listener, () -> false);
 
         listener.waitToCompletion();
@@ -194,7 +183,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
         mockSearchResponse(response);
 
         TestListener listener = new TestListener();
-        ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient);
+        ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover("*", originSettingClient);
         remover.remove(1.0f,listener, () -> attemptsLeft.getAndDecrement() <= 0);
 
         listener.waitToCompletion();

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java

@@ -256,7 +256,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
                     return null;
                 }
         ).when(executor).execute(any());
-        return new ExpiredModelSnapshotsRemover(originSettingClient, threadPool);
+        return new ExpiredModelSnapshotsRemover(originSettingClient, "*", threadPool);
     }
 
     private static ModelSnapshot createModelSnapshot(String jobId, String snapshotId, Date date) {

+ 14 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java

@@ -63,7 +63,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
 
     public void testRemove_GivenNoJobs() throws IOException {
         givenDBQRequestsSucceed();
-        AbstractExpiredJobDataRemoverTests.givenJobs(client, Collections.emptyList());
+        givenJobs(client, Collections.emptyList());
 
         createExpiredResultsRemover().remove(1.0f, listener, () -> false);
 
@@ -73,7 +73,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
 
     public void testRemove_GivenJobsWithoutRetentionPolicy() throws IOException {
         givenDBQRequestsSucceed();
-        AbstractExpiredJobDataRemoverTests.givenJobs(client,
+        givenJobs(client,
                 Arrays.asList(
                 JobTests.buildJobBuilder("foo").build(),
                 JobTests.buildJobBuilder("bar").build()
@@ -153,6 +153,17 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
         verify(cutoffListener).onResponse(eq(new AbstractExpiredJobDataRemover.CutoffDetails(latest.getTime(), expectedCutoffTime)));
     }
 
+    @SuppressWarnings("unchecked")
+    static void givenJobs(Client client, List<Job> jobs) throws IOException {
+        SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs);
+
+        doAnswer(invocationOnMock -> {
+            ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocationOnMock.getArguments()[2];
+            listener.onResponse(response);
+            return null;
+        }).when(client).execute(eq(SearchAction.INSTANCE), any(), any());
+    }
+
     private void givenDBQRequestsSucceed() {
         givenDBQRequest(true);
     }
@@ -208,6 +219,6 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
             }
         ).when(executor).execute(any());
 
-        return new ExpiredResultsRemover(originSettingClient, mock(AnomalyDetectionAuditor.class), threadPool);
+        return new ExpiredResultsRemover(originSettingClient, "*", mock(AnomalyDetectionAuditor.class), threadPool);
     }
 }

+ 24 - 0
x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_expired_data.json

@@ -7,6 +7,18 @@
     "stability":"stable",
     "url":{
       "paths":[
+        {
+          "path":"/_ml/_delete_expired_data/{job_id}",
+          "methods":[
+            "DELETE"
+          ],
+          "parts":{
+            "job_id":{
+              "type":"string",
+              "description":"The ID of the job(s) to perform expired data hygiene for"
+            }
+          }
+        },
         {
           "path":"/_ml/_delete_expired_data",
           "methods":[
@@ -15,6 +27,18 @@
         }
       ]
     },
+    "params":{
+      "requests_per_second":{
+        "type":"number",
+        "required":false,
+        "description":"The desired requests per second for the deletion processes."
+      },
+      "timeout":{
+        "type":"time",
+        "required":false,
+        "description":"How long can the underlying delete processes run until they are canceled"
+      }
+    },
     "body":{
       "description":"deleting expired data parameters"
     }

+ 126 - 4
x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_expired_data.yml

@@ -3,12 +3,11 @@ setup:
       features: headers
   - do:
       headers:
-        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA=="
       ml.put_job:
-        job_id: delete-expired-data
+        job_id: delete-expired-data-a
         body:  >
           {
-            "job_id": "delete-expired-data",
             "description":"Analysis of response time by airline",
             "analysis_config" : {
                 "bucket_span" : "1h",
@@ -18,7 +17,30 @@ setup:
                 "field_delimiter":",",
                 "time_field":"time",
                 "time_format":"yyyy-MM-dd HH:mm:ssX"
-            }
+            },
+            "results_retention_days" : 1,
+            "model_snapshot_retention_days" : 1
+          }
+
+  - do:
+      headers:
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA=="
+      ml.put_job:
+        job_id: delete-expired-data-b
+        body:  >
+          {
+            "description":"Analysis of response time by airline",
+            "analysis_config" : {
+                "bucket_span" : "1h",
+                "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
+            },
+            "data_description" : {
+                "field_delimiter":",",
+                "time_field":"time",
+                "time_format":"yyyy-MM-dd HH:mm:ssX"
+            },
+            "results_retention_days" : 1,
+            "model_snapshot_retention_days" : 1
           }
 
 ---
@@ -34,3 +56,103 @@ setup:
         body:  >
            { "timeout": "10h", "requests_per_second": 100000.0 }
   - match: { deleted: true}
+---
+"Test delete expired data with path parameters":
+  - do:
+      ml.delete_expired_data:
+        timeout: "10h"
+        requests_per_second: 100000.0
+  - match: { deleted: true}
+
+---
+"Test delete expired data with job id":
+  - do:
+      headers:
+        Content-Type: application/json
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA=="
+      index:
+        index:  .ml-anomalies-shared
+        id:     "delete-expired-data-a_model_snapshot_inactive-snapshot"
+        body: >
+          {
+            "job_id": "delete-expired-data-a",
+            "timestamp": "2020-05-01T00:00:00Z",
+            "snapshot_id": "inactive-snapshot",
+            "description": "first",
+            "latest_record_time_stamp": "2020-05-01T00:00:00Z",
+            "latest_result_time_stamp": "2020-05-01T00:00:00Z",
+            "snapshot_doc_count": 1
+          }
+
+  - do:
+      headers:
+        Content-Type: application/json
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA=="
+      index:
+        index:  .ml-anomalies-shared
+        id:     "delete-expired-data-a_model_snapshot_active-snapshot"
+        body: >
+          {
+            "job_id": "delete-expired-data-a",
+            "timestamp": "2020-05-10T00:00:00Z",
+            "snapshot_id": "active-snapshot",
+            "description": "second",
+            "latest_record_time_stamp": "2020-05-10T00:00:00Z",
+            "latest_result_time_stamp": "2020-05-10T00:00:00Z",
+            "snapshot_doc_count": 1,
+            "model_size_stats": {
+                "job_id" : "delete-expired-data-a",
+                "result_type" : "model_size_stats",
+                "model_bytes" : 0
+            },
+            "quantiles": {
+              "job_id": "delete-expired-data-a",
+              "timestamp": 1,
+              "quantile_state": "quantiles-1"
+            }
+          }
+
+  - do:
+      headers:
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA=="
+      indices.refresh:
+        index: [.ml-anomalies-shared]
+
+  - do:
+      ml.get_model_snapshots:
+        job_id: delete-expired-data-a
+  - match: { count: 2 }
+
+# make the above document the current snapshot
+  - do:
+      ml.revert_model_snapshot:
+        job_id: delete-expired-data-a
+        snapshot_id: active-snapshot
+
+  - do:
+      ml.get_model_snapshots:
+        job_id: delete-expired-data-a
+  - match: { count: 2 }
+
+  - do:
+      ml.delete_expired_data:
+        job_id: delete-expired-data-b
+
+  - do:
+      ml.get_model_snapshots:
+        job_id: delete-expired-data-a
+  - match: { count: 2 }
+
+  - do:
+      ml.delete_expired_data:
+        job_id: delete-expired-data-a
+
+  - do:
+      headers:
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA=="
+      indices.refresh: {}
+
+  - do:
+      ml.get_model_snapshots:
+        job_id: delete-expired-data-a
+  - match: { count: 1 }