Browse Source

[ML] Validate dest pipeline exists on transform update (#63494)

Adds validation that the dest pipeline exists when a transform
is updated. Refactors the pipeline check into the `SourceDestValidator`.

Fixes #59587
Dimitris Athanasiou 5 years ago
parent
commit
31b4cde839
14 changed files with 200 additions and 74 deletions
  1. 52 19
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidator.java
  2. 0 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java
  3. 96 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java
  4. 2 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java
  5. 2 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java
  6. 0 2
      x-pack/plugin/src/test/resources/rest-api-spec/test/transform/preview_transforms.yml
  7. 10 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_update.yml
  8. 9 6
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java
  9. 3 18
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java
  10. 2 11
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java
  11. 10 9
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java
  12. 5 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportPreviewTransformActionDeprecated.java
  13. 5 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportUpdateTransformActionDeprecated.java
  14. 4 2
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/SourceDestValidations.java

+ 52 - 19
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidator.java

@@ -19,6 +19,7 @@ import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.indices.InvalidIndexNameException;
+import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.license.RemoteClusterLicenseChecker;
 import org.elasticsearch.protocol.xpack.license.LicenseStatus;
 import org.elasticsearch.transport.NoSuchRemoteClusterException;
@@ -59,10 +60,12 @@ public final class SourceDestValidator {
     public static final String REMOTE_CLUSTER_LICENSE_INACTIVE = "License check failed for remote cluster "
         + "alias [{0}], license is not active";
     public static final String REMOTE_SOURCE_INDICES_NOT_SUPPORTED = "remote source indices are not supported";
+    public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";
 
     private final IndexNameExpressionResolver indexNameExpressionResolver;
     private final RemoteClusterService remoteClusterService;
     private final RemoteClusterLicenseChecker remoteClusterLicenseChecker;
+    private final IngestService ingestService;
     private final String nodeName;
     private final String license;
 
@@ -74,8 +77,10 @@ public final class SourceDestValidator {
         private final IndexNameExpressionResolver indexNameExpressionResolver;
         private final RemoteClusterService remoteClusterService;
         private final RemoteClusterLicenseChecker remoteClusterLicenseChecker;
+        private final IngestService ingestService;
         private final String[] source;
-        private final String dest;
+        private final String destIndex;
+        private final String destPipeline;
         private final String nodeName;
         private final String license;
 
@@ -89,8 +94,10 @@ public final class SourceDestValidator {
             final IndexNameExpressionResolver indexNameExpressionResolver,
             final RemoteClusterService remoteClusterService,
             final RemoteClusterLicenseChecker remoteClusterLicenseChecker,
+            final IngestService ingestService,
             final String[] source,
-            final String dest,
+            final String destIndex,
+            final String destPipeline,
             final String nodeName,
             final String license
         ) {
@@ -98,8 +105,10 @@ public final class SourceDestValidator {
             this.indexNameExpressionResolver = indexNameExpressionResolver;
             this.remoteClusterService = remoteClusterService;
             this.remoteClusterLicenseChecker = remoteClusterLicenseChecker;
+            this.ingestService = ingestService;
             this.source = source;
-            this.dest = dest;
+            this.destIndex = destIndex;
+            this.destPipeline = destPipeline;
             this.nodeName = nodeName;
             this.license = license;
         }
@@ -120,6 +129,10 @@ public final class SourceDestValidator {
             return indexNameExpressionResolver;
         }
 
+        public IngestService getIngestService() {
+            return ingestService;
+        }
+
         public boolean isRemoteSearchEnabled() {
             return remoteClusterLicenseChecker != null;
         }
@@ -128,8 +141,8 @@ public final class SourceDestValidator {
             return source;
         }
 
-        public String getDest() {
-            return dest;
+        public String getDestIndex() {
+            return destIndex;
         }
 
         public String getNodeName() {
@@ -162,11 +175,11 @@ public final class SourceDestValidator {
                     Index singleWriteIndex = indexNameExpressionResolver.concreteWriteIndex(
                         state,
                         IndicesOptions.lenientExpandOpen(),
-                        dest,
+                        destIndex,
                         true,
                         false);
 
-                    resolvedDest = singleWriteIndex != null ? singleWriteIndex.getName() : dest;
+                    resolvedDest = singleWriteIndex != null ? singleWriteIndex.getName() : destIndex;
                 } catch (IllegalArgumentException e) {
                     // stop here as we can not return a single dest index
                     addValidationError(e.getMessage());
@@ -230,6 +243,7 @@ public final class SourceDestValidator {
     public static final SourceDestValidation DESTINATION_IN_SOURCE_VALIDATION = new DestinationInSourceValidation();
     public static final SourceDestValidation DESTINATION_SINGLE_INDEX_VALIDATION = new DestinationSingleIndexValidation();
     public static final SourceDestValidation REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION = new RemoteSourceNotSupportedValidation();
+    public static final SourceDestValidation DESTINATION_PIPELINE_MISSING_VALIDATION = new DestinationPipelineMissingValidation();
 
     /**
      * Create a new Source Dest Validator
@@ -244,29 +258,33 @@ public final class SourceDestValidator {
         IndexNameExpressionResolver indexNameExpressionResolver,
         RemoteClusterService remoteClusterService,
         RemoteClusterLicenseChecker remoteClusterLicenseChecker,
+        IngestService ingestService,
         String nodeName,
         String license
     ) {
         this.indexNameExpressionResolver = indexNameExpressionResolver;
         this.remoteClusterService = remoteClusterService;
         this.remoteClusterLicenseChecker = remoteClusterLicenseChecker;
+        this.ingestService = ingestService;
         this.nodeName = nodeName;
         this.license = license;
     }
 
     /**
-     * Run validation against source and dest.
+     * Run validation against source and destIndex.
      *
      * @param clusterState The current ClusterState
      * @param source an array of source indexes
-     * @param dest destination index
+     * @param destIndex destination index
+     * @param destPipeline destination pipeline
      * @param validations list of of validations to run
      * @param listener result listener
      */
     public void validate(
         final ClusterState clusterState,
         final String[] source,
-        final String dest,
+        final String destIndex,
+        @Nullable final String destPipeline,
         final List<SourceDestValidation> validations,
         final ActionListener<Boolean> listener
     ) {
@@ -275,8 +293,10 @@ public final class SourceDestValidator {
             indexNameExpressionResolver,
             remoteClusterService,
             remoteClusterLicenseChecker,
+            ingestService,
             source,
-            dest,
+            destIndex,
+            destPipeline,
             nodeName,
             license
         );
@@ -300,7 +320,7 @@ public final class SourceDestValidator {
     }
 
     /**
-     * Validate dest request.
+     * Validate request.
      *
      * This runs a couple of simple validations at request time, to be executed from a {@link ActionRequest}}
      * implementation.
@@ -308,17 +328,17 @@ public final class SourceDestValidator {
      * Note: Source can not be validated at request time as it might contain expressions.
      *
      * @param validationException an ActionRequestValidationException for collection validation problem, can be null
-     * @param dest destination index, null if validation shall be skipped
+     * @param destIndex destination index, null if validation shall be skipped
      */
     public static ActionRequestValidationException validateRequest(
         @Nullable ActionRequestValidationException validationException,
-        @Nullable String dest
+        @Nullable String destIndex
     ) {
         try {
-            if (dest != null) {
-                validateIndexOrAliasName(dest, InvalidIndexNameException::new);
-                if (dest.toLowerCase(Locale.ROOT).equals(dest) == false) {
-                    validationException = addValidationError(getMessage(DEST_LOWERCASE, dest), validationException);
+            if (destIndex != null) {
+                validateIndexOrAliasName(destIndex, InvalidIndexNameException::new);
+                if (destIndex.toLowerCase(Locale.ROOT).equals(destIndex) == false) {
+                    validationException = addValidationError(getMessage(DEST_LOWERCASE, destIndex), validationException);
                 }
             }
         } catch (InvalidIndexNameException ex) {
@@ -402,7 +422,7 @@ public final class SourceDestValidator {
 
         @Override
         public void validate(Context context, ActionListener<Context> listener) {
-            final String destIndex = context.getDest();
+            final String destIndex = context.getDestIndex();
             boolean foundSourceInDest = false;
 
             for (String src : context.getSource()) {
@@ -456,6 +476,19 @@ public final class SourceDestValidator {
         }
     }
 
+    static class DestinationPipelineMissingValidation implements SourceDestValidation {
+
+        @Override
+        public void validate(Context context, ActionListener<Context> listener) {
+            if (context.destPipeline != null) {
+                if (context.ingestService.getPipeline(context.destPipeline) == null) {
+                    context.addValidationError(PIPELINE_MISSING, context.destPipeline);
+                }
+            }
+            listener.onResponse(context);
+        }
+    }
+
     private static String getMessage(String message, Object... args) {
         return new MessageFormat(message, Locale.ROOT).format(args);
     }

+ 0 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java

@@ -27,7 +27,6 @@ public class TransformMessages {
     public static final String REST_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";
     public static final String TRANSFORM_FAILED_TO_PERSIST_STATS = "Failed to persist transform statistics for transform [{0}]";
     public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";
-    public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";
 
     public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future.";
 

+ 96 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java

@@ -23,6 +23,11 @@ import org.elasticsearch.common.CheckedConsumer;
 import org.elasticsearch.common.ValidationException;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.ingest.ConfigurationUtils;
+import org.elasticsearch.ingest.IngestService;
+import org.elasticsearch.ingest.Pipeline;
+import org.elasticsearch.ingest.Processor;
+import org.elasticsearch.ingest.TestProcessor;
 import org.elasticsearch.license.License;
 import org.elasticsearch.license.RemoteClusterLicenseChecker;
 import org.elasticsearch.license.XPackLicenseState;
@@ -44,7 +49,9 @@ import org.junit.Before;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,10 +63,12 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF
 import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
 import static org.elasticsearch.mock.orig.Mockito.when;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_IN_SOURCE_VALIDATION;
+import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_PIPELINE_MISSING_VALIDATION;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_SINGLE_INDEX_VALIDATION;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.REMOTE_SOURCE_VALIDATION;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SOURCE_MISSING_VALIDATION;
 import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
 public class SourceDestValidatorTests extends ESTestCase {
@@ -79,6 +88,7 @@ public class SourceDestValidatorTests extends ESTestCase {
         SOURCE_MISSING_VALIDATION,
         DESTINATION_IN_SOURCE_VALIDATION,
         DESTINATION_SINGLE_INDEX_VALIDATION,
+        DESTINATION_PIPELINE_MISSING_VALIDATION,
         REMOTE_SOURCE_VALIDATION
     );
 
@@ -91,10 +101,13 @@ public class SourceDestValidatorTests extends ESTestCase {
     private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
     private final TransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool);
     private final RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
+    private final IngestService ingestService = mock(IngestService.class);
+
     private final SourceDestValidator simpleNonRemoteValidator = new SourceDestValidator(
         new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
         remoteClusterService,
         null,
+        ingestService,
         "node_id",
         "license"
     );
@@ -205,6 +218,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { SOURCE_1 },
                 "dest",
+                null,
                 TEST_VALIDATIONS,
                 listener
             ),
@@ -219,6 +233,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] {},
                 "dest",
+                null,
                 TEST_VALIDATIONS,
                 listener
             ),
@@ -236,6 +251,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { "missing" },
                 "dest",
+                null,
                 TEST_VALIDATIONS,
                 listener
             ),
@@ -251,6 +267,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { "missing" },
                 "dest",
+                null,
                 Collections.emptyList(),
                 listener
             ),
@@ -265,6 +282,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { SOURCE_1, "missing" },
                 "dest",
+                null,
                 TEST_VALIDATIONS,
                 listener
             ),
@@ -280,6 +298,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { SOURCE_1, "missing" },
                 "dest",
+                null,
                 Collections.emptyList(),
                 listener
             ),
@@ -294,6 +313,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { SOURCE_1, "wildcard*", "missing" },
                 "dest",
+                null,
                 TEST_VALIDATIONS,
                 listener
             ),
@@ -309,6 +329,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { SOURCE_1, "wildcard*", "missing" },
                 "dest",
+                null,
                 Collections.emptyList(),
                 listener
             ),
@@ -323,6 +344,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { "wildcard*" },
                 "dest",
+                null,
                 TEST_VALIDATIONS,
                 listener
             ),
@@ -337,6 +359,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { SOURCE_1 },
                 SOURCE_1,
+                null,
                 TEST_VALIDATIONS,
                 listener
             ),
@@ -355,6 +378,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { SOURCE_1 },
                 SOURCE_1,
+                null,
                 Collections.emptyList(),
                 listener
             ),
@@ -369,6 +393,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { "source-*" },
                 SOURCE_2,
+                null,
                 TEST_VALIDATIONS,
                 listener
             ),
@@ -387,6 +412,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { "source-*" },
                 SOURCE_2,
+                null,
                 Collections.emptyList(),
                 listener
             ),
@@ -401,6 +427,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { "source-1", "source-*" },
                 SOURCE_2,
+                null,
                 TEST_VALIDATIONS,
                 listener
             ),
@@ -419,6 +446,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { "source-1", "source-*" },
                 SOURCE_2,
+                null,
                 Collections.emptyList(),
                 listener
             ),
@@ -433,6 +461,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { "source-1", "source-*", "sou*" },
                 SOURCE_2,
+                null,
                 TEST_VALIDATIONS,
                 listener
             ),
