Browse Source

[ML] Delete unused data frame analytics state (#50243)

This commit adds removal of unused data frame analytics state
from the _delete_expired_data API (and in extend th ML daily
maintenance task). At the moment the potential state docs
include the progress document and state for regression and
classification analyses.
Dimitris Athanasiou 5 years ago
parent
commit
d2aee62468
25 changed files with 235 additions and 22 deletions
  1. 10 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java
  2. 8 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Classification.java
  3. 8 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java
  4. 8 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java
  5. 5 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/ClassificationTests.java
  6. 5 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/RegressionTests.java
  7. 34 0
      x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java
  8. 2 5
      x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java
  9. 1 1
      x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java
  10. 2 2
      x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java
  11. 11 0
      x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java
  12. 34 0
      x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java
  13. 2 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java
  14. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java
  15. 1 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java
  16. 14 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/StoredProgress.java
  17. 1 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedJobsIterator.java
  18. 1 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedResultsIterator.java
  19. 1 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedStateDocIdsIterator.java
  20. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java
  21. 31 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java
  22. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/BatchedDocumentsIterator.java
  23. 40 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/DocIdBatchedDocumentIterator.java
  24. 12 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/StoredProgressTests.java
  25. 1 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/BatchedDocumentsIteratorTests.java

+ 10 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java

@@ -6,6 +6,7 @@
 package org.elasticsearch.xpack.core.ml.dataframe;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -310,6 +311,15 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
         return TYPE + "-" + id;
     }
 
+    /**
+     * Returns the job id from the doc id. Returns {@code null} if the doc id is invalid.
+     */
+    @Nullable
+    public static String extractJobIdFromDocId(String docId) {
+        String jobId = docId.replaceAll("^" + TYPE +"-", "");
+        return jobId.equals(docId) ? null : jobId;
+    }
+
     public static class Builder {
 
         private String id;

+ 8 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Classification.java

@@ -39,6 +39,8 @@ public class Classification implements DataFrameAnalysis {
     public static final ParseField TRAINING_PERCENT = new ParseField("training_percent");
     public static final ParseField RANDOMIZE_SEED = new ParseField("randomize_seed");
 
+    private static final String STATE_DOC_ID_SUFFIX = "_classification_state#1";
+
     private static final ConstructingObjectParser<Classification, Void> LENIENT_PARSER = createParser(true);
     private static final ConstructingObjectParser<Classification, Void> STRICT_PARSER = createParser(false);
 
@@ -256,7 +258,12 @@ public class Classification implements DataFrameAnalysis {
 
     @Override
     public String getStateDocId(String jobId) {
-        return jobId + "_classification_state#1";
+        return jobId + STATE_DOC_ID_SUFFIX;
+    }
+
+    public static String extractJobIdFromStateDoc(String stateDocId) {
+        int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX);
+        return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex);
     }
 
     @Override

+ 8 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/Regression.java

@@ -36,6 +36,8 @@ public class Regression implements DataFrameAnalysis {
     public static final ParseField TRAINING_PERCENT = new ParseField("training_percent");
     public static final ParseField RANDOMIZE_SEED = new ParseField("randomize_seed");
 
+    private static final String STATE_DOC_ID_SUFFIX = "_regression_state#1";
+
     private static final ConstructingObjectParser<Regression, Void> LENIENT_PARSER = createParser(true);
     private static final ConstructingObjectParser<Regression, Void> STRICT_PARSER = createParser(false);
 
@@ -196,7 +198,12 @@ public class Regression implements DataFrameAnalysis {
 
     @Override
     public String getStateDocId(String jobId) {
-        return jobId + "_regression_state#1";
+        return jobId + STATE_DOC_ID_SUFFIX;
+    }
+
+    public static String extractJobIdFromStateDoc(String stateDocId) {
+        int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX);
+        return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex);
     }
 
     @Override

+ 8 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfigTests.java

@@ -53,6 +53,7 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.startsWith;
 
 public class DataFrameAnalyticsConfigTests extends AbstractSerializingTestCase<DataFrameAnalyticsConfig> {
@@ -384,6 +385,13 @@ public class DataFrameAnalyticsConfigTests extends AbstractSerializingTestCase<D
         }
     }
 
