Browse Source

[Transform] Report validation failure if there are no aggregations in the test search query (#95318)

Przemysław Witek 2 years ago
parent
commit
fab72ab42a
12 changed files with 107 additions and 31 deletions
  1. 6 0
      docs/changelog/95318.yaml
  2. 16 6
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityTransformIT.java
  3. 1 0
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_unattended.yml
  4. 33 0
      x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/multi_cluster/80_transform.yml
  5. 4 5
      x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java
  6. 3 1
      x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java
  7. 1 10
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java
  8. 1 0
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java
  9. 4 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java
  10. 0 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java
  11. 26 5
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java
  12. 12 1
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java

+ 6 - 0
docs/changelog/95318.yaml

@@ -0,0 +1,6 @@
+pr: 95318
+summary: Report validation failure if there are no aggregations in the test search
+  query
+area: Transform
+type: bug
+issues: []

+ 16 - 6
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityTransformIT.java

@@ -206,8 +206,7 @@ public class RemoteClusterSecurityTransformIT extends AbstractRemoteClusterSecur
             assertOK(performRequestWithRemoteTransformUser(new Request("DELETE", "/_transform/simple-remote-transform")));
 
             // Create a transform targeting an index without permission
-            final var putTransformRequest2 = new Request("PUT", "/_transform/invalid");
-            putTransformRequest2.setJsonEntity("""
+            String invalidTransformConfig = """
                 {
                   "source": { "index": "my_remote_cluster:private-transform-index" },
                   "dest": { "index": "simple-remote-transform" },
@@ -216,15 +215,26 @@ public class RemoteClusterSecurityTransformIT extends AbstractRemoteClusterSecur
                     "aggs": {"avg_stars": {"avg": {"field": "stars"}}}
                   }
                 }
-                """);
-            assertOK(performRequestWithRemoteTransformUser(putTransformRequest2));
-            // It errors when trying to preview it
+                """;
+            final var putInvalidTransformRequest = new Request("PUT", "/_transform/invalid");
+            putInvalidTransformRequest.setJsonEntity(invalidTransformConfig);
+            // It errors when trying to execute the PUT request
             final ResponseException e = expectThrows(
                 ResponseException.class,
-                () -> performRequestWithRemoteTransformUser(new Request("GET", "/_transform/invalid/_preview"))
+                () -> performRequestWithRemoteTransformUser(putInvalidTransformRequest)
             );
             assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
             assertThat(e.getMessage(), containsString("Source indices have been deleted or closed"));
+
+            final var previewInvalidTransformRequest = new Request("GET", "/_transform/_preview");
+            previewInvalidTransformRequest.setJsonEntity(invalidTransformConfig);
+            // It also errors when trying to execute _preview request
+            final ResponseException e2 = expectThrows(
+                ResponseException.class,
+                () -> performRequestWithRemoteTransformUser(previewInvalidTransformRequest)
+            );
+            assertThat(e2.getResponse().getStatusLine().getStatusCode(), equalTo(400));
+            assertThat(e2.getMessage(), containsString("Source indices have been deleted or closed"));
         }
     }
 

+ 1 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_unattended.yml

@@ -74,6 +74,7 @@ teardown:
   - do:
       transform.put_transform:
         transform_id: "transform-unattended"
+        defer_validation: true
         body: >
           {
             "source": { "index": "airline-data*" },

+ 33 - 0
x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/multi_cluster/80_transform.yml

@@ -306,6 +306,7 @@ teardown:
 ---
 "Batch transform from remote cluster when the user is not authorized":
   - do:
+      catch: /Source indices have been deleted or closed./
       headers: { Authorization: "Basic Ym9iOnRyYW5zZm9ybS1wYXNzd29yZA==" }  # This is bob
       transform.put_transform:
         transform_id: "simple-remote-transform-3"
@@ -319,6 +320,37 @@ teardown:
             }
           }
 
+  - do:
+      headers: { Authorization: "Basic Ym9iOnRyYW5zZm9ybS1wYXNzd29yZA==" }  # This is bob
+      transform.put_transform:
+        transform_id: "simple-remote-transform-3"
+        # With defer_validation set, the validation is not performed, so the transform can be created
+        defer_validation: true
+        body: >
+          {
+            "source": { "index": "my_remote_cluster:remote_test_index" },
+            "dest": { "index": "simple-remote-transform-3" },
+            "pivot": {
+              "group_by": { "user": {"terms": {"field": "user"}}},
+              "aggs": { "avg_stars": {"avg": {"field": "stars"}}}
+            }
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      # bob is unauthorized to access the remote index
+      catch: /Source indices have been deleted or closed./
+      headers: { Authorization: "Basic Ym9iOnRyYW5zZm9ybS1wYXNzd29yZA==" }  # This is bob
+      transform.preview_transform:
+        transform_id: "simple-remote-transform-3"
+
+  - do:
+      # On start, we perform the validation regardless of the defer_validation flag specified on PUT, hence the failure
+      catch: /Source indices have been deleted or closed./
+      headers: { Authorization: "Basic Ym9iOnRyYW5zZm9ybS1wYXNzd29yZA==" }  # This is bob
+      transform.start_transform:
+        transform_id: "simple-remote-transform-3"
+
 ---
 "Batch transform update from remote cluster when the user is not authorized":
   - do:
@@ -336,6 +368,7 @@ teardown:
           }
   - match: { acknowledged: true }
   - do:
+      catch: /Source indices have been deleted or closed./
       headers: { Authorization: "Basic Ym9iOnRyYW5zZm9ybS1wYXNzd29yZA==" }  # This is bob
       transform.update_transform:
         transform_id: "simple-remote-transform-2"

+ 4 - 5
x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java

@@ -421,9 +421,8 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
 
         startTransform(config.getId(), RequestOptions.DEFAULT);
 
-        // transform is red with two issues
-        String noSuchIndexIssue = Strings.format("org.elasticsearch.index.IndexNotFoundException: no such index [%s]", destIndexName);
-        assertBusy(() -> assertRed(transformId, authIssue, noSuchIndexIssue), 10, TimeUnit.SECONDS);
+        // transform is red with one issue
+        assertBusy(() -> assertRed(transformId, authIssue), 10, TimeUnit.SECONDS);
 
         // update transform's credentials so that the transform has permission to access source/dest indices
         updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build());
@@ -464,8 +463,8 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
 
         startTransform(config.getId(), RequestOptions.DEFAULT);
 
-        // transform's auth state status is still RED, but the health status is GREEN (because dest index exists)
-        assertRed(transformId, authIssue);
+        // transform is red with one issue
+        assertBusy(() -> assertRed(transformId, authIssue), 10, TimeUnit.SECONDS);
 
         // update transform's credentials so that the transform has permission to access source/dest indices
         updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build());

+ 3 - 1
x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java

