Browse Source

[Transform] implement node.transform to control where to run a transform (#52712)

implement transform node attributes to disable transform on certain nodes and test which nodes are allowed to do remote connections

closes #52200
closes #50033
closes #48734
Hendrik Muhs 5 years ago
parent
commit
563d906a78
15 changed files with 744 additions and 260 deletions
  1. 18 5
      docs/reference/settings/transform-settings.asciidoc
  2. 1 1
      docs/reference/setup.asciidoc
  3. 5 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SourceConfig.java
  4. 26 6
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParams.java
  5. 17 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SourceConfigTests.java
  6. 1 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformTests.java
  7. 62 5
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java
  8. 71 49
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformUsageTransportAction.java
  9. 10 3
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java
  10. 84 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java
  11. 79 0
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java
  12. 30 25
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java
  13. 72 61
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java
  14. 266 100
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java
  15. 2 2
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java

+ 18 - 5
docs/reference/settings/data-frames-settings.asciidoc → docs/reference/settings/transform-settings.asciidoc

@@ -1,6 +1,6 @@
 
 [role="xpack"]
-[[data-frames-settings]]
+[[transform-settings]]
 === {transforms-cap}  settings in Elasticsearch
 [subs="attributes"]
 ++++
@@ -9,17 +9,30 @@
 
 You do not need to configure any settings to use {transforms}. It is enabled by default.
 
-All of these settings can be added to the `elasticsearch.yml` configuration file. 
-The dynamic settings can also be updated across a cluster with the 
+All of these settings can be added to the `elasticsearch.yml` configuration file.
+The dynamic settings can also be updated across a cluster with the
 <<cluster-update-settings,cluster update settings API>>.
 
-TIP: Dynamic settings take precedence over settings in the `elasticsearch.yml` 
+TIP: Dynamic settings take precedence over settings in the `elasticsearch.yml`
 file.
 
 [float]
-[[general-data-frames-settings]]
+[[general-transform-settings]]
 ==== General {transforms} settings
 
+`node.transform`::
+Set to `true` to identify the node as a _transform node_. The default is `false` if
+either `node.data` or `xpack.transform.enabled` is `false` for the node, and `true` otherwise. +
++
+If set to `false` in `elasticsearch.yml`, the node cannot run transforms. If set to
+`true` but `xpack.transform.enabled` is set to `false`, the `node.transform` setting is
+ignored and the node cannot run transforms. If you want to run transforms, there must be at
+least one transform node in your cluster. +
++
+IMPORTANT: It is advised to use the `node.transform` setting to constrain the execution
+of transforms to certain nodes instead of using `xpack.transform.enabled`. On dedicated
+coordinating nodes or dedicated master nodes, disable the node.transform role.
+
 `xpack.transform.enabled`::
 Set to `true` (default) to enable {transforms} on the node. +
 +

+ 1 - 1
docs/reference/setup.asciidoc

@@ -51,7 +51,7 @@ include::settings/audit-settings.asciidoc[]
 
 include::settings/ccr-settings.asciidoc[]
 
-include::settings/data-frames-settings.asciidoc[]
+include::settings/transform-settings.asciidoc[]
 
 include::settings/ilm-settings.asciidoc[]
 

+ 5 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SourceConfig.java

@@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.license.RemoteClusterLicenseChecker;
 import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
 
 import java.io.IOException;
@@ -98,6 +99,10 @@ public class SourceConfig implements Writeable, ToXContentObject {
         return queryConfig.isValid();
     }
 
+    public boolean requiresRemoteCluster() {
+        return Arrays.stream(index).anyMatch(RemoteClusterLicenseChecker::isRemoteIndex);
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeStringArray(index);

+ 26 - 6
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParams.java

@@ -25,29 +25,35 @@ public class TransformTaskParams extends AbstractDiffable<TransformTaskParams> i
 
     public static final String NAME = TransformField.TASK_NAME;
     public static final ParseField FREQUENCY = TransformField.FREQUENCY;
+    public static final ParseField REQUIRES_REMOTE = new ParseField("requires_remote");
 
     private final String transformId;
     private final Version version;
     private final TimeValue frequency;
+    private final Boolean requiresRemote;
 
     public static final ConstructingObjectParser<TransformTaskParams, Void> PARSER = new ConstructingObjectParser<>(NAME, true,
-            a -> new TransformTaskParams((String) a[0], (String) a[1], (String) a[2]));
+            a -> new TransformTaskParams((String) a[0], (String) a[1], (String) a[2], (Boolean) a[3]));
 
     static {
         PARSER.declareString(ConstructingObjectParser.constructorArg(), TransformField.ID);
         PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TransformField.VERSION);
         PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY);
+        PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REQUIRES_REMOTE);
     }
 
-    private TransformTaskParams(String transformId, String version, String frequency) {
+    private TransformTaskParams(String transformId, String version, String frequency, Boolean remote) {
         this(transformId, version == null ? null : Version.fromString(version),
-            frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName()));
+            frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName()),
+                    remote == null ? false : remote.booleanValue()
+                    );
     }
 
-    public TransformTaskParams(String transformId, Version version, TimeValue frequency) {
+    public TransformTaskParams(String transformId, Version version, TimeValue frequency, boolean remote) {
         this.transformId = transformId;
         this.version = version == null ? Version.V_7_2_0 : version;
         this.frequency = frequency;
+        this.requiresRemote = remote;
     }
 
     public TransformTaskParams(StreamInput in) throws IOException {
@@ -62,6 +68,11 @@ public class TransformTaskParams extends AbstractDiffable<TransformTaskParams> i
         } else {
             this.frequency = null;
         }
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_7_0
+            this.requiresRemote = in.readBoolean();
+        } else {
+            this.requiresRemote = false;
+        }
     }
 
     @Override
@@ -83,6 +94,9 @@ public class TransformTaskParams extends AbstractDiffable<TransformTaskParams> i
         if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
             out.writeOptionalTimeValue(frequency);
         }
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_7_0
+            out.writeBoolean(requiresRemote);
+        }
     }
 
     @Override
@@ -93,6 +107,7 @@ public class TransformTaskParams extends AbstractDiffable<TransformTaskParams> i
         if (frequency != null) {
             builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
         }
+        builder.field(REQUIRES_REMOTE.getPreferredName(), requiresRemote);
         builder.endObject();
         return builder;
     }
@@ -109,6 +124,10 @@ public class TransformTaskParams extends AbstractDiffable<TransformTaskParams> i
         return frequency;
     }
 
