Browse Source

Redirect transform actions to transform&remote_cluster_client node when needed (#70125)

Przemysław Witek 4 years ago
parent
commit
319548c80c
39 changed files with 1157 additions and 306 deletions
  1. 15 4
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidator.java
  2. 1 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java
  3. 132 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/ValidateTransformAction.java
  4. 0 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformDestIndexSettings.java
  5. 5 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParams.java
  6. 9 7
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java
  7. 26 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/ValidateTransformActionRequestTests.java
  8. 35 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/ValidateTransformActionResponseTests.java
  9. 5 2
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformDestIndexSettingsTests.java
  10. 1 0
      x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java
  11. 37 0
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_crud.yml
  12. 5 0
      x-pack/plugin/transform/qa/multi-cluster-tests-with-security/build.gradle
  13. 0 0
      x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/java/org/elasticsearch/multi_cluster/MultiClusterYamlTestSuiteIT.java
  14. 102 35
      x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/multi_cluster/80_transform.yml
  15. 0 0
      x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/remote_cluster/80_transform.yml
  16. 5 0
      x-pack/plugin/transform/qa/multi-node-tests/build.gradle
  17. 126 0
      x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoRemoteClusterClientNodeIT.java
  18. 116 11
      x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoTransformNodeIT.java
  19. 24 14
      x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java
  20. 3 0
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java
  21. 1 0
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java
  22. 25 10
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java
  23. 24 9
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java
  24. 5 30
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java
  25. 20 60
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java
  26. 6 3
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java
  27. 26 57
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java
  28. 165 0
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java
  29. 4 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportGetTransformStatsActionDeprecated.java
  30. 12 10
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java
  31. 2 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java
  32. 1 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformIndex.java
  33. 1 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformNodeAssignments.java
  34. 118 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformNodes.java
  35. 4 35
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java
  36. 4 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/SourceDestValidations.java
  37. 12 6
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java
  38. 33 0
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformNodeAssignmentsTests.java
  39. 47 1
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformNodesTests.java

+ 15 - 4
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidator.java

@@ -43,6 +43,7 @@ import static java.util.stream.Collectors.joining;
 import static java.util.stream.Collectors.toMap;
 import static org.elasticsearch.action.ValidateActions.addValidationError;
 import static org.elasticsearch.cluster.metadata.MetadataCreateIndexService.validateIndexOrAliasName;
+import static org.elasticsearch.cluster.node.DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE;
 
 /**
  * Validation of source indexes and destination index.
@@ -58,7 +59,7 @@ public final class SourceDestValidator {
     public static final String DEST_LOWERCASE = "Destination index [{0}] must be lowercase";
     public static final String NEEDS_REMOTE_CLUSTER_SEARCH = "Source index is configured with a remote index pattern(s) [{0}]"
         + " but the current node [{1}] is not allowed to connect to remote clusters."
-        + " Please enable remote.cluster_client for all data nodes.";
+        + " Please enable " + REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " for all {2} nodes.";
     public static final String ERROR_REMOTE_CLUSTER_SEARCH = "Error resolving remote source: {0}";
     public static final String UNKNOWN_REMOTE_CLUSTER_LICENSE = "Error during license check ({0}) for remote cluster "
         + "alias(es) {1}, error: {2}";
@@ -253,7 +254,6 @@ public final class SourceDestValidator {
         .strictExpandOpenAndForbidClosedIgnoreThrottled();
 
     public static final SourceDestValidation SOURCE_MISSING_VALIDATION = new SourceMissingValidation();
-    public static final SourceDestValidation REMOTE_SOURCE_VALIDATION = new RemoteSourceEnabledAndRemoteLicenseValidation();
     public static final SourceDestValidation DESTINATION_IN_SOURCE_VALIDATION = new DestinationInSourceValidation();
     public static final SourceDestValidation DESTINATION_SINGLE_INDEX_VALIDATION = new DestinationSingleIndexValidation();
     public static final SourceDestValidation REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION = new RemoteSourceNotSupportedValidation();
@@ -380,7 +380,14 @@ public final class SourceDestValidator {
         }
     }
 
-    static class RemoteSourceEnabledAndRemoteLicenseValidation implements SourceDestValidation {
+    public static class RemoteSourceEnabledAndRemoteLicenseValidation implements SourceDestValidation {
+
+        private final String nodeRoleThatRequiresRemoteClusterClient;
+
+        public RemoteSourceEnabledAndRemoteLicenseValidation(String nodeRoleThatRequiresRemoteClusterClient) {
+            this.nodeRoleThatRequiresRemoteClusterClient = nodeRoleThatRequiresRemoteClusterClient;
+        }
+
         @Override
         public void validate(Context context, ActionListener<Context> listener) {
             if (context.resolveRemoteSource().isEmpty()) {
@@ -392,7 +399,11 @@ public final class SourceDestValidator {
             // we can only check this node at the moment, clusters with mixed CCS enabled/disabled nodes are not supported,
             // see gh#50033
             if (context.isRemoteSearchEnabled() == false) {
-                context.addValidationError(NEEDS_REMOTE_CLUSTER_SEARCH, context.resolveRemoteSource(), context.getNodeName());
+                context.addValidationError(
+                    NEEDS_REMOTE_CLUSTER_SEARCH,
+                    context.resolveRemoteSource(),
+                    context.getNodeName(),
+                    nodeRoleThatRequiresRemoteClusterClient);
                 listener.onResponse(context);
                 return;
             }

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java

@@ -41,6 +41,7 @@ public final class TransformField {
     public static final ParseField SYNC = new ParseField("sync");
     public static final ParseField TIME = new ParseField("time");
     public static final ParseField DELAY = new ParseField("delay");
+    // TODO: Rename to "defer_data_validation" or similar to emphasize that not all validation is deferred
     public static final ParseField DEFER_VALIDATION = new ParseField("defer_validation");
     public static final ParseField RETENTION_POLICY = new ParseField("retention_policy");
     public static final ParseField MAX_AGE = new ParseField("max_age");

+ 132 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/ValidateTransformAction.java

@@ -0,0 +1,132 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.transform.action;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
+import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+public class ValidateTransformAction extends ActionType<ValidateTransformAction.Response> {
+
+    public static final ValidateTransformAction INSTANCE = new ValidateTransformAction();
+    public static final String NAME = "cluster:admin/transform/validate";
+
+    private ValidateTransformAction() {
+        super(NAME, ValidateTransformAction.Response::new);
+    }
+
+    public static class Request extends AcknowledgedRequest<Request> {
+
+        private final TransformConfig config;
+        private final boolean deferValidation;
+
+        public Request(TransformConfig config, boolean deferValidation) {
+            this.config = config;
+            this.deferValidation = deferValidation;
+        }
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            this.config = new TransformConfig(in);
+            this.deferValidation = in.readBoolean();
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            ActionRequestValidationException validationException = null;
+
+            validationException = config.validate(validationException);
+            validationException = SourceDestValidator.validateRequest(
+                validationException,
+                config.getDestination() != null ? config.getDestination().getIndex() : null
+            );
+
+            return validationException;
+        }
+
+        public TransformConfig getConfig() {
+            return config;
+        }
+
+        public boolean isDeferValidation() {
+            return deferValidation;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            this.config.writeTo(out);
+            out.writeBoolean(this.deferValidation);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == this) {
+                return true;
+            }
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+            Request that = (Request) obj;
+            return Objects.equals(config, that.config)
+                && deferValidation == that.deferValidation;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(config, deferValidation);
+        }
+    }
+
+    public static class Response extends ActionResponse {
+
+        private final Map<String, String> destIndexMappings;
+
+        public Response(Map<String, String> destIndexMappings) {
+            this.destIndexMappings = destIndexMappings;
+        }
+
+        public Response(StreamInput in) throws IOException {
+            this.destIndexMappings = in.readMap(StreamInput::readString, StreamInput::readString);
+        }
+
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeMap(destIndexMappings, StreamOutput::writeString, StreamOutput::writeString);
+        }
+
+        public Map<String, String> getDestIndexMappings() {
+            return destIndexMappings;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == this) {
+                return true;
+            }
+            if (obj == null || obj.getClass() != getClass()) {
+                return false;
+            }
+            Response that = (Response) obj;
+            return Objects.equals(this.destIndexMappings, that.destIndexMappings);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(destIndexMappings);
+        }
+    }
+}

+ 0 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformDestIndexSettings.java

@@ -15,7 +15,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
-import org.elasticsearch.common.xcontent.ToXContent.Params;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;

+ 5 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParams.java

@@ -44,10 +44,12 @@ public class TransformTaskParams extends AbstractDiffable<TransformTaskParams> i
     }
 
     private TransformTaskParams(String transformId, String version, String frequency, Boolean remote) {
-        this(transformId, version == null ? null : Version.fromString(version),
+        this(
+            transformId,
+            version == null ? null : Version.fromString(version),
             frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName()),
-                    remote == null ? false : remote.booleanValue()
-                    );
+            remote == null ? false : remote.booleanValue()
+        );
     }
 
     public TransformTaskParams(String transformId, Version version, TimeValue frequency, boolean remote) {

+ 9 - 7
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java

@@ -66,7 +66,6 @@ import static org.elasticsearch.mock.orig.Mockito.when;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_IN_SOURCE_VALIDATION;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_PIPELINE_MISSING_VALIDATION;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_SINGLE_INDEX_VALIDATION;
-import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.REMOTE_SOURCE_VALIDATION;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SOURCE_MISSING_VALIDATION;
 import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.Mockito.mock;
@@ -85,6 +84,9 @@ public class SourceDestValidatorTests extends ESTestCase {
 
     private static final ClusterState CLUSTER_STATE;
 
+    private static final String DUMMY_NODE_ROLE = "dummy";
+    private static final SourceDestValidator.SourceDestValidation REMOTE_SOURCE_VALIDATION =
+        new RemoteSourceEnabledAndRemoteLicenseValidation(DUMMY_NODE_ROLE);
     private static final List<SourceDestValidator.SourceDestValidation> TEST_VALIDATIONS = Arrays.asList(
         SOURCE_MISSING_VALIDATION,
         DESTINATION_IN_SOURCE_VALIDATION,
@@ -670,7 +672,7 @@ public class SourceDestValidatorTests extends ESTestCase {
         );
 
         when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC));
-        RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation();
+        RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation(DUMMY_NODE_ROLE);
 
         assertValidationWithContext(
             listener -> validator.validate(context, listener),
@@ -697,7 +699,7 @@ public class SourceDestValidatorTests extends ESTestCase {
         );
 
         when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC));
-        final RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation();
+        final RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation(DUMMY_NODE_ROLE);
 
         assertValidationWithContext(listener -> validator.validate(context, listener), c -> {
             assertNotNull(c.getValidationException());
@@ -752,7 +754,7 @@ public class SourceDestValidatorTests extends ESTestCase {
         );
         when(context3.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_PLATINUM));
 
-        final RemoteSourceEnabledAndRemoteLicenseValidation validator3 = new RemoteSourceEnabledAndRemoteLicenseValidation();
+        final RemoteSourceEnabledAndRemoteLicenseValidation validator3 = new RemoteSourceEnabledAndRemoteLicenseValidation(DUMMY_NODE_ROLE);
         assertValidationWithContext(
             listener -> validator3.validate(context3, listener),
             c -> { assertNull(c.getValidationException()); },
@@ -776,7 +778,7 @@ public class SourceDestValidatorTests extends ESTestCase {
         );
         when(context4.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_PLATINUM));
 
-        final RemoteSourceEnabledAndRemoteLicenseValidation validator4 = new RemoteSourceEnabledAndRemoteLicenseValidation();
+        final RemoteSourceEnabledAndRemoteLicenseValidation validator4 = new RemoteSourceEnabledAndRemoteLicenseValidation(DUMMY_NODE_ROLE);
         assertValidationWithContext(
             listener -> validator4.validate(context4, listener),
             c -> { assertNull(c.getValidationException()); },
@@ -802,7 +804,7 @@ public class SourceDestValidatorTests extends ESTestCase {
         );
 
         when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC));
-        final RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation();
+        final RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation(DUMMY_NODE_ROLE);
         assertValidationWithContext(listener -> validator.validate(context, listener), c -> {
             assertNotNull(c.getValidationException());
             assertEquals(1, c.getValidationException().validationErrors().size());
@@ -831,7 +833,7 @@ public class SourceDestValidatorTests extends ESTestCase {
         );
 
         when(context.getRegisteredRemoteClusterNames()).thenReturn(Collections.singleton(REMOTE_BASIC));
-        RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation();
+        RemoteSourceEnabledAndRemoteLicenseValidation validator = new RemoteSourceEnabledAndRemoteLicenseValidation(DUMMY_NODE_ROLE);
 
         assertValidationWithContext(listener -> validator.validate(context, listener), c -> {
             assertNotNull(c.getValidationException());

+ 26 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/ValidateTransformActionRequestTests.java

@@ -0,0 +1,26 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.transform.action;
+
+import org.elasticsearch.common.io.stream.Writeable.Reader;
+import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction.Request;
+
+import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig;
+
+public class ValidateTransformActionRequestTests extends AbstractWireSerializingTransformTestCase<Request> {
+
+    @Override
+    protected Request createTestInstance() {
+        return new Request(randomTransformConfig(), randomBoolean());
+    }
+
+    @Override
+    protected Reader<Request> instanceReader() {
+        return Request::new;
+    }
+}

+ 35 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/ValidateTransformActionResponseTests.java

@@ -0,0 +1,35 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.transform.action;
+
+import org.elasticsearch.common.io.stream.Writeable.Reader;
+import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction.Response;
+
+import java.util.Map;
+import java.util.function.Function;
+
+import static java.util.stream.Collectors.toMap;
+
+public class ValidateTransformActionResponseTests extends AbstractWireSerializingTransformTestCase<Response> {
+
+    @Override
+    protected Response createTestInstance() {
+        return new Response(randomDestIndexMappings());
+    }
+
+    private Map<String, String> randomDestIndexMappings() {
+        return randomList(1, 20, () -> randomAlphaOfLengthBetween(1, 20)).stream()
+            .distinct()
+            .collect(toMap(Function.identity(), fieldName -> randomAlphaOfLengthBetween(1, 20)));
+    }
+
+    @Override
+    protected Reader<Response> instanceReader() {
+        return Response::new;
+    }
+}

+ 5 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformDestIndexSettingsTests.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.core.transform.transforms;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.indices.alias.Alias;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.settings.Settings;
@@ -19,6 +20,8 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import static java.util.Collections.singletonMap;
+
 public class TransformDestIndexSettingsTests extends AbstractSerializingTransformTestCase<TransformDestIndexSettings> {
 
     public static TransformDestIndexSettings randomDestIndexSettings() {
@@ -28,9 +31,9 @@ public class TransformDestIndexSettingsTests extends AbstractSerializingTransfor
 
         if (randomBoolean()) {
             mappings = new HashMap<>(size);
-
+            mappings.put("_meta", singletonMap("_transform", singletonMap("version", Version.CURRENT.toString())));
             for (int i = 0; i < size; i++) {
-                mappings.put(randomAlphaOfLength(10), Map.of("type", randomAlphaOfLength(10)));
+                mappings.put(randomAlphaOfLength(10), singletonMap("type", randomAlphaOfLength(10)));
             }
         }
 

+ 1 - 0
x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

@@ -90,6 +90,7 @@ public class Constants {
         "cluster:admin/transform/start",
         "cluster:admin/transform/stop",
         "cluster:admin/transform/update",
+        "cluster:admin/transform/validate",
         // "cluster:admin/voting_config/add_exclusions",
         // "cluster:admin/voting_config/clear_exclusions",
         "cluster:admin/xpack/ccr/auto_follow_pattern/activate",

+ 37 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_crud.yml

@@ -362,6 +362,43 @@ setup:
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
             }
           }
+
+---
+"Test transform where source query is invalid":
+  - do:
+      catch: /No handler for type \[bad-type\] declared on runtime field \[rt-field\]/
+      transform.put_transform:
+        transform_id: "airline-transform"
+        body: >
+          {
+            "source": {
+              "index": ["airline-data*"],
+              "runtime_mappings": {"rt-field":{"type": "bad-type"}}
+            },
+            "dest": { "index": "dest-airline-data-by-airline" },
+            "pivot": {
+              "group_by": { "airline": {"terms": {"field": "airline"}}},
+              "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
+            }
+          }
+  - do:
+      transform.put_transform:
+        transform_id: "airline-transform"
+        defer_validation: true
+        body: >
+          {
+            "source": {
+              "index": ["airline-data*"],
+              "runtime_mappings": {"rt-field":{"type": "bad-type"}}
+            },
+            "dest": { "index": "dest-airline-data-by-airline" },
+            "pivot": {
+              "group_by": { "airline": {"terms": {"field": "airline"}}},
+              "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
+            }
+          }
+  - match: { acknowledged: true }
+
 ---
 "Test alias scenarios":
   - do:

+ 5 - 0
x-pack/qa/multi-cluster-tests-with-security/build.gradle → x-pack/plugin/transform/qa/multi-cluster-tests-with-security/build.gradle

@@ -30,6 +30,11 @@ testClusters {
 
   'mixed-cluster' {
     testDistribution = 'DEFAULT'
+    numberOfNodes = 2
+    // Node roles are configured this way in order to verify redirecting the transform request from the node lacking
+    // remote_cluster_client role to the node that is remote_cluster_client.
+    nodes."mixed-cluster-0".setting 'node.roles', '[data,ingest,master,transform]'
+    nodes."mixed-cluster-1".setting 'node.roles', '[data,ingest,master,transform,remote_cluster_client]'
     setting 'xpack.security.enabled', 'true'
     setting 'xpack.watcher.enabled', 'false'
     setting 'xpack.license.self_generated.type', 'trial'

+ 0 - 0
x-pack/qa/multi-cluster-tests-with-security/src/test/java/org/elasticsearch/multi_cluster/MultiClusterYamlTestSuiteIT.java → x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/java/org/elasticsearch/multi_cluster/MultiClusterYamlTestSuiteIT.java


+ 102 - 35
x-pack/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/multi_cluster/80_transform.yml → x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/multi_cluster/80_transform.yml

@@ -11,52 +11,59 @@ setup:
       security.put_user:
         username: "joe"
         body:  >
-            {
-              "password": "transform-password",
-              "roles" : [  "transform_admin", "x_cluster_role" ]
-            }
+          {
+            "password": "transform-password",
+            "roles" : [  "transform_admin", "x_cluster_role" ]
+          }
+  - do:
+      security.put_user:
+        username: "bob"
+        body:  >
+          {
+            "password": "transform-password",
+            "roles" : [ "transform_admin", "x_cluster_role_only_dest" ]
+          }
   - do:
       security.put_role:
         name: "x_cluster_role"
         body:  >
-            {
-              "cluster": [],
-              "indices": [
-                {
-                  "names": ["test_index", "my_remote_cluster:test_i*", "my_remote_cluster:aliased_test_index"],
-                  "privileges": ["all", "view_index_metadata"]
-                },
-                {
-                  "names": ["simple-remote-transform", "simple-local-remote-transform"],
-                  "privileges": ["all"]
-                }
-              ]
-            }
+          {
+            "cluster": [],
+            "indices": [
+              {
+                "names": ["test_index", "my_remote_cluster:test_i*", "my_remote_cluster:aliased_test_index"],
+                "privileges": ["all", "view_index_metadata"]
+              },
+              {
+                "names": ["simple-remote-transform*", "simple-local-remote-transform"],
+                "privileges": ["all"]
+              }
+            ]
+          }
+
+  - do:
+      security.put_role:
+        name: "x_cluster_role_only_dest"
+        body:  >
+          {
+            "cluster": [],
+            "indices": [
+              {
+                "names": ["simple-remote-transform*", "simple-local-remote-transform"],
+                "privileges": ["all"]
+              }
+            ]
+          }
 ---
 teardown:
   - do:
       security.delete_user:
         username: "joe"
         ignore: 404
-
----
-"Search remote cluster":
   - do:
-      headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" }
-      search:
-        rest_total_hits_as_int: true
-        index: my_remote_cluster:test_index
-        body:
-          aggs:
-            user:
-              terms:
-                field: user
-
-  - match: { _shards.total: 3 }
-  - match: { hits.total: 9 }
-  - length: { aggregations.user.buckets: 3 }
-  - match: { aggregations.user.buckets.0.key: "a" }
-  - match: { aggregations.user.buckets.0.doc_count: 5 }
+      security.delete_user:
+        username: "bob"
+        ignore: 404
 
 ---
 "Batch transform from remote cluster":
@@ -283,3 +290,63 @@ teardown:
   - match: { hits.hits.1._source.user: b }
   - match: { hits.hits.3._source.count: 2 }
   - match: { hits.hits.3._source.user: d }
+
+---
+"Batch transform from remote cluster when the user is not authorized":
+  - do:
+      catch: /Cannot create transform \[simple-remote-transform\] because user bob lacks all the required permissions for indices. \[my_remote_cluster:test_index, simple-remote-transform\]/
+      headers: { Authorization: "Basic Ym9iOnRyYW5zZm9ybS1wYXNzd29yZA==" }  # This is bob
+      transform.put_transform:
+        transform_id: "simple-remote-transform"
+        body: >
+          {
+            "source": { "index": "my_remote_cluster:test_index" },
+            "dest": { "index": "simple-remote-transform" },
+            "pivot": {
+              "group_by": { "user": {"terms": {"field": "user"}}},
+              "aggs": {"avg_stars": {"avg": {"field": "stars"}}}
+            }
+          }
+
+---
+"Batch transform update from remote cluster when the user is not authorized":
+  - do:
+      headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" }  # This is joe
+      transform.put_transform:
+        transform_id: "simple-remote-transform-2"
+        body: >
+          {
+            "source": { "index": "my_remote_cluster:test_index" },
+            "dest": { "index": "simple-remote-transform-2" },
+            "pivot": {
+              "group_by": { "user": {"terms": {"field": "user"}}},
+              "aggs": {"avg_stars": {"avg": {"field": "stars"}}}
+            }
+          }
+  - match: { acknowledged: true }
+  - do:
+      catch: /Cannot update transform \[simple-remote-transform-2\] because user bob lacks all the required permissions for indices. \[my_remote_cluster:test_index, simple-remote-transform-2\]/
+      headers: { Authorization: "Basic Ym9iOnRyYW5zZm9ybS1wYXNzd29yZA==" }  # This is bob
+      transform.update_transform:
+        transform_id: "simple-remote-transform-2"
+        body: >
+          {
+            "source": { "index": "my_remote_cluster:test_index" },
+            "dest": { "index": "simple-remote-transform-2" }
+          }
+
+---
+"Batch transform preview from remote cluster when the user is not authorized":
+  - do:
+      catch: /Source indices have been deleted or closed./
+      headers: { Authorization: "Basic Ym9iOnRyYW5zZm9ybS1wYXNzd29yZA==" }  # This is bob
+      transform.preview_transform:
+        body: >
+          {
+            "source": { "index": "my_remote_cluster:test_index" },
+            "dest": { "index": "simple-remote-transform-2" },
+            "pivot": {
+              "group_by": { "user": {"terms": {"field": "user"}}},
+              "aggs": {"avg_stars": {"avg": {"field": "stars"}}}
+            }
+          }

+ 0 - 0
x-pack/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/remote_cluster/80_transform.yml → x-pack/plugin/transform/qa/multi-cluster-tests-with-security/src/test/resources/rest-api-spec/test/remote_cluster/80_transform.yml


+ 5 - 0
x-pack/plugin/transform/qa/multi-node-tests/build.gradle

@@ -37,6 +37,11 @@ testClusters.javaRestTest {
   keystore 'bootstrap.password', 'x-pack-test-password'
   keystore 'xpack.security.transport.ssl.secure_key_passphrase', 'testnode'
   numberOfNodes = 3
+  // Node roles are configured this way in order to verify redirecting the transform request from the node lacking
+  // remote_cluster_client role to the node that is remote_cluster_client.
+  nodes."javaRestTest-0".setting 'node.roles', '[data,ingest,master]'
+  nodes."javaRestTest-1".setting 'node.roles', '[data,ingest,master,transform]'
+  nodes."javaRestTest-2".setting 'node.roles', '[data,ingest,master,transform,remote_cluster_client]'
 
   extraConfigFile nodeKey.name, nodeKey
   extraConfigFile nodeCert.name, nodeCert

+ 126 - 0
x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoRemoteClusterClientNodeIT.java

@@ -0,0 +1,126 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.transform.integration;
+
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.NodeRoleSettings;
+import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction;
+import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
+import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
+import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
+import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
+import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
+import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
+import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests;
+import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase;
+import org.junit.After;
+
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+
+public class TransformNoRemoteClusterClientNodeIT extends TransformSingleNodeTestCase {
+    @Override
+    protected Settings nodeSettings() {
+        return Settings.builder().put(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), "master, data, ingest, transform").build();
+    }
+
+    @After
+    public void preCleanup() throws Exception {
+        // Updating a transform will leave indexing an audit message in-flight, so
+        // we need to wait for that to complete or it could interfere with deleting
+        // all the indices
+        waitForPendingTasks();
+    }
+
+    public void testPreviewTransformWithRemoteIndex() {
+        String transformId = "transform-with-remote-index";
+        TransformConfig config = randomConfig(transformId, "remote_cluster:my-index");
+        PreviewTransformAction.Request request = new PreviewTransformAction.Request(config);
+        ElasticsearchStatusException e =
+            expectThrows(
+                ElasticsearchStatusException.class,
+                () -> client().execute(PreviewTransformAction.INSTANCE, request).actionGet());
+        assertThat(
+            e.getMessage(),
+            allOf(
+                containsString("No appropriate node to run on"),
+                containsString("transform requires a remote connection but remote is disabled")));
+    }
+
+    public void testPutTransformWithRemoteIndex_DeferValidation() {
+        String transformId = "transform-with-remote-index";
+        TransformConfig config = randomConfig(transformId, "remote_cluster:my-index");
+        PutTransformAction.Request request = new PutTransformAction.Request(config, true);
+        client().execute(PutTransformAction.INSTANCE, request).actionGet();
+    }
+
+    public void testPutTransformWithRemoteIndex_NoDeferValidation() {
+        String transformId = "transform-with-remote-index";
+        TransformConfig config = randomConfig(transformId, "remote_cluster:my-index");
+        PutTransformAction.Request request = new PutTransformAction.Request(config, false);
+        ElasticsearchStatusException e =
+            expectThrows(
+                ElasticsearchStatusException.class,
+                () -> client().execute(PutTransformAction.INSTANCE, request).actionGet());
+        assertThat(
+            e.getMessage(),
+            allOf(
+                containsString("No appropriate node to run on"),
+                containsString("transform requires a remote connection but remote is disabled")));
+    }
+
+    public void testUpdateTransformWithRemoteIndex_DeferValidation() {
+        String transformId = "transform-with-local-index";
+        {
+            TransformConfig config = randomConfig(transformId, "my-index");
+            PutTransformAction.Request request = new PutTransformAction.Request(config, true);
+            AcknowledgedResponse response = client().execute(PutTransformAction.INSTANCE, request).actionGet();
+            assertThat(response.isAcknowledged(), is(true));
+        }
+
+        TransformConfigUpdate update =
+            new TransformConfigUpdate(new SourceConfig("remote_cluster:my-index"), null, null, null, null, null, null);
+        UpdateTransformAction.Request request = new UpdateTransformAction.Request(update, transformId, true);
+        client().execute(UpdateTransformAction.INSTANCE, request).actionGet();
+    }
+
+    public void testUpdateTransformWithRemoteIndex_NoDeferValidation() {
+        String transformId = "transform-with-local-index";
+        {
+            TransformConfig config = randomConfig(transformId, "my-index");
+            PutTransformAction.Request request = new PutTransformAction.Request(config, true);
+            AcknowledgedResponse response = client().execute(PutTransformAction.INSTANCE, request).actionGet();
+            assertThat(response.isAcknowledged(), is(true));
+        }
+
+        TransformConfigUpdate update =
+            new TransformConfigUpdate(new SourceConfig("remote_cluster:my-index"), null, null, null, null, null, null);
+        UpdateTransformAction.Request request = new UpdateTransformAction.Request(update, transformId, false);
+        ElasticsearchStatusException e =
+            expectThrows(
+                ElasticsearchStatusException.class,
+                () -> client().execute(UpdateTransformAction.INSTANCE, request).actionGet());
+        assertThat(
+            e.getMessage(),
+            allOf(
+                containsString("No appropriate node to run on"),
+                containsString("transform requires a remote connection but remote is disabled")));
+    }
+
+    private static TransformConfig randomConfig(String transformId, String sourceIndex) {
+        return new TransformConfig.Builder()
+            .setId(transformId)
+            .setSource(new SourceConfig(sourceIndex))
+            .setDest(new DestConfig("my-dest-index", null))
+            .setPivotConfig(PivotConfigTests.randomPivotConfig())
+            .build();
+    }
+}

+ 116 - 11
x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoTransformNodeIT.java

@@ -7,11 +7,25 @@
 
 package org.elasticsearch.xpack.transform.integration;
 
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.node.NodeRoleSettings;
 import org.elasticsearch.xpack.core.transform.action.GetTransformAction;
 import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction;
+import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction;
+import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
+import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
+import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
+import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
+import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
+import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
+import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests;
 import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase;
+import org.junit.After;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 
 public class TransformNoTransformNodeIT extends TransformSingleNodeTestCase {
     @Override
@@ -19,23 +33,114 @@ public class TransformNoTransformNodeIT extends TransformSingleNodeTestCase {
         return Settings.builder().put(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), "master, data, ingest").build();
     }
 
-    public void testWarningForStats() {
-        GetTransformStatsAction.Request getTransformStatsRequest = new GetTransformStatsAction.Request("_all");
-        GetTransformStatsAction.Response getTransformStatsResponse = client().execute(
-            GetTransformStatsAction.INSTANCE,
-            getTransformStatsRequest
-        ).actionGet();
+    @After
+    public void preCleanup() throws Exception {
+        // Updating a transform will leave indexing an audit message in-flight, so
+        // we need to wait for that to complete or it could interfere with deleting
+        // all the indices
+        waitForPendingTasks();
+    }
+
+    public void testGetTransformStats() {
+        GetTransformStatsAction.Request request = new GetTransformStatsAction.Request("_all");
+        ElasticsearchStatusException e =
+            expectThrows(
+                ElasticsearchStatusException.class,
+                () -> client().execute(GetTransformStatsAction.INSTANCE, request).actionGet());
+        assertThat(
+            e.getMessage(),
+            is(equalTo("Transform requires the transform node role for at least 1 node, found no transform nodes")));
+
+        assertWarnings("Transform requires the transform node role for at least 1 node, found no transform nodes");
+    }
+
+    public void testGetTransform() {
+        GetTransformAction.Request request = new GetTransformAction.Request("_all");
+        GetTransformAction.Response response = client().execute(GetTransformAction.INSTANCE, request).actionGet();
+        assertEquals(0, response.getTransformConfigurations().size());
+
+        assertWarnings("Transform requires the transform node role for at least 1 node, found no transform nodes");
+    }
+
+    public void testPreviewTransform() {
+        String transformId = "transform-1";
+        TransformConfig config = randomConfig(transformId);
+        PreviewTransformAction.Request request = new PreviewTransformAction.Request(config);
+        ElasticsearchStatusException e =
+            expectThrows(ElasticsearchStatusException.class, () -> client().execute(PreviewTransformAction.INSTANCE, request).actionGet());
+        assertThat(
+            e.getMessage(),
+            is(equalTo("Transform requires the transform node role for at least 1 node, found no transform nodes")));
+    }
 
-        assertEquals(0, getTransformStatsResponse.getTransformsStats().size());
+    public void testPutTransform_DeferValidation() {
+        String transformId = "transform-2";
+        TransformConfig config = randomConfig(transformId);
+        PutTransformAction.Request request = new PutTransformAction.Request(config, true);
+        AcknowledgedResponse response = client().execute(PutTransformAction.INSTANCE, request).actionGet();
+        assertThat(response.isAcknowledged(), is(true));
 
         assertWarnings("Transform requires the transform node role for at least 1 node, found no transform nodes");
     }
 
-    public void testWarningForGet() {
-        GetTransformAction.Request getTransformRequest = new GetTransformAction.Request("_all");
-        GetTransformAction.Response getTransformResponse = client().execute(GetTransformAction.INSTANCE, getTransformRequest).actionGet();
-        assertEquals(0, getTransformResponse.getTransformConfigurations().size());
+    public void testPutTransform_NoDeferValidation() {
+        String transformId = "transform-2";
+        TransformConfig config = randomConfig(transformId);
+        PutTransformAction.Request request = new PutTransformAction.Request(config, false);
+        ElasticsearchStatusException e =
+            expectThrows(
+                ElasticsearchStatusException.class,
+                () -> client().execute(PutTransformAction.INSTANCE, request).actionGet());
+        assertThat(
+            e.getMessage(),
+            is(equalTo("Transform requires the transform node role for at least 1 node, found no transform nodes")));
+    }
+
+    public void testUpdateTransform_DeferValidation() {
+        String transformId = "transform-3";
+        {
+            TransformConfig config = randomConfig(transformId);
+            PutTransformAction.Request request = new PutTransformAction.Request(config, true);
+            AcknowledgedResponse response = client().execute(PutTransformAction.INSTANCE, request).actionGet();
+            assertThat(response.isAcknowledged(), is(true));
+        }
+
+        TransformConfigUpdate update =
+            new TransformConfigUpdate(new SourceConfig("my-index", "my-index-2"), null, null, null, null, null, null);
+        UpdateTransformAction.Request request = new UpdateTransformAction.Request(update, transformId, true);
+        client().execute(UpdateTransformAction.INSTANCE, request).actionGet();
 
         assertWarnings("Transform requires the transform node role for at least 1 node, found no transform nodes");
     }
+
+    public void testUpdateTransform_NoDeferValidation() {
+        String transformId = "transform-3";
+        {
+            TransformConfig config = randomConfig(transformId);
+            PutTransformAction.Request request = new PutTransformAction.Request(config, true);
+            AcknowledgedResponse response = client().execute(PutTransformAction.INSTANCE, request).actionGet();
+            assertThat(response.isAcknowledged(), is(true));
+            assertWarnings("Transform requires the transform node role for at least 1 node, found no transform nodes");
+        }
+
+        TransformConfigUpdate update =
+            new TransformConfigUpdate(new SourceConfig("my-index", "my-index-2"), null, null, null, null, null, null);
+        UpdateTransformAction.Request request = new UpdateTransformAction.Request(update, transformId, false);
+        ElasticsearchStatusException e =
+            expectThrows(
+                ElasticsearchStatusException.class,
+                () -> client().execute(UpdateTransformAction.INSTANCE, request).actionGet());
+        assertThat(
+            e.getMessage(),
+            is(equalTo("Transform requires the transform node role for at least 1 node, found no transform nodes")));
+    }
+
+    private static TransformConfig randomConfig(String transformId) {
+        return new TransformConfig.Builder()
+            .setId(transformId)
+            .setSource(new SourceConfig("my-index"))
+            .setDest(new DestConfig("my-dest-index", null))
+            .setPivotConfig(PivotConfigTests.randomPivotConfig())
+            .build();
+    }
 }

+ 24 - 14
x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManagerTests.java

@@ -33,11 +33,13 @@ import org.junit.Before;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static org.elasticsearch.common.collect.Tuple.tuple;
 import static org.elasticsearch.xpack.transform.persistence.TransformConfigManager.TO_XCONTENT_PARAMS;
 import static org.elasticsearch.xpack.transform.persistence.TransformInternalIndex.mappings;
 import static org.hamcrest.CoreMatchers.is;
@@ -202,7 +204,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
         // expand 1 id
         assertAsync(
             listener -> transformConfigManager.expandTransformIds(transformConfig1.getId(), PageParams.defaultParams(), true, listener),
-            new Tuple<>(1L, Collections.singletonList("transform1_expand")),
+            tuple(1L, tuple(singletonList("transform1_expand"), singletonList(transformConfig1))),
             null,
             null
         );
@@ -215,7 +217,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
                 true,
                 listener
             ),
-            new Tuple<>(2L, Arrays.asList("transform1_expand", "transform2_expand")),
+            tuple(2L, tuple(Arrays.asList("transform1_expand", "transform2_expand"), Arrays.asList(transformConfig1, transformConfig2))),
             null,
             null
         );
@@ -228,7 +230,11 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
                 true,
                 listener
             ),
-            new Tuple<>(3L, Arrays.asList("transform1_expand", "transform2_expand", "transform3_expand")),
+            tuple(
+                3L,
+                tuple(
+                    Arrays.asList("transform1_expand", "transform2_expand", "transform3_expand"),
+                    Arrays.asList(transformConfig1, transformConfig2, transformConfig3))),
             null,
             null
         );
@@ -236,7 +242,11 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
         // expand 3 ids _all
         assertAsync(
             listener -> transformConfigManager.expandTransformIds("_all", PageParams.defaultParams(), true, listener),
-            new Tuple<>(3L, Arrays.asList("transform1_expand", "transform2_expand", "transform3_expand")),
+            tuple(
+                3L,
+                tuple(
+                    Arrays.asList("transform1_expand", "transform2_expand", "transform3_expand"),
+                    Arrays.asList(transformConfig1, transformConfig2, transformConfig3))),
             null,
             null
         );
@@ -244,7 +254,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
         // expand 1 id _all with pagination
         assertAsync(
             listener -> transformConfigManager.expandTransformIds("_all", new PageParams(0, 1), true, listener),
-            new Tuple<>(3L, Collections.singletonList("transform1_expand")),
+            tuple(3L, tuple(singletonList("transform1_expand"), singletonList(transformConfig1))),
             null,
             null
         );
@@ -252,7 +262,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
         // expand 2 later ids _all with pagination
         assertAsync(
             listener -> transformConfigManager.expandTransformIds("_all", new PageParams(1, 2), true, listener),
-            new Tuple<>(3L, Arrays.asList("transform2_expand", "transform3_expand")),
+            tuple(3L, tuple(Arrays.asList("transform2_expand", "transform3_expand"), Arrays.asList(transformConfig2, transformConfig3))),
             null,
             null
         );
@@ -260,7 +270,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
         // expand 1 id explicitly that does not exist
         assertAsync(
             listener -> transformConfigManager.expandTransformIds("unknown,unknown2", new PageParams(1, 2), true, listener),
-            (Tuple<Long, List<String>>) null,
+            (Tuple<Long, Tuple<List<String>, List<TransformConfig>>>) null,
             null,
             e -> {
                 assertThat(e, instanceOf(ResourceNotFoundException.class));
@@ -274,7 +284,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
         // expand 1 id implicitly that does not exist
         assertAsync(
             listener -> transformConfigManager.expandTransformIds("unknown*", new PageParams(1, 2), false, listener),
-            (Tuple<Long, List<String>>) null,
+            (Tuple<Long, Tuple<List<String>, List<TransformConfig>>>) null,
             null,
             e -> {
                 assertThat(e, instanceOf(ResourceNotFoundException.class));
@@ -293,7 +303,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
         assertAsync(listener -> transformConfigManager.putOrUpdateTransformStoredDoc(storedDocs, null, listener), firstIndex, null, null);
         assertAsync(
             listener -> transformConfigManager.getTransformStoredDoc(transformId, listener),
-            Tuple.tuple(storedDocs, firstIndex),
+            tuple(storedDocs, firstIndex),
             null,
             null
         );
@@ -308,7 +318,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
         );
         assertAsync(
             listener -> transformConfigManager.getTransformStoredDoc(transformId, listener),
-            Tuple.tuple(updated, secondIndex),
+            tuple(updated, secondIndex),
             null,
             null
         );
@@ -445,7 +455,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
                 transformId,
                 timestamp + i * 200,
                 i,
-                Collections.emptyMap(),
+                emptyMap(),
                 timestamp - 100 + i * 200
             );
             assertAsync(listener -> transformConfigManager.putTransformCheckpoint(checkpoint, listener), true, null, null);
@@ -457,7 +467,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
             transformId,
             timestamp + randomCheckpoint * 200,
             randomCheckpoint,
-            Collections.emptyMap(),
+            emptyMap(),
             timestamp - 100 + randomCheckpoint * 200
         );
 
@@ -503,7 +513,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
         // test that the other docs are still there
         assertAsync(
             listener -> transformConfigManager.getTransformStoredDoc(transformId, listener),
-            Tuple.tuple(storedDocs, firstIndex),
+            tuple(storedDocs, firstIndex),
             null,
             null
         );

+ 3 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java

@@ -68,6 +68,7 @@ import org.elasticsearch.xpack.core.transform.action.SetResetModeAction;
 import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
 import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
 import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
+import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
 import org.elasticsearch.xpack.core.transform.action.compat.DeleteTransformActionDeprecated;
 import org.elasticsearch.xpack.core.transform.action.compat.GetTransformActionDeprecated;
 import org.elasticsearch.xpack.core.transform.action.compat.GetTransformStatsActionDeprecated;
@@ -86,6 +87,7 @@ import org.elasticsearch.xpack.transform.action.TransportSetTransformResetModeAc
 import org.elasticsearch.xpack.transform.action.TransportStartTransformAction;
 import org.elasticsearch.xpack.transform.action.TransportStopTransformAction;
 import org.elasticsearch.xpack.transform.action.TransportUpdateTransformAction;
+import org.elasticsearch.xpack.transform.action.TransportValidateTransformAction;
 import org.elasticsearch.xpack.transform.action.compat.TransportDeleteTransformActionDeprecated;
 import org.elasticsearch.xpack.transform.action.compat.TransportGetTransformActionDeprecated;
 import org.elasticsearch.xpack.transform.action.compat.TransportGetTransformStatsActionDeprecated;
@@ -213,6 +215,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
             new ActionHandler<>(PreviewTransformAction.INSTANCE, TransportPreviewTransformAction.class),
             new ActionHandler<>(UpdateTransformAction.INSTANCE, TransportUpdateTransformAction.class),
             new ActionHandler<>(SetResetModeAction.INSTANCE, TransportSetTransformResetModeAction.class),
+            new ActionHandler<>(ValidateTransformAction.INSTANCE, TransportValidateTransformAction.class),
 
             // deprecated actions, to be removed for 8.0.0
             new ActionHandler<>(PutTransformActionDeprecated.INSTANCE, TransportPutTransformActionDeprecated.class),

+ 1 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java

@@ -32,6 +32,7 @@ import org.elasticsearch.xpack.core.transform.action.GetTransformAction.Request;
 import org.elasticsearch.xpack.core.transform.action.GetTransformAction.Response;
 import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
 import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
+import org.elasticsearch.xpack.transform.transforms.TransformNodes;
 
 import static org.elasticsearch.xpack.core.transform.TransformField.INDEX_DOC_TYPE;
 

+ 25 - 10
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java

@@ -22,6 +22,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
@@ -39,6 +40,8 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
 import org.elasticsearch.xpack.transform.TransformServices;
 import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
 import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
+import org.elasticsearch.xpack.transform.transforms.TransformNodeAssignments;
+import org.elasticsearch.xpack.transform.transforms.TransformNodes;
 import org.elasticsearch.xpack.transform.transforms.TransformTask;
 
 import java.util.ArrayList;
@@ -58,6 +61,7 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans
     private final TransformConfigManager transformConfigManager;
     private final TransformCheckpointService transformCheckpointService;
     private final Client client;
+    private final Settings nodeSettings;
 
     @Inject
     public TransportGetTransformStatsAction(
@@ -65,9 +69,10 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans
         ActionFilters actionFilters,
         ClusterService clusterService,
         TransformServices transformServices,
-        Client client
+        Client client,
+        Settings settings
     ) {
-        this(GetTransformStatsAction.NAME, transportService, actionFilters, clusterService, transformServices, client);
+        this(GetTransformStatsAction.NAME, transportService, actionFilters, clusterService, transformServices, client, settings);
     }
 
     protected TransportGetTransformStatsAction(
@@ -76,12 +81,14 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans
         ActionFilters actionFilters,
         ClusterService clusterService,
         TransformServices transformServices,
-        Client client
+        Client client,
+        Settings settings
     ) {
         super(name, clusterService, transportService, actionFilters, Request::new, Response::new, Response::new, ThreadPool.Names.SAME);
         this.transformConfigManager = transformServices.getConfigManager();
         this.transformCheckpointService = transformServices.getCheckpointService();
         this.client = client;
+        this.nodeSettings = settings;
     }
 
     @Override
@@ -132,28 +139,36 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans
 
     @Override
     protected void doExecute(Task task, Request request, ActionListener<Response> finalListener) {
-        final ClusterState state = clusterService.state();
-        TransformNodes.warnIfNoTransformNodes(state);
+        final ClusterState clusterState = clusterService.state();
+        TransformNodes.warnIfNoTransformNodes(clusterState);
 
         transformConfigManager.expandTransformIds(
             request.getId(),
             request.getPageParams(),
             request.isAllowNoMatch(),
             ActionListener.wrap(hitsAndIds -> {
-                request.setExpandedIds(hitsAndIds.v2());
-                final TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(hitsAndIds.v2(), state);
+                TransformNodes.throwIfNoTransformNodes(clusterState);
+                boolean requiresRemote = hitsAndIds.v2().v2().stream().anyMatch(config -> config.getSource().requiresRemoteCluster());
+                if (TransformNodes.redirectToAnotherNodeIfNeeded(
+                        clusterState, nodeSettings, requiresRemote, transportService, actionName, request, Response::new, finalListener)) {
+                    return;
+                }
+
+                request.setExpandedIds(hitsAndIds.v2().v1());
+                final TransformNodeAssignments transformNodeAssignments =
+                    TransformNodes.transformTaskNodes(hitsAndIds.v2().v1(), clusterState);
 
                 ActionListener<Response> doExecuteListener = ActionListener.wrap(response -> {
-                    PersistentTasksCustomMetadata tasksInProgress = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
+                    PersistentTasksCustomMetadata tasksInProgress = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
                     if (tasksInProgress != null) {
                         // Mutates underlying state object with the assigned node attributes
-                        response.getTransformsStats().forEach(dtsasi -> setNodeAttributes(dtsasi, tasksInProgress, state));
+                        response.getTransformsStats().forEach(dtsasi -> setNodeAttributes(dtsasi, tasksInProgress, clusterState));
                     }
                     collectStatsForTransformsWithoutTasks(
                         request,
                         response,
                         transformNodeAssignments.getWaitingForAssignment(),
-                        state,
+                        clusterState,
                         ActionListener.wrap(
                             finalResponse -> finalListener.onResponse(
                                 new Response(

+ 24 - 9
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.transform.action;
 
 import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ingest.SimulatePipelineAction;
 import org.elasticsearch.action.ingest.SimulatePipelineRequest;
@@ -39,6 +40,8 @@ import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
 import org.elasticsearch.xpack.core.transform.TransformField;
 import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction;
+import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.Request;
+import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.Response;
 import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
 import org.elasticsearch.xpack.core.transform.transforms.SyncConfig;
 import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
@@ -46,6 +49,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSetti
 import org.elasticsearch.xpack.transform.persistence.TransformIndex;
 import org.elasticsearch.xpack.transform.transforms.Function;
 import org.elasticsearch.xpack.transform.transforms.FunctionFactory;
+import org.elasticsearch.xpack.transform.transforms.TransformNodes;
 import org.elasticsearch.xpack.transform.utils.SourceDestValidations;
 
 import java.time.Clock;
@@ -57,14 +61,14 @@ import java.util.stream.Collectors;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 
-public class TransportPreviewTransformAction extends HandledTransportAction<
-    PreviewTransformAction.Request,
-    PreviewTransformAction.Response> {
+public class TransportPreviewTransformAction extends HandledTransportAction<Request, Response> {
 
     private static final int NUMBER_OF_PREVIEW_BUCKETS = 100;
     private final Client client;
     private final ThreadPool threadPool;
     private final ClusterService clusterService;
+    private final TransportService transportService;
+    private final Settings nodeSettings;
     private final SourceDestValidator sourceDestValidator;
 
     @Inject
@@ -102,10 +106,12 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
         Settings settings,
         IngestService ingestService
     ) {
-        super(name, transportService, actionFilters, PreviewTransformAction.Request::new);
+        super(name, transportService, actionFilters, Request::new);
         this.client = client;
         this.threadPool = threadPool;
         this.clusterService = clusterService;
+        this.transportService = transportService;
+        this.nodeSettings = settings;
         this.sourceDestValidator = new SourceDestValidator(
             indexNameExpressionResolver,
             transportService.getRemoteClusterService(),
@@ -119,9 +125,18 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
     }
 
     @Override
-    protected void doExecute(Task task, PreviewTransformAction.Request request, ActionListener<PreviewTransformAction.Response> listener) {
+    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
         final ClusterState clusterState = clusterService.state();
-        TransformNodes.warnIfNoTransformNodes(clusterState);
+        TransformNodes.throwIfNoTransformNodes(clusterState);
+
+        // Redirection can only be performed between nodes that are at least 7.13.
+        if (clusterState.nodes().getMinNodeVersion().onOrAfter(Version.V_7_13_0)) {
+            boolean requiresRemote = request.getConfig().getSource().requiresRemoteCluster();
+            if (TransformNodes.redirectToAnotherNodeIfNeeded(
+                    clusterState, nodeSettings, requiresRemote, transportService, actionName, request, Response::new, listener)) {
+                return;
+            }
+        }
 
         final TransformConfig config = request.getConfig();
         sourceDestValidator.validate(
@@ -156,7 +171,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
         String pipeline,
         String dest,
         SyncConfig syncConfig,
-        ActionListener<PreviewTransformAction.Response> listener
+        ActionListener<Response> listener
     ) {
         final SetOnce<Map<String, String>> mappings = new SetOnce<>();
 
@@ -177,7 +192,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
 
             List<String> warnings = TransformConfigLinter.getWarnings(function, source, syncConfig);
             warnings.forEach(HeaderWarning::addWarning);
-            listener.onResponse(new PreviewTransformAction.Response(docs, generatedDestIndexSettings));
+            listener.onResponse(new Response(docs, generatedDestIndexSettings));
         }, listener::onFailure);
         function.deduceMappings(client, source, ActionListener.wrap(deducedMappings -> {
             mappings.set(deducedMappings);
@@ -196,7 +211,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
                         );
                         List<String> warnings = TransformConfigLinter.getWarnings(function, source, syncConfig);
                         warnings.forEach(HeaderWarning::addWarning);
-                        listener.onResponse(new PreviewTransformAction.Response(docs, generatedDestIndexSettings));
+                        listener.onResponse(new Response(docs, generatedDestIndexSettings));
                     } else {
                         List<Map<String, Object>> results = docs.stream().map(doc -> {
                             Map<String, Object> src = new HashMap<>();

+ 5 - 30
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java

@@ -23,14 +23,11 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.ingest.IngestService;
-import org.elasticsearch.license.License;
-import org.elasticsearch.license.RemoteClusterLicenseChecker;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.rest.RestStatus;
@@ -40,7 +37,6 @@ import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.XPackPlugin;
 import org.elasticsearch.xpack.core.XPackSettings;
-import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
 import org.elasticsearch.xpack.core.security.SecurityContext;
 import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
 import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
@@ -51,13 +47,13 @@ import org.elasticsearch.xpack.core.security.support.Exceptions;
 import org.elasticsearch.xpack.core.transform.TransformMessages;
 import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
 import org.elasticsearch.xpack.core.transform.action.PutTransformAction.Request;
+import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
 import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
 import org.elasticsearch.xpack.transform.TransformServices;
 import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
 import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
 import org.elasticsearch.xpack.transform.transforms.Function;
 import org.elasticsearch.xpack.transform.transforms.FunctionFactory;
-import org.elasticsearch.xpack.transform.utils.SourceDestValidations;
 
 import java.time.Instant;
 import java.util.ArrayList;
@@ -74,7 +70,6 @@ public class TransportPutTransformAction extends AcknowledgedTransportMasterNode
     private final TransformConfigManager transformConfigManager;
     private final SecurityContext securityContext;
     private final TransformAuditor auditor;
-    private final SourceDestValidator sourceDestValidator;
 
     @Inject
     public TransportPutTransformAction(
@@ -134,16 +129,6 @@ public class TransportPutTransformAction extends AcknowledgedTransportMasterNode
             ? new SecurityContext(settings, threadPool.getThreadContext())
             : null;
         this.auditor = transformServices.getAuditor();
-        this.sourceDestValidator = new SourceDestValidator(
-            indexNameExpressionResolver,
-            transportService.getRemoteClusterService(),
-            DiscoveryNode.isRemoteClusterClient(settings)
-                /* transforms are BASIC so always allowed, no need to check license */
-                ? new RemoteClusterLicenseChecker(client, mode -> true) : null,
-            ingestService,
-            clusterService.getNodeName(),
-            License.OperationMode.BASIC.description()
-        );
     }
 
     static HasPrivilegesRequest buildPrivilegeCheck(
@@ -192,7 +177,6 @@ public class TransportPutTransformAction extends AcknowledgedTransportMasterNode
     @Override
     protected void masterOperation(Task task, Request request, ClusterState clusterState, ActionListener<AcknowledgedResponse> listener) {
         XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
-        TransformNodes.warnIfNoTransformNodes(clusterState);
 
         // set headers to run transform as calling user
         Map<String, String> filteredHeaders = ClientHelper.filterSecurityHeaders(threadPool.getThreadContext().getHeaders());
@@ -208,12 +192,9 @@ public class TransportPutTransformAction extends AcknowledgedTransportMasterNode
             return;
         }
 
-        sourceDestValidator.validate(
-            clusterState,
-            config.getSource().getIndex(),
-            config.getDestination().getIndex(),
-            config.getDestination().getPipeline(),
-            SourceDestValidations.getValidations(request.isDeferValidation(), config.getAdditionalValidations()),
+        client.execute(
+            ValidateTransformAction.INSTANCE,
+            new ValidateTransformAction.Request(config, request.isDeferValidation()),
             ActionListener.wrap(
                 validationResponse -> {
                     // Early check to verify that the user can create the destination index and can read from the source
@@ -307,12 +288,6 @@ public class TransportPutTransformAction extends AcknowledgedTransportMasterNode
             }
         );
 
-        function.validateConfig(ActionListener.wrap(r2 -> {
-            if (request.isDeferValidation()) {
-                validationListener.onResponse(true);
-            } else {
-                function.validateQuery(client, config.getSource(), validationListener);
-            }
-        }, listener::onFailure));
+        validationListener.onResponse(true);
     }
 }

+ 20 - 60
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java

@@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
-import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
 import org.elasticsearch.action.support.ActionFilters;
@@ -22,14 +21,11 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.ingest.IngestService;
-import org.elasticsearch.license.License;
-import org.elasticsearch.license.RemoteClusterLicenseChecker;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.persistent.PersistentTasksService;
 import org.elasticsearch.rest.RestStatus;
@@ -37,9 +33,9 @@ import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ClientHelper;
-import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
 import org.elasticsearch.xpack.core.transform.TransformMessages;
 import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
+import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
 import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
 import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings;
 import org.elasticsearch.xpack.core.transform.transforms.TransformState;
@@ -49,9 +45,7 @@ import org.elasticsearch.xpack.transform.TransformServices;
 import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
 import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
 import org.elasticsearch.xpack.transform.persistence.TransformIndex;
-import org.elasticsearch.xpack.transform.transforms.Function;
-import org.elasticsearch.xpack.transform.transforms.FunctionFactory;
-import org.elasticsearch.xpack.transform.utils.SourceDestValidations;
+import org.elasticsearch.xpack.transform.transforms.TransformNodes;
 
 import java.time.Clock;
 import java.util.Collection;
@@ -69,7 +63,6 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
     private final PersistentTasksService persistentTasksService;
     private final Client client;
     private final TransformAuditor auditor;
-    private final SourceDestValidator sourceDestValidator;
     private final IngestService ingestService;
 
     @Inject
@@ -128,16 +121,6 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
         this.persistentTasksService = persistentTasksService;
         this.client = client;
         this.auditor = transformServices.getAuditor();
-        this.sourceDestValidator = new SourceDestValidator(
-            indexNameExpressionResolver,
-            transportService.getRemoteClusterService(),
-            DiscoveryNode.isRemoteClusterClient(settings)
-                /* transforms are BASIC so always allowed, no need to check license */
-                ? new RemoteClusterLicenseChecker(client, mode -> true) : null,
-            ingestService,
-            clusterService.getNodeName(),
-            License.OperationMode.BASIC.description()
-        );
         this.ingestService = ingestService;
     }
 