@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 
 public class TransformUpdateIT extends TransformRestTestCase {
@@ -246,7 +247,8 @@ public class TransformUpdateIT extends TransformRestTestCase {
             }
             fail("request should have failed");
         } catch (ResponseException e) {
-            assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(500));
+            assertThat("Error was: " + e.getMessage(), e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
+            assertThat(e.getMessage(), containsString("Source indices have been deleted or closed."));
         }
         assertBusy(() -> {
             Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);

+ 1 - 10
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java

@@ -69,7 +69,6 @@ import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtil
 
 public class TransportPreviewTransformAction extends HandledTransportAction<Request, Response> {
 
-    private static final int NUMBER_OF_PREVIEW_BUCKETS = 100;
     private final SecurityContext securityContext;
     private final IndexNameExpressionResolver indexNameExpressionResolver;
     private final Client client;
@@ -279,15 +278,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
 
         ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(deducedMappings -> {
             mappings.set(deducedMappings);
-            function.preview(
-                parentTaskAssigningClient,
-                timeout,
-                filteredHeaders,
-                source,
-                deducedMappings,
-                NUMBER_OF_PREVIEW_BUCKETS,
-                previewListener
-            );
+            function.preview(parentTaskAssigningClient, timeout, filteredHeaders, source, deducedMappings, previewListener);
         }, listener::onFailure);
 
         function.deduceMappings(parentTaskAssigningClient, filteredHeaders, source, deduceMappingsListener);

+ 1 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java

@@ -196,6 +196,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
             );
         }, e -> {
             if (Boolean.TRUE.equals(transformConfigHolder.get().getSettings().getUnattended())) {
+                logger.debug(() -> format("[%s] Validation failed: %s", transformConfigHolder.get().getId(), e.getMessage()));
                 logger.debug(
                     () -> format("[%s] Skip dest index creation as this is an unattended transform", transformConfigHolder.get().getId())
                 );

+ 4 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java

@@ -274,12 +274,15 @@ class ClientTransformIndexer extends TransformIndexer {
         SchemaUtil.getDestinationFieldMappings(client, getConfig().getDestination().getIndex(), fieldMappingsListener);
     }
 
+    @Override
     void validate(ActionListener<Void> listener) {
+        assert Boolean.TRUE.equals(transformConfig.getSettings().getUnattended());
         ClientHelper.executeAsyncWithOrigin(
             client,
             ClientHelper.TRANSFORM_ORIGIN,
             ValidateTransformAction.INSTANCE,
-            new ValidateTransformAction.Request(transformConfig, false, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT),
+            // Since the transform is unattended, we defer the deferrable validations.
+            new ValidateTransformAction.Request(transformConfig, true, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT),
             ActionListener.wrap(response -> listener.onResponse(null), listener::onFailure)
         );
     }

+ 0 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java

@@ -136,7 +136,6 @@ public interface Function {
      * @param headers headers to be used to query only for what the caller is allowed to
      * @param sourceConfig the source configuration
      * @param fieldTypeMap mapping of field types
-     * @param numberOfRows number of rows to produce for the preview
      * @param listener listener that takes a list, where every entry corresponds to 1 row/doc in the preview
      */
     void preview(
@@ -145,7 +144,6 @@ public interface Function {
         Map<String, String> headers,
         SourceConfig sourceConfig,
         Map<String, String> fieldTypeMap,
-        int numberOfRows,
         ActionListener<List<Map<String, Object>>> listener
     );
 

+ 26 - 5
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java

@@ -45,7 +45,9 @@ import static org.elasticsearch.core.Strings.format;
  */
 public abstract class AbstractCompositeAggFunction implements Function {
 
-    public static final int TEST_QUERY_PAGE_SIZE = 50;
+    private static final int TEST_QUERY_PAGE_SIZE = 50;
+    private static final int PREVIEW_QUERY_PAGE_SIZE = 100;
+
     public static final String COMPOSITE_AGGREGATION_NAME = "_transform";
 
     private final CompositeAggregationBuilder cachedCompositeAggregation;
@@ -68,19 +70,31 @@ public abstract class AbstractCompositeAggFunction implements Function {
         Map<String, String> headers,
         SourceConfig sourceConfig,
         Map<String, String> fieldTypeMap,
-        int numberOfBuckets,
         ActionListener<List<Map<String, Object>>> listener
     ) {
         ClientHelper.assertNoAuthorizationHeader(headers);
+        SearchRequest searchRequest = buildSearchRequest(sourceConfig, timeout, PREVIEW_QUERY_PAGE_SIZE);
         ClientHelper.executeWithHeadersAsync(
             headers,
             ClientHelper.TRANSFORM_ORIGIN,
             client,
             SearchAction.INSTANCE,
-            buildSearchRequest(sourceConfig, timeout, numberOfBuckets),
-            ActionListener.wrap(r -> {
+            searchRequest,
+            ActionListener.wrap(response -> {
                 try {
-                    final Aggregations aggregations = r.getAggregations();
+                    if (response == null) {
+                        listener.onFailure(new ValidationException().addValidationError("Unexpected null response from preview query"));
+                        return;
+                    }
+                    if (response.status() != RestStatus.OK) {
+                        listener.onFailure(
+                            new ValidationException().addValidationError(
+                                format("Unexpected status from response of preview query: %s", response.status())
+                            )
+                        );
+                        return;
+                    }
+                    final Aggregations aggregations = response.getAggregations();
                     if (aggregations == null) {
                         listener.onFailure(
                             new ElasticsearchStatusException("Source indices have been deleted or closed.", RestStatus.BAD_REQUEST)
@@ -131,6 +145,13 @@ public abstract class AbstractCompositeAggFunction implements Function {
                     );
                     return;
                 }
+                final Aggregations aggregations = response.getAggregations();
+                if (aggregations == null) {
+                    listener.onFailure(
+                        new ElasticsearchStatusException("Source indices have been deleted or closed.", RestStatus.BAD_REQUEST)
+                    );
+                    return;
+                }
                 listener.onResponse(true);
             }, e -> {
                 Throwable unwrapped = ExceptionsHelper.unwrapCause(e);

+ 12 - 1
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java

@@ -55,6 +55,7 @@ import org.junit.Before;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -130,6 +131,13 @@ public class PivotTests extends ESTestCase {
         assertInvalidTransform(client, source, pivot);
     }
 
+    public void testValidateAggregationsBeingNullInSearchResponse() throws Exception {
+        SourceConfig source = new SourceConfig("no_permissions");
+        Function pivot = new Pivot(getValidPivotConfig(), new SettingsConfig(), Version.CURRENT, Collections.emptySet());
+
+        assertInvalidTransform(client, source, pivot);
+    }
+
     public void testInitialPageSize() throws Exception {
         int expectedPageSize = 1000;
 
@@ -298,9 +306,12 @@ public class PivotTests extends ESTestCase {
                     }
                 }
 
+                final Aggregations aggregations = Arrays.stream(searchRequest.indices()).anyMatch(index -> index.contains("no_permissions"))
+                    ? null
+                    : new Aggregations(List.of());
                 final SearchResponseSections sections = new SearchResponseSections(
                     new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0),
-                    null,
+                    aggregations,
                     null,
                     false,
                     null,