Browse Source

INGEST: Enable default pipelines (#32286)

* INGEST: Enable default pipelines

* Add `default_pipeline` index setting
* `_none` is interpreted as no pipeline
* closes #21101
Armin Braun 7 years ago
parent
commit
be31cc642b

+ 73 - 0
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml

@@ -0,0 +1,73 @@
+---
+teardown:
+  - do:
+      ingest.delete_pipeline:
+        id: "my_pipeline"
+        ignore: 404
+
+---
+"Test index with default pipeline":
+  - do:
+      ingest.put_pipeline:
+        id: "my_pipeline"
+        body:  >
+          {
+            "description": "_description",
+            "processors": [
+              {
+                "bytes" : {
+                  "field" : "bytes_source_field",
+                  "target_field" : "bytes_target_field"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      indices.create:
+        index: test
+        body:
+          settings:
+            index:
+              default_pipeline: "my_pipeline"
+
+  - do:
+      index:
+        index: test
+        type: test
+        id: 1
+        body: {bytes_source_field: "1kb"}
+
+  - do:
+      get:
+        index: test
+        type: test
+        id: 1
+  - match: { _source.bytes_source_field: "1kb" }
+  - match: { _source.bytes_target_field: 1024 }
+
+  - do:
+      index:
+        index: test
+        type: test
+        id: 2
+        pipeline: "_none"
+        body: {bytes_source_field: "1kb"}
+        
+  - do:
+      get:
+        index: test
+        type: test
+        id: 2
+  - match: { _source.bytes_source_field: "1kb" }
+  - is_false: _source.bytes_target_field
+  
+  - do:
+      catch:  bad_request
+      index:
+        index: test
+        type: test
+        id: 3
+        pipeline: ""
+        body: {bytes_source_field: "1kb"}

+ 0 - 16
server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

@@ -523,22 +523,6 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
         return -1;
     }
 
-    /**
-     * @return Whether this bulk request contains index request with an ingest pipeline enabled.
-     */
-    public boolean hasIndexRequestsWithPipelines() {
-        for (DocWriteRequest<?> actionRequest : requests) {
-            if (actionRequest instanceof IndexRequest) {
-                IndexRequest indexRequest = (IndexRequest) actionRequest;
-                if (Strings.hasText(indexRequest.getPipeline())) {
-                    return true;
-                }
-            }
-        }
-
-        return false;
-    }
-
     @Override
     public ActionRequestValidationException validate() {
         ActionRequestValidationException validationException = null;

+ 25 - 1
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -47,6 +47,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
@@ -54,6 +55,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndexClosedException;
@@ -125,7 +127,29 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
 
     @Override
     protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
-        if (bulkRequest.hasIndexRequestsWithPipelines()) {
+        boolean hasIndexRequestsWithPipelines = false;
+        ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
+        for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
+            if (actionRequest instanceof IndexRequest) {
+                IndexRequest indexRequest = (IndexRequest) actionRequest;
+                String pipeline = indexRequest.getPipeline();
+                if (pipeline == null) {
+                    IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
+                    if (indexMetaData == null) {
+                        indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
+                    } else {
+                        String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
+                        indexRequest.setPipeline(defaultPipeline);
+                        if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
+                            hasIndexRequestsWithPipelines = true;
+                        }
+                    }
+                } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
+                    hasIndexRequestsWithPipelines = true;
+                }
+            }
+        }
+        if (hasIndexRequestsWithPipelines) {
             if (clusterService.localNode().isIngestNode()) {
                 processBulkIndexIngestRequest(task, bulkRequest, listener);
             } else {

+ 4 - 0
server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

@@ -185,6 +185,10 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
             validationException = addValidationError("an id must be provided if version type or value are set", validationException);
         }
 
+        if (pipeline != null && pipeline.isEmpty()) {
+            validationException = addValidationError("pipeline cannot be an empty string", validationException);
+        }
+
         return validationException;
     }
 

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

@@ -155,6 +155,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
         EngineConfig.INDEX_CODEC_SETTING,
         EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
         IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
+        IndexSettings.DEFAULT_PIPELINE,
 
         // validate that built-in similarities don't get redefined
         Setting.groupSetting("index.similarity.", (s) -> {

+ 20 - 0
server/src/main/java/org/elasticsearch/index/IndexSettings.java

@@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.node.Node;
 
 import java.util.Collections;
@@ -254,6 +255,14 @@ public final class IndexSettings {
     public static final Setting<Integer> MAX_REGEX_LENGTH_SETTING = Setting.intSetting("index.max_regex_length",
         1000, 1, Property.Dynamic, Property.IndexScope);
 
+    public static final Setting<String> DEFAULT_PIPELINE =
+       new Setting<>("index.default_pipeline", IngestService.NOOP_PIPELINE_NAME, s -> {
+           if (s == null || s.isEmpty()) {
+               throw new IllegalArgumentException("Value for [index.default_pipeline] must be a non-empty string.");
+           }
+        return s;
+       }, Property.Dynamic, Property.IndexScope);
+
     private final Index index;
     private final Version version;
     private final Logger logger;
@@ -293,6 +302,7 @@ public final class IndexSettings {
     private volatile TimeValue searchIdleAfter;
     private volatile int maxAnalyzedOffset;
     private volatile int maxTermsCount;
+    private volatile String defaultPipeline;
 
     /**
      * The maximum number of refresh listeners allows on this shard.
@@ -408,6 +418,7 @@ public final class IndexSettings {
         this.mergePolicyConfig = new MergePolicyConfig(logger, this);
         this.indexSortConfig = new IndexSortConfig(this);
         searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
+        defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE);
 
         scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
         scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed);
@@ -446,6 +457,7 @@ public final class IndexSettings {
         scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
         scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
         scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
+        scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
     }
 
     private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }
@@ -821,4 +833,12 @@ public final class IndexSettings {
      * Returns the time that an index shard becomes search idle unless it's accessed in between
      */
     public TimeValue getSearchIdleAfter() { return searchIdleAfter; }
+
+    public String getDefaultPipeline() {
+        return defaultPipeline;
+    }
+
+    public void setDefaultPipeline(String defaultPipeline) {
+        this.defaultPipeline = defaultPipeline;
+    }
 }

+ 3 - 0
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -39,6 +39,9 @@ import org.elasticsearch.threadpool.ThreadPool;
  * Holder class for several ingest related services.
  */
 public class IngestService {
+
+    public static final String NOOP_PIPELINE_NAME = "_none";
+
     private final PipelineStore pipelineStore;
     private final PipelineExecutionService pipelineExecutionService;
 

+ 6 - 3
server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java

@@ -24,7 +24,6 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterStateApplier;
-import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.metrics.CounterMetric;
 import org.elasticsearch.common.metrics.MeanMetric;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -73,12 +72,16 @@ public class PipelineExecutionService implements ClusterStateApplier {
                         UpdateRequest updateRequest = (UpdateRequest) actionRequest;
                         indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
                     }
-                    if (indexRequest != null && Strings.hasText(indexRequest.getPipeline())) {
+                    if (indexRequest == null) {
+                        continue;
+                    }
+                    String pipeline = indexRequest.getPipeline();
+                    if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
                         try {
                             innerExecute(indexRequest, getPipeline(indexRequest.getPipeline()));
                             //this shouldn't be needed here but we do it for consistency with index api
                             // which requires it to prevent double execution
-                            indexRequest.setPipeline(null);
+                            indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
                         } catch (Exception e) {
                             itemFailureHandler.accept(indexRequest, e);
                         }

+ 7 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java

@@ -26,6 +26,7 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
@@ -45,6 +46,7 @@ import java.util.function.Function;
 import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCase {
     public void testNonExceptional() {
@@ -97,7 +99,11 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
 
     private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
             BulkRequest bulkRequest, Function<String, Boolean> shouldAutoCreate) {
-        TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), mock(ClusterService.class),
+        ClusterService clusterService = mock(ClusterService.class);
+        ClusterState state = mock(ClusterState.class);
+        when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
+        when(clusterService.state()).thenReturn(state);
+        TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), clusterService,
                 null, null, null, mock(ActionFilters.class), null, null) {
             @Override
             void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,

+ 51 - 2
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.action.bulk;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.index.IndexAction;
@@ -28,6 +29,8 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateApplier;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -35,6 +38,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.ingest.IngestService;
 import org.elasticsearch.ingest.PipelineExecutionService;
 import org.elasticsearch.tasks.Task;
@@ -68,6 +72,11 @@ import static org.mockito.Mockito.when;
 
 public class TransportBulkActionIngestTests extends ESTestCase {
 
+    /**
+     * Index for which mock settings contain a default pipeline.
+     */
+    private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline";
+
     /** Services needed by bulk action */
     TransportService transportService;
     ClusterService clusterService;
@@ -153,6 +162,15 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         when(nodes.getIngestNodes()).thenReturn(ingestNodes);
         ClusterState state = mock(ClusterState.class);
         when(state.getNodes()).thenReturn(nodes);
+        when(state.getMetaData()).thenReturn(MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()
+            .putAll(
+                Collections.singletonMap(
+                    WITH_DEFAULT_PIPELINE,
+                    IndexMetaData.builder(WITH_DEFAULT_PIPELINE).settings(
+                        settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
+                            .build()
+                    ).numberOfShards(1).numberOfReplicas(1).build()))
+            .build()).build());
         when(clusterService.state()).thenReturn(state);
         doAnswer(invocation -> {
             ClusterChangedEvent event = mock(ClusterChangedEvent.class);
@@ -227,7 +245,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         // now check success
         Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator();
         failureHandler.getValue().accept((IndexRequest)req.next(), exception); // have an exception for our one index request
-        indexRequest2.setPipeline(null); // this is done by the real pipeline execution service when processing
+        indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
         completionHandler.getValue().accept(null);
         assertTrue(action.isExecuted);
         assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
@@ -259,7 +277,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         assertTrue(failureCalled.get());
 
         // now check success
-        indexRequest.setPipeline(null); // this is done by the real pipeline execution service when processing
+        indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
         completionHandler.getValue().accept(null);
         assertTrue(action.isExecuted);
         assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
@@ -359,4 +377,35 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         }
     }
 
+    public void testUseDefaultPipeline() throws Exception {
+        Exception exception = new Exception("fake exception");
+        IndexRequest indexRequest = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id");
+        indexRequest.source(Collections.emptyMap());
+        AtomicBoolean responseCalled = new AtomicBoolean(false);
+        AtomicBoolean failureCalled = new AtomicBoolean(false);
+        singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
+            response -> {
+                responseCalled.set(true);
+            },
+            e -> {
+                assertThat(e, sameInstance(exception));
+                failureCalled.set(true);
+            }));
+
+        // check failure works, and passes through to the listener
+        assertFalse(action.isExecuted); // haven't executed yet
+        assertFalse(responseCalled.get());
+        assertFalse(failureCalled.get());
+        verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
+        completionHandler.getValue().accept(exception);
+        assertTrue(failureCalled.get());
+
+        // now check success
+        indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
+        completionHandler.getValue().accept(null);
+        assertTrue(action.isExecuted);
+        assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
+        verifyZeroInteractions(transportService);
+    }
+
 }

+ 10 - 0
server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java

@@ -199,4 +199,14 @@ public class IndexRequestTests extends ESTestCase {
         assertEquals("index {[index][type][null], source[n/a, actual length: [" + new ByteSizeValue(actualBytes).toString() +
                 "], max length: " + new ByteSizeValue(IndexRequest.MAX_SOURCE_LENGTH_IN_TOSTRING).toString() + "]}", request.toString());
     }
+
+    public void testRejectsEmptyStringPipeline() {
+        IndexRequest request = new IndexRequest("index", "type");
+        request.source("{}", XContentType.JSON);
+        request.setPipeline("");
+        ActionRequestValidationException validate = request.validate();
+        assertThat(validate, notNullValue());
+        assertThat(validate.getMessage(),
+            containsString("pipeline cannot be an empty string"));
+    }
 }