+    public boolean requiresRemote() {
+        return requiresRemote;
+    }
+
     public static TransformTaskParams fromXContent(XContentParser parser) throws IOException {
         return PARSER.parse(parser, null);
     }
@@ -127,11 +146,12 @@ public class TransformTaskParams extends AbstractDiffable<TransformTaskParams> i
 
         return Objects.equals(this.transformId, that.transformId)
             && Objects.equals(this.version, that.version)
-            && Objects.equals(this.frequency, that.frequency);
+            && Objects.equals(this.frequency, that.frequency)
+            && this.requiresRemote == that.requiresRemote;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(transformId, version, frequency);
+        return Objects.hash(transformId, version, frequency, requiresRemote);
     }
 }

+ 17 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SourceConfigTests.java

@@ -59,4 +59,21 @@ public class SourceConfigTests extends AbstractSerializingTransformTestCase<Sour
         return SourceConfig::new;
     }
 
+    public void testRequiresRemoteCluster() {
+        assertFalse(new SourceConfig(new String [] {"index1", "index2", "index3"},
+                QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());
+
+        assertTrue(new SourceConfig(new String [] {"index1", "remote2:index2", "index3"},
+                QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());
+
+        assertTrue(new SourceConfig(new String [] {"index1", "index2", "remote3:index3"},
+                QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());
+
+        assertTrue(new SourceConfig(new String [] {"index1", "remote2:index2", "remote3:index3"},
+                QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());
+
+        assertTrue(new SourceConfig(new String [] {"remote1:index1"},
+                QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());
+    }
+
 }

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformTests.java

@@ -27,7 +27,7 @@ public class TransformTests extends AbstractSerializingTransformTestCase<Transfo
     @Override
     protected TransformTaskParams createTestInstance() {
         return new TransformTaskParams(randomAlphaOfLength(10), randomBoolean() ? null : Version.CURRENT,
-            randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)));
+            randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), randomBoolean());
     }
 
     @Override

+ 62 - 5
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java

@@ -14,12 +14,14 @@ import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.IndexScopedSettings;
 import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
 import org.elasticsearch.common.settings.SettingsModule;
@@ -39,6 +41,7 @@ import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.threadpool.ExecutorBuilder;
 import org.elasticsearch.threadpool.FixedExecutorBuilder;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.RemoteClusterService;
 import org.elasticsearch.watcher.ResourceWatcherService;
 import org.elasticsearch.xpack.core.XPackPlugin;
 import org.elasticsearch.xpack.core.XPackSettings;
@@ -137,6 +140,23 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
         Setting.Property.Dynamic
     );
 
+    /**
+     * Node attributes for transform, automatically created and retrievable via cluster state.
+     * These attributes should never be set directly, use the node setting counter parts instead.
+     */
+    public static final String TRANSFORM_ENABLED_NODE_ATTR = "transform.node";
+    public static final String TRANSFORM_REMOTE_ENABLED_NODE_ATTR = "transform.remote_connect";
+
+    /**
+     * Setting whether transform (the coordinator task) can run on this node and REST API's are available,
+     * respects xpack.transform.enabled (for the whole plugin) as fallback
+     */
+    public static final Setting<Boolean> TRANSFORM_ENABLED_NODE = Setting.boolSetting(
+        "node.transform",
+        settings -> Boolean.toString(XPackSettings.TRANSFORM_ENABLED.get(settings) && DiscoveryNode.isDataNode(settings)),
+        Property.NodeScope
+    );
+
     public Transform(Settings settings) {
         this.settings = settings;
         this.enabled = XPackSettings.TRANSFORM_ENABLED.get(settings);
@@ -222,8 +242,14 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
             return emptyList();
         }
 
-        FixedExecutorBuilder indexing = new FixedExecutorBuilder(settings, TASK_THREAD_POOL_NAME, 4, 4, "transform.task_thread_pool",
-            false);
+        FixedExecutorBuilder indexing = new FixedExecutorBuilder(
+            settings,
+            TASK_THREAD_POOL_NAME,
+            4,
+            4,
+            "transform.task_thread_pool",
+            false
+        );
 
         return Collections.singletonList(indexing);
     }
@@ -296,13 +322,44 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
         // the transform services should have been created
         assert transformServices.get() != null;
 
-        return Collections.singletonList(new TransformPersistentTasksExecutor(client, transformServices.get(), threadPool, clusterService,
-            settingsModule.getSettings(), expressionResolver));
+        return Collections.singletonList(
+            new TransformPersistentTasksExecutor(
+                client,
+                transformServices.get(),
+                threadPool,
+                clusterService,
+                settingsModule.getSettings(),
+                expressionResolver
+            )
+        );
     }
 
     @Override
     public List<Setting<?>> getSettings() {
-        return Collections.singletonList(NUM_FAILURE_RETRIES_SETTING);
+        return Collections.unmodifiableList(Arrays.asList(TRANSFORM_ENABLED_NODE, NUM_FAILURE_RETRIES_SETTING));
+    }
+
+    @Override
+    public Settings additionalSettings() {
+        String transformEnabledNodeAttribute = "node.attr." + TRANSFORM_ENABLED_NODE_ATTR;
+        String transformRemoteEnabledNodeAttribute = "node.attr." + TRANSFORM_REMOTE_ENABLED_NODE_ATTR;
+
+        if (settings.get(transformEnabledNodeAttribute) != null || settings.get(transformRemoteEnabledNodeAttribute) != null) {
+            throw new IllegalArgumentException(
+                "Directly setting transform node attributes is not permitted, please use the documented node settings instead"
+            );
+        }
+
+        if (enabled == false) {
+            return Settings.EMPTY;
+        }
+
+        Settings.Builder additionalSettings = Settings.builder();
+
+        additionalSettings.put(transformEnabledNodeAttribute, TRANSFORM_ENABLED_NODE.get(settings));
+        additionalSettings.put(transformRemoteEnabledNodeAttribute, RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings));
+
+        return additionalSettings.build();
     }
 
     @Override

+ 71 - 49
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformUsageTransportAction.java

@@ -54,19 +54,36 @@ public class TransformUsageTransportAction extends XPackUsageFeatureTransportAct
     private final Client client;
 
     @Inject