+    public void testExtractJobIdFromDocId() {
+        assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("data_frame_analytics_config-foo"), equalTo("foo"));
+        assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("data_frame_analytics_config-data_frame_analytics_config-foo"),
+            equalTo("data_frame_analytics_config-foo"));
+        assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("foo"), is(nullValue()));
+    }
+
     private static void assertTooSmall(ElasticsearchStatusException e) {
         assertThat(e.getMessage(), startsWith("model_memory_limit must be at least 1kb."));
     }

+ 5 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/ClassificationTests.java

@@ -216,4 +216,9 @@ public class ClassificationTests extends AbstractSerializingTestCase<Classificat
         String randomId = randomAlphaOfLength(10);
         assertThat(classification.getStateDocId(randomId), equalTo(randomId + "_classification_state#1"));
     }
+
+    public void testExtractJobIdFromStateDoc() {
+        assertThat(Classification.extractJobIdFromStateDoc("foo_bar-1_classification_state#1"), equalTo("foo_bar-1"));
+        assertThat(Classification.extractJobIdFromStateDoc("noop"), is(nullValue()));
+    }
 }

+ 5 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/dataframe/analyses/RegressionTests.java

@@ -110,6 +110,11 @@ public class RegressionTests extends AbstractSerializingTestCase<Regression> {
         assertThat(regression.getStateDocId(randomId), equalTo(randomId + "_regression_state#1"));
     }
 
+    public void testExtractJobIdFromStateDoc() {
+        assertThat(Regression.extractJobIdFromStateDoc("foo_bar-1_regression_state#1"), equalTo("foo_bar-1"));
+        assertThat(Regression.extractJobIdFromStateDoc("noop"), is(nullValue()));
+    }
+
     public void testToXContent_GivenVersionBeforeRandomizeSeedWasIntroduced() throws IOException {
         Regression regression = createRandom();
         assertThat(regression.getRandomizeSeed(), is(notNullValue()));

+ 34 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

@@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexAction;
 import org.elasticsearch.action.index.IndexRequest;
@@ -17,6 +18,7 @@ import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
@@ -315,6 +317,38 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
         assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
     }
 
+    public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
+        initialize("classification_delete_expired_data");
+        indexData(sourceIndex, 100, 0, KEYWORD_FIELD);
+
+        DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
+        registerAnalytics(config);
+        putAnalytics(config);
+        startAnalytics(jobId);
+        waitUntilAnalyticsIsStopped(jobId);
+
+        assertProgress(jobId, 100, 100, 100, 100);
+        assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
+        assertModelStatePersisted(stateDocId());
+        assertInferenceModelPersisted(jobId);
+
+        // Call _delete_expired_data API and check nothing was deleted
+        assertThat(deleteExpiredData().isDeleted(), is(true));
+        assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
+        assertModelStatePersisted(stateDocId());
+
+        // Delete the config straight from the config index
+        DeleteResponse deleteResponse = client().prepareDelete(".ml-config", DataFrameAnalyticsConfig.documentId(jobId))
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet();
+        assertThat(deleteResponse.status(), equalTo(RestStatus.OK));
+
+        // Now calling the _delete_expired_data API should remove unused state
+        assertThat(deleteExpiredData().isDeleted(), is(true));
+
+        SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet();
+        assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
+    }
+
     private void initialize(String jobId) {
         this.jobId = jobId;
         this.sourceIndex = jobId + "_source_index";

+ 2 - 5
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java

@@ -87,7 +87,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
         cleanUp();
     }
 