@@ -150,13 +133,13 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
     ) throws Exception {
         TransformNodes.warnIfNoTransformNodes(state);
 
-        final AtomicReference<TransformTaskParams> transformTaskHolder = new AtomicReference<>();
+        final AtomicReference<TransformTaskParams> transformTaskParamsHolder = new AtomicReference<>();
         final AtomicReference<TransformConfig> transformConfigHolder = new AtomicReference<>();
 
         // <5> Wait for the allocated task's state to STARTED
         ActionListener<PersistentTasksCustomMetadata.PersistentTask<TransformTaskParams>> newPersistentTaskActionListener = ActionListener
             .wrap(task -> {
-                TransformTaskParams transformTask = transformTaskHolder.get();
+                TransformTaskParams transformTask = transformTaskParamsHolder.get();
                 assert transformTask != null;
                 waitForTransformTaskStarted(
                     task.getId(),
@@ -168,7 +151,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
 
         // <4> Create the task in cluster state so that it will start executing on the node
         ActionListener<Boolean> createOrGetIndexListener = ActionListener.wrap(unused -> {
-            TransformTaskParams transformTask = transformTaskHolder.get();
+            TransformTaskParams transformTask = transformTaskParamsHolder.get();
             assert transformTask != null;
             PersistentTasksCustomMetadata.PersistentTask<TransformTaskParams> existingTask = getExistingTask(transformTask.getId(), state);
             if (existingTask == null) {
@@ -203,12 +186,12 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
         }, listener::onFailure);
 
         // <2> If the destination index exists, start the task, otherwise deduce our mappings for the destination index and create it
-        ActionListener<Boolean> validationListener = ActionListener.wrap(validationResponse -> {
+        ActionListener<ValidateTransformAction.Response> validationListener = ActionListener.wrap(validationResponse -> {
             final String destinationIndex = transformConfigHolder.get().getDestination().getIndex();
             String[] dest = indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.lenientExpandOpen(), destinationIndex);
 
             if (dest.length == 0) {
-                createDestinationIndex(transformConfigHolder.get(), ActionListener.wrap(r -> {
+                createDestinationIndex(transformConfigHolder.get(), validationResponse.getDestIndexMappings(), ActionListener.wrap(r -> {
                     auditor.info(request.getId(), "Created destination index [" + destinationIndex + "] with deduced mappings.");
                     createOrGetIndexListener.onResponse(r);
                 }, createOrGetIndexListener::onFailure));
@@ -249,43 +232,29 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
                 );
                 return;
             }
-            transformTaskHolder.set(
-                createTransform(config.getId(), config.getVersion(), config.getFrequency(), config.getSource().requiresRemoteCluster())
+            transformTaskParamsHolder.set(
+                new TransformTaskParams(
+                    config.getId(), config.getVersion(), config.getFrequency(), config.getSource().requiresRemoteCluster()
+                )
             );
             transformConfigHolder.set(config);
-
-            sourceDestValidator.validate(
-                clusterService.state(),
-                config.getSource().getIndex(),
-                config.getDestination().getIndex(),
-                config.getDestination().getPipeline(),
-                SourceDestValidations.getValidations(false, config.getAdditionalValidations()),
-                validationListener
-            );
+            client.execute(ValidateTransformAction.INSTANCE, new ValidateTransformAction.Request(config, false), validationListener);
         }, listener::onFailure);
 
         // <1> Get the config to verify it exists and is valid
         transformConfigManager.getTransformConfiguration(request.getId(), getTransformListener);
     }
 
-    private void createDestinationIndex(final TransformConfig config, final ActionListener<Boolean> listener) {
-
-        final Function function = FunctionFactory.create(config);
+    private void createDestinationIndex(final TransformConfig config,
+                                        final Map<String, String> mappings,
+                                        final ActionListener<Boolean> listener) {
 
-        ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(mappings -> {
-            TransformDestIndexSettings generateddestIndexSettings = TransformIndex.createTransformDestIndexSettings(
-                mappings,
-                config.getId(),
-                Clock.systemUTC()
-            );
-            TransformIndex.createDestinationIndex(client, config, generateddestIndexSettings, listener);
-        },
-            deduceTargetMappingsException -> listener.onFailure(
-                new RuntimeException(TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_DEDUCE_DEST_MAPPINGS, deduceTargetMappingsException)
-            )
+        TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
+            mappings,
+            config.getId(),
+            Clock.systemUTC()
         );
-
-        function.deduceMappings(client, config.getSource(), deduceMappingsListener);
+        TransformIndex.createDestinationIndex(client, config, generatedDestIndexSettings, listener);
     }
 
     @Override
@@ -293,15 +262,6 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
         return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
     }
 
-    private static TransformTaskParams createTransform(
-        String transformId,
-        Version transformVersion,
-        TimeValue frequency,
-        Boolean requiresRemoteCluster
-    ) {
-        return new TransformTaskParams(transformId, transformVersion, frequency, requiresRemoteCluster);
-    }
-
     @SuppressWarnings("unchecked")
     private static PersistentTasksCustomMetadata.PersistentTask<TransformTaskParams> getExistingTask(String id, ClusterState state) {
         PersistentTasksCustomMetadata pTasksMeta = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);

+ 6 - 3
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java

@@ -45,6 +45,8 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformState;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
 import org.elasticsearch.xpack.transform.TransformServices;
 import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
+import org.elasticsearch.xpack.transform.transforms.TransformNodeAssignments;
+import org.elasticsearch.xpack.transform.transforms.TransformNodes;
 import org.elasticsearch.xpack.transform.transforms.TransformTask;
 
 import java.util.ArrayList;
@@ -163,9 +165,10 @@ public class TransportStopTransformAction extends TransportTasksAction<Transform
                 new PageParams(0, 10_000),
                 request.isAllowNoMatch(),
                 ActionListener.wrap(hitsAndIds -> {
-                    validateTaskState(state, hitsAndIds.v2(), request.isForce());
-                    request.setExpandedIds(new HashSet<>(hitsAndIds.v2()));
-                    final TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(hitsAndIds.v2(), state);
+                    validateTaskState(state, hitsAndIds.v2().v1(), request.isForce());
+                    request.setExpandedIds(new HashSet<>(hitsAndIds.v2().v1()));
+                    final TransformNodeAssignments transformNodeAssignments =
+                        TransformNodes.transformTaskNodes(hitsAndIds.v2().v1(), state);
 
                     final ActionListener<Response> doExecuteListener;
                     if (transformNodeAssignments.getWaitingForAssignment().size() > 0) {

+ 26 - 57
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java

@@ -22,7 +22,6 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
@@ -30,8 +29,6 @@ import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.discovery.MasterNotDiscoveredException;
 import org.elasticsearch.ingest.IngestService;
-import org.elasticsearch.license.License;
-import org.elasticsearch.license.RemoteClusterLicenseChecker;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.rest.RestStatus;
@@ -41,7 +38,6 @@ import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.XPackPlugin;
 import org.elasticsearch.xpack.core.XPackSettings;
-import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
 import org.elasticsearch.xpack.core.security.SecurityContext;
 import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
 import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
@@ -52,6 +48,7 @@ import org.elasticsearch.xpack.core.transform.TransformMessages;
 import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
 import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Request;
 import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Response;
+import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
 import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
 import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
 import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings;
@@ -66,7 +63,6 @@ import org.elasticsearch.xpack.transform.persistence.TransformIndex;
 import org.elasticsearch.xpack.transform.transforms.Function;
 import org.elasticsearch.xpack.transform.transforms.FunctionFactory;
 import org.elasticsearch.xpack.transform.transforms.TransformTask;
-import org.elasticsearch.xpack.transform.utils.SourceDestValidations;
 
 import java.time.Clock;
 import java.util.List;
@@ -83,7 +79,6 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
     private final TransformConfigManager transformConfigManager;
     private final SecurityContext securityContext;
     private final TransformAuditor auditor;
-    private final SourceDestValidator sourceDestValidator;
     private final ThreadPool threadPool;
     private final IndexNameExpressionResolver indexNameExpressionResolver;
 
@@ -146,16 +141,6 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
             ? new SecurityContext(settings, threadPool.getThreadContext())
             : null;
         this.auditor = transformServices.getAuditor();
-        this.sourceDestValidator = new SourceDestValidator(
-            indexNameExpressionResolver,
-            transportService.getRemoteClusterService(),
-            DiscoveryNode.isRemoteClusterClient(settings)
-                /* transforms are BASIC so always allowed, no need to check license */
-                ? new RemoteClusterLicenseChecker(client, mode -> true) : null,
-            ingestService,
-            clusterService.getNodeName(),
-            License.OperationMode.BASIC.description()
-        );
         this.threadPool = threadPool;
         this.indexNameExpressionResolver = indexNameExpressionResolver;
     }
@@ -181,7 +166,6 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
             }
             return;
         }
-        TransformNodes.warnIfNoTransformNodes(clusterState);
 
         // set headers to run transform as calling user
         Map<String, String> filteredHeaders = ClientHelper.filterSecurityHeaders(threadPool.getThreadContext().getHeaders());
@@ -235,15 +219,18 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
                 updateListener = listener;
             }
 
-            sourceDestValidator.validate(
-                clusterState,
-                updatedConfig.getSource().getIndex(),
-                updatedConfig.getDestination().getIndex(),
-                updatedConfig.getDestination().getPipeline(),
-                SourceDestValidations.getValidations(request.isDeferValidation(), config.getAdditionalValidations()),
+            client.execute(
+                ValidateTransformAction.INSTANCE,
+                new ValidateTransformAction.Request(updatedConfig, request.isDeferValidation()),
                 ActionListener.wrap(
                     validationResponse -> {
-                        checkPriviledgesAndUpdateTransform(request, clusterState, updatedConfig, configAndVersion.v2(), updateListener);
+                        checkPriviledgesAndUpdateTransform(
+                            request,
+                            clusterState,
+                            updatedConfig,
+                            validationResponse.getDestIndexMappings(),
+                            configAndVersion.v2(),
+                            updateListener);
                     },
                     listener::onFailure
                 )
@@ -274,13 +261,14 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
         String username,
         Request request,
         TransformConfig config,
+        Map<String, String> mappings,
         SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
         ClusterState clusterState,
         HasPrivilegesResponse privilegesResponse,
         ActionListener<Response> listener
     ) {
         if (privilegesResponse.isCompleteMatch()) {
-            updateTransform(request, config, seqNoPrimaryTermAndIndex, clusterState, listener);
+            updateTransform(request, config, mappings, seqNoPrimaryTermAndIndex, clusterState, listener);
         } else {
             List<String> indices = privilegesResponse.getIndexPrivileges()
                 .stream()
@@ -302,6 +290,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
         Request request,
         ClusterState clusterState,
         TransformConfig config,
+        Map<String, String> mappings,
         SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
         ActionListener<Response> listener
     ) {
@@ -310,19 +299,20 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
             final String username = securityContext.getUser().principal();
             HasPrivilegesRequest privRequest = buildPrivilegeCheck(config, indexNameExpressionResolver, clusterState, username);
             ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
-                r -> handlePrivsResponse(username, request, config, seqNoPrimaryTermAndIndex, clusterState, r, listener),
+                r -> handlePrivsResponse(username, request, config, mappings, seqNoPrimaryTermAndIndex, clusterState, r, listener),
                 listener::onFailure
             );
 
             client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
         } else { // No security enabled, just create the transform
-            updateTransform(request, config, seqNoPrimaryTermAndIndex, clusterState, listener);
+            updateTransform(request, config, mappings, seqNoPrimaryTermAndIndex, clusterState, listener);
         }
     }
 
     private void updateTransform(
         Request request,
         TransformConfig config,
+        Map<String, String> mappings,
         SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
         ClusterState clusterState,
         ActionListener<Response> listener
@@ -351,7 +341,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
         );
 
         // <2> Update our transform
-        ActionListener<Void> createDestinationListener = ActionListener.wrap(
+        ActionListener<Boolean> createDestinationListener = ActionListener.wrap(
             createDestResponse -> transformConfigManager.updateTransformConfiguration(
                 config,
                 seqNoPrimaryTermAndIndex,
@@ -379,7 +369,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
             // we allow source indices to disappear. If the source and destination indices do not exist, don't do anything
             // the transform will just have to dynamically create the destination index without special mapping.
                 && src.length > 0) {
-                createDestination(function, config, createDestinationListener);
+                createDestinationIndex(config, mappings, createDestinationListener);
             } else {
                 createDestinationListener.onResponse(null);
             }
@@ -403,36 +393,15 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
             }
         });
 
-        function.validateConfig(ActionListener.wrap(r2 -> {
-            if (request.isDeferValidation()) {
-                functionValidationListener.onResponse(true);
-            } else {
-                function.validateQuery(client, config.getSource(), functionValidationListener);
-            }
-        }, listener::onFailure));
+        functionValidationListener.onResponse(true);
     }
 
-    private void createDestination(Function function, TransformConfig config, ActionListener<Void> listener) {
-        ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(mappings -> {
-            TransformDestIndexSettings generateddestIndexSettings = TransformIndex.createTransformDestIndexSettings(
-                mappings,
-                config.getId(),
-                Clock.systemUTC()
-            );
-            TransformIndex.createDestinationIndex(
-                client,
-                config,
-                generateddestIndexSettings,
-                ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure)
-            );
-        },
-
-            deduceTargetMappingsException -> listener.onFailure(
-                new RuntimeException(TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_DEDUCE_DEST_MAPPINGS, deduceTargetMappingsException)
-            )
+    private void createDestinationIndex(TransformConfig config, Map<String, String> mappings, ActionListener<Boolean> listener) {
+        TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
+            mappings,
+            config.getId(),
+            Clock.systemUTC()
         );
-
-        function.deduceMappings(client, config.getSource(), deduceMappingsListener);
+        TransformIndex.createDestinationIndex(client, config, generatedDestIndexSettings, listener);
     }
-
 }

+ 165 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java

@@ -0,0 +1,165 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.transform.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.ingest.IngestService;
+import org.elasticsearch.license.License;
+import org.elasticsearch.license.RemoteClusterLicenseChecker;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
+import org.elasticsearch.xpack.core.transform.TransformMessages;
+import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
+import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction.Request;
+import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction.Response;
+import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
+import org.elasticsearch.xpack.transform.transforms.Function;
+import org.elasticsearch.xpack.transform.transforms.FunctionFactory;
+import org.elasticsearch.xpack.transform.transforms.TransformNodes;
+import org.elasticsearch.xpack.transform.utils.SourceDestValidations;
+
+import java.util.Map;
+
+public class TransportValidateTransformAction extends HandledTransportAction<Request, Response> {
+
+    private final Client client;
+    private final ClusterService clusterService;
+    private final TransportService transportService;
+    private final Settings nodeSettings;
+    private final SourceDestValidator sourceDestValidator;
+
+    @Inject
+    public TransportValidateTransformAction(
+        TransportService transportService,
+        ActionFilters actionFilters,
+        Client client,
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        ClusterService clusterService,
+        Settings settings,
+        IngestService ingestService
+    ) {
+        this(
+            ValidateTransformAction.NAME,
+            transportService,
+            actionFilters,
+            client,
+            indexNameExpressionResolver,
+            clusterService,
+            settings,
+            ingestService);
+    }
+
+    protected TransportValidateTransformAction(
+        String name,
+        TransportService transportService,
+        ActionFilters actionFilters,
+        Client client,
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        ClusterService clusterService,
+        Settings settings,
+        IngestService ingestService
+    ) {
+        super(name, transportService, actionFilters, Request::new);
+        this.client = client;
+        this.clusterService = clusterService;
+        this.transportService = transportService;
+        this.nodeSettings = settings;
+        this.sourceDestValidator = new SourceDestValidator(
+            indexNameExpressionResolver,
+            transportService.getRemoteClusterService(),
+            DiscoveryNode.isRemoteClusterClient(settings)
+                /* transforms are BASIC so always allowed, no need to check license */
+                ? new RemoteClusterLicenseChecker(client, mode -> true) : null,
+            ingestService,
+            clusterService.getNodeName(),
+            License.OperationMode.BASIC.description()
+        );
+    }
+
+    @Override
+    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
+        final ClusterState clusterState = clusterService.state();
+        if (request.isDeferValidation() == false) {
+            TransformNodes.throwIfNoTransformNodes(clusterState);
+            boolean requiresRemote = request.getConfig().getSource().requiresRemoteCluster();
+            if (TransformNodes.redirectToAnotherNodeIfNeeded(
+                    clusterState, nodeSettings, requiresRemote, transportService, actionName, request, Response::new, listener)) {
+                return;
+            }
+        }
+
+        TransformNodes.warnIfNoTransformNodes(clusterState);
+
+        final TransformConfig config = request.getConfig();
+        final Function function = FunctionFactory.create(config);
+
+        // <5> Final listener
+        ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(
+            mappings -> {
+                listener.onResponse(new Response(mappings));
+            },
+            deduceTargetMappingsException -> listener.onFailure(
+                new RuntimeException(
+                    TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_DEDUCE_DEST_MAPPINGS,
+                    deduceTargetMappingsException)
+            )
+        );
+
+        // <4> Deduce destination index mappings
+        ActionListener<Boolean> validateQueryListener = ActionListener.wrap(
+            validateQueryResponse -> {
+                if (request.isDeferValidation()) {
+                    deduceMappingsListener.onResponse(null);
+                } else {
+                    function.deduceMappings(client, config.getSource(), deduceMappingsListener);
+                }
+            },
+            listener::onFailure
+        );
+
+        // <3> Validate transform query
+        ActionListener<Boolean> validateConfigListener = ActionListener.wrap(
+            validateConfigResponse -> {
+                if (request.isDeferValidation()) {
+                    validateQueryListener.onResponse(true);
+                } else {
+                    function.validateQuery(client, config.getSource(), validateQueryListener);
+                }
+            },
+            listener::onFailure
+        );
+
+        // <2> Validate transform function config
+        ActionListener<Boolean> validateSourceDestListener = ActionListener.wrap(
+            validateSourceDestResponse -> {
+                function.validateConfig(validateConfigListener);
+            },
+            listener::onFailure
+        );
+
+        // <1> Validate source and destination indices
+        sourceDestValidator.validate(
+            clusterState,
+            config.getSource().getIndex(),
+            config.getDestination().getIndex(),
+            config.getDestination().getPipeline(),
+            SourceDestValidations.getValidations(request.isDeferValidation(), config.getAdditionalValidations()),
+            validateSourceDestListener
+        );
+    }
+}

+ 4 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportGetTransformStatsActionDeprecated.java

@@ -11,6 +11,7 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.transform.action.compat.GetTransformStatsActionDeprecated;
 import org.elasticsearch.xpack.transform.TransformServices;
@@ -24,8 +25,9 @@ public class TransportGetTransformStatsActionDeprecated extends TransportGetTran
         ActionFilters actionFilters,
         ClusterService clusterService,
         TransformServices transformServices,
-        Client client
+        Client client,
+        Settings settings
     ) {
-        super(GetTransformStatsActionDeprecated.NAME, transportService, actionFilters, clusterService, transformServices, client);
+        super(GetTransformStatsActionDeprecated.NAME, transportService, actionFilters, clusterService, transformServices, client, settings);
     }
 }

+ 12 - 10
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/IndexBasedTransformConfigManager.java

@@ -403,7 +403,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
         String transformIdsExpression,
         PageParams pageParams,
         boolean allowNoMatch,
-        ActionListener<Tuple<Long, List<String>>> foundIdsListener
+        ActionListener<Tuple<Long, Tuple<List<String>, List<TransformConfig>>>> foundConfigsListener
     ) {
         String[] idTokens = ExpandedIdsMatcher.tokenizeExpression(transformIdsExpression);
         QueryBuilder queryBuilder = buildQueryFromTokenizedIds(idTokens, TransformConfig.NAME);
@@ -417,8 +417,6 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
             .setTrackTotalHits(true)
             .setSize(pageParams.getSize())
             .setQuery(queryBuilder)
-            // We only care about the `id` field, small optimization
-            .setFetchSource(TransformField.ID.getPreferredName(), "")
             .request();
 
         final ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(idTokens, allowNoMatch);
@@ -431,23 +429,26 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
                 long totalHits = searchResponse.getHits().getTotalHits().value;
                 // important: preserve order
                 Set<String> ids = new LinkedHashSet<>(searchResponse.getHits().getHits().length);
+                Set<TransformConfig> configs = new LinkedHashSet<>(searchResponse.getHits().getHits().length);
                 for (SearchHit hit : searchResponse.getHits().getHits()) {
                     BytesReference source = hit.getSourceRef();
                     try (
                         InputStream stream = source.streamInput();
                         XContentParser parser = XContentFactory.xContent(XContentType.JSON)
-                            .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
+                            .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)
                     ) {
-                        ids.add((String) parser.map().get(TransformField.ID.getPreferredName()));
+                        TransformConfig config = TransformConfig.fromXContent(parser, null, true);
+                        ids.add(config.getId());
+                        configs.add(config);
                     } catch (IOException e) {
-                        foundIdsListener.onFailure(new ElasticsearchParseException("failed to parse search hit for ids", e));
+                        foundConfigsListener.onFailure(new ElasticsearchParseException("failed to parse search hit for ids", e));
                         return;
                     }
                 }
                 requiredMatches.filterMatchedIds(ids);
                 if (requiredMatches.hasUnmatchedIds()) {
                     // some required Ids were not found
-                    foundIdsListener.onFailure(
+                    foundConfigsListener.onFailure(
                         new ResourceNotFoundException(
                             TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, requiredMatches.unmatchedIdsString())
                         )
@@ -457,11 +458,12 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
                 // if only exact ids have been given, take the count from docs to avoid potential duplicates
                 // in versioned indexes (like transform)
                 if (requiredMatches.isOnlyExact()) {
-                    foundIdsListener.onResponse(new Tuple<>((long) ids.size(), new ArrayList<>(ids)));
+                    foundConfigsListener.onResponse(
+                        new Tuple<>((long) ids.size(), Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs))));
                 } else {
-                    foundIdsListener.onResponse(new Tuple<>(totalHits, new ArrayList<>(ids)));
+                    foundConfigsListener.onResponse(new Tuple<>(totalHits, Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs))));
                 }