-    public TransformUsageTransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
-                                         ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
-                                         Settings settings, XPackLicenseState licenseState, Client client) {
-        super(XPackUsageFeatureAction.TRANSFORM.name(), transportService, clusterService,
-            threadPool, actionFilters, indexNameExpressionResolver);
+    public TransformUsageTransportAction(
+        TransportService transportService,
+        ClusterService clusterService,
+        ThreadPool threadPool,
+        ActionFilters actionFilters,
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        Settings settings,
+        XPackLicenseState licenseState,
+        Client client
+    ) {
+        super(
+            XPackUsageFeatureAction.TRANSFORM.name(),
+            transportService,
+            clusterService,
+            threadPool,
+            actionFilters,
+            indexNameExpressionResolver
+        );
         this.enabled = XPackSettings.TRANSFORM_ENABLED.get(settings);
         this.licenseState = licenseState;
         this.client = client;
     }
 
     @Override
-    protected void masterOperation(Task task, XPackUsageRequest request, ClusterState state,
-                                   ActionListener<XPackUsageFeatureResponse> listener) {
+    protected void masterOperation(
+        Task task,
+        XPackUsageRequest request,
+        ClusterState state,
+        ActionListener<XPackUsageFeatureResponse> listener
+    ) {
         boolean available = licenseState.isTransformAllowed();
         if (enabled == false) {
             var usage = new TransformFeatureSetUsage(available, enabled, Collections.emptyMap(), new TransformIndexerStats());
@@ -75,61 +92,66 @@ public class TransformUsageTransportAction extends XPackUsageFeatureTransportAct
         }
 
         PersistentTasksCustomMetaData taskMetadata = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(state);
-        Collection<PersistentTasksCustomMetaData.PersistentTask<?>> transformTasks = taskMetadata == null ?
-            Collections.emptyList() :
-            taskMetadata.findTasks(TransformTaskParams.NAME, (t) -> true);
+        Collection<PersistentTasksCustomMetaData.PersistentTask<?>> transformTasks = taskMetadata == null
+            ? Collections.emptyList()
+            : taskMetadata.findTasks(TransformTaskParams.NAME, (t) -> true);
         final int taskCount = transformTasks.size();
         final Map<String, Long> transformsCountByState = new HashMap<>();
-        for(PersistentTasksCustomMetaData.PersistentTask<?> transformTask : transformTasks) {
-            TransformState transformState = (TransformState)transformTask.getState();
-            transformsCountByState.merge(transformState.getTaskState().value(), 1L, Long::sum);
+        for (PersistentTasksCustomMetaData.PersistentTask<?> transformTask : transformTasks) {
+            TransformState transformState = (TransformState) transformTask.getState();
+            TransformTaskState taskState = transformState.getTaskState();
+            if (taskState != null) {
+                transformsCountByState.merge(taskState.value(), 1L, Long::sum);
+            }
         }
 
-        ActionListener<TransformIndexerStats> totalStatsListener = ActionListener.wrap(
-            statSummations -> {
-                var usage = new TransformFeatureSetUsage(available, enabled, transformsCountByState, statSummations);
-                listener.onResponse(new XPackUsageFeatureResponse(usage));
-            },
-            listener::onFailure
-        );
+        ActionListener<TransformIndexerStats> totalStatsListener = ActionListener.wrap(statSummations -> {
+            var usage = new TransformFeatureSetUsage(available, enabled, transformsCountByState, statSummations);
+            listener.onResponse(new XPackUsageFeatureResponse(usage));
+        }, listener::onFailure);
 
-        ActionListener<SearchResponse> totalTransformCountListener = ActionListener.wrap(
-            transformCountSuccess -> {
-                if (transformCountSuccess.getShardFailures().length > 0) {
-                    logger.error("total transform count search returned shard failures: {}",
-                        Arrays.toString(transformCountSuccess.getShardFailures()));
-                }
-                long totalTransforms = transformCountSuccess.getHits().getTotalHits().value;
-                if (totalTransforms == 0) {
-                    var usage = new TransformFeatureSetUsage(available, enabled, transformsCountByState,
-                        new TransformIndexerStats());
-                    listener.onResponse(new XPackUsageFeatureResponse(usage));
-                    return;
-                }
-                transformsCountByState.merge(TransformTaskState.STOPPED.value(), totalTransforms - taskCount, Long::sum);
+        ActionListener<SearchResponse> totalTransformCountListener = ActionListener.wrap(transformCountSuccess -> {
+            if (transformCountSuccess.getShardFailures().length > 0) {
+                logger.error(
+                    "total transform count search returned shard failures: {}",
+                    Arrays.toString(transformCountSuccess.getShardFailures())
+                );
+            }
+            long totalTransforms = transformCountSuccess.getHits().getTotalHits().value;
+            if (totalTransforms == 0) {
+                var usage = new TransformFeatureSetUsage(available, enabled, transformsCountByState, new TransformIndexerStats());
+                listener.onResponse(new XPackUsageFeatureResponse(usage));
+                return;
+            }
+            transformsCountByState.merge(TransformTaskState.STOPPED.value(), totalTransforms - taskCount, Long::sum);
+            TransformInfoTransportAction.getStatisticSummations(client, totalStatsListener);
+        }, transformCountFailure -> {
+            if (transformCountFailure instanceof ResourceNotFoundException) {
                 TransformInfoTransportAction.getStatisticSummations(client, totalStatsListener);
-            },
-            transformCountFailure -> {
-                if (transformCountFailure instanceof ResourceNotFoundException) {
-                    TransformInfoTransportAction.getStatisticSummations(client, totalStatsListener);
-                } else {
-                    listener.onFailure(transformCountFailure);
-                }
+            } else {
+                listener.onFailure(transformCountFailure);
             }
-        );
+        });
 
-        SearchRequest totalTransformCount = client
-            .prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
-                TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
+        SearchRequest totalTransformCount = client.prepareSearch(
+            TransformInternalIndexConstants.INDEX_NAME_PATTERN,
+            TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
+        )
             .setTrackTotalHits(true)
-            .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
-                .filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformConfig.NAME))))
+            .setQuery(
+                QueryBuilders.constantScoreQuery(
+                    QueryBuilders.boolQuery()
+                        .filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformConfig.NAME))
+                )
+            )
             .request();
 
-        ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
+        ClientHelper.executeAsyncWithOrigin(
+            client.threadPool().getThreadContext(),
             ClientHelper.TRANSFORM_ORIGIN,
             totalTransformCount,
             totalTransformCountListener,
-            client::search);
+            client::search
+        );
     }
 }

+ 10 - 3
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java

@@ -265,7 +265,9 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
                 );
                 return;
             }
-            transformTaskHolder.set(createTransform(config.getId(), config.getVersion(), config.getFrequency()));
+            transformTaskHolder.set(
+                createTransform(config.getId(), config.getVersion(), config.getFrequency(), config.getSource().requiresRemoteCluster())
+            );
             transformConfigHolder.set(config);
             if (config.getDestination().getPipeline() != null) {
                 if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
@@ -311,8 +313,13 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
         return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
     }
 
