浏览代码

Fix downsample api by returning a failure incase one or more downsample persistent tasks failed. (#103615)

Prior to this change, when a shard downsample task fails, the downsample api would return a successful response. This was due to a mistake, the FAILED status wasn't checked. Only whether COMPLETED, FAILED or CANCELLED flag were set. With this change if FAILED is the status then the downsample api will report an error.
Martijn van Groningen 1 年之前
父节点
当前提交
1ae8ae99d0

+ 5 - 0
docs/changelog/103615.yaml

@@ -0,0 +1,5 @@
+pr: 103615
+summary: Fix downsample api by returning a failure in case one or more downsample persistent tasks failed
+area: Downsampling
+type: bug
+issues: []

+ 32 - 0
x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml

@@ -407,6 +407,38 @@ setup:
             "fixed_interval": "1h"
           }
 
+---
+"Downsample failure":
+  - skip:
+      version: " - 8.12.99"
+      reason: "#103615 merged to 8.13.0 and later"
+      features: allowed_warnings
+
+  - do:
+      allowed_warnings:
+        - "index template [my-template1] has index patterns [failed-downsample-test] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template1
+        body:
+          index_patterns: [failed-downsample-test]
+          template:
+            settings:
+              index:
+                routing:
+                  allocation:
+                    include:
+                      does-not-exist: "yes"
+
+  - do:
+      catch: /downsample task \[downsample-failed-downsample-test-0-1h\] failed/
+      indices.downsample:
+        index: test
+        target_index: failed-downsample-test
+        body:  >
+          {
+            "fixed_interval": "1h"
+          }
+
 ---
 "Downsample to existing index":
   - skip:

+ 5 - 3
x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java

@@ -159,12 +159,13 @@ public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecut
         final DownsampleShardTaskParams params,
         final SearchHit[] lastDownsampledTsidHits
     ) {
+        DownsampleShardTask downsampleShardTask = (DownsampleShardTask) task;
         client.execute(
             DelegatingAction.INSTANCE,
-            new DelegatingAction.Request((DownsampleShardTask) task, lastDownsampledTsidHits, params),
+            new DelegatingAction.Request(downsampleShardTask, lastDownsampledTsidHits, params),
             ActionListener.wrap(empty -> {}, e -> {
                 LOGGER.error("error while delegating", e);
-                markAsFailed(task, e);
+                markAsFailed(downsampleShardTask, e);
             })
         );
     }
@@ -222,7 +223,8 @@ public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecut
         });
     }
 
-    private static void markAsFailed(AllocatedPersistentTask task, Exception e) {
+    private static void markAsFailed(DownsampleShardTask task, Exception e) {
+        task.setDownsampleShardIndexerStatus(DownsampleShardIndexerStatus.FAILED);
         task.updatePersistentTaskState(
             new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.FAILED, null),
             ActionListener.running(() -> task.markAsFailed(e))

+ 13 - 0
x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java

@@ -400,6 +400,19 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
 
                 @Override
                 public void onResponse(PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> persistentTask) {
+                    if (persistentTask != null) {
+                        var runningPersistentTaskState = (DownsampleShardPersistentTaskState) persistentTask.getState();
+                        if (runningPersistentTaskState != null) {
+                            if (runningPersistentTaskState.failed()) {
+                                onFailure(new ElasticsearchException("downsample task [" + persistentTaskId + "] failed"));
+                                return;
+                            } else if (runningPersistentTaskState.cancelled()) {
+                                onFailure(new ElasticsearchException("downsample task [" + persistentTaskId + "] cancelled"));
+                                return;
+                            }
+                        }
+                    }
+
                     logger.info("Downsampling task [" + persistentTaskId + " completed for shard " + params.shardId());
                     if (countDown.decrementAndGet() == 0) {
                         logger.info("All downsampling tasks completed [" + numberOfShards + "]");