-            }, foundIdsListener::onFailure),
+            }, foundConfigsListener::onFailure),
             client::search
         );
     }

+ 2 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java

@@ -123,13 +123,13 @@ public interface TransformConfigManager {
      *
      * @param transformIdsExpression The id expression. Can be _all, *, or comma delimited list of simple regex strings
      * @param pageParams             The paging params
-     * @param foundIdsListener       The listener on signal on success or failure
+     * @param foundConfigsListener   The listener on signal on success or failure
      */
     void expandTransformIds(
         String transformIdsExpression,
         PageParams pageParams,
         boolean allowNoMatch,
-        ActionListener<Tuple<Long, List<String>>> foundIdsListener
+        ActionListener<Tuple<Long, Tuple<List<String>, List<TransformConfig>>>> foundConfigsListener
     );
 
     /**

+ 1 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformIndex.java

@@ -100,7 +100,7 @@ public final class TransformIndex {
 
         Map<String, Object> transformMetadata = new HashMap<>();
         transformMetadata.put(TransformField.CREATION_DATE_MILLIS, clock.millis());
-        transformMetadata.put(TransformField.VERSION.getPreferredName(), Map.of(TransformField.CREATED, Version.CURRENT));
+        transformMetadata.put(TransformField.VERSION.getPreferredName(), Map.of(TransformField.CREATED, Version.CURRENT.toString()));
         transformMetadata.put(TransformField.TRANSFORM, id);
 
         metadata.put(TransformField.META_FIELDNAME, transformMetadata);

+ 1 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodeAssignments.java → x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformNodeAssignments.java

@@ -5,7 +5,7 @@
  * 2.0.
  */
 
-package org.elasticsearch.xpack.transform.action;
+package org.elasticsearch.xpack.transform.transforms;
 
 import java.util.Collections;
 import java.util.Set;

+ 118 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java → x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformNodes.java

@@ -5,16 +5,27 @@
  * 2.0.
  */
 
-package org.elasticsearch.xpack.transform.action;
+package org.elasticsearch.xpack.transform.transforms;
 
+import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.logging.HeaderWarning;
 import org.elasticsearch.common.regex.Regex;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
+import org.elasticsearch.transport.TransportRequest;
+import org.elasticsearch.transport.TransportResponse;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.core.transform.TransformField;
 import org.elasticsearch.xpack.core.transform.TransformMessages;
 import org.elasticsearch.xpack.core.transform.TransformMetadata;
@@ -24,7 +35,10 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -152,4 +166,107 @@ public final class TransformNodes {
             }
         }
     }