-    private static TransformTaskParams createTransform(String transformId, Version transformVersion, TimeValue frequency) {
-        return new TransformTaskParams(transformId, transformVersion, frequency);
+    private static TransformTaskParams createTransform(
+        String transformId,
+        Version transformVersion,
+        TimeValue frequency,
+        Boolean requiresRemoteCluster
+    ) {
+        return new TransformTaskParams(transformId, transformVersion, frequency, requiresRemoteCluster);
     }
 
     @SuppressWarnings("unchecked")

+ 84 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java

@@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.LatchedActionListener;
 import org.elasticsearch.action.support.IndicesOptions;
@@ -49,8 +50,10 @@ import org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<TransformTaskParams> {
 
@@ -100,9 +103,88 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
         }
         DiscoveryNode discoveryNode = selectLeastLoadedNode(
             clusterState,
-            (node) -> node.isDataNode() && node.getVersion().onOrAfter(params.getVersion())
+            (node) -> node.getVersion().onOrAfter(Version.V_8_0_0)
+                ? nodeCanRunThisTransform(node, params, null)
+                : nodeCanRunThisTransformPre77(node, params, null)
         );
-        return discoveryNode == null ? NO_NODE_FOUND : new PersistentTasksCustomMetaData.Assignment(discoveryNode.getId(), "");
+
+        if (discoveryNode == null) {
+            Map<String, String> explainWhyAssignmentFailed = new TreeMap<>();
+            for (DiscoveryNode node : clusterState.getNodes()) {
+                if (node.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_7_0, remove from 8.0
+                    nodeCanRunThisTransform(node, params, explainWhyAssignmentFailed);
+                } else {
+                    nodeCanRunThisTransformPre77(node, params, explainWhyAssignmentFailed);
+                }
+            }
+            String reason = "Not starting transform ["
+                + params.getId()
+                + "], reasons ["
+                + explainWhyAssignmentFailed.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining("|"))
+                + "]";
+
+            logger.debug(reason);
+            return new PersistentTasksCustomMetaData.Assignment(null, reason);
+        }
+
+        return new PersistentTasksCustomMetaData.Assignment(discoveryNode.getId(), "");
+    }
+
+    // todo: this can be removed for 8.0 after backport
+    public static boolean nodeCanRunThisTransformPre77(DiscoveryNode node, TransformTaskParams params, Map<String, String> explain) {
+        if (node.isDataNode() == false) {
+            if (explain != null) {
+                explain.put(node.getId(), "not a data node");
+            }
+            return false;
+        }
+
+        // version of the transform run on a node that has at least the same version
+        if (node.getVersion().onOrAfter(params.getVersion()) == false) {
+            if (explain != null) {
+                explain.put(
+                    node.getId(),
+                    "node has version: " + node.getVersion() + " but transform requires at least " + params.getVersion()
+                );
+            }
+            return false;
+        }
+
+        return true;
+    }
+
+    public static boolean nodeCanRunThisTransform(DiscoveryNode node, TransformTaskParams params, Map<String, String> explain) {
+        // version of the transform run on a node that has at least the same version
+        if (node.getVersion().onOrAfter(params.getVersion()) == false) {
+            if (explain != null) {
+                explain.put(
+                    node.getId(),
+                    "node has version: " + node.getVersion() + " but transform requires at least " + params.getVersion()
+                );
+            }
+            return false;
+        }
+
+        final Map<String, String> nodeAttributes = node.getAttributes();
+
+        // transform enabled?
+        if (Boolean.parseBoolean(nodeAttributes.get(Transform.TRANSFORM_ENABLED_NODE_ATTR)) == false) {
+            if (explain != null) {
+                explain.put(node.getId(), "not a transform node");
+            }
+            return false;
+        }
+
+        // does the transform require a remote and remote is enabled?
+        if (params.requiresRemote() && Boolean.parseBoolean(nodeAttributes.get(Transform.TRANSFORM_REMOTE_ENABLED_NODE_ATTR)) == false) {
+            if (explain != null) {
+                explain.put(node.getId(), "transform requires a remote connection but remote is disabled");
+            }
+            return false;
+        }
+
+        // we found no reason that the transform can not run on this node
+        return true;
     }
 
     static List<String> verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, IndexNameExpressionResolver resolver) {

+ 79 - 0
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java

@@ -0,0 +1,79 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.transform;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
+
+public class TransformTests extends ESTestCase {
+
+    public void testNodeAttributes() {
+        Settings.Builder builder = Settings.builder();
+        boolean transformEnabled = true;
+        boolean remoteEnabled = true;
+
+        if (randomBoolean()) {
+            transformEnabled = randomBoolean();
+            if (randomBoolean()) {
+                builder.put("node.transform", transformEnabled);
+                if (randomBoolean()) {
+                    // note: the case where node.transform: true and xpack.transform.enabled: false is benign
+                    builder.put("xpack.transform.enabled", randomBoolean());
+                }
+            } else {
+                builder.put("xpack.transform.enabled", transformEnabled);
+            }
+        }
+
+        if (randomBoolean()) {
+            remoteEnabled = randomBoolean();
+            builder.put("cluster.remote.connect", remoteEnabled);
+        }
+
+        builder.put("node.attr.some_other_attrib", "value");
+        Transform transform = createTransform(builder.build());
+        assertNotNull(transform.additionalSettings());
+        assertEquals(transformEnabled, Boolean.parseBoolean(transform.additionalSettings().get("node.attr.transform.node")));
+        assertEquals(
+            transformEnabled && remoteEnabled,
+            Boolean.parseBoolean(transform.additionalSettings().get("node.attr.transform.remote_connect"))
+        );
+    }
+
+    public void testNodeAttributesDirectlyGiven() {
+        Settings.Builder builder = Settings.builder();
+
+        if (randomBoolean()) {
+            builder.put("node.attr.transform.node", randomBoolean());
+        } else {
+            builder.put("node.attr.transform.remote_connect", randomBoolean());
+        }
+
+        Transform transform = createTransform(builder.build());
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> transform.additionalSettings());
+        assertThat(
+            e.getMessage(),
+            equalTo("Directly setting transform node attributes is not permitted, please use the documented node settings instead")
+        );
+    }
+
+    private Transform createTransform(Settings settings) {
+        XPackLicenseState licenseState = mock(XPackLicenseState.class);
+
+        return new Transform(settings) {
+            @Override
+            protected XPackLicenseState getLicenseState() {
+                return licenseState;
+            }
+        };
+    }
+
+}

+ 30 - 25
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java

