Browse Source

ML: update .ml-state actions to support > 1 index (#37307)

* ML: Updating .ml-state calls to be able to support > 1 index

* Matching bulk delete behavior with dbq

* Adjusting state name

* refreshing indices before search

* fixing line length

* adjusting index expansion options
Benjamin Trent 6 years ago
parent
commit
19a7e0f4eb
17 changed files with 184 additions and 106 deletions
  1. 9 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java
  2. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndexFields.java
  3. 2 1
      x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java
  4. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  5. 4 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java
  6. 3 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java
  7. 5 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java
  8. 27 18
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java
  9. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java
  10. 5 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java
  11. 15 8
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/StateStreamer.java
  12. 29 24
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java
  13. 1 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java
  14. 12 5
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java
  15. 4 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/NetworkDisruptionIT.java
  16. 5 0
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java
  17. 60 27
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/StateStreamerTests.java

+ 9 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java

@@ -44,7 +44,15 @@ public final class AnomalyDetectorsIndex {
      * @return The index name
      */
     public static String jobStateIndexName() {
-        return AnomalyDetectorsIndexFields.STATE_INDEX_NAME;
+        return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX;
+    }
+
+    /**
+     * The name pattern to capture all .ml-state prefixed indices
+     * @return The .ml-state index pattern
+     */
+    public static String jobStateIndexPattern() {
+        return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "*";
     }
 
     /**

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndexFields.java

@@ -9,7 +9,7 @@ public final class AnomalyDetectorsIndexFields {
 
     public static final String CONFIG_INDEX = ".ml-config";
     public static final String RESULTS_INDEX_PREFIX = ".ml-anomalies-";
-    public static final String STATE_INDEX_NAME = ".ml-state";
+    public static final String STATE_INDEX_PREFIX = ".ml-state";
     public static final String RESULTS_INDEX_DEFAULT = "shared";
 
     private AnomalyDetectorsIndexFields() {}

+ 2 - 1
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java

@@ -239,8 +239,9 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
         }
 
         // Verify .ml-state doesn't contain unused state documents
-        SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexName())
+        SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
                 .setFetchSource(false)
+                .setTrackTotalHits(true)
                 .setSize(10000)
                 .get();
 

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -702,7 +702,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
 
             try (XContentBuilder stateMapping = ElasticsearchMappings.stateMapping()) {
                 IndexTemplateMetaData stateTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName())
-                        .patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexName()))
+                        .patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexPattern()))
                         // TODO review these settings
                         .settings(Settings.builder()
                                 .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")

+ 4 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

@@ -14,7 +14,6 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
@@ -387,7 +386,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
                 failureHandler);
 
         // Step 2. Delete state done, delete the quantiles
-        ActionListener<BulkResponse> deleteStateHandler = ActionListener.wrap(
+        ActionListener<BulkByScrollResponse> deleteStateHandler = ActionListener.wrap(
                 bulkResponse -> deleteQuantiles(parentTaskClient, jobId, deleteQuantilesHandler),
                 failureHandler);
 
@@ -397,7 +396,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
 
     private void deleteQuantiles(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<Boolean> finishedHandler) {
         // The quantiles type and doc ID changed in v5.5 so delete both the old and new format
-        DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName());
+        DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern());
         // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
         IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId));
         request.setQuery(query);
@@ -417,7 +416,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
                 }));
     }
 
-    private void deleteModelState(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<BulkResponse> listener) {
+    private void deleteModelState(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener<BulkByScrollResponse> listener) {
         GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null);
         request.setPageParams(new PageParams(0, MAX_SNAPSHOTS_TO_DELETE));
         executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap(
@@ -432,7 +431,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
     private void deleteCategorizerState(ParentTaskAssigningClient parentTaskClient, String jobId, int docNum,
                                         ActionListener<Boolean> finishedHandler) {
         // The categorizer state type and doc ID changed in v5.5 so delete both the old and new format
-        DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName());
+        DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern());
         // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace
         IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum));
         request.setQuery(query);

+ 3 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteModelSnapshotAction.java

@@ -7,12 +7,12 @@ package org.elasticsearch.xpack.ml.action;
 
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
@@ -79,9 +79,9 @@ public class TransportDeleteModelSnapshotAction extends HandledTransportAction<D
                                 // Delete the snapshot and any associated state files
                                 JobDataDeleter deleter = new JobDataDeleter(client, request.getJobId());
                                 deleter.deleteModelSnapshots(Collections.singletonList(deleteCandidate),
-                                        new ActionListener<BulkResponse>() {
+                                        new ActionListener<BulkByScrollResponse>() {
                                             @Override
-                                            public void onResponse(BulkResponse bulkResponse) {
+                                            public void onResponse(BulkByScrollResponse bulkResponse) {
                                                 String msg = Messages.getMessage(Messages.JOB_AUDIT_SNAPSHOT_DELETED,
                                                         deleteCandidate.getSnapshotId(), deleteCandidate.getDescription());
 

+ 5 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

@@ -16,6 +16,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.client.Client;
@@ -368,13 +369,14 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
 
     static String[] indicesOfInterest(String resultsIndex) {
         if (resultsIndex == null) {
-            return new String[]{AnomalyDetectorsIndex.jobStateIndexName(), MlMetaIndex.INDEX_NAME};
+            return new String[]{AnomalyDetectorsIndex.jobStateIndexPattern(), MlMetaIndex.INDEX_NAME};
         }
-        return new String[]{AnomalyDetectorsIndex.jobStateIndexName(), resultsIndex, MlMetaIndex.INDEX_NAME};
+        return new String[]{AnomalyDetectorsIndex.jobStateIndexPattern(), resultsIndex, MlMetaIndex.INDEX_NAME};
     }
 
     static List<String> verifyIndicesPrimaryShardsAreActive(String resultsIndex, ClusterState clusterState) {
-        String[] indices = indicesOfInterest(resultsIndex);
+        IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
+        String[] indices = resolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), indicesOfInterest(resultsIndex));
         List<String> unavailableIndices = new ArrayList<>(indices.length);
         for (String index : indices) {
             // Indices are created on demand from templates.

+ 27 - 18
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

@@ -8,26 +8,28 @@ package org.elasticsearch.xpack.ml.job.persistence;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.bulk.BulkAction;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
+import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.index.reindex.BulkByScrollTask;
 import org.elasticsearch.index.reindex.DeleteByQueryAction;
 import org.elasticsearch.index.reindex.DeleteByQueryRequest;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
-import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.core.ml.job.results.Result;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
@@ -50,27 +52,34 @@ public class JobDataDeleter {
      *
      * @param modelSnapshots the model snapshots to delete
      */
-    public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListener<BulkResponse> listener) {
+    public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListener<BulkByScrollResponse> listener) {
         if (modelSnapshots.isEmpty()) {
-            listener.onResponse(new BulkResponse(new BulkItemResponse[0], 0L));
+            listener.onResponse(new BulkByScrollResponse(TimeValue.ZERO,
+                new BulkByScrollTask.Status(Collections.emptyList(), null),
+                Collections.emptyList(),
+                Collections.emptyList(),
+                false));
             return;
         }
 
-        String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName();
+        String stateIndexName = AnomalyDetectorsIndex.jobStateIndexPattern();
 
-        BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
+        List<String> idsToDelete = new ArrayList<>();
+        Set<String> indices = new HashSet<>();
+        indices.add(stateIndexName);
         for (ModelSnapshot modelSnapshot : modelSnapshots) {
-            for (String stateDocId : modelSnapshot.stateDocumentIds()) {
-                bulkRequestBuilder.add(client.prepareDelete(stateIndexName, ElasticsearchMappings.DOC_TYPE, stateDocId));
-            }
-
-            bulkRequestBuilder.add(client.prepareDelete(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()),
-                    ElasticsearchMappings.DOC_TYPE, ModelSnapshot.documentId(modelSnapshot)));
+            idsToDelete.addAll(modelSnapshot.stateDocumentIds());
+            idsToDelete.add(ModelSnapshot.documentId(modelSnapshot));
+            indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()));
         }
 