+
+    /**
+     * Check if cluster has at least 1 transform nodes and throw an exception if not.
+     * To be used by transport actions only.
+     *
+     * @param clusterState state
+     */
+    public static void throwIfNoTransformNodes(ClusterState clusterState) {
+        long transformNodes = getNumberOfTransformNodes(clusterState);
+        if (transformNodes == 0) {
+            throw ExceptionsHelper.badRequestException(TransformMessages.REST_WARN_NO_TRANSFORM_NODES);
+        }
+    }
+
+    public static <Request extends TransportRequest, Response extends TransportResponse> boolean redirectToAnotherNodeIfNeeded(
+        ClusterState clusterState,
+        Settings nodeSettings,
+        boolean requiresRemote,
+        TransportService transportService,
+        String actionName,
+        Request request,
+        Writeable.Reader<Response> reader,
+        ActionListener<Response> listener
+    ) {
+        final boolean isTransformNode = DiscoveryNode.hasRole(nodeSettings, DiscoveryNodeRole.TRANSFORM_ROLE);
+        final boolean isRemoteClusterClientNode = DiscoveryNode.isRemoteClusterClient(nodeSettings);
+        final DiscoveryNodes nodes = clusterState.nodes();
+
+        if ((isTransformNode == false) || (requiresRemote && (isRemoteClusterClientNode == false))) {
+            Optional<DiscoveryNode> appropriateNode = selectAnyNodeThatCanRunThisTransform(nodes, requiresRemote);
+            if (appropriateNode.isPresent()) {
+                // Redirect the request to an appropriate node
+                transportService.sendRequest(
+                    appropriateNode.get(),
+                    actionName,
+                    request,
+                    new ActionListenerResponseHandler<>(listener, reader));
+            } else {
+                Map<String, String> explain = new TreeMap<>();
+                for (DiscoveryNode node : nodes) {
+                    nodeCanRunThisTransform(node, Version.V_7_13_0, requiresRemote, explain);
+                }
+                // There are no appropriate nodes in the cluster, fail
+                listener.onFailure(
+                    ExceptionsHelper.badRequestException(
+                        "No appropriate node to run on, reasons [{}]",
+                        explain.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining("|"))));
+            }
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Select any node among provided nodes that satisfies all of the following:
+     *  - is a transform node
+     *  - is a remote_cluster_client node (in case this transform uses CCS, i.e. requires access to remote indices)
+     *  - runs at least version 7.13
+     *    This is needed as version 7.13 contains changes in wire format of {@code TransformDestIndexSettings} which are needed to correctly
+     *    read the redirected response.
+     *
+     * @param nodes nodes to select from
+     * @param requiresRemote whether this transform requires access to remote indices
+     * @return selected node or {@code Optional.empty()} if none of the nodes satisfy the conditions
+     */
+    static Optional<DiscoveryNode> selectAnyNodeThatCanRunThisTransform(DiscoveryNodes nodes, boolean requiresRemote) {
+        return StreamSupport.stream(nodes.spliterator(), false)
+            .filter(node -> nodeCanRunThisTransform(node, Version.V_7_13_0, requiresRemote, null))
+            .findAny();
+    }
+
+    public static boolean nodeCanRunThisTransform(DiscoveryNode node,
+                                                  Version minRequiredVersion,
+                                                  boolean requiresRemote,
+                                                  Map<String, String> explain) {
+        // version of the transform run on a node that has at least the same version
+        if (node.getVersion().onOrAfter(minRequiredVersion) == false) {
+            if (explain != null) {
+                explain.put(
+                    node.getId(), "node has version: " + node.getVersion() + " but transform requires at least " + minRequiredVersion);
+            }
+            return false;
+        }
+
+        // transform enabled?
+        if (node.getRoles().contains(DiscoveryNodeRole.TRANSFORM_ROLE) == false) {
+            if (explain != null) {
+                explain.put(node.getId(), "not a transform node");
+            }
+            return false;
+        }
+
+        // does the transform require a remote and remote is enabled?
+        if (requiresRemote && node.isRemoteClusterClient() == false) {
+            if (explain != null) {
+                explain.put(node.getId(), "transform requires a remote connection but remote is disabled");
+            }
+            return false;
+        }
+
+        // we found no reason that the transform can not run on this node
+        return true;
+    }
 }