@@ -30,38 +30,43 @@ public class TransformNodesTests extends ESTestCase {
         String transformIdBar = "df-id-bar";
 
         PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
-        tasksBuilder.addTask(transformIdFoo,
-                TransformField.TASK_NAME, new TransformTaskParams(transformIdFoo, Version.CURRENT, null),
-                new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
-        tasksBuilder.addTask(transformIdBar,
-                TransformField.TASK_NAME, new TransformTaskParams(transformIdBar, Version.CURRENT, null),
-                new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment"));
+        tasksBuilder.addTask(
+            transformIdFoo,
+            TransformField.TASK_NAME,
+            new TransformTaskParams(transformIdFoo, Version.CURRENT, null, false),
+            new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")
+        );
+        tasksBuilder.addTask(
+            transformIdBar,
+            TransformField.TASK_NAME,
+            new TransformTaskParams(transformIdBar, Version.CURRENT, null, false),
+            new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment")
+        );
         tasksBuilder.addTask("test-task1", "testTasks", new PersistentTaskParams() {
-                @Override
-                public String getWriteableName() {
-                    return "testTasks";
-                }
+            @Override
+            public String getWriteableName() {
+                return "testTasks";
+            }
 
-                @Override
-                public Version getMinimalSupportedVersion() {
-                    return null;
-                }
+            @Override
+            public Version getMinimalSupportedVersion() {
+                return null;
+            }
 
-                @Override
-                public void writeTo(StreamOutput out) {
+            @Override
+            public void writeTo(StreamOutput out) {
 
-                }
+            }
 
-                @Override
-                public XContentBuilder toXContent(XContentBuilder builder, Params params) {
-                    return null;
-                }
-            },
-            new PersistentTasksCustomMetaData.Assignment("node-3", "test assignment"));
+            @Override
+            public XContentBuilder toXContent(XContentBuilder builder, Params params) {
+                return null;
+            }
+        }, new PersistentTasksCustomMetaData.Assignment("node-3", "test assignment"));
 
         ClusterState cs = ClusterState.builder(new ClusterName("_name"))
-                .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
-                .build();
+            .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
+            .build();
 
         String[] nodes = TransformNodes.transformTaskNodes(Arrays.asList(transformIdFoo, transformIdBar), cs);
         assertEquals(2, nodes.length);

+ 72 - 61
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java

@@ -17,8 +17,8 @@ import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.transform.TransformMessages;
-import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
 import org.elasticsearch.xpack.core.transform.transforms.TransformState;
+import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
 
 import java.util.ArrayList;
@@ -48,35 +48,35 @@ public class TransportStopTransformActionTests extends ESTestCase {
     public void testTaskStateValidationWithTransformTasks() {
         // Test with the task state being null
         PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder()
-            .addTask("non-failed-task",
+            .addTask(
+                "non-failed-task",
                 TransformTaskParams.NAME,
-                new TransformTaskParams("transform-task-1", Version.CURRENT, null),
-                new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", ""));
+                new TransformTaskParams("transform-task-1", Version.CURRENT, null, false),
+                new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "")
+            );
         ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
 
         TransportStopTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);
 
         // test again with a non failed task but this time it has internal state
-        pTasksBuilder.updateTaskState("non-failed-task", new TransformState(TransformTaskState.STOPPED,
-            IndexerState.STOPPED,
-            null,
-            0L,
-            null,
-            null));
+        pTasksBuilder.updateTaskState(
+            "non-failed-task",
+            new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null, null)
+        );
         csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
 
         TransportStopTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);
 
-        pTasksBuilder.addTask("failed-task",
+        pTasksBuilder.addTask(
+            "failed-task",
             TransformTaskParams.NAME,
-            new TransformTaskParams("transform-task-1", Version.CURRENT, null),
-            new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", ""))
-            .updateTaskState("failed-task", new TransformState(TransformTaskState.FAILED,
-                IndexerState.STOPPED,
-                null,
-                0L,
-                "task has failed",
-                null));
+            new TransformTaskParams("transform-task-1", Version.CURRENT, null, false),
+            new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "")
+        )
+            .updateTaskState(
+                "failed-task",
+                new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0L, "task has failed", null)
+            );
         csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
 
         TransportStopTransformAction.validateTaskState(csBuilder.build(), Arrays.asList("non-failed-task", "failed-task"), true);
@@ -84,51 +84,59 @@ public class TransportStopTransformActionTests extends ESTestCase {
         TransportStopTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false);
 
         ClusterState.Builder csBuilderFinal = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build()));
-        ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class,
-            () -> TransportStopTransformAction.validateTaskState(csBuilderFinal.build(),
-                Collections.singletonList("failed-task"),
-                false));
+        ElasticsearchStatusException ex = expectThrows(
+            ElasticsearchStatusException.class,
+            () -> TransportStopTransformAction.validateTaskState(csBuilderFinal.build(), Collections.singletonList("failed-task"), false)
+        );
 
         assertThat(ex.status(), equalTo(CONFLICT));
