瀏覽代碼

ingest: support default pipelines + bulk upserts (#36618)

This commit adds support to enable bulk upserts to use an index's
default pipeline. Bulk upsert, doc_as_upsert, and script_as_upsert
are all supported.

However, bulk script_as_upsert has slightly surprising behavior since
the pipeline is executed _before_ the script is evaluated. This means
that the pipeline only has access the data found in the upsert field
of the script_as_upsert. The non-bulk script_as_upsert (existing behavior)
runs the pipeline _after_ the script is executed. This commit
does _not_ attempt to consolidate the bulk and non-bulk behavior for
script_as_upsert.

This commit also adds additional testing for the non-bulk behavior,
which remains unchanged with this commit.

fixes #36219
Jake Landis 6 年之前
父節點
當前提交
384757deff

+ 95 - 6
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml

@@ -23,7 +23,7 @@ teardown:
             ]
           }
   - match: { acknowledged: true }
-
+# default pipeline via index
   - do:
       indices.create:
         index: test
@@ -48,7 +48,7 @@ teardown:
         id: 1
   - match: { _source.bytes_source_field: "1kb" }
   - match: { _source.bytes_target_field: 1024 }
-
+# default pipeline via alias
   - do:
       index:
         index: test_alias
@@ -63,12 +63,101 @@ teardown:
         id: 2
   - match: { _source.bytes_source_field: "1kb" }
   - match: { _source.bytes_target_field: 1024 }