-    public void testDeleteExpiredDataGivenNothingToDelete() throws Exception {
+    public void testDeleteExpiredData_GivenNothingToDelete() throws Exception {
         // Tests that nothing goes wrong when there's nothing to delete
         client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
     }
@@ -201,10 +201,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
         assertThat(indexUnusedStateDocsResponse.get().status(), equalTo(RestStatus.OK));
 
         // Now call the action under test
-        client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
-
-        // We need to refresh to ensure the deletion is visible
-        client().admin().indices().prepareRefresh("*").get();
+        assertThat(deleteExpiredData().isDeleted(), is(true));
 
         // no-retention job should have kept all data
         assertThat(getBuckets("no-retention").size(), is(greaterThanOrEqualTo(70)));

+ 1 - 1
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java

@@ -21,6 +21,7 @@ import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.xpack.core.action.util.PageParams;
 import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
 import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
@@ -45,7 +46,6 @@ import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
-import org.elasticsearch.xpack.core.action.util.PageParams;
 import org.elasticsearch.xpack.core.ml.calendars.Calendar;
 import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;

+ 2 - 2
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java

@@ -40,7 +40,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
 import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
 import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
-import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
+import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 
@@ -205,7 +205,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
     }
 
     protected SearchResponse searchStoredProgress(String jobId) {
-        String docId = DataFrameAnalyticsTask.progressDocId(jobId);
+        String docId = StoredProgress.documentId(jobId);
         return client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
             .setQuery(QueryBuilders.idsQuery().addIds(docId))
             .get();

+ 11 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java

@@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.ml.MachineLearningField;
 import org.elasticsearch.xpack.core.ml.MlMetadata;
 import org.elasticsearch.xpack.core.ml.MlTasks;
+import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
 import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
 import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
@@ -142,6 +143,16 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase {
         }
     }
 
+    protected DeleteExpiredDataAction.Response deleteExpiredData() throws Exception {
+        DeleteExpiredDataAction.Response response = client().execute(DeleteExpiredDataAction.INSTANCE,
+            new DeleteExpiredDataAction.Request()).get();
+
+        // We need to refresh to ensure the deletion is visible
+        client().admin().indices().prepareRefresh("*").get();
+
+        return response;
+    }
+
     @Override
     protected void ensureClusterStateConsistency() throws IOException {
         if (cluster() != null && cluster().size() > 0) {

+ 34 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java

@@ -7,11 +7,13 @@ package org.elasticsearch.xpack.ml.integration;
 
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
@@ -272,6 +274,38 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
         assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
     }
 
+    public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
+        initialize("regression_delete_expired_data");
+        indexData(sourceIndex, 100, 0);
+
+        DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Regression(DEPENDENT_VARIABLE_FIELD));
+        registerAnalytics(config);
+        putAnalytics(config);
+        startAnalytics(jobId);
+        waitUntilAnalyticsIsStopped(jobId);
+
+        assertProgress(jobId, 100, 100, 100, 100);
+        assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
+        assertModelStatePersisted(stateDocId());
+        assertInferenceModelPersisted(jobId);
+
+        // Call _delete_expired_data API and check nothing was deleted
+        assertThat(deleteExpiredData().isDeleted(), is(true));
+        assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
+        assertModelStatePersisted(stateDocId());
+
+        // Delete the config straight from the config index
+        DeleteResponse deleteResponse = client().prepareDelete(".ml-config", DataFrameAnalyticsConfig.documentId(jobId))
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet();
+        assertThat(deleteResponse.status(), equalTo(RestStatus.OK));
+
+        // Now calling the _delete_expired_data API should remove unused state
+        assertThat(deleteExpiredData().isDeleted(), is(true));
+
+        SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet();
+        assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
+    }
+
     private void initialize(String jobId) {
         this.jobId = jobId;
         this.sourceIndex = jobId + "_source_index";

+ 2 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java

@@ -43,7 +43,7 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
-import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
+import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
 import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
 import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
 import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
@@ -165,7 +165,7 @@ public class TransportDeleteDataFrameAnalyticsAction
                              DataFrameAnalyticsConfig config,
                              ActionListener<BulkByScrollResponse> listener) {
         List<String> ids = new ArrayList<>();
-        ids.add(DataFrameAnalyticsTask.progressDocId(config.getId()));
+        ids.add(StoredProgress.documentId(config.getId()));
         if (config.getAnalysis().persistsState()) {
             ids.add(config.getAnalysis().getStateDocId(config.getId()));
         }

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java

@@ -187,7 +187,7 @@ public class TransportGetDataFrameAnalyticsStatsAction
             SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern());
             searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
             searchRequest.source().size(1);
