Browse Source

Fix combine result for `ingest_took` (#132088) (#132278)

If we combined two `BulkResponse`s with `ingest_took` set to
`NO_INGEST_TOOK` we'd get an `ingest_took` of `-2`. Which doesn't make
any sense. This fixes it to be set to `NO_INGEST_TOOK` properly.
Nik Everett 2 months ago
parent
commit
c8662eb8f9

+ 5 - 0
docs/changelog/132088.yaml

@@ -0,0 +1,5 @@
+pr: 132088
+summary: Fix combine result for `ingest_took`
+area: ES|QL
+type: bug
+issues: []

+ 29 - 0
server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java

@@ -21,6 +21,7 @@ import org.elasticsearch.xcontent.ToXContent;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 
 /**
  * A response of a bulk execution. Holding a response for each item responding (in order) of the
@@ -167,4 +168,32 @@ public class BulkResponse extends ActionResponse implements Iterable<BulkItemRes
             return b;
         }).array(ITEMS, Iterators.forArray(responses)));
     }
+
+    /**
+     * Combine many bulk responses into one.
+     */
+    public static BulkResponse combine(List<BulkResponse> responses) {
+        long tookInMillis = 0;
+        long ingestTookInMillis = NO_INGEST_TOOK;
+        int itemResponseCount = 0;
+        for (BulkResponse response : responses) {
+            tookInMillis += response.getTookInMillis();
+            if (response.getIngestTookInMillis() != NO_INGEST_TOOK) {
+                if (ingestTookInMillis == NO_INGEST_TOOK) {
+                    ingestTookInMillis = 0;
+                }
+                ingestTookInMillis += response.getIngestTookInMillis();
+            }
+            itemResponseCount += response.getItems().length;
+        }
+        BulkItemResponse[] bulkItemResponses = new BulkItemResponse[itemResponseCount];
+        int i = 0;
+        for (BulkResponse response : responses) {
+            for (BulkItemResponse itemResponse : response.getItems()) {
+                bulkItemResponses[i++] = itemResponse;
+            }
+        }
+
+        return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis);
+    }
 }

+ 2 - 22
server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

@@ -191,7 +191,7 @@ public class IncrementalBulkService {
                         @Override
                         public void onResponse(BulkResponse bulkResponse) {
                             handleBulkSuccess(bulkResponse);
-                            listener.onResponse(combineResponses());
+                            listener.onResponse(BulkResponse.combine(responses));
                         }
 
                         @Override
@@ -233,7 +233,7 @@ public class IncrementalBulkService {
             if (globalFailure) {
                 listener.onFailure(bulkActionLevelFailure);
             } else {
-                listener.onResponse(combineResponses());
+                listener.onResponse(BulkResponse.combine(responses));
             }
         }
 
@@ -292,25 +292,5 @@ public class IncrementalBulkService {
                 bulkRequest.setRefreshPolicy(refresh);
             }
         }
-
-        private BulkResponse combineResponses() {
-            long tookInMillis = 0;
-            long ingestTookInMillis = 0;
-            int itemResponseCount = 0;
-            for (BulkResponse response : responses) {
-                tookInMillis += response.getTookInMillis();
-                ingestTookInMillis += response.getIngestTookInMillis();
-                itemResponseCount += response.getItems().length;
-            }
-            BulkItemResponse[] bulkItemResponses = new BulkItemResponse[itemResponseCount];
-            int i = 0;
-            for (BulkResponse response : responses) {
-                for (BulkItemResponse itemResponse : response.getItems()) {
-                    bulkItemResponses[i++] = itemResponse;
-                }
-            }
-
-            return new BulkResponse(bulkItemResponses, tookInMillis, ingestTookInMillis);
-        }
     }
 }

+ 12 - 0
server/src/test/java/org/elasticsearch/action/bulk/BulkResponseTests.java

@@ -144,6 +144,18 @@ public class BulkResponseTests extends ESTestCase {
         }
     }
 
+    public void testCombineNoIngest() {
+        BulkResponse first = new BulkResponse(new BulkItemResponse[0], 1, NO_INGEST_TOOK);
+        BulkResponse second = new BulkResponse(new BulkItemResponse[0], 1, NO_INGEST_TOOK);
+        assertThat(BulkResponse.combine(List.of(first, second)).getIngestTookInMillis(), equalTo(NO_INGEST_TOOK));
+    }
+
+    public void testCombineOneIngest() {
+        BulkResponse first = new BulkResponse(new BulkItemResponse[0], 1, NO_INGEST_TOOK);
+        BulkResponse second = new BulkResponse(new BulkItemResponse[0], 1, 2);
+        assertThat(BulkResponse.combine(List.of(first, second)).getIngestTookInMillis(), equalTo(2L));
+    }
+
     private static Tuple<? extends DocWriteResponse, ? extends DocWriteResponse> success(
         DocWriteRequest.OpType opType,
         XContentType xContentType