Browse Source

[Transform] fail to start/put on missing pipeline (#50701)

If a pipeline referenced by a transform does not exist, we should not allow the transform to be created. 

We do allow the pipeline existence check to be skipped with defer_validations, but if the pipeline still does not exist on `_start`, the pipeline will fail to start.

relates:  #50135
Benjamin Trent 5 years ago
parent
commit
207525bb70

+ 1 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/TransformDocumentationIT.java

@@ -134,6 +134,7 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
             .setIndex("pivot-destination")
             .setPipeline("my-pipeline").build();
         // end::put-transform-dest-config
+        destConfig = DestConfig.builder().setIndex("pivot-destination").build();
         // tag::put-transform-group-config
         GroupConfig groupConfig = GroupConfig.builder()
             .groupBy("reviewer", // <1>

+ 17 - 1
docs/build.gradle

@@ -1235,7 +1235,23 @@ buildRestTests.setups['kibana_sample_data_ecommerce'] = '''
               number_of_shards: 1
               number_of_replicas: 0
 '''
-buildRestTests.setups['simple_kibana_continuous_pivot'] = buildRestTests.setups['kibana_sample_data_ecommerce'] + '''
+buildRestTests.setups['add_timestamp_pipeline'] = '''
+  - do:
+      ingest.put_pipeline:
+        id: "add_timestamp_pipeline"
+        body:  >
+          {
+            "processors": [
+              {
+                "set" : {
+                  "field" : "@timestamp",
+                  "value" : "{{_ingest.timestamp}}"
+                }
+              }
+            ]
+          }
+'''
+buildRestTests.setups['simple_kibana_continuous_pivot'] = buildRestTests.setups['kibana_sample_data_ecommerce'] + buildRestTests.setups['add_timestamp_pipeline']  + '''
   - do:
       raw:
         method: PUT

+ 1 - 1
docs/reference/transform/apis/put-transform.asciidoc

@@ -195,7 +195,7 @@ PUT _transform/ecommerce_transform
   }
 }
 --------------------------------------------------
-// TEST[setup:kibana_sample_data_ecommerce]
+// TEST[setup:kibana_sample_data_ecommerce,add_timestamp_pipeline]
 
 When the {transform} is created, you receive the following results:
 

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java

@@ -27,6 +27,7 @@ public class TransformMessages {
     public static final String REST_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";
     public static final String TRANSFORM_FAILED_TO_PERSIST_STATS = "Failed to persist transform statistics for transform [{0}]";
     public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";
+    public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";
 
     public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future.";
 

+ 49 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_crud.yml

@@ -122,6 +122,22 @@ setup:
           }
   - match: { acknowledged: true }
 
+  - do:
+      ingest.put_pipeline:
+        id: "airline-pipeline"
+        body:  >
+          {
+            "processors": [
+              {
+                "set" : {
+                  "field" : "some_field",
+                  "value" : 42
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
   - do:
       transform.put_transform:
         transform_id: "airline-transform-dos"
@@ -631,3 +647,36 @@ setup:
         transform_id: "airline-transform-start-delete"
         force: true
   - match: { acknowledged: true }
+---
+"Test put transform with missing pipeline":
+  - do:
+      catch: /Pipeline with id \[missing-transform-pipeline\] could not be found/
+      transform.put_transform:
+        transform_id: "airline-transform-with-missing-pipeline-crud"
+        body: >
+          {
+            "source": { "index": "airline-data" },
+            "dest": { "index": "airline-data-by-airline-with-pipeline", "pipeline": "missing-transform-pipeline" },
+            "pivot": {
+              "group_by": { "airline": {"terms": {"field": "airline"}}},
+              "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
+            },
+            "description": "yaml test transform on airline-data"
+          }
+---
+"Test put transform with missing pipeline and defer validations":
+  - do:
+      transform.put_transform:
+        defer_validation: true
+        transform_id: "airline-transform-with-missing-pipeline-crud-defer"
+        body: >
+          {
+            "source": { "index": "airline-data" },
+            "dest": { "index": "airline-data-by-airline", "pipeline": "missing-transform-pipeline" },
+            "pivot": {
+              "group_by": { "airline": {"terms": {"field": "airline"}}},
+              "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
+            },
+            "description": "yaml test transform on airline-data"
+          }
+  - match: {acknowledged: true}

+ 56 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_start_stop.yml

@@ -376,3 +376,59 @@ teardown:
         index: airline-data-time-alias
   - match: { airline-data-time-alias.mappings.properties.time.type: date }
   - match: { airline-data-time-alias.mappings.properties.avg_response.type: double }
+---
+"Test start transform with missing pipeline":
+  - do:
+      transform.put_transform:
+        defer_validation: true
+        transform_id: "airline-transform-with-missing-pipeline"
+        body: >
+          {
+            "source": { "index": "airline-data" },
+            "dest": { "index": "airline-data-by-airline-pipeline", "pipeline": "missing-transform-pipeline" },
+            "pivot": {
+              "group_by": { "airline": {"terms": {"field": "airline"}}},
+              "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
+            },
+            "description": "yaml test transform on airline-data"
+          }
+  - match: {acknowledged: true}
+  - do:
+      catch: /Pipeline with id \[missing-transform-pipeline\] could not be found/
+      transform.start_transform:
+        transform_id: "airline-transform-with-missing-pipeline"
+---
+"Test start transform with pipeline":
+  - do:
+      ingest.put_pipeline:
+        id: "transform-pipeline"
+        body:  >
+          {
+            "processors": [
+              {
+                "set" : {
+                  "field" : "some_field",
+                  "value" : 42
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      transform.put_transform:
+        transform_id: "airline-transform-with-pipeline"
+        body: >
+          {
+            "source": { "index": "airline-data" },
+            "dest": { "index": "airline-data-by-airline-pipeline", "pipeline": "transform-pipeline" },
+            "pivot": {
+              "group_by": { "airline": {"terms": {"field": "airline"}}},
+              "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
+            },
+            "description": "yaml test transform on airline-data"
+          }
+  - match: {acknowledged: true}
+  - do:
+      transform.start_transform:
+        transform_id: "airline-transform-with-pipeline"

+ 19 - 3
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java

@@ -26,6 +26,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.license.License;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.RemoteClusterLicenseChecker;
@@ -74,6 +75,7 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
     private final SecurityContext securityContext;
     private final TransformAuditor auditor;
     private final SourceDestValidator sourceDestValidator;
+    private final IngestService ingestService;
 
     @Inject
     public TransportPutTransformAction(
@@ -85,7 +87,8 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
         ClusterService clusterService,
         XPackLicenseState licenseState,
         TransformServices transformServices,
-        Client client
+        Client client,
+        IngestService ingestService
     ) {
         this(
             PutTransformAction.NAME,
@@ -97,7 +100,8 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
             clusterService,
             licenseState,
             transformServices,
-            client
+            client,
+            ingestService
         );
     }
 
@@ -111,7 +115,8 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
         ClusterService clusterService,
         XPackLicenseState licenseState,
         TransformServices transformServices,
-        Client client
+        Client client,
+        IngestService ingestService
     ) {
         super(
             name,
@@ -138,6 +143,7 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
             clusterService.getNodeName(),
             License.OperationMode.BASIC.description()
         );
+        this.ingestService = ingestService;
     }
 
     static HasPrivilegesRequest buildPrivilegeCheck(
@@ -335,6 +341,16 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
         if (request.isDeferValidation()) {
             pivotValidationListener.onResponse(true);
         } else {
+            if (config.getDestination().getPipeline() != null) {
+                if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
+                    listener.onFailure(new ElasticsearchStatusException(
+                        TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()),
+                        RestStatus.BAD_REQUEST
+                        )
+                    );
+                    return;
+                }
+            }
             pivot.validateQuery(client, config.getSource(), pivotValidationListener);
         }
     }

+ 19 - 3
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java

@@ -26,6 +26,7 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.license.License;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.RemoteClusterLicenseChecker;
@@ -71,6 +72,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
     private final Client client;
     private final TransformAuditor auditor;
     private final SourceDestValidator sourceDestValidator;
+    private final IngestService ingestService;
 
     @Inject
     public TransportStartTransformAction(
@@ -83,7 +85,8 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
         TransformServices transformServices,
         PersistentTasksService persistentTasksService,
         Client client,
-        Settings settings
+        Settings settings,
+        IngestService ingestService
     ) {
         this(
             StartTransformAction.NAME,
@@ -96,7 +99,8 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
             transformServices,
             persistentTasksService,
             client,
-            settings
+            settings,
+            ingestService
         );
     }
 
@@ -111,7 +115,8 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
         TransformServices transformServices,
         PersistentTasksService persistentTasksService,
         Client client,
-        Settings settings
+        Settings settings,
+        IngestService ingestService
     ) {
         super(
             name,
@@ -136,6 +141,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
             clusterService.getNodeName(),
             License.OperationMode.BASIC.description()
         );
+        this.ingestService = ingestService;
     }
 
     @Override
@@ -258,6 +264,16 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
             }
             transformTaskHolder.set(createTransform(config.getId(), config.getVersion(), config.getFrequency()));
             transformConfigHolder.set(config);
+            if (config.getDestination().getPipeline() != null) {
+                if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
+                    listener.onFailure(new ElasticsearchStatusException(
+                        TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()),
+                        RestStatus.BAD_REQUEST
+                        )
+                    );
+                    return;
+                }
+            }
 
             sourceDestValidator.validate(
                 clusterService.state(),

+ 5 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportPutTransformActionDeprecated.java

@@ -12,6 +12,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -31,7 +32,8 @@ public class TransportPutTransformActionDeprecated extends TransportPutTransform
         ClusterService clusterService,
         XPackLicenseState licenseState,
         TransformServices transformServices,
-        Client client
+        Client client,
+        IngestService ingestService
     ) {
         super(
             PutTransformActionDeprecated.NAME,
@@ -43,7 +45,8 @@ public class TransportPutTransformActionDeprecated extends TransportPutTransform
             clusterService,
             licenseState,
             transformServices,
-            client
+            client,
+            ingestService
         );
     }
 

+ 5 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportStartTransformActionDeprecated.java

@@ -12,6 +12,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.persistent.PersistentTasksService;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -33,7 +34,8 @@ public class TransportStartTransformActionDeprecated extends TransportStartTrans
         TransformServices transformServices,
         PersistentTasksService persistentTasksService,
         Client client,
-        Settings settings
+        Settings settings,
+        IngestService ingestService
     ) {
         super(
             StartTransformActionDeprecated.NAME,
@@ -46,7 +48,8 @@ public class TransportStartTransformActionDeprecated extends TransportStartTrans
             transformServices,
             persistentTasksService,
             client,
-            settings
+            settings,
+            ingestService
         );
     }
 }