-        bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indices.toArray(new String[0]))
+            .setRefresh(true)
+            .setIndicesOptions(IndicesOptions.lenientExpandOpen())
+            .setQuery(new IdsQueryBuilder().addIds(idsToDelete.toArray(new String[0])));
+
         try {
-            executeAsyncWithOrigin(client, ML_ORIGIN, BulkAction.INSTANCE, bulkRequestBuilder.request(), listener);
+            executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
         } catch (Exception e) {
             listener.onFailure(e);
         }

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java

@@ -305,7 +305,7 @@ public class JobResultsPersister {
      * @param jobId The job Id
      * */
     public void commitStateWrites(String jobId) {
-        String indexName = AnomalyDetectorsIndex.jobStateIndexName();
+        String indexName = AnomalyDetectorsIndex.jobStateIndexPattern();
         // Refresh should wait for Lucene to make the data searchable
         logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName);
         RefreshRequest refreshRequest = new RefreshRequest(indexName);

+ 5 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java

@@ -157,14 +157,14 @@ public class JobResultsProvider {
      */
     public void checkForLeftOverDocuments(Job job, ActionListener<Boolean> listener) {
 
-        SearchRequestBuilder stateDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexName())
+        SearchRequestBuilder stateDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
                 .setQuery(QueryBuilders.idsQuery().addIds(CategorizerState.documentId(job.getId(), 1),
                         CategorizerState.v54DocumentId(job.getId(), 1)))
-                .setIndicesOptions(IndicesOptions.lenientExpandOpen());
+                .setIndicesOptions(IndicesOptions.strictExpand());
 
-        SearchRequestBuilder quantilesDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexName())
+        SearchRequestBuilder quantilesDocSearch = client.prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
                 .setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId()), Quantiles.v54DocumentId(job.getId())))
