Browse Source

[ML] Fix timeout bug in DBQ deletion of unused and orphan ML data (#130083) (#130101)

There was a bug in the code for deleting unused and orphan ML data. When deletion using DBQ occurred, the bug caused the request to time out. This PR resolves the issue.
Valeriy Khakhutskyy 3 months ago
parent
commit
4570ddb2db

+ 5 - 0
docs/changelog/130083.yaml

@@ -0,0 +1,5 @@
+pr: 130083
+summary: Fix timeout bug in DBQ deletion of unused and orphan ML data
+area: Machine Learning
+type: bug
+issues: []

+ 24 - 12
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

@@ -107,15 +107,7 @@ public class JobDataDeleter {
      */
     public void deleteModelSnapshots(List<ModelSnapshot> modelSnapshots, ActionListener<BulkByScrollResponse> listener) {
         if (modelSnapshots.isEmpty()) {
-            listener.onResponse(
-                new BulkByScrollResponse(
-                    TimeValue.ZERO,
-                    new BulkByScrollTask.Status(Collections.emptyList(), null),
-                    Collections.emptyList(),
-                    Collections.emptyList(),
-                    false
-                )
-            );
+            listener.onResponse(emptyBulkByScrollResponse());
             return;
         }
 
@@ -132,7 +124,12 @@ public class JobDataDeleter {
             indices.add(AnomalyDetectorsIndex.jobResultsAliasedName(modelSnapshot.getJobId()));
         }
 
-        String[] indicesToQuery = removeReadOnlyIndices(new ArrayList<>(indices), listener, "model snapshots", null);
+        String[] indicesToQuery = removeReadOnlyIndices(
+            new ArrayList<>(indices),
+            listener,
+            "model snapshots",
+            () -> listener.onResponse(emptyBulkByScrollResponse())
+        );
         if (indicesToQuery.length == 0) return;
 
         DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true)
@@ -145,6 +142,16 @@ public class JobDataDeleter {
         executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
     }
 
+    private static BulkByScrollResponse emptyBulkByScrollResponse() {
+        return new BulkByScrollResponse(
+            TimeValue.ZERO,
+            new BulkByScrollTask.Status(Collections.emptyList(), null),
+            Collections.emptyList(),
+            Collections.emptyList(),
+            false
+        );
+    }
+
     /**
      * Asynchronously delete the annotations
      * If the deleteUserAnnotations field is set to true then all
@@ -309,7 +316,7 @@ public class JobDataDeleter {
             List.of(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)),
             listener,
             "datafeed timing stats",
-            null
+            () -> listener.onResponse(emptyBulkByScrollResponse())
         );
         if (indicesToQuery.length == 0) return;
         DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indicesToQuery).setRefresh(true)
@@ -502,7 +509,12 @@ public class JobDataDeleter {
         ActionListener<BroadcastResponse> refreshListener = ActionListener.wrap(refreshResponse -> {
             logger.info("[{}] running delete by query on [{}]", jobId, String.join(", ", indices));
             ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
-            String[] indicesToQuery = removeReadOnlyIndices(List.of(indices), listener, "results", null);
+            String[] indicesToQuery = removeReadOnlyIndices(
+                List.of(indices),
+                listener,
+                "results",
+                () -> listener.onResponse(emptyBulkByScrollResponse())
+            );
             if (indicesToQuery.length == 0) return;
             DeleteByQueryRequest request = new DeleteByQueryRequest(indicesToQuery).setQuery(query)
                 .setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpenHidden()))