+ 4 - 35
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java

@@ -19,7 +19,6 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
@@ -57,6 +56,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.xpack.transform.transforms.TransformNodes.nodeCanRunThisTransform;
+
 public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<TransformTaskParams> {
 
     private static final Logger logger = LogManager.getLogger(TransformPersistentTasksExecutor.class);
@@ -109,13 +110,13 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
         }
         DiscoveryNode discoveryNode = selectLeastLoadedNode(
             clusterState,
-            (node) -> nodeCanRunThisTransform(node, params, null)
+            node -> nodeCanRunThisTransform(node, params.getVersion(), params.requiresRemote(), null)
         );
 
         if (discoveryNode == null) {
             Map<String, String> explainWhyAssignmentFailed = new TreeMap<>();
             for (DiscoveryNode node : clusterState.getNodes()) {
-                nodeCanRunThisTransform(node, params, explainWhyAssignmentFailed);
+                nodeCanRunThisTransform(node, params.getVersion(), params.requiresRemote(), explainWhyAssignmentFailed);
             }
             String reason = "Not starting transform ["
                 + params.getId()
@@ -130,38 +131,6 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
         return new PersistentTasksCustomMetadata.Assignment(discoveryNode.getId(), "");
     }
 
-    public static boolean nodeCanRunThisTransform(DiscoveryNode node, TransformTaskParams params, Map<String, String> explain) {
-        // version of the transform run on a node that has at least the same version
-        if (node.getVersion().onOrAfter(params.getVersion()) == false) {
-            if (explain != null) {
-                explain.put(
-                    node.getId(),
-                    "node has version: " + node.getVersion() + " but transform requires at least " + params.getVersion()
-                );
-            }
-            return false;
-        }
-
-        // transform enabled?
-        if (node.getRoles().contains(DiscoveryNodeRole.TRANSFORM_ROLE) == false) {
-            if (explain != null) {
-                explain.put(node.getId(), "not a transform node");
-            }
-            return false;
-        }
-
-        // does the transform require a remote and remote is enabled?
-        if (params.requiresRemote() && node.isRemoteClusterClient() == false) {
-            if (explain != null) {
-                explain.put(node.getId(), "transform requires a remote connection but remote is disabled");
-            }
-            return false;
-        }
-
-        // we found no reason that the transform can not run on this node
-        return true;
-    }
-
     static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, IndexNameExpressionResolver resolver) {
         String[] indices = resolver.concreteIndexNames(
             clusterState,

+ 4 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/SourceDestValidations.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.transform.utils;
 
+import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
 import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SourceDestValidation;
 
 import java.util.ArrayList;
@@ -17,7 +18,6 @@ import java.util.List;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_IN_SOURCE_VALIDATION;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_PIPELINE_MISSING_VALIDATION;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_SINGLE_INDEX_VALIDATION;
-import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.REMOTE_SOURCE_VALIDATION;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SOURCE_MISSING_VALIDATION;
 
 /**
@@ -27,6 +27,9 @@ public final class SourceDestValidations {
 
     private SourceDestValidations() {}
 
+    private static final SourceDestValidation REMOTE_SOURCE_VALIDATION =
+        new SourceDestValidator.RemoteSourceEnabledAndRemoteLicenseValidation("transform");
+
     private static final List<SourceDestValidation> PREVIEW_VALIDATIONS = Arrays.asList(
         SOURCE_MISSING_VALIDATION, REMOTE_SOURCE_VALIDATION, DESTINATION_PIPELINE_MISSING_VALIDATION);
 

+ 12 - 6
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/InMemoryTransformConfigManager.java

@@ -19,13 +19,15 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+
 /**
  * Simple in-memory based TransformConfigManager
  *
@@ -140,30 +142,34 @@ public class InMemoryTransformConfigManager implements TransformConfigManager {
         String transformIdsExpression,
         PageParams pageParams,
         boolean allowNoMatch,
-        ActionListener<Tuple<Long, List<String>>> foundIdsListener
+        ActionListener<Tuple<Long, Tuple<List<String>, List<TransformConfig>>>> foundConfigsListener
     ) {
 
         if (Regex.isMatchAllPattern(transformIdsExpression)) {
             List<String> ids = new ArrayList<>(configs.keySet());
-            foundIdsListener.onResponse(new Tuple<>((long) ids.size(), ids));
+            foundConfigsListener.onResponse(new Tuple<>((long) ids.size(), Tuple.tuple(ids, new ArrayList<>(configs.values()))));
             return;
         }
 
         if (Regex.isSimpleMatchPattern(transformIdsExpression) == false) {
             if (configs.containsKey(transformIdsExpression)) {
-                foundIdsListener.onResponse(new Tuple<>(1L, Collections.singletonList(transformIdsExpression)));
+                foundConfigsListener.onResponse(
+                    new Tuple<>(
+                        1L, Tuple.tuple(singletonList(transformIdsExpression), singletonList(configs.get(transformIdsExpression)))));
             } else {
-                foundIdsListener.onResponse(new Tuple<>(0L, Collections.emptyList()));
+                foundConfigsListener.onResponse(new Tuple<>(0L, Tuple.tuple(emptyList(), emptyList())));
             }
             return;
         }
         Set<String> ids = new LinkedHashSet<>();
+        Set<TransformConfig> matchedConfigs = new LinkedHashSet<>();
         configs.keySet().forEach(id -> {
             if (Regex.simpleMatch(transformIdsExpression, id)) {
                 ids.add(id);
+                matchedConfigs.add(configs.get(id));
             }
         });
-        foundIdsListener.onResponse(new Tuple<>((long) ids.size(), new ArrayList<>(ids)));
+        foundConfigsListener.onResponse(new Tuple<>((long) ids.size(), Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(matchedConfigs))));
     }
 
     @Override

+ 33 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformNodeAssignmentsTests.java

@@ -0,0 +1,33 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.transform.transforms;
+
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+public class TransformNodeAssignmentsTests extends ESTestCase {
+
+    public void testConstructorAndGetters() {
+        Set<String> executorNodes = new HashSet<>(Arrays.asList("executor-1", "executor-2"));
+        Set<String> assigned = new HashSet<>(Arrays.asList("assigned-1", "assigned-2"));
+        Set<String> waitingForAssignment = new HashSet<>(Arrays.asList("waiting-1", "waitingv-2"));
+        Set<String> stopped = new HashSet<>(Arrays.asList("stopped-1", "stopped-2"));
+        TransformNodeAssignments assignments = new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped);
+
+        assertThat(assignments.getExecutorNodes(), is(equalTo(executorNodes)));
+        assertThat(assignments.getAssigned(), is(equalTo(assigned)));
+        assertThat(assignments.getWaitingForAssignment(), is(equalTo(waitingForAssignment)));
+        assertThat(assignments.getStopped(), is(equalTo(stopped)));
+    }
+}

+ 47 - 1
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java → x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformNodesTests.java

@@ -5,12 +5,15 @@
  * 2.0.
  */
 
-package org.elasticsearch.xpack.transform.action;
+package org.elasticsearch.xpack.transform.transforms;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.persistent.PersistentTaskParams;
@@ -21,6 +24,15 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
+
+import static java.util.Collections.emptyMap;
+import static org.elasticsearch.cluster.node.DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE;
+import static org.elasticsearch.cluster.node.DiscoveryNodeRole.TRANSFORM_ROLE;
+import static org.elasticsearch.test.hamcrest.OptionalMatchers.isEmpty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.oneOf;
 
 public class TransformNodesTests extends ESTestCase {
 
@@ -201,4 +213,38 @@ public class TransformNodesTests extends ESTestCase {
         assertEquals(0, transformNodeAssignments.getAssigned().size());
         assertEquals(0, transformNodeAssignments.getStopped().size());
     }
+
+    public void testSelectAnyNodeThatCanRunThisTransform() {
+        DiscoveryNodes nodes = DiscoveryNodes.EMPTY_NODES;
+        assertThat(TransformNodes.selectAnyNodeThatCanRunThisTransform(nodes, true), isEmpty());
+        assertThat(TransformNodes.selectAnyNodeThatCanRunThisTransform(nodes, false), isEmpty());
+
+        nodes =
+            DiscoveryNodes.builder()
+                .add(newDiscoveryNode("node-1", Version.V_7_12_0, TRANSFORM_ROLE, REMOTE_CLUSTER_CLIENT_ROLE))
+                .add(newDiscoveryNode("node-2", Version.V_7_13_0, TRANSFORM_ROLE))
+                .add(newDiscoveryNode("node-3", Version.V_7_13_0, REMOTE_CLUSTER_CLIENT_ROLE))
+                .build();
+        assertThat(TransformNodes.selectAnyNodeThatCanRunThisTransform(nodes, true), isEmpty());
+        assertThat(TransformNodes.selectAnyNodeThatCanRunThisTransform(nodes, false).get().getId(), is(equalTo("node-2")));
+
+        nodes =
+            DiscoveryNodes.builder()
+                .add(newDiscoveryNode("node-1", Version.V_7_12_0, TRANSFORM_ROLE, REMOTE_CLUSTER_CLIENT_ROLE))
+                .add(newDiscoveryNode("node-2", Version.V_7_13_0, TRANSFORM_ROLE))
+                .add(newDiscoveryNode("node-3", Version.V_7_13_0, REMOTE_CLUSTER_CLIENT_ROLE))
+                .add(newDiscoveryNode("node-4", Version.V_7_13_0, TRANSFORM_ROLE, REMOTE_CLUSTER_CLIENT_ROLE))
+                .build();
+        assertThat(TransformNodes.selectAnyNodeThatCanRunThisTransform(nodes, true).get().getId(), is(equalTo("node-4")));
+        assertThat(TransformNodes.selectAnyNodeThatCanRunThisTransform(nodes, false).get().getId(), is(oneOf("node-2", "node-4")));
+    }
+
+    private static DiscoveryNode newDiscoveryNode(String id, Version version, DiscoveryNodeRole... roles) {
+        return new DiscoveryNode(
+            id,
+            buildNewFakeTransportAddress(),
+            emptyMap(),
+            new HashSet<>(Arrays.asList(roles)),
+            version);
+    }
 }