Procházet zdrojové kódy

INGEST: Create Index Before Pipeline Execute (#32786)

* INGEST: Create Index Before Pipeline Execute

* Ensures that indices are created before the default pipeline setting is read to correcly handle the case of an index template containing a default pipeline (without the fix the first document does not get the pipeline applied as explained in #32758)
* closes #32758
Armin Braun před 7 roky
rodič
revize
124c1f1358

+ 44 - 35
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -127,37 +127,6 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
 
     @Override
     protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
-        boolean hasIndexRequestsWithPipelines = false;
-        ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
-        for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
-            if (actionRequest instanceof IndexRequest) {
-                IndexRequest indexRequest = (IndexRequest) actionRequest;
-                String pipeline = indexRequest.getPipeline();
-                if (pipeline == null) {
-                    IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
-                    if (indexMetaData == null) {
-                        indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
-                    } else {
-                        String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
-                        indexRequest.setPipeline(defaultPipeline);
-                        if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
-                            hasIndexRequestsWithPipelines = true;
-                        }
-                    }
-                } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
-                    hasIndexRequestsWithPipelines = true;
-                }
-            }
-        }
-        if (hasIndexRequestsWithPipelines) {
-            if (clusterService.localNode().isIngestNode()) {
-                processBulkIndexIngestRequest(task, bulkRequest, listener);
-            } else {
-                ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
-            }
-            return;
-        }
-
         final long startTime = relativeTime();
         final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
 
@@ -191,7 +160,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
             }
             // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
             if (autoCreateIndices.isEmpty()) {
-                executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
+                executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
             } else {
                 final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
                 for (String index : autoCreateIndices) {
@@ -199,7 +168,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                         @Override
                         public void onResponse(CreateIndexResponse result) {
                             if (counter.decrementAndGet() == 0) {
-                                executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
+                                executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
                             }
                         }
 
@@ -215,7 +184,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                                 }
                             }
                             if (counter.decrementAndGet() == 0) {
-                                executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
+                                executeIngestAndBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
                                     inner.addSuppressed(e);
                                     listener.onFailure(inner);
                                 }), responses, indicesThatCannotBeCreated);
@@ -225,7 +194,47 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
                 }
             }
         } else {
-            executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
+            executeIngestAndBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
+        }
+    }
+
+    private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos,
+        final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses,
+        Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
+        boolean hasIndexRequestsWithPipelines = false;
+        ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
+        for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
+            if (actionRequest instanceof IndexRequest) {
+                IndexRequest indexRequest = (IndexRequest) actionRequest;
+                String pipeline = indexRequest.getPipeline();
+                if (pipeline == null) {
+                    IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
+                    if (indexMetaData == null) {
+                        indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
+                    } else {
+                        String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
+                        indexRequest.setPipeline(defaultPipeline);
+                        if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
+                            hasIndexRequestsWithPipelines = true;
+                        }
+                    }
+                } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
+                    hasIndexRequestsWithPipelines = true;
+                }
+            }
+        }
+        if (hasIndexRequestsWithPipelines) {
+            try {
+                if (clusterService.localNode().isIngestNode()) {
+                    processBulkIndexIngestRequest(task, bulkRequest, listener);
+                } else {
+                    ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
+                }
+            } catch (Exception e) {
+                listener.onFailure(e);
+            }
+        } else {
+            executeBulk(task, bulkRequest, startTimeNanos, listener, responses, indicesThatCannotBeCreated);
         }
     }
 

+ 65 - 6
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

@@ -22,20 +22,25 @@ package org.elasticsearch.action.bulk;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.index.IndexAction;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.AutoCreateIndex;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateApplier;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexSettings;
@@ -77,6 +82,9 @@ public class TransportBulkActionIngestTests extends ESTestCase {
      */
     private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline";
 
+    private static final Settings SETTINGS =
+        Settings.builder().put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), true).build();
+
     /** Services needed by bulk action */
     TransportService transportService;
     ClusterService clusterService;