@@ -457,6 +486,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { SOURCE_1 },
                 DEST_ALIAS,
+                null,
                 TEST_VALIDATIONS,
                 listener
             ),
@@ -479,6 +509,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { SOURCE_1 },
                 DEST_ALIAS,
+                null,
                 Collections.emptyList(),
                 listener
             ),
@@ -493,6 +524,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { SOURCE_1 },
                 ALIAS_READ_WRITE_DEST,
+                null,
                 TEST_VALIDATIONS,
                 listener
             ),
@@ -519,6 +551,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { SOURCE_1 },
                 SOURCE_1_ALIAS,
+                null,
                 TEST_VALIDATIONS,
                 listener
             ),
@@ -537,6 +570,7 @@ public class SourceDestValidatorTests extends ESTestCase {
                 CLUSTER_STATE,
                 new String[] { SOURCE_1 },
                 SOURCE_1_ALIAS,
+                null,
                 Collections.emptyList(),
                 listener
             ),
@@ -545,12 +579,60 @@ public class SourceDestValidatorTests extends ESTestCase {
         );
     }
 
+    public void testCheck_GivenMissingDestPipeline() throws Exception {
+        assertValidation(
+            listener -> simpleNonRemoteValidator.validate(
+                CLUSTER_STATE,
+                new String[] { SOURCE_1 },
+                "some-dest",
+                "missing-pipeline",
+                TEST_VALIDATIONS,
+                listener
+            ),
+            (Boolean) null,
+            e -> {
+                assertEquals(1, e.validationErrors().size());
+                assertThat(
+                    e.validationErrors().get(0),
+                    equalTo("Pipeline with id [missing-pipeline] could not be found")
+                );
+            }
+        );
+
+        // Let's now pretend that pipeline exists
+        Map<String, Object> processorConfig0 = new HashMap<>();
+        Map<String, Object> processorConfig1 = new HashMap<>();
+        processorConfig0.put(ConfigurationUtils.TAG_KEY, "first-processor");
+        Map<String, Object> pipelineConfig = new HashMap<>();
+        pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
+        pipelineConfig.put(Pipeline.VERSION_KEY, "1");
+        pipelineConfig.put(Pipeline.PROCESSORS_KEY,
+            Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1)));
+        Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
+        Pipeline pipeline = Pipeline.create("missing-pipeline", pipelineConfig, processorRegistry, null);
+        when(ingestService.getPipeline("missing-pipeline")).thenReturn(pipeline);
+
+        assertValidation(
+            listener -> simpleNonRemoteValidator.validate(
+                CLUSTER_STATE,
+                new String[] { SOURCE_1 },
+                "some-dest",
+                "missing-pipeline",
+                TEST_VALIDATIONS,
+                listener
+            ),
+            true,
+            null
+        );
+    }
+
     public void testCheck_MultipleValidationErrors() throws InterruptedException {
         assertValidation(
             listener -> simpleNonRemoteValidator.validate(
                 CLUSTER_STATE,
                 new String[] { SOURCE_1, "missing" },
                 SOURCE_1_ALIAS,
+                null,
                 TEST_VALIDATIONS,
                 listener
             ),
@@ -575,8 +657,10 @@ public class SourceDestValidatorTests extends ESTestCase {
                 new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
                 remoteClusterService,
                 remoteClusterLicenseCheckerBasic,
+                ingestService,
                 new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
                 "dest",
+                null,
                 "node_id",
                 "license"
             )
@@ -600,8 +684,10 @@ public class SourceDestValidatorTests extends ESTestCase {
                 remoteClusterService,
                 new RemoteClusterLicenseChecker(clientWithBasicLicense,
                     operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)),
+                ingestService,
                 new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
                 "dest",
+                null,
                 "node_id",
                 "platinum"
             )
@@ -630,8 +716,10 @@ public class SourceDestValidatorTests extends ESTestCase {
                 remoteClusterService,
                 new RemoteClusterLicenseChecker(clientWithPlatinumLicense,
                     operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)),
+                ingestService,
                 new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
                 "dest",
+                null,
                 "node_id",
                 "license"
             )
