Browse Source

[Transform] Report _preview warning in the face of pipeline failure. (#81972)

Przemysław Witek 3 years ago
parent
commit
e9af7c608f

+ 72 - 0
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

@@ -10,8 +10,10 @@ package org.elasticsearch.xpack.transform.integration;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.StringEntity;
 import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.client.WarningsHandler;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.xcontent.XContentBuilder;
@@ -28,7 +30,9 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasSize;
@@ -1013,6 +1017,74 @@ public class TransformPivotRestIT extends TransformRestTestCase {
         });
     }
 
+    @SuppressWarnings("unchecked")
+    public void testPreviewTransformWithPipelineScript() throws Exception {
+        String pipelineId = "my-preview-pivot-pipeline-script";
+        Request pipelineRequest = new Request("PUT", "/_ingest/pipeline/" + pipelineId);
+        pipelineRequest.setJsonEntity("""
+            {
+              "description": "my pivot preview pipeline",
+              "processors": [
+                {
+                  "script": {
+                    "lang": "painless",
+                    "source": "ctx._id = ctx['non']['existing'];"
+                  }
+                }
+              ]
+            }
+            """);
+        client().performRequest(pipelineRequest);
+
+        setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
+        final Request createPreviewRequest = createRequestWithAuth("POST", getTransformEndpoint() + "_preview", null);
+        createPreviewRequest.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(WarningsHandler.PERMISSIVE));
+
+        String config = """
+            {
+              "source": {
+                "index": "%s"
+              },
+              "dest": {
+                "pipeline": "%s"
+              },
+              "pivot": {
+                "group_by": {
+                  "user.id": {
+                    "terms": {
+                      "field": "user_id"
+                    }
+                  },
+                  "by_day": {
+                    "date_histogram": {
+                      "fixed_interval": "1d",
+                      "field": "timestamp"
+                    }
+                  }
+                },
+                "aggregations": {
+                  "user.avg_rating": {
+                    "avg": {
+                      "field": "stars"
+                    }
+                  }
+                }
+              }
+            }""".formatted(REVIEWS_INDEX_NAME, pipelineId);
+        createPreviewRequest.setJsonEntity(config);
+
+        Response createPreviewResponse = client().performRequest(createPreviewRequest);
+        Map<String, Object> previewTransformResponse = entityAsMap(createPreviewResponse);
+        List<Map<String, Object>> preview = (List<Map<String, Object>>) previewTransformResponse.get("preview");
+        // Pipeline failed for all the docs so the preview is empty
+        assertThat(preview, is(empty()));
+        assertThat(createPreviewResponse.getWarnings(), hasSize(1));
+        assertThat(
+            createPreviewResponse.getWarnings().get(0),
+            allOf(containsString("Pipeline returned 100 errors, first error:"), containsString("type=script_exception"))
+        );
+    }
+
     public void testPivotWithMaxOnDateField() throws Exception {
         String transformId = "simple_date_histogram_pivot_with_max_time";
         String transformIndex = "pivot_reviews_via_date_histogram_with_max_time";

+ 12 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java

@@ -203,13 +203,24 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
 
         ActionListener<SimulatePipelineResponse> pipelineResponseActionListener = ActionListener.wrap(simulatePipelineResponse -> {
             List<Map<String, Object>> docs = new ArrayList<>(simulatePipelineResponse.getResults().size());
+            List<Map<String, Object>> errors = new ArrayList<>();
             for (var simulateDocumentResult : simulatePipelineResponse.getResults()) {
                 try (XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
                     XContentBuilder content = simulateDocumentResult.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
                     Map<String, Object> tempMap = XContentHelper.convertToMap(BytesReference.bytes(content), true, XContentType.JSON).v2();
-                    docs.add((Map<String, Object>) XContentMapValues.extractValue("doc._source", tempMap));
+                    Map<String, Object> doc = (Map<String, Object>) XContentMapValues.extractValue("doc._source", tempMap);
+                    if (doc != null) {
+                        docs.add(doc);
+                    }
+                    Map<String, Object> error = (Map<String, Object>) XContentMapValues.extractValue("error", tempMap);
+                    if (error != null) {
+                        errors.add(error);
+                    }
                 }
             }
+            if (errors.isEmpty() == false) {
+                HeaderWarning.addWarning("Pipeline returned " + errors.size() + " errors, first error: " + errors.get(0));
+            }
             TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
                 mappings.get(),
                 transformId,