Browse Source

Add the ability to require an ingest pipeline (#46847)

This commit adds the ability to require an ingest pipeline on an
index. Today we can have a default pipeline, but that could be
overridden by a request pipeline parameter. This commit introduces a new
index setting index.required_pipeline that acts similarly to
index.default_pipeline, except that it can not be overridden by a
request pipeline parameter. Additionally, a default pipeline and a
request pipeline can not both be set. The required pipeline can be set
to _none to ensure that no pipeline ever runs for index requests on that
index.
Jason Tedor 6 years ago
parent
commit
19b710a02f

+ 8 - 1
docs/reference/index-modules.asciidoc

@@ -234,13 +234,20 @@ specific index module:
     The length of time that a <<delete-versioning,deleted document's version number>> remains available for <<index-versioning,further versioned operations>>.
     Defaults to `60s`.
 
-  `index.default_pipeline`::
+ `index.default_pipeline`::
 
     The default <<ingest,ingest node>> pipeline for this index. Index requests will fail
     if the default pipeline is set and the pipeline does not exist. The default may be
     overridden using the `pipeline` parameter. The special pipeline name `_none` indicates
     no ingest pipeline should be run.
 
+ `index.required_pipeline`::
+    The required <<ingest,ingest node>> pipeline for this index. Index requests
+    will fail if the required pipeline is set and the pipeline does not exist.
+    The required pipeline can not be overridden with the `pipeline` parameter. A
+    default pipeline and a required pipeline can not both be set. The special
+    pipeline name `_none` indicates no ingest pipeline will run.
+
 [float]
 === Settings in other index modules
 

+ 175 - 0
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml

@@ -0,0 +1,175 @@
+---
+teardown:
+  - do:
+      ingest.delete_pipeline:
+        id: "my_pipeline"
+        ignore: 404
+
+---
+"Test index with required pipeline":
+  - do:
+      ingest.put_pipeline:
+        id: "my_pipeline"
+        body:  >
+          {
+            "description": "_description",
+            "processors": [
+              {
+                "bytes" : {
+                  "field" : "bytes_source_field",
+                  "target_field" : "bytes_target_field"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+  # required pipeline via index
+  - do:
+      indices.create:
+        index: test
+        body:
+          settings:
+            index:
+              required_pipeline: "my_pipeline"
+          aliases:
+            test_alias: {}
+
+  - do:
+      index:
+        index: test
+        id: 1
+        body: {bytes_source_field: "1kb"}
+
+  - do:
+      get:
+        index: test
+        id: 1
+  - match: { _source.bytes_source_field: "1kb" }
+  - match: { _source.bytes_target_field: 1024 }
+  # required pipeline via alias
+  - do:
+      index:
+        index: test_alias
+        id: 2
+        body: {bytes_source_field: "1kb"}
+
+  - do:
+      get:
+        index: test
+        id: 2
+  - match: { _source.bytes_source_field: "1kb" }
+  - match: { _source.bytes_target_field: 1024 }
+  # required pipeline via upsert
+  - do:
+      update:
+        index: test
+        id: 3
+        body:
+          script:
+            source: "ctx._source.ran_script = true"
+            lang: "painless"
+          upsert: { "bytes_source_field":"1kb" }
+  - do:
+      get:
+        index: test
+        id: 3
+  - match: { _source.bytes_source_field: "1kb" }
+  - match: { _source.bytes_target_field: 1024 }
+  # required pipeline via scripted upsert
+  - do:
+      update:
+        index: test
+        id: 4
+        body:
+          script:
+            source: "ctx._source.bytes_source_field = '1kb'"
+            lang: "painless"
+          upsert : {}
+          scripted_upsert: true
+  - do:
+      get:
+        index: test
+        id: 4
+  - match: { _source.bytes_source_field: "1kb" }
+  - match: { _source.bytes_target_field: 1024 }
+  # required pipeline via doc_as_upsert
+  - do:
+      update:
+        index: test
+        id: 5
+        body:
+          doc: { "bytes_source_field":"1kb" }
+          doc_as_upsert: true
+  - do:
+      get:
+        index: test
+        id: 5
+  - match: { _source.bytes_source_field: "1kb" }
+  - match: { _source.bytes_target_field: 1024 }
+  # required pipeline via bulk upsert
+  # note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline
+  # needs to be in the upsert, not the script
+  - do:
+      bulk:
+        refresh: true
+        body: |
+          {"update":{"_id":"6","_index":"test"}}
+          {"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
+          {"update":{"_id":"7","_index":"test"}}
+          {"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
+          {"update":{"_id":"8","_index":"test"}}
+          {"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
+          {"update":{"_id":"6_alias","_index":"test_alias"}}
+          {"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
+          {"update":{"_id":"7_alias","_index":"test_alias"}}
+          {"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
+          {"update":{"_id":"8_alias","_index":"test_alias"}}
+          {"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
+
+  - do:
+      mget:
+        body:
+          docs:
+            - { _index: "test", _id: "6" }
+            - { _index: "test", _id: "7" }
+            - { _index: "test", _id: "8" }
+            - { _index: "test", _id: "6_alias" }
+            - { _index: "test", _id: "7_alias" }
+            - { _index: "test", _id: "8_alias" }
+  - match: { docs.0._index: "test" }
+  - match: { docs.0._id: "6" }
+  - match: { docs.0._source.bytes_source_field: "1kb" }
+  - match: { docs.0._source.bytes_target_field: 1024 }
+  - is_false: docs.0._source.ran_script
+  - match: { docs.1._index: "test" }
+  - match: { docs.1._id: "7" }
+  - match: { docs.1._source.bytes_source_field: "2kb" }
+  - match: { docs.1._source.bytes_target_field: 2048 }
+  - match: { docs.2._index: "test" }
+  - match: { docs.2._id: "8" }
+  - match: { docs.2._source.bytes_source_field: "3kb" }
+  - match: { docs.2._source.bytes_target_field: 3072 }
+  - match: { docs.2._source.ran_script: true }
+  - match: { docs.3._index: "test" }
+  - match: { docs.3._id: "6_alias" }
+  - match: { docs.3._source.bytes_source_field: "1kb" }
+  - match: { docs.3._source.bytes_target_field: 1024 }
+  - is_false: docs.3._source.ran_script
+  - match: { docs.4._index: "test" }
+  - match: { docs.4._id: "7_alias" }
+  - match: { docs.4._source.bytes_source_field: "2kb" }
+  - match: { docs.4._source.bytes_target_field: 2048 }
+  - match: { docs.5._index: "test" }
+  - match: { docs.5._id: "8_alias" }
+  - match: { docs.5._source.bytes_source_field: "3kb" }
+  - match: { docs.5._source.bytes_target_field: 3072 }
+  - match: { docs.5._source.ran_script: true }
+
+  # bad request, request pipeline can not be specified
+  - do:
+      catch: /illegal_argument_exception.*request pipeline \[pipeline\] can not override required pipeline \[my_pipeline\]/
+      index:
+        index: test
+        id: 9
+        pipeline: "pipeline"
+        body: {bytes_source_field: "1kb"}

+ 83 - 18
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -21,6 +21,7 @@ package org.elasticsearch.action.bulk;
 
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.lucene.util.SparseFixedBitSet;
+import org.elasticsearch.Assertions;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceAlreadyExistsException;
@@ -76,6 +77,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -156,11 +158,14 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
         for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
             IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
+
             if (indexRequest != null) {
-                // get pipeline from request
-                String pipeline = indexRequest.getPipeline();
-                if (pipeline == null) {
-                    // start to look for default pipeline via settings found in the index meta data
+                if (indexRequest.isPipelineResolved() == false) {
+                    final String requestPipeline = indexRequest.getPipeline();
+                    indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
+                    boolean requestCanOverridePipeline = true;
+                    String requiredPipeline = null;
+                    // start to look for default or required pipelines via settings found in the index meta data
                     IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
                     // check the alias for the index request (this is how normal index requests are modeled)
                     if (indexMetaData == null && indexRequest.index() != null) {
@@ -179,34 +184,86 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                         }
                     }
                     if (indexMetaData != null) {
-                        // Find the default pipeline if one is defined from and existing index.
-                        String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
-                        indexRequest.setPipeline(defaultPipeline);
-                        if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
-                            hasIndexRequestsWithPipelines = true;
+                        final Settings indexSettings = indexMetaData.getSettings();
+                        if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
+                            // find the required pipeline if one is defined from an existing index
+                            requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
+                            assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
+                                IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
+                            indexRequest.setPipeline(requiredPipeline);
+                            requestCanOverridePipeline = false;
+                        } else {
+                            // find the default pipeline if one is defined from an existing index
+                            String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
+                            indexRequest.setPipeline(defaultPipeline);
                         }
                     } else if (indexRequest.index() != null) {
-                        // No index exists yet (and is valid request), so matching index templates to look for a default pipeline
+                        // the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
                         List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
                         assert (templates != null);
-                        String defaultPipeline = IngestService.NOOP_PIPELINE_NAME;
-                        // order of templates are highest order first, break if we find a default_pipeline
+                        // order of templates are highest order first, we have to iterate through them all though
+                        String defaultPipeline = null;
                         for (IndexTemplateMetaData template : templates) {
                             final Settings settings = template.settings();
-                            if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
+                            if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
+                                requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
+                                requestCanOverridePipeline = false;
+                                // we can not break in case a lower-order template has a default pipeline that we need to reject
+                            } else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
                                 defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
-                                break;
+                                // we can not break in case a lower-order template has a required pipeline that we need to reject
                             }
                         }
-                        indexRequest.setPipeline(defaultPipeline);
-                        if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
-                            hasIndexRequestsWithPipelines = true;
+                        if (requiredPipeline != null && defaultPipeline != null) {
+                            // we can not have picked up a required and a default pipeline from applying templates
+                            final String message = String.format(
+                                Locale.ROOT,
+                                "required pipeline [%s] and default pipeline [%s] can not both be set",
+                                requiredPipeline,
+                                defaultPipeline);
+                            throw new IllegalArgumentException(message);
+                        }
+                        final String pipeline;
+                        if (requiredPipeline != null) {
+                            pipeline = requiredPipeline;
+                        } else {
+                            pipeline = Objects.requireNonNullElse(defaultPipeline, IngestService.NOOP_PIPELINE_NAME);
+                        }
+                        indexRequest.setPipeline(pipeline);
+                    }
+
+                    if (requestPipeline != null) {
+                        if (requestCanOverridePipeline == false) {
+                            final String message = String.format(
+                                Locale.ROOT,
+                                "request pipeline [%s] can not override required pipeline [%s]",
+                                requestPipeline,
+                                requiredPipeline);
+                            throw new IllegalArgumentException(message);
+                        } else {
+                            indexRequest.setPipeline(requestPipeline);
                         }
                     }
-                } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
+
+                    if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
+                        hasIndexRequestsWithPipelines = true;
+                    }
+                    /*
+                     * We have to track whether or not the pipeline for this request has already been resolved. It can happen that the
+                     * pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request
+                     * has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have
+                     * already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we
+                     * can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been
+                     * set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish
+                     * these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request
+                     * pipeline parameter too.
+                     */
+                    indexRequest.isPipelineResolved(true);
+                } else if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
                     hasIndexRequestsWithPipelines = true;
                 }
             }
+
         }
 
         if (hasIndexRequestsWithPipelines) {
@@ -217,6 +274,14 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                 if (clusterService.localNode().isIngestNode()) {
                     processBulkIndexIngestRequest(task, bulkRequest, listener);
                 } else {
+                    if (Assertions.ENABLED) {
+                        final boolean allAreForwardedRequests = bulkRequest.requests()
+                            .stream()
+                            .map(TransportBulkAction::getIndexWriteRequest)
+                            .filter(Objects::nonNull)
+                            .allMatch(IndexRequest::isPipelineResolved);
+                        assert allAreForwardedRequests : bulkRequest;
+                    }
                     ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
                 }
             } catch (Exception e) {

+ 32 - 4
server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

@@ -102,6 +102,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
 
     private String pipeline;
 
+    private boolean isPipelineResolved;
+
     /**
      * Value for {@link #getAutoGeneratedTimestamp()} if the document has an external
      * provided ID.
@@ -124,6 +126,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         version = in.readLong();
         versionType = VersionType.fromValue(in.readByte());
         pipeline = in.readOptionalString();
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            isPipelineResolved = in.readBoolean();
+        }
         isRetry = in.readBoolean();
         autoGeneratedTimestamp = in.readLong();
         if (in.readBoolean()) {
@@ -249,7 +254,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
     @Override
     public String type() {
         if (type == null) {
-            return MapperService.SINGLE_MAPPING_NAME;                    
+            return MapperService.SINGLE_MAPPING_NAME;
         }
         return type;
     }
@@ -278,7 +283,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
             type = defaultType;
         }
         return this;
-    }      
+    }
     /**
      * The id of the indexed document. If not set, will be automatically generated.
      */
@@ -333,6 +338,26 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         return this.pipeline;
     }
 
+    /**
+     * Sets if the pipeline for this request has been resolved by the coordinating node.
+     *
+     * @param isPipelineResolved true if the pipeline has been resolved
+     * @return the request
+     */
+    public IndexRequest isPipelineResolved(final boolean isPipelineResolved) {
+        this.isPipelineResolved = isPipelineResolved;
+        return this;
+    }
+
+    /**
+     * Returns whether or not the pipeline for this request has been resolved by the coordinating node.
+     *
+     * @return true if the pipeline has been resolved
+     */
+    public boolean isPipelineResolved() {
+        return this.isPipelineResolved;
+    }
+
     /**
      * The source of the document to index, recopied to a new array if it is unsafe.
      */
@@ -616,8 +641,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);
-        // A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions. 
-        // So we use the type accessor method here to make the type non-null (will default it to "_doc"). 
+        // A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions.
+        // So we use the type accessor method here to make the type non-null (will default it to "_doc").
         out.writeOptionalString(type());
         out.writeOptionalString(id);
         out.writeOptionalString(routing);
@@ -626,6 +651,9 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         out.writeLong(version);
         out.writeByte(versionType.getValue());
         out.writeOptionalString(pipeline);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeBoolean(isPipelineResolved);
+        }
         out.writeBoolean(isRetry);
         out.writeLong(autoGeneratedTimestamp);
         if (contentType != null) {

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

@@ -162,6 +162,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
             EngineConfig.INDEX_CODEC_SETTING,
             IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
             IndexSettings.DEFAULT_PIPELINE,
+            IndexSettings.REQUIRED_PIPELINE,
             MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
 
             // validate that built-in similarities don't get redefined

+ 74 - 7
server/src/main/java/org/elasticsearch/index/IndexSettings.java

@@ -35,8 +35,10 @@ import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.node.Node;
 
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -292,12 +294,67 @@ public final class IndexSettings {
         1000, 1, Property.Dynamic, Property.IndexScope);
 
     public static final Setting<String> DEFAULT_PIPELINE =
-       new Setting<>("index.default_pipeline", IngestService.NOOP_PIPELINE_NAME, s -> {
-           if (s == null || s.isEmpty()) {
-               throw new IllegalArgumentException("Value for [index.default_pipeline] must be a non-empty string.");
-           }
-        return s;
-       }, Property.Dynamic, Property.IndexScope);
+        new Setting<>("index.default_pipeline",
+        IngestService.NOOP_PIPELINE_NAME,
+        Function.identity(),
+        new DefaultPipelineValidator(),
+        Property.Dynamic,
+        Property.IndexScope);
+
+    public static final Setting<String> REQUIRED_PIPELINE =
+        new Setting<>("index.required_pipeline",
+            IngestService.NOOP_PIPELINE_NAME,
+            Function.identity(),
+            new RequiredPipelineValidator(),
+            Property.Dynamic,
+            Property.IndexScope);
+
+    static class DefaultPipelineValidator implements Setting.Validator<String> {
+
+        @Override
+        public void validate(final String value) {
+
+        }
+
+        @Override
+        public void validate(final String value, final Map<Setting<String>, String> settings) {
+            final String requiredPipeline = settings.get(IndexSettings.REQUIRED_PIPELINE);
+            if (value.equals(IngestService.NOOP_PIPELINE_NAME) == false
+                && requiredPipeline.equals(IngestService.NOOP_PIPELINE_NAME) == false) {
+                throw new IllegalArgumentException(
+                    "index has a default pipeline [" + value + "] and a required pipeline [" + requiredPipeline + "]");
+            }
+        }
+
+        @Override
+        public Iterator<Setting<String>> settings() {
+            return List.of(REQUIRED_PIPELINE).iterator();
+        }
+
+    }
+
+    static class RequiredPipelineValidator implements Setting.Validator<String> {
+
+        @Override
+        public void validate(final String value) {
+
+        }
+
+        @Override
+        public void validate(final String value, final Map<Setting<String>, String> settings) {
+            final String defaultPipeline = settings.get(IndexSettings.DEFAULT_PIPELINE);
+            if (value.equals(IngestService.NOOP_PIPELINE_NAME) && defaultPipeline.equals(IngestService.NOOP_PIPELINE_NAME) == false) {
+                throw new IllegalArgumentException(
+                    "index has a required pipeline [" + value + "] and a default pipeline [" + defaultPipeline + "]");
+            }
+        }
+
+        @Override
+        public Iterator<Setting<String>> settings() {
+            return List.of(DEFAULT_PIPELINE).iterator();
+        }
+
+    }
 
     /**
      * Marks an index to be searched throttled. This means that never more than one shard of such an index will be searched concurrently
@@ -376,6 +433,7 @@ public final class IndexSettings {
     private volatile int maxAnalyzedOffset;
     private volatile int maxTermsCount;
     private volatile String defaultPipeline;
+    private volatile String requiredPipeline;
     private volatile boolean searchThrottled;
 
     /**
@@ -545,6 +603,7 @@ public final class IndexSettings {
         scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
         scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
         scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
+        scopedSettings.addSettingsUpdateConsumer(REQUIRED_PIPELINE, this::setRequiredPipeline);
         scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
         scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled);
         scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, this::setRetentionLeaseMillis);
@@ -736,7 +795,7 @@ public final class IndexSettings {
     public void setTranslogSyncInterval(TimeValue translogSyncInterval) {
         this.syncInterval = translogSyncInterval;
     }
-    
+
     /**
      * Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled.
      */
@@ -964,6 +1023,14 @@ public final class IndexSettings {
         this.defaultPipeline = defaultPipeline;
     }
 
+    public String getRequiredPipeline() {
+        return requiredPipeline;
+    }
+
+    public void setRequiredPipeline(final String requiredPipeline) {
+        this.requiredPipeline = requiredPipeline;
+    }
+
     /**
      * Returns <code>true</code> if soft-delete is enabled.
      */

+ 4 - 0
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java

@@ -27,6 +27,7 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -106,6 +107,9 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
         ClusterState state = mock(ClusterState.class);
         when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
         when(clusterService.state()).thenReturn(state);
+        DiscoveryNode localNode = mock(DiscoveryNode.class);
+        when(clusterService.localNode()).thenReturn(localNode);
+        when(localNode.isIngestNode()).thenReturn(randomBoolean());
         final ThreadPool threadPool = mock(ThreadPool.class);
         final ExecutorService direct = EsExecutors.newDirectExecutorService();
         when(threadPool.executor(anyString())).thenReturn(direct);

+ 133 - 0
server/src/test/java/org/elasticsearch/index/RequiredPipelineIT.java

@@ -0,0 +1,133 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index;
+
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ESIntegTestCase;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasToString;
+
+public class RequiredPipelineIT extends ESIntegTestCase {
+
+    public void testRequiredPipeline() {
+        final Settings settings = Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build();
+        createIndex("index", settings);
+
+        // this asserts that the required_pipeline was used, without us having to actually create the pipeline etc.
+        final IllegalArgumentException e = expectThrows(
+            IllegalArgumentException.class,
+            () -> client().prepareIndex("index", "_doc", "1").setSource(Map.of("field", "value")).get());
+        assertThat(e, hasToString(containsString("pipeline with id [required_pipeline] does not exist")));
+    }
+
+    public void testDefaultAndRequiredPipeline() {
+        final Settings settings = Settings.builder()
+            .put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
+            .put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline")
+            .build();
+        final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> createIndex("index", settings));
+        assertThat(
+            e,
+            hasToString(containsString("index has a default pipeline [default_pipeline] and a required pipeline [required_pipeline]")));
+    }
+
+    public void testDefaultAndRequiredPipelineFromTemplates() {
+        final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1);
+        final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE);
+        final int requiredPipelineOrder;
+        final int defaultPipelineOrder;
+        if (randomBoolean()) {
+            defaultPipelineOrder = lowOrder;
+            requiredPipelineOrder = highOrder;
+        } else {
+            defaultPipelineOrder = highOrder;
+            requiredPipelineOrder = lowOrder;
+        }
+        final Settings defaultPipelineSettings =
+            Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
+        admin().indices()
+            .preparePutTemplate("default")
+            .setPatterns(List.of("index*"))
+            .setOrder(defaultPipelineOrder)
+            .setSettings(defaultPipelineSettings)
+            .get();
+        final Settings requiredPipelineSettings =
+            Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build();
+        admin().indices()
+            .preparePutTemplate("required")
+            .setPatterns(List.of("index*"))
+            .setOrder(requiredPipelineOrder)
+            .setSettings(requiredPipelineSettings)
+            .get();
+        final IllegalArgumentException e = expectThrows(
+            IllegalArgumentException.class,
+            () -> client().prepareIndex("index", "_doc", "1").setSource(Map.of("field", "value")).get());
+        assertThat(
+            e,
+            hasToString(containsString(
+                "required pipeline [required_pipeline] and default pipeline [default_pipeline] can not both be set")));
+    }
+
+    public void testHighOrderRequiredPipelinePreferred() throws IOException {
+        final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1);
+        final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE);
+        final Settings defaultPipelineSettings =
+            Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "low_order_required_pipeline").build();
+        admin().indices()
+            .preparePutTemplate("default")
+            .setPatterns(List.of("index*"))
+            .setOrder(lowOrder)
+            .setSettings(defaultPipelineSettings)
+            .get();
+        final Settings requiredPipelineSettings =
+            Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "high_order_required_pipeline").build();
+        admin().indices()
+            .preparePutTemplate("required")
+            .setPatterns(List.of("index*"))
+            .setOrder(highOrder)
+            .setSettings(requiredPipelineSettings)
+            .get();
+
+        // this asserts that the high_order_required_pipeline was selected, without us having to actually create the pipeline etc.
+        final IllegalArgumentException e = expectThrows(
+            IllegalArgumentException.class,
+            () -> client().prepareIndex("index", "_doc", "1").setSource(Map.of("field", "value")).get());
+        assertThat(e, hasToString(containsString("pipeline with id [high_order_required_pipeline] does not exist")));
+    }
+
+    public void testRequiredPipelineAndRequestPipeline() {
+        final Settings settings = Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build();
+        createIndex("index", settings);
+        final IndexRequestBuilder builder = client().prepareIndex("index", "_doc", "1");
+        builder.setSource(Map.of("field", "value"));
+        builder.setPipeline("request_pipeline");
+        final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::get);
+        assertThat(
+            e,
+            hasToString(containsString("request pipeline [request_pipeline] can not override required pipeline [required_pipeline]")));
+    }
+
+}

+ 1 - 0
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java

@@ -390,6 +390,7 @@ public class TransportResumeFollowAction extends TransportMasterNodeAction<Resum
             IndexSettings.MAX_TOKEN_COUNT_SETTING,
             IndexSettings.MAX_SLICES_PER_SCROLL,
             IndexSettings.DEFAULT_PIPELINE,
+            IndexSettings.REQUIRED_PIPELINE,
             IndexSettings.INDEX_SEARCH_THROTTLED,
             IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
             IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,