+# default pipeline via upsert
+  - do:
+      update:
+        index: test
+        type: test
+        id: 3
+        body:
+          script:
+            source: "ctx._source.ran_script = true"
+            lang: "painless"
+          upsert: { "bytes_source_field":"1kb" }
+  - do:
+      get:
+        index: test
+        type: test
+        id: 3
+  - match: { _source.bytes_source_field: "1kb" }
+  - match: { _source.bytes_target_field: 1024 }
+# default pipeline via scripted upsert
+  - do:
+      update:
+        index: test
+        type: test
+        id: 4
+        body:
+          script:
+            source: "ctx._source.bytes_source_field = '1kb'"
+            lang: "painless"
+          upsert : {}
+          scripted_upsert: true
+  - do:
+      get:
+        index: test
+        type: test
+        id: 4
+  - match: { _source.bytes_source_field: "1kb" }
+  - match: { _source.bytes_target_field: 1024 }
+# default pipeline via doc_as_upsert
+  - do:
+      update:
+        index: test
+        type: test
+        id: 5
+        body:
+          doc: { "bytes_source_field":"1kb" }
+          doc_as_upsert: true
+  - do:
+      get:
+        index: test
+        type: test
+        id: 5
+  - match: { _source.bytes_source_field: "1kb" }
+  - match: { _source.bytes_target_field: 1024 }
+# default pipeline via bulk upsert
+# note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline
+# needs to be in the upsert, not the script
+  - do:
+      bulk:
+        refresh: true
+        body: |
+          {"update":{"_id":"6","_index":"test","_type":"test"}}
+          {"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
+          {"update":{"_id":"7","_index":"test","_type":"test"}}
+          {"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
+          {"update":{"_id":"8","_index":"test","_type":"test"}}
+          {"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
 
+  - do:
+      mget:
+        body:
+          docs:
+          - { _index: "test", _type: "_doc", _id: "6" }
+          - { _index: "test", _type: "_doc", _id: "7" }
+          - { _index: "test", _type: "_doc", _id: "8" }
+  - match: { docs.0._index: "test" }
+  - match: { docs.0._id: "6" }
+  - match: { docs.0._source.bytes_source_field: "1kb" }
+  - match: { docs.0._source.bytes_target_field: 1024 }
+  - is_false: docs.0._source.ran_script
+  - match: { docs.1._index: "test" }
+  - match: { docs.1._id: "7" }
+  - match: { docs.1._source.bytes_source_field: "2kb" }
+  - match: { docs.1._source.bytes_target_field: 2048 }
+  - match: { docs.2._index: "test" }
+  - match: { docs.2._id: "8" }
+  - match: { docs.2._source.bytes_source_field: "3kb" }
+  - match: { docs.2._source.bytes_target_field: 3072 }
+  - match: { docs.2._source.ran_script: true }
+
+# explicit no default pipeline
   - do:
       index:
         index: test
         type: test
-        id: 3
+        id: 9
         pipeline: "_none"
         body: {bytes_source_field: "1kb"}
         
@@ -76,15 +165,15 @@ teardown:
       get:
         index: test
         type: test
-        id: 3
+        id: 9
   - match: { _source.bytes_source_field: "1kb" }
   - is_false: _source.bytes_target_field
-
+# bad request
   - do:
       catch:  bad_request
       index:
         index: test
         type: test
-        id: 4
+        id: 10
         pipeline: ""
         body: {bytes_source_field: "1kb"}

+ 24 - 6
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -127,6 +127,24 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         clusterService.addStateApplier(this.ingestForwarder);
     }
 
+    /**
+     * Retrieves the {@link IndexRequest} from the provided {@link DocWriteRequest} for index or upsert actions.  Upserts are
+     * modeled as {@link IndexRequest} inside the {@link UpdateRequest}. Ignores {@link org.elasticsearch.action.delete.DeleteRequest}'s
+     *
+     * @param docWriteRequest The request to find the {@link IndexRequest}
+     * @return the found {@link IndexRequest} or {@code null} if one can not be found.
+     */
+    public static IndexRequest getIndexWriteRequest(DocWriteRequest docWriteRequest) {
+        IndexRequest indexRequest = null;
+        if (docWriteRequest instanceof IndexRequest) {
+            indexRequest = (IndexRequest) docWriteRequest;
+        } else if (docWriteRequest instanceof UpdateRequest) {
+            UpdateRequest updateRequest = (UpdateRequest) docWriteRequest;
+            indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
+        }
+        return indexRequest;
+    }
+
     @Override
     protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
         final long startTime = relativeTime();
@@ -207,12 +225,12 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         final MetaData metaData = clusterService.state().getMetaData();
         ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
         for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
-            if (actionRequest instanceof IndexRequest) {
-                IndexRequest indexRequest = (IndexRequest) actionRequest;
+            IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
+            if(indexRequest != null){
                 String pipeline = indexRequest.getPipeline();
                 if (pipeline == null) {
-                    IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
-                    if (indexMetaData == null) {
+                    IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
+                    if (indexMetaData == null && indexRequest.index() != null) {
                         //check the alias
                         AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
                         if (indexOrAlias != null && indexOrAlias.isAlias()) {
@@ -626,7 +644,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         }
 
         void markCurrentItemAsDropped() {
-            IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
+            IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
             failedSlots.set(currentSlot);
             itemResponses.add(
                 new BulkItemResponse(currentSlot, indexRequest.opType(),
@@ -639,7 +657,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
         }
 
         void markCurrentItemAsFailed(Exception e) {
-            IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
+            IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
             // We hit a error during preprocessing a request, so we:
             // 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed
             // 2) Add a bulk item failure for this request

+ 2 - 8
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -24,11 +24,11 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.TransportBulkAction;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.ingest.DeletePipelineRequest;
 import org.elasticsearch.action.ingest.PutPipelineRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
@@ -388,13 +388,7 @@ public class IngestService implements ClusterStateApplier {
             @Override
             protected void doRun() {
                 for (DocWriteRequest<?> actionRequest : actionRequests) {
-                    IndexRequest indexRequest = null;
-                    if (actionRequest instanceof IndexRequest) {
-                        indexRequest = (IndexRequest) actionRequest;
-                    } else if (actionRequest instanceof UpdateRequest) {
-                        UpdateRequest updateRequest = (UpdateRequest) actionRequest;
-                        indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
-                    }
+                    IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
                     if (indexRequest == null) {
                         continue;
                     }

+ 54 - 0
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

@@ -28,6 +28,7 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.AutoCreateIndex;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateApplier;
@@ -408,6 +409,57 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         validateDefaultPipeline(new IndexRequest(WITH_DEFAULT_PIPELINE_ALIAS, "type", "id"));
     }
 
+    public void testUseDefaultPipelineWithBulkUpsert() throws Exception {
+        Exception exception = new Exception("fake exception");
+        BulkRequest bulkRequest = new BulkRequest();
+        IndexRequest indexRequest1 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id1").source(Collections.emptyMap());
+        IndexRequest indexRequest2 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id2").source(Collections.emptyMap());
+        IndexRequest indexRequest3 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id3").source(Collections.emptyMap());
+        UpdateRequest upsertRequest = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id1").upsert(indexRequest1).script(mockScript("1"));
+        UpdateRequest docAsUpsertRequest = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id2").doc(indexRequest2).docAsUpsert(true);
+        // this test only covers the mechanics that scripted bulk upserts will execute a default pipeline. However, in practice scripted
+        // bulk upserts with a default pipeline are a bit surprising since the script executes AFTER the pipeline.
+        UpdateRequest scriptedUpsert = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id2").upsert(indexRequest3).script(mockScript("1"))
+            .scriptedUpsert(true);
+        bulkRequest.add(upsertRequest).add(docAsUpsertRequest).add(scriptedUpsert);
+
+        AtomicBoolean responseCalled = new AtomicBoolean(false);
+        AtomicBoolean failureCalled = new AtomicBoolean(false);
+        assertNull(indexRequest1.getPipeline());
+        assertNull(indexRequest2.getPipeline());
+        assertNull(indexRequest3.getPipeline());
+        action.execute(null, bulkRequest, ActionListener.wrap(
+            response -> {
+                BulkItemResponse itemResponse = response.iterator().next();
+                assertThat(itemResponse.getFailure().getMessage(), containsString("fake exception"));
+                responseCalled.set(true);
+            },
+            e -> {
+                assertThat(e, sameInstance(exception));
+                failureCalled.set(true);
+            }));
+
+        // check failure works, and passes through to the listener
+        assertFalse(action.isExecuted); // haven't executed yet
+        assertFalse(responseCalled.get());
+        assertFalse(failureCalled.get());
+        verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
+        assertEquals(indexRequest1.getPipeline(), "default_pipeline");
+        assertEquals(indexRequest2.getPipeline(), "default_pipeline");
+        assertEquals(indexRequest3.getPipeline(), "default_pipeline");
+        completionHandler.getValue().accept(exception);
+        assertTrue(failureCalled.get());
+
+        // now check success of the transport bulk action
+        indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
+        indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
+        indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
+        completionHandler.getValue().accept(null);
+        assertTrue(action.isExecuted);
+        assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
+        verifyZeroInteractions(transportService);
+    }
+
     public void testCreateIndexBeforeRunPipeline() throws Exception {
         Exception exception = new Exception("fake exception");
         IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
@@ -445,6 +497,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         indexRequest.source(Collections.emptyMap());
         AtomicBoolean responseCalled = new AtomicBoolean(false);
         AtomicBoolean failureCalled = new AtomicBoolean(false);
+        assertNull(indexRequest.getPipeline());
         singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
             response -> {
                 responseCalled.set(true);
@@ -459,6 +512,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         assertFalse(responseCalled.get());
         assertFalse(failureCalled.get());
         verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
+        assertEquals(indexRequest.getPipeline(), "default_pipeline");
         completionHandler.getValue().accept(exception);
         assertTrue(failureCalled.get());
 

+ 21 - 0
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

@@ -23,8 +23,10 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver;
 import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.AutoCreateIndex;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
@@ -132,4 +134,23 @@ public class TransportBulkActionTests extends ESTestCase {
             throw new AssertionError(exception);
         }));
     }
+
+    public void testGetIndexWriteRequest() throws Exception {
+        IndexRequest indexRequest = new IndexRequest("index", "type", "id1").source(Collections.emptyMap());
+        UpdateRequest upsertRequest = new UpdateRequest("index", "type", "id1").upsert(indexRequest).script(mockScript("1"));
+        UpdateRequest docAsUpsertRequest = new UpdateRequest("index", "type", "id2").doc(indexRequest).docAsUpsert(true);
+        UpdateRequest scriptedUpsert = new UpdateRequest("index", "type", "id2").upsert(indexRequest).script(mockScript("1"))
+            .scriptedUpsert(true);
+
+        assertEquals(TransportBulkAction.getIndexWriteRequest(indexRequest), indexRequest);
+        assertEquals(TransportBulkAction.getIndexWriteRequest(upsertRequest), indexRequest);
+        assertEquals(TransportBulkAction.getIndexWriteRequest(docAsUpsertRequest), indexRequest);
+        assertEquals(TransportBulkAction.getIndexWriteRequest(scriptedUpsert), indexRequest);
+
+        DeleteRequest deleteRequest = new DeleteRequest("index", "id");
+        assertNull(TransportBulkAction.getIndexWriteRequest(deleteRequest));
+
+        UpdateRequest badUpsertRequest = new UpdateRequest("index", "type", "id1");
+        assertNull(TransportBulkAction.getIndexWriteRequest(badUpsertRequest));
+    }
 }