@@ -651,9 +739,11 @@ public class SourceDestValidatorTests extends ESTestCase {
                 remoteClusterService,
                 new RemoteClusterLicenseChecker(clientWithPlatinumLicense,
                     operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)),
+                ingestService,
                 new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
                 "dest",
                 "node_id",
+                null,
                 "platinum"
             )
         );
@@ -673,9 +763,11 @@ public class SourceDestValidatorTests extends ESTestCase {
                 remoteClusterService,
                 new RemoteClusterLicenseChecker(clientWithTrialLicense,
                     operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)),
+                ingestService,
                 new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
                 "dest",
                 "node_id",
+                null,
                 "trial"
             )
         );
@@ -697,8 +789,10 @@ public class SourceDestValidatorTests extends ESTestCase {
                 remoteClusterService,
                 new RemoteClusterLicenseChecker(clientWithExpiredBasicLicense,
                     operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)),
+                ingestService,
                 new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
                 "dest",
+                null,
                 "node_id",
                 "license"
             )
@@ -724,8 +818,10 @@ public class SourceDestValidatorTests extends ESTestCase {
                 remoteClusterService,
                 new RemoteClusterLicenseChecker(clientWithExpiredBasicLicense,
                     operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)),
+                ingestService,
                 new String[] { "non_existing_remote:" + "SOURCE_1" },
                 "dest",