@@ -112,25 +120,42 @@ public class TransportBulkActionIngestTests extends ESTestCase {
     /** A subclass of the real bulk action to allow skipping real bulk indexing, and marking when it would have happened. */
     class TestTransportBulkAction extends TransportBulkAction {
         boolean isExecuted = false; // set when the "real" bulk execution happens
+
+        boolean needToCheck; // pluggable return value for `needToCheck`
+
+        boolean indexCreated = true; // If set to false, will be set to true by call to createIndex
+
         TestTransportBulkAction() {
-            super(Settings.EMPTY, null, transportService, clusterService, ingestService,
-                null, null, new ActionFilters(Collections.emptySet()), null, null);
+            super(SETTINGS, null, transportService, clusterService, ingestService,
+                null, null, new ActionFilters(Collections.emptySet()), null,
+                new AutoCreateIndex(
+                    SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+                    new IndexNameExpressionResolver(SETTINGS)
+                )
+            );
         }
         @Override
         protected boolean needToCheck() {
-            return false;
+            return needToCheck;
         }
         @Override
         void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener,
                 final AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
+            assertTrue(indexCreated);
             isExecuted = true;
         }
+
+        @Override
+        void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResponse> listener) {
+            indexCreated = true;
+            listener.onResponse(null);
+        }
     }
 
     class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> {
 
         TestSingleItemBulkWriteAction(TestTransportBulkAction bulkAction) {
-            super(Settings.EMPTY, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
+            super(SETTINGS, IndexAction.NAME, TransportBulkActionIngestTests.this.transportService,
                     TransportBulkActionIngestTests.this.clusterService,
                     null, null, null, new ActionFilters(Collections.emptySet()), null,
                     IndexRequest::new, IndexRequest::new, ThreadPool.Names.WRITE, bulkAction, null);
@@ -162,7 +187,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         when(nodes.getIngestNodes()).thenReturn(ingestNodes);
         ClusterState state = mock(ClusterState.class);
         when(state.getNodes()).thenReturn(nodes);
-        when(state.getMetaData()).thenReturn(MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()
+        MetaData metaData = MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()
             .putAll(
                 Collections.singletonMap(
                     WITH_DEFAULT_PIPELINE,
@@ -170,7 +195,9 @@ public class TransportBulkActionIngestTests extends ESTestCase {
                         settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
                             .build()
                     ).numberOfShards(1).numberOfReplicas(1).build()))
-            .build()).build());
+            .build()).build();
+        when(state.getMetaData()).thenReturn(metaData);
+        when(state.metaData()).thenReturn(metaData);
         when(clusterService.state()).thenReturn(state);
         doAnswer(invocation -> {
             ClusterChangedEvent event = mock(ClusterChangedEvent.class);
@@ -408,4 +435,36 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         verifyZeroInteractions(transportService);
     }
 
+    public void testCreateIndexBeforeRunPipeline() throws Exception {
+        Exception exception = new Exception("fake exception");
+        IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
+        indexRequest.setPipeline("testpipeline");
+        indexRequest.source(Collections.emptyMap());
+        AtomicBoolean responseCalled = new AtomicBoolean(false);
+        AtomicBoolean failureCalled = new AtomicBoolean(false);
+        action.needToCheck = true;
+        action.indexCreated = false;
+        singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
+            response -> responseCalled.set(true),
+            e -> {
+                assertThat(e, sameInstance(exception));
+                failureCalled.set(true);
+            }));
+
+        // check failure works, and passes through to the listener
+        assertFalse(action.isExecuted); // haven't executed yet
+        assertFalse(responseCalled.get());
+        assertFalse(failureCalled.get());
+        verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
+        completionHandler.getValue().accept(exception);
+        assertTrue(failureCalled.get());
+
+        // now check success
+        indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
+        completionHandler.getValue().accept(null);
+        assertTrue(action.isExecuted);
+        assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
+        verifyZeroInteractions(transportService);
+    }
+
 }