Browse Source

Fix wrong result when executing bulk requests with and without pipeline (#60818)

bellengao 5 years ago
parent
commit
651242b207

+ 25 - 0
rest-api-spec/src/main/resources/rest-api-spec/test/bulk/90_pipeline.yml

@@ -0,0 +1,25 @@
+---
+"One request has pipeline and another not":
+  - skip:
+      version: " - 7.99.99"
+      reason: "change after backporting"
+  - do:
+      bulk:
+        refresh: true
+        body:
+          - '{"index": {"_index": "test_index", "_id": "test_id1"}}'
+          - '{"f1": "v1", "f2": 42}'
+          - '{"index": {"_index": "test_index", "_id": "test_id2", "pipeline": "mypipeline"}}'
+          - '{"f1": "v2", "f2": 47}'
+
+  - match: { errors: true }
+  - match: { items.0.index.result: created }
+  - match: { items.1.index.status: 400 }
+  - match: { items.1.index.error.type: illegal_argument_exception }
+  - match: { items.1.index.error.reason: "pipeline with id [mypipeline] does not exist" }
+
+  - do:
+      count:
+        index: test_index
+
+  - match: {count: 1}

+ 2 - 0
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -472,6 +472,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                             onCompletion.accept(originalThread, null);
                         }
                         assert counter.get() >= 0;
+                        i++;
                         continue;
                     }
 
@@ -494,6 +495,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                             onCompletion.accept(originalThread, null);
                         }
                         assert counter.get() >= 0;
+                        i++;
                         continue;
                     }
 

+ 32 - 9
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -636,17 +636,27 @@ public class IngestServiceTests extends ESTestCase {
         clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         final SetOnce<Boolean> failure = new SetOnce<>();
-        final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline(id).setFinalPipeline("_none");
+
+        BulkRequest bulkRequest = new BulkRequest();
+        final IndexRequest indexRequest1 =
+            new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_none").setFinalPipeline("_none");
+        bulkRequest.add(indexRequest1);
+        IndexRequest indexRequest2 =
+            new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline(id).setFinalPipeline("_none");
+        bulkRequest.add(indexRequest2);
+
         final BiConsumer<Integer, Exception> failureHandler = (slot, e) -> {
             assertThat(e.getCause(), instanceOf(IllegalStateException.class));
             assertThat(e.getCause().getMessage(), equalTo("error"));
             failure.set(true);
+            assertThat(slot, equalTo(1));
         };
 
         @SuppressWarnings("unchecked")
         final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
 
-        ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
+        ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler,
+            completionHandler, indexReq -> {});
 
         assertTrue(failure.get());
         verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
@@ -665,11 +675,15 @@ public class IngestServiceTests extends ESTestCase {
 
         BulkRequest bulkRequest = new BulkRequest();
 
-        IndexRequest indexRequest1 = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
+        IndexRequest indexRequest1 =
+            new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_none").setFinalPipeline("_none");
         bulkRequest.add(indexRequest1);
         IndexRequest indexRequest2 =
-            new IndexRequest("_index").id("_id").source(Collections.emptyMap()).setPipeline("does_not_exist").setFinalPipeline("_none");
+            new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
         bulkRequest.add(indexRequest2);
+        IndexRequest indexRequest3 =
+            new IndexRequest("_index").id("_id3").source(Collections.emptyMap()).setPipeline("does_not_exist").setFinalPipeline("_none");
+        bulkRequest.add(indexRequest3);
         @SuppressWarnings("unchecked")
         BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
         @SuppressWarnings("unchecked")
@@ -680,7 +694,7 @@ public class IngestServiceTests extends ESTestCase {
             argThat(new CustomTypeSafeMatcher<>("failure handler was not called with the expected arguments") {
                 @Override
                 protected boolean matchesSafely(Integer item) {
-                    return item == 1;
+                    return item == 2;
                 }
 
             }),
@@ -1176,18 +1190,27 @@ public class IngestServiceTests extends ESTestCase {
         ClusterState previousClusterState = clusterState;
         clusterState = IngestService.innerPut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
-        final IndexRequest indexRequest =
-            new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
+
+        BulkRequest bulkRequest = new BulkRequest();
+        final IndexRequest indexRequest1 =
+            new IndexRequest("_index").id("_id1").source(Collections.emptyMap()).setPipeline("_none").setFinalPipeline("_none");
+        bulkRequest.add(indexRequest1);
+
+        IndexRequest indexRequest2 =
+            new IndexRequest("_index").id("_id2").source(Collections.emptyMap()).setPipeline("_id").setFinalPipeline("_none");
+        bulkRequest.add(indexRequest2);
+
         @SuppressWarnings("unchecked")
         final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
         @SuppressWarnings("unchecked")
         final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
         @SuppressWarnings("unchecked")
         final IntConsumer dropHandler = mock(IntConsumer.class);
-        ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, dropHandler);
+        ingestService.executeBulkRequest(bulkRequest.numberOfActions(), bulkRequest.requests(), failureHandler,
+            completionHandler, dropHandler);
         verify(failureHandler, never()).accept(any(), any());
         verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
-        verify(dropHandler, times(1)).accept(0);
+        verify(dropHandler, times(1)).accept(1);
     }
 
     public void testIngestClusterStateListeners_orderOfExecution() {