+                null,
                 "node_id",
                 "license"
             )

+ 2 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java

@@ -93,6 +93,7 @@ public class TransportPutDataFrameAnalyticsAction
             indexNameExpressionResolver,
             transportService.getRemoteClusterService(),
             null,
+            null,
             clusterService.getNodeName(),
             License.OperationMode.PLATINUM.description()
         );
@@ -118,7 +119,7 @@ public class TransportPutDataFrameAnalyticsAction
             listener::onFailure
         );
 
-        sourceDestValidator.validate(clusterService.state(), config.getSource().getIndex(), config.getDest().getIndex(),
+        sourceDestValidator.validate(clusterService.state(), config.getSource().getIndex(), config.getDest().getIndex(), null,
             SourceDestValidations.ALL_VALIDATIONS, sourceDestValidationListener);
     }
 

+ 2 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

@@ -133,6 +133,7 @@ public class TransportStartDataFrameAnalyticsAction
             indexNameExpressionResolver,
             transportService.getRemoteClusterService(),
             null,
+            null,
             clusterService.getNodeName(),
             License.OperationMode.PLATINUM.description()
         );
@@ -304,7 +305,7 @@ public class TransportStartDataFrameAnalyticsAction
 
                 // Validate source/dest are valid
                 sourceDestValidator.validate(clusterService.state(), startContext.config.getSource().getIndex(),
-                    startContext.config.getDest().getIndex(), SourceDestValidations.ALL_VALIDATIONS, ActionListener.wrap(
+                    startContext.config.getDest().getIndex(), null, SourceDestValidations.ALL_VALIDATIONS, ActionListener.wrap(
                         aBoolean -> toValidateExtractionPossibleListener.onResponse(startContext), finalListener::onFailure));
             },
             finalListener::onFailure

+ 0 - 2
x-pack/plugin/src/test/resources/rest-api-spec/test/transform/preview_transforms.yml

@@ -260,7 +260,6 @@ setup:
         body: >
           {
             "source": { "index": "airline-data" },
-            "dest": { "pipeline": "missing-pipeline" },
             "pivot": {
               "group_by": {
                 "time": {"date_histogram": {"fixed_interval": "1h", "field": "time"}}},
@@ -275,7 +274,6 @@ setup:
         body: >
           {
             "source": { "index": "airline-data" },
-            "dest": { "pipeline": "missing-pipeline" },
             "pivot": {
               "group_by": {
                 "time": {"date_histogram": {"fixed_interval": "1h", "field": "time"}}},

+ 10 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_update.yml

@@ -45,6 +45,16 @@ setup:
             "description": "new description"
           }
 ---
+"Test update transform with missing pipeline":
+  - do:
+      catch: /Pipeline with id \[missing-transform-pipeline\] could not be found/
+      transform.update_transform:
+        transform_id: "updating-airline-transform"
+        body: >
+          {
+            "dest": { "index": "airline-data-by-airline", "pipeline": "missing-transform-pipeline" }
+          }
+---
 "Test update transform with frequency too low":
   - do:
       catch: /minimum permitted \[frequency\] is \[1s\]/

+ 9 - 6
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java

@@ -6,8 +6,6 @@
 
 package org.elasticsearch.xpack.transform.action;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ingest.SimulatePipelineAction;
@@ -29,6 +27,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.license.License;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.RemoteClusterLicenseChecker;
@@ -62,7 +61,6 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
     PreviewTransformAction.Request,
     PreviewTransformAction.Response> {
 
-    private static final Logger logger = LogManager.getLogger(TransportPreviewTransformAction.class);
     private static final int NUMBER_OF_PREVIEW_BUCKETS = 100;
     private final XPackLicenseState licenseState;
     private final Client client;
@@ -79,7 +77,8 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
         XPackLicenseState licenseState,
         IndexNameExpressionResolver indexNameExpressionResolver,
         ClusterService clusterService,
-        Settings settings
+        Settings settings,
+        IngestService ingestService
     ) {
         this(
             PreviewTransformAction.NAME,
@@ -90,7 +89,8 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
             licenseState,
             indexNameExpressionResolver,
             clusterService,
-            settings
+            settings,
+            ingestService
         );
     }
 
@@ -103,7 +103,8 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
         XPackLicenseState licenseState,
         IndexNameExpressionResolver indexNameExpressionResolver,
         ClusterService clusterService,
-        Settings settings
+        Settings settings,
+        IngestService ingestService
     ) {
         super(name, transportService, actionFilters, PreviewTransformAction.Request::new);
         this.licenseState = licenseState;
@@ -116,6 +117,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
             DiscoveryNode.isRemoteClusterClient(settings)
                 ? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
                 : null,
+            ingestService,
             clusterService.getNodeName(),
             License.OperationMode.BASIC.description()
         );
@@ -136,6 +138,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
             clusterState,
             config.getSource().getIndex(),
             config.getDestination().getIndex(),
+            config.getDestination().getPipeline(),
             SourceDestValidations.PREVIEW_VALIDATIONS,
             ActionListener.wrap(r -> {
                 // create the function for validation

+ 3 - 18
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java

@@ -75,7 +75,6 @@ public class TransportPutTransformAction extends AcknowledgedTransportMasterNode
     private final SecurityContext securityContext;
     private final TransformAuditor auditor;
     private final SourceDestValidator sourceDestValidator;
-    private final IngestService ingestService;
 
     @Inject
     public TransportPutTransformAction(
@@ -141,10 +140,10 @@ public class TransportPutTransformAction extends AcknowledgedTransportMasterNode
             DiscoveryNode.isRemoteClusterClient(settings)
                 ? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
                 : null,
+            ingestService,
             clusterService.getNodeName(),
             License.OperationMode.BASIC.description()
         );
-        this.ingestService = ingestService;
     }
 
     static HasPrivilegesRequest buildPrivilegeCheck(
@@ -218,6 +217,7 @@ public class TransportPutTransformAction extends AcknowledgedTransportMasterNode
             clusterState,
             config.getSource().getIndex(),
             config.getDestination().getIndex(),
+            config.getDestination().getPipeline(),
             request.isDeferValidation() ? SourceDestValidations.NON_DEFERABLE_VALIDATIONS : SourceDestValidations.ALL_VALIDATIONS,
             ActionListener.wrap(
                 validationResponse -> {
@@ -311,22 +311,7 @@ public class TransportPutTransformAction extends AcknowledgedTransportMasterNode
             if (request.isDeferValidation()) {
                 validationListener.onResponse(true);
             } else {
-                if (config.getDestination().getPipeline() != null) {
-                    if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
-                        listener.onFailure(
-                            new ElasticsearchStatusException(
-                                TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()),
-                                RestStatus.BAD_REQUEST
-                            )
-                        );
-                        return;
-                    }
-                }
-                if (request.isDeferValidation()) {
-                    validationListener.onResponse(true);
-                } else {
-                    function.validateQuery(client, config.getSource(), validationListener);
-                }
+                function.validateQuery(client, config.getSource(), validationListener);
             }
         }, listener::onFailure));
     }

+ 2 - 11
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java

@@ -141,6 +141,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
             DiscoveryNode.isRemoteClusterClient(settings)
                 ? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
                 : null,
+            ingestService,
             clusterService.getNodeName(),
             License.OperationMode.BASIC.description()
         );
@@ -261,22 +262,12 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
                 createTransform(config.getId(), config.getVersion(), config.getFrequency(), config.getSource().requiresRemoteCluster())
             );
             transformConfigHolder.set(config);
-            if (config.getDestination().getPipeline() != null) {
-                if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
-                    listener.onFailure(
-                        new ElasticsearchStatusException(
-                            TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()),
-                            RestStatus.BAD_REQUEST
-                        )
-                    );
-                    return;
-                }
-            }
 
             sourceDestValidator.validate(
                 clusterService.state(),
                 config.getSource().getIndex(),
                 config.getDestination().getIndex(),
+                config.getDestination().getPipeline(),
                 SourceDestValidations.ALL_VALIDATIONS,
                 validationListener
             );

+ 10 - 9
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java

@@ -27,6 +27,7 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.discovery.MasterNotDiscoveredException;
+import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.license.License;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.RemoteClusterLicenseChecker;
@@ -96,7 +97,8 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
         ClusterService clusterService,
         XPackLicenseState licenseState,
         TransformServices transformServices,
-        Client client
+        Client client,
+        IngestService ingestService
     ) {
         this(
             UpdateTransformAction.NAME,
@@ -108,7 +110,8 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
             clusterService,
             licenseState,
             transformServices,
-            client
+            client,
+            ingestService
         );
     }
 
@@ -122,7 +125,8 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
         ClusterService clusterService,
         XPackLicenseState licenseState,
         TransformServices transformServices,
-        Client client
+        Client client,
+        IngestService ingestService
     ) {
         super(
             name,
@@ -148,6 +152,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
             DiscoveryNode.isRemoteClusterClient(settings)
                 ? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
                 : null,
+            ingestService,
             clusterService.getNodeName(),
             License.OperationMode.BASIC.description()
         );
@@ -237,6 +242,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
                 clusterState,
                 updatedConfig.getSource().getIndex(),
                 updatedConfig.getDestination().getIndex(),
+                updatedConfig.getDestination().getPipeline(),
                 request.isDeferValidation() ? SourceDestValidations.NON_DEFERABLE_VALIDATIONS : SourceDestValidations.ALL_VALIDATIONS,
                 ActionListener.wrap(
                     validationResponse -> {
@@ -399,12 +405,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
             if (request.isDeferValidation()) {
                 functionValidationListener.onResponse(true);
             } else {
-                // TODO: it seems we are not validating ingest pipelines, consider to share code with PUT
-                if (request.isDeferValidation()) {
-                    functionValidationListener.onResponse(true);
-                } else {
-                    function.validateQuery(client, config.getSource(), functionValidationListener);
-                }
+                function.validateQuery(client, config.getSource(), functionValidationListener);
             }
         }, listener::onFailure));
     }

+ 5 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportPreviewTransformActionDeprecated.java

@@ -12,6 +12,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -29,7 +30,8 @@ public class TransportPreviewTransformActionDeprecated extends TransportPreviewT
         XPackLicenseState licenseState,
         IndexNameExpressionResolver indexNameExpressionResolver,
         ClusterService clusterService,
-        Settings settings
+        Settings settings,
+        IngestService ingestService
     ) {
         super(
             PreviewTransformActionDeprecated.NAME,
@@ -40,7 +42,8 @@ public class TransportPreviewTransformActionDeprecated extends TransportPreviewT
             licenseState,
             indexNameExpressionResolver,
             clusterService,
-            settings
+            settings,
+            ingestService
         );
     }
 

+ 5 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportUpdateTransformActionDeprecated.java

@@ -12,6 +12,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -31,7 +32,8 @@ public class TransportUpdateTransformActionDeprecated extends TransportUpdateTra
         ClusterService clusterService,
         XPackLicenseState licenseState,
         TransformServices transformServices,
-        Client client
+        Client client,
+        IngestService ingestService
     ) {
         super(
             UpdateTransformActionDeprecated.NAME,
@@ -43,7 +45,8 @@ public class TransportUpdateTransformActionDeprecated extends TransportUpdateTra
             clusterService,
             licenseState,
             transformServices,
-            client
+            client,
+            ingestService
         );
     }
 

+ 4 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/SourceDestValidations.java

@@ -13,6 +13,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_IN_SOURCE_VALIDATION;
+import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_PIPELINE_MISSING_VALIDATION;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_SINGLE_INDEX_VALIDATION;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.REMOTE_SOURCE_VALIDATION;
 import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SOURCE_MISSING_VALIDATION;
@@ -25,13 +26,14 @@ public final class SourceDestValidations {
     private SourceDestValidations() {}
 
     public static final List<SourceDestValidator.SourceDestValidation> PREVIEW_VALIDATIONS = Arrays.asList(
-        SOURCE_MISSING_VALIDATION, REMOTE_SOURCE_VALIDATION);
+        SOURCE_MISSING_VALIDATION, REMOTE_SOURCE_VALIDATION, DESTINATION_PIPELINE_MISSING_VALIDATION);
 
     public static final List<SourceDestValidator.SourceDestValidation> ALL_VALIDATIONS = Arrays.asList(
         SOURCE_MISSING_VALIDATION,
         REMOTE_SOURCE_VALIDATION,
         DESTINATION_IN_SOURCE_VALIDATION,
-        DESTINATION_SINGLE_INDEX_VALIDATION
+        DESTINATION_SINGLE_INDEX_VALIDATION,
+        DESTINATION_PIPELINE_MISSING_VALIDATION
     );
 
     public static final List<SourceDestValidator.SourceDestValidation> NON_DEFERABLE_VALIDATIONS = Collections.singletonList(