Quellcode durchsuchen

[ML] Refresh results indices before running delete by query (#74292)

Test failures in #74101 revealed that the last documents persisted from
the job running on a node before it goes down may not be deleted
when the reset action is executed. The reason is that the results
index has not been refreshed thus those docs are not visible to
the search the delete by query action is doing.

This commit adds a call to the refresh API before running delete
by query to the results indices.

Closes #74101
Dimitris Athanasiou vor 4 Jahren
Ursprung
Commit
c9ad768536

+ 0 - 1
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java

@@ -464,7 +464,6 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
         });
     }
 
-    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/74101")
     public void testClusterWithTwoMlNodes_RunsDatafeed_GivenOriginalNodeGoesDown() throws Exception {
         internalCluster().ensureAtMostNumDataNodes(0);
         logger.info("Starting dedicated master node...");

+ 32 - 12
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.ml.job.persistence;
 
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
@@ -14,6 +15,9 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.search.MultiSearchAction;
 import org.elasticsearch.action.search.MultiSearchRequest;
@@ -26,10 +30,10 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
@@ -280,17 +284,7 @@ public class JobDataDeleter {
         ActionListener<Boolean> deleteByQueryExecutor = ActionListener.wrap(
             response -> {
                 if (response && indexNames.get().length > 0) {
-                    logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indexNames.get()));
-                    ConstantScoreQueryBuilder query =
-                        new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
-                    DeleteByQueryRequest request = new DeleteByQueryRequest(indexNames.get())
-                        .setQuery(query)
-                        .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))
-                        .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
-                        .setAbortOnVersionConflict(false)
-                        .setRefresh(true);
-
-                    executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, dbqHandler);
+                    deleteResultsByQuery(jobId, indexNames.get(), dbqHandler);
                 } else { // We did not execute DBQ, no need to delete aliases or check the response
                     dbqHandler.onResponse(null);
                 }
@@ -414,6 +408,32 @@ public class JobDataDeleter {
         deleteModelState(jobId, deleteStateHandler);
     }
 
+    private void deleteResultsByQuery(String jobId, String[] indices, ActionListener<BulkByScrollResponse> listener) {
+        assert indices.length > 0;
+
+        ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(
+            refreshResponse -> {
+                logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indices));
+                ConstantScoreQueryBuilder query =
+                    new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
+                DeleteByQueryRequest request = new DeleteByQueryRequest(indices)
+                    .setQuery(query)
+                    .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))
+                    .setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
+                    .setAbortOnVersionConflict(false)
+                    .setRefresh(true);
+
+                executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, listener);
+            },
+            listener::onFailure
+        );
+
+        // First, we refresh the indices to ensure any in-flight docs become visible
+        RefreshRequest refreshRequest = new RefreshRequest(indices);
+        refreshRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()));
+        executeAsyncWithOrigin(client, ML_ORIGIN, RefreshAction.INSTANCE, refreshRequest, refreshListener);
+    }
+
     private void deleteAliases(String jobId, ActionListener<AcknowledgedResponse> finishedHandler) {
         final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
         final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId);