Explorar el Código

ingest: support default pipeline through an alias (#36231)

This commit allows writes that go through an alias to use the default
pipeline defined on the backing index.

Fixes #35817
Jake Landis hace 6 años
padre
commit
190ac8e9bf

+ 20 - 3
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml

@@ -31,6 +31,8 @@ teardown:
           settings:
             index:
               default_pipeline: "my_pipeline"
+          aliases:
+            test_alias: {}
 
   - do:
       index:
@@ -49,9 +51,24 @@ teardown:
 
   - do:
       index:
+        index: test_alias
+        type: test
+        id: 2
+        body: {bytes_source_field: "1kb"}
+
+  - do:
+      get:
         index: test
         type: test
         id: 2
+  - match: { _source.bytes_source_field: "1kb" }
+  - match: { _source.bytes_target_field: 1024 }
+
+  - do:
+      index:
+        index: test
+        type: test
+        id: 3
         pipeline: "_none"
         body: {bytes_source_field: "1kb"}
         
@@ -59,15 +76,15 @@ teardown:
       get:
         index: test
         type: test
-        id: 2
+        id: 3
   - match: { _source.bytes_source_field: "1kb" }
   - is_false: _source.bytes_target_field
-  
+
   - do:
       catch:  bad_request
       index:
         index: test
         type: test
-        id: 3
+        id: 4
         pipeline: ""
         body: {bytes_source_field: "1kb"}

+ 11 - 1
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -44,6 +44,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateObserver;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.AliasOrIndex;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
@@ -203,13 +204,22 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses,
         Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
         boolean hasIndexRequestsWithPipelines = false;
-        ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
+        final MetaData metaData = clusterService.state().getMetaData();
+        ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
         for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
             if (actionRequest instanceof IndexRequest) {
                 IndexRequest indexRequest = (IndexRequest) actionRequest;
                 String pipeline = indexRequest.getPipeline();
                 if (pipeline == null) {
                     IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
+                    if (indexMetaData == null) {
+                        //check the alias
+                        AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
+                        if (indexOrAlias != null && indexOrAlias.isAlias()) {
+                            AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
+                            indexMetaData = alias.getWriteIndex();
+                        }
+                    }
                     if (indexMetaData == null) {
                         indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
                     } else {

+ 20 - 12
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

@@ -31,6 +31,7 @@ import org.elasticsearch.action.support.AutoCreateIndex;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateApplier;
+import org.elasticsearch.cluster.metadata.AliasMetaData;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MetaData;
@@ -80,6 +81,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
      * Index for which mock settings contain a default pipeline.
      */
     private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline";
+    private static final String WITH_DEFAULT_PIPELINE_ALIAS = "alias_for_index_with_default_pipeline";
 
     private static final Settings SETTINGS =
         Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build();
@@ -190,7 +192,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
                     IndexMetaData.builder(WITH_DEFAULT_PIPELINE).settings(
                         settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
                             .build()
-                    ).numberOfShards(1).numberOfReplicas(1).build()))
+                    ).putAlias(AliasMetaData.builder(WITH_DEFAULT_PIPELINE_ALIAS).build()).numberOfShards(1).numberOfReplicas(1).build()))
             .build()).build();
         when(state.getMetaData()).thenReturn(metaData);
         when(state.metaData()).thenReturn(metaData);
@@ -399,15 +401,24 @@ public class TransportBulkActionIngestTests extends ESTestCase {
     }
 
     public void testUseDefaultPipeline() throws Exception {
+        validateDefaultPipeline(new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id"));
+    }
+
+    public void testUseDefaultPipelineWithAlias() throws Exception {
+        validateDefaultPipeline(new IndexRequest(WITH_DEFAULT_PIPELINE_ALIAS, "type", "id"));
+    }
+
+    public void testCreateIndexBeforeRunPipeline() throws Exception {
         Exception exception = new Exception("fake exception");
-        IndexRequest indexRequest = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id");
+        IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
+        indexRequest.setPipeline("testpipeline");
         indexRequest.source(Collections.emptyMap());
         AtomicBoolean responseCalled = new AtomicBoolean(false);
         AtomicBoolean failureCalled = new AtomicBoolean(false);
+        action.needToCheck = true;
+        action.indexCreated = false;
         singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
-            response -> {
-                responseCalled.set(true);
-            },
+            response -> responseCalled.set(true),
             e -> {
                 assertThat(e, sameInstance(exception));
                 failureCalled.set(true);
@@ -429,17 +440,15 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         verifyZeroInteractions(transportService);
     }
 
-    public void testCreateIndexBeforeRunPipeline() throws Exception {
+    private void validateDefaultPipeline(IndexRequest indexRequest) {
         Exception exception = new Exception("fake exception");
-        IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
-        indexRequest.setPipeline("testpipeline");
         indexRequest.source(Collections.emptyMap());
         AtomicBoolean responseCalled = new AtomicBoolean(false);
         AtomicBoolean failureCalled = new AtomicBoolean(false);
-        action.needToCheck = true;
-        action.indexCreated = false;
         singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
-            response -> responseCalled.set(true),
+            response -> {
+                responseCalled.set(true);
+            },
             e -> {
                 assertThat(e, sameInstance(exception));
                 failureCalled.set(true);
@@ -460,5 +469,4 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
         verifyZeroInteractions(transportService);
     }
-
 }