-        assertThat(ex.getMessage(),
-            equalTo(TransformMessages.getMessage(TransformMessages.CANNOT_STOP_FAILED_TRANSFORM,
-                "failed-task",
-                "task has failed")));
+        assertThat(
+            ex.getMessage(),
+            equalTo(TransformMessages.getMessage(TransformMessages.CANNOT_STOP_FAILED_TRANSFORM, "failed-task", "task has failed"))
+        );
     }
 
     public void testFirstNotOKStatus() {
         List<ElasticsearchException> nodeFailures = new ArrayList<>();
         List<TaskOperationFailure> taskOperationFailures = new ArrayList<>();
 
-        nodeFailures.add(new ElasticsearchException("nodefailure",
-            new ElasticsearchStatusException("failure", RestStatus.UNPROCESSABLE_ENTITY)));
-        taskOperationFailures.add(new TaskOperationFailure("node",
-            1,
-            new ElasticsearchStatusException("failure", RestStatus.BAD_REQUEST)));
-
-        assertThat(TransportStopTransformAction.firstNotOKStatus(Collections.emptyList(), Collections.emptyList()),
-            equalTo(RestStatus.INTERNAL_SERVER_ERROR));
-
-        assertThat(TransportStopTransformAction.firstNotOKStatus(taskOperationFailures, Collections.emptyList()),
-            equalTo(RestStatus.BAD_REQUEST));
-        assertThat(TransportStopTransformAction.firstNotOKStatus(taskOperationFailures, nodeFailures),
-            equalTo(RestStatus.BAD_REQUEST));
-        assertThat(TransportStopTransformAction.firstNotOKStatus(taskOperationFailures,
-            Collections.singletonList(new ElasticsearchException(new ElasticsearchStatusException("not failure", RestStatus.OK)))),
-            equalTo(RestStatus.BAD_REQUEST));
-
-        assertThat(TransportStopTransformAction.firstNotOKStatus(
-            Collections.singletonList(new TaskOperationFailure(
-                "node",
-                1,
-                new ElasticsearchStatusException("not failure", RestStatus.OK))),
-            nodeFailures),
-            equalTo(RestStatus.INTERNAL_SERVER_ERROR));
-
-        assertThat(TransportStopTransformAction.firstNotOKStatus(
-            Collections.emptyList(),
-            nodeFailures),
-            equalTo(RestStatus.INTERNAL_SERVER_ERROR));
+        nodeFailures.add(
+            new ElasticsearchException("nodefailure", new ElasticsearchStatusException("failure", RestStatus.UNPROCESSABLE_ENTITY))
+        );
+        taskOperationFailures.add(new TaskOperationFailure("node", 1, new ElasticsearchStatusException("failure", RestStatus.BAD_REQUEST)));
+
+        assertThat(
+            TransportStopTransformAction.firstNotOKStatus(Collections.emptyList(), Collections.emptyList()),
+            equalTo(RestStatus.INTERNAL_SERVER_ERROR)
+        );
+
+        assertThat(
+            TransportStopTransformAction.firstNotOKStatus(taskOperationFailures, Collections.emptyList()),
+            equalTo(RestStatus.BAD_REQUEST)
+        );
+        assertThat(TransportStopTransformAction.firstNotOKStatus(taskOperationFailures, nodeFailures), equalTo(RestStatus.BAD_REQUEST));
+        assertThat(
+            TransportStopTransformAction.firstNotOKStatus(
+                taskOperationFailures,
+                Collections.singletonList(new ElasticsearchException(new ElasticsearchStatusException("not failure", RestStatus.OK)))
+            ),
+            equalTo(RestStatus.BAD_REQUEST)
+        );
+
+        assertThat(
+            TransportStopTransformAction.firstNotOKStatus(
+                Collections.singletonList(
+                    new TaskOperationFailure("node", 1, new ElasticsearchStatusException("not failure", RestStatus.OK))
+                ),
+                nodeFailures
+            ),
+            equalTo(RestStatus.INTERNAL_SERVER_ERROR)
+        );
+
+        assertThat(
+            TransportStopTransformAction.firstNotOKStatus(Collections.emptyList(), nodeFailures),
+            equalTo(RestStatus.INTERNAL_SERVER_ERROR)
+        );
     }
 
     public void testBuildException() {
@@ -136,13 +144,16 @@ public class TransportStopTransformActionTests extends ESTestCase {
         List<TaskOperationFailure> taskOperationFailures = new ArrayList<>();
 
         nodeFailures.add(new ElasticsearchException("node failure"));
-        taskOperationFailures.add(new TaskOperationFailure("node",
-            1,
-            new ElasticsearchStatusException("task failure", RestStatus.BAD_REQUEST)));
+        taskOperationFailures.add(
+            new TaskOperationFailure("node", 1, new ElasticsearchStatusException("task failure", RestStatus.BAD_REQUEST))
+        );
 
         RestStatus status = CONFLICT;
-        ElasticsearchStatusException statusException =
-            TransportStopTransformAction.buildException(taskOperationFailures, nodeFailures, status);
+        ElasticsearchStatusException statusException = TransportStopTransformAction.buildException(
+            taskOperationFailures,
+            nodeFailures,
+            status
+        );
 
         assertThat(statusException.status(), equalTo(status));
         assertThat(statusException.getMessage(), equalTo(taskOperationFailures.get(0).getCause().getMessage()));

+ 266 - 100
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java

@@ -28,6 +28,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
+import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
@@ -41,8 +42,12 @@ import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigMa
 import org.elasticsearch.xpack.transform.persistence.TransformInternalIndexTests;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -52,114 +57,115 @@ import static org.mockito.Mockito.when;
 public class TransformPersistentTasksExecutorTests extends ESTestCase {
 
     public void testNodeVersionAssignment() {
-        MetaData.Builder metaData = MetaData.builder();
-        RoutingTable.Builder routingTable = RoutingTable.builder();
-        addIndices(metaData, routingTable);
-        PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder()
-            .addTask(
-                "transform-task-1",
-                TransformTaskParams.NAME,
-                new TransformTaskParams("transform-task-1", Version.CURRENT, null),
-                new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "")
-            )
-            .addTask(
-                "transform-task-2",
-                TransformTaskParams.NAME,
-                new TransformTaskParams("transform-task-2", Version.CURRENT, null),
-                new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", "")
-            )
-            .addTask(
-                "transform-task-3",
-                TransformTaskParams.NAME,
-                new TransformTaskParams("transform-task-3", Version.CURRENT, null),
-                new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", "")
-            );
+        DiscoveryNodes.Builder nodes = buildNodes(false, true, true, true, true);
+        ClusterState cs = buildClusterState(nodes);
+        TransformPersistentTasksExecutor executor = buildTaskExecutor();
 
-        PersistentTasksCustomMetaData pTasks = pTasksBuilder.build();
+        assertThat(
+            executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, true), cs).getExecutorNode(),
+            equalTo("current-data-node-with-1-tasks")
+        );
+        assertThat(
+            executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, false), cs).getExecutorNode(),
+            equalTo("current-data-node-with-0-tasks-transform-remote-disabled")
+        );
+        assertThat(
+            executor.getAssignment(new TransformTaskParams("new-old-task-id", Version.V_7_5_0, null, true), cs).getExecutorNode(),
+            equalTo("past-data-node-1")
+        );
+    }
 
-        metaData.putCustom(PersistentTasksCustomMetaData.TYPE, pTasks);
+    public void testNodeAssignmentProblems() {
+        // no data nodes
+        DiscoveryNodes.Builder nodes = buildNodes(false, false, false, false, true);
+        ClusterState cs = buildClusterState(nodes);
+        TransformPersistentTasksExecutor executor = buildTaskExecutor();
 
-        DiscoveryNodes.Builder nodes = DiscoveryNodes.builder()
-            .add(
-                new DiscoveryNode(
-                    "past-data-node-1",
-                    buildNewFakeTransportAddress(),
-                    Collections.emptyMap(),
-                    Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE),
-                    Version.V_7_2_0
-                )
-            )
-            .add(
-                new DiscoveryNode(
-                    "current-data-node-with-2-tasks",
-                    buildNewFakeTransportAddress(),
-                    Collections.emptyMap(),
-                    Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE),
-                    Version.CURRENT
-                )
-            )
-            .add(
-                new DiscoveryNode(
-                    "non-data-node-1",
-                    buildNewFakeTransportAddress(),
-                    Collections.emptyMap(),
-                    Set.of(DiscoveryNodeRole.MASTER_ROLE),
-                    Version.CURRENT
-                )
-            )
-            .add(
-                new DiscoveryNode(
-                    "current-data-node-with-1-tasks",
-                    buildNewFakeTransportAddress(),
-                    Collections.emptyMap(),
-                    Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE),
-                    Version.CURRENT
-                )
-            );
+        Assignment assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, false), cs);
+        assertNull(assignment.getExecutorNode());
+        assertThat(
+            assignment.getExplanation(),
+            equalTo("Not starting transform [new-task-id], reasons [current-data-node-with-transform-disabled:not a transform node]")
+        );
 
-        ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).nodes(nodes);
-        csBuilder.routingTable(routingTable.build());
-        csBuilder.metaData(metaData);
+        // dedicated transform node
+        nodes = buildNodes(true, false, false, false, true);
+        cs = buildClusterState(nodes);
+        executor = buildTaskExecutor();
 
-        ClusterState cs = csBuilder.build();
-        Client client = mock(Client.class);
-        TransformAuditor mockAuditor = mock(TransformAuditor.class);
-        IndexBasedTransformConfigManager transformsConfigManager = new IndexBasedTransformConfigManager(client, xContentRegistry());
-        TransformCheckpointService transformCheckpointService = new TransformCheckpointService(
-            client,
-            Settings.EMPTY,
-            new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null),
-            transformsConfigManager,
-            mockAuditor
-        );
-        TransformServices transformServices = new TransformServices(
-            transformsConfigManager,
-            transformCheckpointService,
-            mockAuditor,
-            mock(SchedulerEngine.class)
-        );
+        assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, false), cs);
+        assertNotNull(assignment.getExecutorNode());
+        assertThat(assignment.getExecutorNode(), equalTo("dedicated-transform-node"));
 
-        ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY, Collections.singleton(Transform.NUM_FAILURE_RETRIES_SETTING));
-        ClusterService clusterService = mock(ClusterService.class);
-        when(clusterService.getClusterSettings()).thenReturn(cSettings);
-        when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE);
-        TransformPersistentTasksExecutor executor = new TransformPersistentTasksExecutor(
-            client,
-            transformServices,
-            mock(ThreadPool.class),
-            clusterService,
-            Settings.EMPTY,
-            new IndexNameExpressionResolver()
+        // only an old node
+        nodes = buildNodes(false, true, false, false, true);
+        cs = buildClusterState(nodes);
+        executor = buildTaskExecutor();
+
+        assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_7_7_0, null, false), cs);
+        assertNull(assignment.getExecutorNode());
+        assertThat(
+            assignment.getExplanation(),
+            equalTo(
+                "Not starting transform [new-task-id], reasons ["
+                    + "current-data-node-with-transform-disabled:not a transform node"
+                    + "|"
+                    + "past-data-node-1:node has version: 7.5.0 but transform requires at least 7.7.0"
+                    + "]"
+            )
         );
 
+        assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_7_5_0, null, false), cs);
+        assertNotNull(assignment.getExecutorNode());
+        assertThat(assignment.getExecutorNode(), equalTo("past-data-node-1"));
+
+        // no remote
+        nodes = buildNodes(false, false, false, true, false);
+        cs = buildClusterState(nodes);
+        executor = buildTaskExecutor();
+
+        assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_7_5_0, null, true), cs);
+        assertNull(assignment.getExecutorNode());
         assertThat(
-            executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null), cs).getExecutorNode(),
-            equalTo("current-data-node-with-1-tasks")
+            assignment.getExplanation(),
+            equalTo(
+                "Not starting transform [new-task-id], reasons ["
+                    + "current-data-node-with-0-tasks-transform-remote-disabled:"
+                    + "transform requires a remote connection but remote is disabled"
+                    + "]"
+            )
         );
+
+        assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, false), cs);
+        assertNotNull(assignment.getExecutorNode());
+        assertThat(assignment.getExecutorNode(), equalTo("current-data-node-with-0-tasks-transform-remote-disabled"));
+
+        // no remote and disabled
+        nodes = buildNodes(false, false, false, true, true);
+        cs = buildClusterState(nodes);
+        executor = buildTaskExecutor();
+
+        assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_7_5_0, null, true), cs);
+        assertNull(assignment.getExecutorNode());
         assertThat(
-            executor.getAssignment(new TransformTaskParams("new-old-task-id", Version.V_7_2_0, null), cs).getExecutorNode(),
-            equalTo("past-data-node-1")
+            assignment.getExplanation(),
+            equalTo(
+                "Not starting transform [new-task-id], reasons ["
+                    + "current-data-node-with-0-tasks-transform-remote-disabled:"
+                    + "transform requires a remote connection but remote is disabled"
+                    + "|"
+                    + "current-data-node-with-transform-disabled:not a transform node"
+                    + "]"
+            )
         );
+        // old node, we do not know if remote is enabled
+        nodes = buildNodes(false, true, false, true, false);
+        cs = buildClusterState(nodes);
+        executor = buildTaskExecutor();
+
+        assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_7_5_0, null, true), cs);
+        assertNotNull(assignment.getExecutorNode());
+        assertThat(assignment.getExecutorNode(), equalTo("past-data-node-1"));
     }
 
     public void testVerifyIndicesPrimaryShardsAreActive() {
@@ -172,8 +178,7 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
         csBuilder.metaData(metaData);
 
         ClusterState cs = csBuilder.build();
-        assertEquals(0,
-            TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(cs, new IndexNameExpressionResolver()).size());
+        assertEquals(0, TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(cs, new IndexNameExpressionResolver()).size());
 
         metaData = new MetaData.Builder(cs.metaData());
         routingTable = new RoutingTable.Builder(cs.routingTable());
@@ -197,8 +202,10 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
 
         csBuilder.routingTable(routingTable.build());
         csBuilder.metaData(metaData);
-        List<String> result =
-            TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(csBuilder.build(), new IndexNameExpressionResolver());
+        List<String> result = TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(
+            csBuilder.build(),
+            new IndexNameExpressionResolver()
+        );
         assertEquals(1, result.size());
         assertEquals(indexToRemove, result.get(0));
     }
@@ -232,4 +239,163 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase {
         }
     }
 