-            searchRequest.source().query(QueryBuilders.idsQuery().addIds(DataFrameAnalyticsTask.progressDocId(configId)));
+            searchRequest.source().query(QueryBuilders.idsQuery().addIds(StoredProgress.documentId(configId)));
             multiSearchRequest.add(searchRequest);
         }
 

+ 1 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsTask.java

@@ -244,7 +244,7 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
             statsResponse -> {
                 GetDataFrameAnalyticsStatsAction.Response.Stats stats = statsResponse.getResponse().results().get(0);
                 IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias());
-                indexRequest.id(progressDocId(taskParams.getId()));
+                indexRequest.id(StoredProgress.documentId(taskParams.getId()));
                 indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                 try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) {
                     new StoredProgress(stats.getProgress()).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
@@ -310,10 +310,6 @@ public class DataFrameAnalyticsTask extends AllocatedPersistentTask implements S
         }
     }
 
-    public static String progressDocId(String id) {
-        return "data_frame_analytics-" + id + "-progress";
-    }
-
     public static class ProgressTracker {
 
         public static final String REINDEXING = "reindexing";

+ 14 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/StoredProgress.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.xpack.ml.dataframe;
 
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
@@ -14,6 +15,8 @@ import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
 import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 public class StoredProgress implements ToXContentObject {
 
@@ -57,4 +60,15 @@ public class StoredProgress implements ToXContentObject {
     public int hashCode() {
         return Objects.hash(progress);
     }
+
+    public static String documentId(String id) {
+        return "data_frame_analytics-" + id + "-progress";
+    }
+
+    @Nullable
+    public static String extractJobIdFromDocId(String docId) {
+        Pattern pattern = Pattern.compile("^data_frame_analytics-(.*)-progress$");
+        Matcher matcher = pattern.matcher(docId);
+        return matcher.find() ? matcher.group(1) : null;
+    }
 }

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedJobsIterator.java

@@ -16,6 +16,7 @@ import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.TermQueryBuilder;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
 
 import java.io.IOException;
 import java.io.InputStream;

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedResultsIterator.java

@@ -10,6 +10,7 @@ import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.TermsQueryBuilder;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.results.Result;
+import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
 
 public abstract class BatchedResultsIterator<T> extends BatchedDocumentsIterator<Result<T>> {
 

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedStateDocIdsIterator.java

@@ -9,6 +9,7 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
 
 /**
  * Iterates through the state doc ids

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/ScoresUpdater.java

@@ -12,7 +12,7 @@ import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
 import org.elasticsearch.xpack.core.ml.job.results.Bucket;
 import org.elasticsearch.xpack.core.ml.job.results.Influencer;
 import org.elasticsearch.xpack.core.ml.job.results.Result;
-import org.elasticsearch.xpack.ml.job.persistence.BatchedDocumentsIterator;
+import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;
 import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
 

+ 31 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java

@@ -16,14 +16,19 @@ import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.reindex.DeleteByQueryAction;
 import org.elasticsearch.index.reindex.DeleteByQueryRequest;
 import org.elasticsearch.xpack.core.ml.MlMetadata;
+import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
+import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification;
+import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
+import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
 import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
 import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator;
+import org.elasticsearch.xpack.ml.utils.persistence.DocIdBatchedDocumentIterator;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -93,6 +98,13 @@ public class UnusedStateRemover implements MlDataRemover {
 
     private Set<String> getJobIds() {
         Set<String> jobIds = new HashSet<>();
+        jobIds.addAll(getAnamalyDetectionJobIds());
+        jobIds.addAll(getDataFrameAnalyticsJobIds());
+        return jobIds;
+    }
+
+    private Set<String> getAnamalyDetectionJobIds() {
+        Set<String> jobIds = new HashSet<>();
 
         // TODO Once at 8.0, we can stop searching for jobs in cluster state
         // and remove cluster service as a member all together.
@@ -106,6 +118,18 @@ public class UnusedStateRemover implements MlDataRemover {
         return jobIds;
     }
 
+    private Set<String> getDataFrameAnalyticsJobIds() {
+        Set<String> jobIds = new HashSet<>();
+
+        DocIdBatchedDocumentIterator iterator = new DocIdBatchedDocumentIterator(client, AnomalyDetectorsIndex.configIndexName(),
+            QueryBuilders.termQuery(DataFrameAnalyticsConfig.CONFIG_TYPE.getPreferredName(), DataFrameAnalyticsConfig.TYPE));
+        while (iterator.hasNext()) {
+            Deque<String> docIds = iterator.next();
+            docIds.stream().map(DataFrameAnalyticsConfig::extractJobIdFromDocId).filter(Objects::nonNull).forEach(jobIds::add);
+        }
+        return jobIds;
+    }
+
     private void executeDeleteUnusedStateDocs(List<String> unusedDocIds, ActionListener<Boolean> listener) {
         LOGGER.info("Found [{}] unused state documents; attempting to delete",
                 unusedDocIds.size());
@@ -137,7 +161,13 @@ public class UnusedStateRemover implements MlDataRemover {
     private static class JobIdExtractor {
 
         private static List<Function<String, String>> extractors = Arrays.asList(
-            ModelState::extractJobId, Quantiles::extractJobId, CategorizerState::extractJobId);
+            ModelState::extractJobId,
+            Quantiles::extractJobId,
+            CategorizerState::extractJobId,
+            Classification::extractJobIdFromStateDoc,
+            Regression::extractJobIdFromStateDoc,
+            StoredProgress::extractJobIdFromDocId
+        );
 
         private static String extractJobId(String docId) {
             String jobId;

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIterator.java → x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/BatchedDocumentsIterator.java

@@ -3,7 +3,7 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.ml.job.persistence;
+package org.elasticsearch.xpack.ml.utils.persistence;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;

+ 40 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/DocIdBatchedDocumentIterator.java

@@ -0,0 +1,40 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml.utils.persistence;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+
+import java.util.Objects;
+
+/**
+ * This is a document iterator that returns just the id of each matched document.
+ */
+public class DocIdBatchedDocumentIterator extends BatchedDocumentsIterator<String> {
+
+    private final QueryBuilder query;
+
+    public DocIdBatchedDocumentIterator(Client client, String index, QueryBuilder query) {
+        super(client, index);
+        this.query = Objects.requireNonNull(query);
+    }
+
+    @Override
+    protected QueryBuilder getQuery() {
+        return query;
+    }
+
+    @Override
+    protected String map(SearchHit hit) {
+        return hit.getId();
+    }
+
+    @Override
+    protected boolean shouldFetchSource() {
+        return false;
+    }
+}

+ 12 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/StoredProgressTests.java

@@ -13,6 +13,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.hamcrest.Matchers.equalTo;
+
 public class StoredProgressTests extends AbstractXContentTestCase<StoredProgress> {
 
     @Override
@@ -34,4 +36,14 @@ public class StoredProgressTests extends AbstractXContentTestCase<StoredProgress
         }
         return new StoredProgress(progress);
     }
+
+    public void testDocumentId() {
+        assertThat(StoredProgress.documentId("foo"), equalTo("data_frame_analytics-foo-progress"));
+    }
+
+    public void testExtractJobIdFromDocId() {
+        assertThat(StoredProgress.extractJobIdFromDocId("data_frame_analytics-foo-progress"), equalTo("foo"));
+        assertThat(StoredProgress.extractJobIdFromDocId("data_frame_analytics-data_frame_analytics-bar-progress-progress"),
+            equalTo("data_frame_analytics-bar-progress"));
+    }
 }

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/BatchedDocumentsIteratorTests.java → x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/BatchedDocumentsIteratorTests.java

@@ -3,7 +3,7 @@
  * or more contributor license agreements. Licensed under the Elastic License;
  * you may not use this file except in compliance with the Elastic License.
  */
-package org.elasticsearch.xpack.ml.job.persistence;
+package org.elasticsearch.xpack.ml.utils.persistence;
 
 import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.action.ActionFuture;