-                .setIndicesOptions(IndicesOptions.lenientExpandOpen());
+                .setIndicesOptions(IndicesOptions.strictExpand());
 
         String resultsIndexName = job.getResultsIndexName();
         SearchRequestBuilder resultDocSearch = client.prepareSearch(resultsIndexName)
@@ -396,7 +396,7 @@ public class JobResultsProvider {
 
         AutodetectParams.Builder paramsBuilder = new AutodetectParams.Builder(job.getId());
         String resultsIndex = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
-        String stateIndex = AnomalyDetectorsIndex.jobStateIndexName();
+        String stateIndex = AnomalyDetectorsIndex.jobStateIndexPattern();
 
         MultiSearchRequestBuilder msearch = client.prepareMultiSearch()
                 .add(createLatestDataCountsSearch(resultsIndex, jobId))

+ 15 - 8
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/StateStreamer.java

@@ -9,10 +9,11 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefIterator;
-import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
 import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
@@ -62,7 +63,7 @@ public class StateStreamer {
      * @param restoreStream the stream to write the state to
      */
     public void restoreStateToStream(String jobId, ModelSnapshot modelSnapshot, OutputStream restoreStream) throws IOException {
-        String indexName = AnomalyDetectorsIndex.jobStateIndexName();
+        String indexName = AnomalyDetectorsIndex.jobStateIndexPattern();
 
         // First try to restore model state.
         for (String stateDocId : modelSnapshot.stateDocumentIds()) {
@@ -73,13 +74,16 @@ public class StateStreamer {
             LOGGER.trace("ES API CALL: get ID {} from index {}", stateDocId, indexName);
 
             try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
-                GetResponse stateResponse = client.prepareGet(indexName, ElasticsearchMappings.DOC_TYPE, stateDocId).get();
-                if (!stateResponse.isExists()) {
+                SearchResponse stateResponse = client.prepareSearch(indexName)
+                    .setTypes(ElasticsearchMappings.DOC_TYPE)
+                    .setSize(1)
+                    .setQuery(QueryBuilders.idsQuery().addIds(stateDocId)).get();
+                if (stateResponse.getHits().getHits().length == 0) {
                     LOGGER.error("Expected {} documents for model state for {} snapshot {} but failed to find {}",
                             modelSnapshot.getSnapshotDocCount(), jobId, modelSnapshot.getSnapshotId(), stateDocId);
                     break;
                 }
-                writeStateToStream(stateResponse.getSourceAsBytesRef(), restoreStream);
+                writeStateToStream(stateResponse.getHits().getAt(0).getSourceRef(), restoreStream);
             }
         }
 
@@ -97,11 +101,14 @@ public class StateStreamer {
             LOGGER.trace("ES API CALL: get ID {} from index {}", docId, indexName);
 
             try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) {
-                GetResponse stateResponse = client.prepareGet(indexName, ElasticsearchMappings.DOC_TYPE, docId).get();
-                if (!stateResponse.isExists()) {
+                SearchResponse stateResponse = client.prepareSearch(indexName)
+                    .setTypes(ElasticsearchMappings.DOC_TYPE)
+                    .setSize(1)
+                    .setQuery(QueryBuilders.idsQuery().addIds(docId)).get();
+                if (stateResponse.getHits().getHits().length == 0) {
                     break;
                 }
-                writeStateToStream(stateResponse.getSourceAsBytesRef(), restoreStream);
+                writeStateToStream(stateResponse.getHits().getAt(0).getSourceRef(), restoreStream);
             }
         }
 

+ 29 - 24
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java

@@ -8,11 +8,13 @@ package org.elasticsearch.xpack.ml.job.retention;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.reindex.DeleteByQueryAction;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
 import org.elasticsearch.xpack.core.ml.MlMetadata;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -23,6 +25,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
 import org.elasticsearch.xpack.ml.job.persistence.BatchedJobsIterator;
 import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Deque;
 import java.util.HashSet;
@@ -51,9 +54,9 @@ public class UnusedStateRemover implements MlDataRemover {
     @Override
     public void remove(ActionListener<Boolean> listener) {
         try {
-            BulkRequestBuilder deleteUnusedStateRequestBuilder = findUnusedStateDocs();
-            if (deleteUnusedStateRequestBuilder.numberOfActions() > 0) {
-                executeDeleteUnusedStateDocs(deleteUnusedStateRequestBuilder, listener);
+            List<String> unusedStateDocIds = findUnusedStateDocIds();
+            if (unusedStateDocIds.size() > 0) {
+                executeDeleteUnusedStateDocs(unusedStateDocIds, listener);
             } else {
                 listener.onResponse(true);
             }
@@ -62,10 +65,11 @@ public class UnusedStateRemover implements MlDataRemover {
         }
     }
 
-    private BulkRequestBuilder findUnusedStateDocs() {
+    private List<String> findUnusedStateDocIds() {
         Set<String> jobIds = getJobIds();
-        BulkRequestBuilder deleteUnusedStateRequestBuilder = client.prepareBulk();
-        BatchedStateDocIdsIterator stateDocIdsIterator = new BatchedStateDocIdsIterator(client, AnomalyDetectorsIndex.jobStateIndexName());
+        List<String> stateDocIdsToDelete = new ArrayList<>();
+        BatchedStateDocIdsIterator stateDocIdsIterator = new BatchedStateDocIdsIterator(client,
+            AnomalyDetectorsIndex.jobStateIndexPattern());
         while (stateDocIdsIterator.hasNext()) {
             Deque<String> stateDocIds = stateDocIdsIterator.next();
             for (String stateDocId : stateDocIds) {
@@ -75,12 +79,11 @@ public class UnusedStateRemover implements MlDataRemover {
                     continue;
                 }
                 if (jobIds.contains(jobId) == false) {
-                    deleteUnusedStateRequestBuilder.add(new DeleteRequest(
-                            AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, stateDocId));
+                    stateDocIdsToDelete.add(stateDocId);
                 }
             }
         }
-        return deleteUnusedStateRequestBuilder;
+        return stateDocIdsToDelete;
     }
 
     private Set<String> getJobIds() {
@@ -98,27 +101,29 @@ public class UnusedStateRemover implements MlDataRemover {
         return jobIds;
     }
 
-    private void executeDeleteUnusedStateDocs(BulkRequestBuilder deleteUnusedStateRequestBuilder, ActionListener<Boolean> listener) {
+    private void executeDeleteUnusedStateDocs(List<String> unusedDocIds, ActionListener<Boolean> listener) {
         LOGGER.info("Found [{}] unused state documents; attempting to delete",
-                deleteUnusedStateRequestBuilder.numberOfActions());
-        deleteUnusedStateRequestBuilder.execute(new ActionListener<BulkResponse>() {
-            @Override
-            public void onResponse(BulkResponse bulkItemResponses) {
-                if (bulkItemResponses.hasFailures()) {
+                unusedDocIds.size());
+        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
+            .types(ElasticsearchMappings.DOC_TYPE)
+            .setIndicesOptions(IndicesOptions.lenientExpandOpen())
+            .setQuery(QueryBuilders.idsQuery().addIds(unusedDocIds.toArray(new String[0])));
+        client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryRequest, ActionListener.wrap(
+            response -> {
+                if (response.getBulkFailures().size() > 0 || response.getSearchFailures().size() > 0) {
                     LOGGER.error("Some unused state documents could not be deleted due to failures: {}",
-                            bulkItemResponses.buildFailureMessage());
+                        Strings.collectionToCommaDelimitedString(response.getBulkFailures()) +
+                            "," + Strings.collectionToCommaDelimitedString(response.getSearchFailures()));
                 } else {
                     LOGGER.info("Successfully deleted all unused state documents");
                 }
                 listener.onResponse(true);
-            }
-
-            @Override
-            public void onFailure(Exception e) {
+            },
+            e -> {
                 LOGGER.error("Error deleting unused model state documents: ", e);
                 listener.onFailure(e);
             }
-        });
+        ));
     }
 
     private static class JobIdExtractor {

+ 1 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java

@@ -379,7 +379,7 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
         Quantiles quantiles = new Quantiles(jobId, new Date(), "quantile-state");
         indexQuantiles(quantiles);
 
-        client().admin().indices().prepareRefresh(MlMetaIndex.INDEX_NAME, AnomalyDetectorsIndex.jobStateIndexName(),
+        client().admin().indices().prepareRefresh(MlMetaIndex.INDEX_NAME, AnomalyDetectorsIndex.jobStateIndexPattern(),
                 AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).get();
 
 

+ 12 - 5
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java

@@ -6,8 +6,8 @@
 package org.elasticsearch.xpack.ml.integration;
 
 import org.elasticsearch.Version;
-import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.xpack.core.ml.MlMetadata;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
@@ -52,6 +53,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder;
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.mockito.Matchers.eq;
@@ -308,12 +310,17 @@ public class MlConfigMigratorIT extends MlSingleNodeTestCase {
     }
 
     public void assertSnapshot(MlMetadata expectedMlMetadata) throws IOException {
-        GetResponse getResponse = client()
-                .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, "ml-config").get();
+        client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.jobStateIndexPattern()).execute();
+        SearchResponse searchResponse = client()
+            .prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
+            .setTypes(ElasticsearchMappings.DOC_TYPE)
+            .setSize(1)
+            .setQuery(QueryBuilders.idsQuery().addIds("ml-config"))
+            .get();
 
-        assertTrue(getResponse.isExists());
+        assertThat(searchResponse.getHits().getHits().length, greaterThan(0));
 
-        try (InputStream stream = getResponse.getSourceAsBytesRef().streamInput();
+        try (InputStream stream = searchResponse.getHits().getAt(0).getSourceRef().streamInput();
              XContentParser parser = XContentFactory.xContent(XContentType.JSON)
                      .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
             MlMetadata recoveredMeta = MlMetadata.LENIENT_PARSER.apply(parser, null).build();

+ 4 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/NetworkDisruptionIT.java

@@ -93,8 +93,9 @@ public class NetworkDisruptionIT extends BaseMlIntegTestCase {
         assertEquals(newJobNode, finalJobNode);
 
         // The job running on the original node should have been killed, and hence should not have persisted quantiles
-        SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexName())
+        SearchResponse searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
                 .setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId())))
+                .setTrackTotalHits(true)
                 .setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute().actionGet();
         assertEquals(0L, searchResponse.getHits().getTotalHits().value);
 
@@ -103,8 +104,9 @@ public class NetworkDisruptionIT extends BaseMlIntegTestCase {
         assertTrue(closeJobResponse.isClosed());
 
         // The relocated job was closed rather than killed, and hence should have persisted quantiles
-        searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexName())
+        searchResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
                 .setQuery(QueryBuilders.idsQuery().addIds(Quantiles.documentId(job.getId())))
+                .setTrackTotalHits(true)
                 .setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute().actionGet();
         assertEquals(1L, searchResponse.getHits().getTotalHits().value);
     }

+ 5 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/MockClientBuilder.java

@@ -271,6 +271,11 @@ public class MockClientBuilder {
         return this;
     }
 
+    public MockClientBuilder prepareSearches(String index, SearchRequestBuilder first, SearchRequestBuilder... searches) {
+        when(client.prepareSearch(eq(index))).thenReturn(first, searches);
+        return this;
+    }
+
     /**
      * Creates a {@link SearchResponse} with a {@link SearchHit} for each element of {@code docs}
      * @param indexName Index being searched

+ 60 - 27
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/StateStreamerTests.java

@@ -5,14 +5,20 @@
  */
 package org.elasticsearch.xpack.ml.job.persistence;
 
-import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.mock.orig.Mockito;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.sort.SortBuilder;
+import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
-import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
@@ -21,9 +27,14 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -36,25 +47,25 @@ public class StateStreamerTests extends ESTestCase {
         String snapshotId = "123";
         Map<String, Object> categorizerState = new HashMap<>();
         categorizerState.put("catName", "catVal");
-        GetResponse categorizerStateGetResponse1 = createGetResponse(true, categorizerState);
-        GetResponse categorizerStateGetResponse2 = createGetResponse(false, null);
-        Map<String, Object> modelState = new HashMap<>();
-        modelState.put("modName", "modVal1");
-        GetResponse modelStateGetResponse1 = createGetResponse(true, modelState);
-        modelState.put("modName", "modVal2");
-        GetResponse modelStateGetResponse2 = createGetResponse(true, modelState);
-
-        MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME).addClusterStatusYellowResponse()
-                .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE,
-                        CategorizerState.documentId(JOB_ID, 1), categorizerStateGetResponse1)
-                .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE,
-                        CategorizerState.documentId(JOB_ID, 2), categorizerStateGetResponse2)
-                .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE,
-                        ModelState.documentId(JOB_ID, snapshotId, 1), modelStateGetResponse1)
-                .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE,
-                        ModelState.documentId(JOB_ID, snapshotId, 2), modelStateGetResponse2);
+        Map<String, Object> modelState1 = new HashMap<>();
+        modelState1.put("modName1", "modVal1");
+        Map<String, Object> modelState2 = new HashMap<>();
+        modelState2.put("modName2", "modVal2");
 
 
+        SearchRequestBuilder builder1 = prepareSearchBuilder(createSearchResponse(Collections.singletonList(modelState1)),
+            QueryBuilders.idsQuery().addIds(ModelState.documentId(JOB_ID, snapshotId, 1)));
+        SearchRequestBuilder builder2 = prepareSearchBuilder(createSearchResponse(Collections.singletonList(modelState2)),
+            QueryBuilders.idsQuery().addIds(ModelState.documentId(JOB_ID, snapshotId, 2)));
+        SearchRequestBuilder builder3 = prepareSearchBuilder(createSearchResponse(Collections.singletonList(categorizerState)),
+            QueryBuilders.idsQuery().addIds(CategorizerState.documentId(JOB_ID, 1)));
+        SearchRequestBuilder builder4 = prepareSearchBuilder(createSearchResponse(Collections.emptyList()),
+            QueryBuilders.idsQuery().addIds(CategorizerState.documentId(JOB_ID, 2)));
+
+        MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME)
+            .addClusterStatusYellowResponse()
+            .prepareSearches(AnomalyDetectorsIndex.jobStateIndexPattern(), builder1, builder2, builder3, builder4);
+
         ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(JOB_ID).setSnapshotId(snapshotId).setSnapshotDocCount(2).build();
 
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
@@ -64,8 +75,8 @@ public class StateStreamerTests extends ESTestCase {
 
         String[] restoreData = stream.toString(StandardCharsets.UTF_8.name()).split("\0");
         assertEquals(3, restoreData.length);
-        assertEquals("{\"modName\":\"modVal1\"}", restoreData[0]);
-        assertEquals("{\"modName\":\"modVal2\"}", restoreData[1]);
+        assertEquals("{\"modName1\":\"modVal1\"}", restoreData[0]);
+        assertEquals("{\"modName2\":\"modVal2\"}", restoreData[1]);
         assertEquals("{\"catName\":\"catVal\"}", restoreData[2]);
     }
 
@@ -80,10 +91,32 @@ public class StateStreamerTests extends ESTestCase {
         Mockito.verifyNoMoreInteractions(outputStream);
     }
 
-    private static GetResponse createGetResponse(boolean exists, Map<String, Object> source) throws IOException {
-        GetResponse getResponse = mock(GetResponse.class);
-        when(getResponse.isExists()).thenReturn(exists);
-        when(getResponse.getSourceAsBytesRef()).thenReturn(BytesReference.bytes(XContentFactory.jsonBuilder().map(source)));
-        return getResponse;
+    private static SearchResponse createSearchResponse(List<Map<String, Object>> source) throws IOException {
+        SearchResponse searchResponse = mock(SearchResponse.class);
+        SearchHit[] hits = new SearchHit[source.size()];
+        int i = 0;
+        for (Map<String, Object> s : source) {
+            SearchHit hit = new SearchHit(1).sourceRef(BytesReference.bytes(XContentFactory.jsonBuilder().map(s)));
+            hits[i++] = hit;
+        }
+        SearchHits searchHits = new SearchHits(hits, null, (float)0.0);
+        when(searchResponse.getHits()).thenReturn(searchHits);
+        return searchResponse;
+    }
+
+    private static SearchRequestBuilder prepareSearchBuilder(SearchResponse response, QueryBuilder queryBuilder) {
+        SearchRequestBuilder builder = mock(SearchRequestBuilder.class);
+        when(builder.setTypes(any())).thenReturn(builder);
+        when(builder.addSort(any(SortBuilder.class))).thenReturn(builder);
+        when(builder.setQuery(queryBuilder)).thenReturn(builder);
+        when(builder.setPostFilter(any())).thenReturn(builder);
+        when(builder.setFrom(anyInt())).thenReturn(builder);
+        when(builder.setSize(anyInt())).thenReturn(builder);
+        when(builder.setFetchSource(eq(true))).thenReturn(builder);
+        when(builder.addDocValueField(any(String.class))).thenReturn(builder);
+        when(builder.addDocValueField(any(String.class), any(String.class))).thenReturn(builder);
+        when(builder.addSort(any(String.class), any(SortOrder.class))).thenReturn(builder);
+        when(builder.get()).thenReturn(response);
+        return builder;
     }
-}
+}