+    private DiscoveryNodes.Builder buildNodes(
+        boolean dedicatedTransformNode,
+        boolean pastDataNode,
+        boolean transformRemoteNodes,
+        boolean transformLocanOnlyNodes,
+        boolean currentDataNode
+    ) {
+
+        Map<String, String> transformNodeAttributes = new HashMap<>();
+        transformNodeAttributes.put(Transform.TRANSFORM_ENABLED_NODE_ATTR, "true");
+        transformNodeAttributes.put(Transform.TRANSFORM_REMOTE_ENABLED_NODE_ATTR, "true");
+        Map<String, String> transformNodeAttributesDisabled = new HashMap<>();
+        transformNodeAttributesDisabled.put(Transform.TRANSFORM_ENABLED_NODE_ATTR, "false");
+        transformNodeAttributesDisabled.put(Transform.TRANSFORM_REMOTE_ENABLED_NODE_ATTR, "true");
+        Map<String, String> transformNodeAttributesNoRemote = new HashMap<>();
+        transformNodeAttributesNoRemote.put(Transform.TRANSFORM_ENABLED_NODE_ATTR, "true");
+        transformNodeAttributesNoRemote.put(Transform.TRANSFORM_REMOTE_ENABLED_NODE_ATTR, "false");
+
+        DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
+
+        if (dedicatedTransformNode) {
+            nodes.add(
+                new DiscoveryNode(
+                    "dedicated-transform-node",
+                    buildNewFakeTransportAddress(),
+                    transformNodeAttributes,
+                    Collections.singleton(DiscoveryNodeRole.MASTER_ROLE),
+                    Version.CURRENT
+                )
+            );
+        }
+
+        if (pastDataNode) {
+            nodes.add(
+                new DiscoveryNode(
+                    "past-data-node-1",
+                    buildNewFakeTransportAddress(),
+                    Collections.emptyMap(),
+                    new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
+                    Version.V_7_5_0
+                )
+            );
+        }
+
+        if (transformRemoteNodes) {
+            nodes.add(
+                new DiscoveryNode(
+                    "current-data-node-with-2-tasks",
+                    buildNewFakeTransportAddress(),
+                    transformNodeAttributes,
+                    new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE)),
+                    Version.CURRENT
+                )
+            )
+                .add(
+                    new DiscoveryNode(
+                        "current-data-node-with-1-tasks",
+                        buildNewFakeTransportAddress(),
+                        transformNodeAttributes,
+                        new HashSet<>(Arrays.asList(DiscoveryNodeRole.MASTER_ROLE)),
+                        Version.CURRENT
+                    )
+                );
+        }
+
+        if (transformLocanOnlyNodes) {
+            nodes.add(
+                new DiscoveryNode(
+                    "current-data-node-with-0-tasks-transform-remote-disabled",
+                    buildNewFakeTransportAddress(),
+                    transformNodeAttributesNoRemote,
+                    new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
+                    Version.CURRENT
+                )
+            );
+        }
+
+        if (currentDataNode) {
+            nodes.add(
+                new DiscoveryNode(
+                    "current-data-node-with-transform-disabled",
+                    buildNewFakeTransportAddress(),
+                    transformNodeAttributesDisabled,
+                    Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE),
+                    Version.CURRENT
+                )
+            );
+        }
+
+        return nodes;
+    }
+
+    private ClusterState buildClusterState(DiscoveryNodes.Builder nodes) {
+        MetaData.Builder metaData = MetaData.builder();
+        RoutingTable.Builder routingTable = RoutingTable.builder();
+        addIndices(metaData, routingTable);
+        PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder()
+            .addTask(
+                "transform-task-1",
+                TransformTaskParams.NAME,
+                new TransformTaskParams("transform-task-1", Version.CURRENT, null, false),
+                new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "")
+            )
+            .addTask(
+                "transform-task-2",
+                TransformTaskParams.NAME,
+                new TransformTaskParams("transform-task-2", Version.CURRENT, null, false),
+                new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", "")
+            )
+            .addTask(
+                "transform-task-3",
+                TransformTaskParams.NAME,
+                new TransformTaskParams("transform-task-3", Version.CURRENT, null, false),
+                new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", "")
+            );
+
+        PersistentTasksCustomMetaData pTasks = pTasksBuilder.build();
+        metaData.putCustom(PersistentTasksCustomMetaData.TYPE, pTasks);
+
+        ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).nodes(nodes);
+        csBuilder.routingTable(routingTable.build());
+        csBuilder.metaData(metaData);
+
+        return csBuilder.build();
+
+    }
+
+    public TransformPersistentTasksExecutor buildTaskExecutor() {
+        Client client = mock(Client.class);
+        TransformAuditor mockAuditor = mock(TransformAuditor.class);
+        IndexBasedTransformConfigManager transformsConfigManager = new IndexBasedTransformConfigManager(client, xContentRegistry());
+        TransformCheckpointService transformCheckpointService = new TransformCheckpointService(
+            client,
+            Settings.EMPTY,
+            new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null),
+            transformsConfigManager,
+            mockAuditor
+        );
+        TransformServices transformServices = new TransformServices(
+            transformsConfigManager,
+            transformCheckpointService,
+            mockAuditor,
+            mock(SchedulerEngine.class)
+        );
+
+        ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY, Collections.singleton(Transform.NUM_FAILURE_RETRIES_SETTING));
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.getClusterSettings()).thenReturn(cSettings);
+        when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE);
+
+        return new TransformPersistentTasksExecutor(
+            client,
+            transformServices,
+            mock(ThreadPool.class),
+            clusterService,
+            Settings.EMPTY,
+            new IndexNameExpressionResolver()
+        );
+    }
 }

+ 2 - 2
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java

@@ -96,7 +96,7 @@ public class TransformTaskTests extends ESTestCase {
             "some_type",
             "some_action",
             TaskId.EMPTY_TASK_ID,
-            new TransformTaskParams(transformConfig.getId(), Version.CURRENT, TimeValue.timeValueSeconds(10)),
+            new TransformTaskParams(transformConfig.getId(), Version.CURRENT, TimeValue.timeValueSeconds(10), false),
             transformState,
             mock(SchedulerEngine.class),
             auditor,
@@ -176,7 +176,7 @@ public class TransformTaskTests extends ESTestCase {
             "some_type",
             "some_action",
             TaskId.EMPTY_TASK_ID,
-            new TransformTaskParams(transformConfig.getId(), Version.CURRENT, TimeValue.timeValueSeconds(10)),
+            new TransformTaskParams(transformConfig.getId(), Version.CURRENT, TimeValue.timeValueSeconds(10), false),
             transformState,
             mock(SchedulerEngine.class),
             auditor,