Ver código fonte

Parse bulk lines in individual steps (#114086) (#116210)

Currently our incremental bulk parsing framework only parses once both
the action line and document line are available. In addition, it will
re-search lines for line delimiters as data is received. This commit
ensures that the state is not lost in between parse attempts.
Tim Brooks 11 meses atrás
pai
commit
735e6355a9

+ 186 - 88
server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java

@@ -86,13 +86,13 @@ public final class BulkRequestParser {
             .withRestApiVersion(restApiVersion);
     }
 
-    private static int findNextMarker(byte marker, int from, BytesReference data, boolean isIncremental) {
+    private static int findNextMarker(byte marker, int from, BytesReference data, boolean lastData) {
         final int res = data.indexOf(marker, from);
         if (res != -1) {
             assert res >= 0;
             return res;
         }
-        if (from != data.length() && isIncremental == false) {
+        if (from != data.length() && lastData) {
             throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]");
         }
         return res;
@@ -137,13 +137,7 @@ public final class BulkRequestParser {
         Consumer<UpdateRequest> updateRequestConsumer,
         Consumer<DeleteRequest> deleteRequestConsumer
     ) throws IOException {
-        // Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
-        // deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
-        // reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
-        final Map<String, String> stringDeduplicator = new HashMap<>();
-
-        incrementalParse(
-            data,
+        IncrementalParser incrementalParser = new IncrementalParser(
             defaultIndex,
             defaultRouting,
             defaultFetchSourceContext,
@@ -155,53 +149,164 @@ public final class BulkRequestParser {
             xContentType,
             indexRequestConsumer,
             updateRequestConsumer,
-            deleteRequestConsumer,
-            false,
-            stringDeduplicator
+            deleteRequestConsumer
         );
+
+        incrementalParser.parse(data, true);
     }
 
-    public int incrementalParse(
-        BytesReference data,
-        String defaultIndex,
-        String defaultRouting,
-        FetchSourceContext defaultFetchSourceContext,
-        String defaultPipeline,
-        Boolean defaultRequireAlias,
-        Boolean defaultRequireDataStream,
-        Boolean defaultListExecutedPipelines,
+    public IncrementalParser incrementalParser(
+        @Nullable String defaultIndex,
+        @Nullable String defaultRouting,
+        @Nullable FetchSourceContext defaultFetchSourceContext,
+        @Nullable String defaultPipeline,
+        @Nullable Boolean defaultRequireAlias,
+        @Nullable Boolean defaultRequireDataStream,
+        @Nullable Boolean defaultListExecutedPipelines,
         boolean allowExplicitIndex,
         XContentType xContentType,
         BiConsumer<IndexRequest, String> indexRequestConsumer,
         Consumer<UpdateRequest> updateRequestConsumer,
-        Consumer<DeleteRequest> deleteRequestConsumer,
-        boolean isIncremental,
-        Map<String, String> stringDeduplicator
-    ) throws IOException {
-        XContent xContent = xContentType.xContent();
-        byte marker = xContent.bulkSeparator();
+        Consumer<DeleteRequest> deleteRequestConsumer
+    ) {
+        return new IncrementalParser(
+            defaultIndex,
+            defaultRouting,
+            defaultFetchSourceContext,
+            defaultPipeline,
+            defaultRequireAlias,
+            defaultRequireDataStream,
+            defaultListExecutedPipelines,
+            allowExplicitIndex,
+            xContentType,
+            indexRequestConsumer,
+            updateRequestConsumer,
+            deleteRequestConsumer
+        );
+    }
+
+    public class IncrementalParser {
+
+        // Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
+        // deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
+        // reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
+        private final Map<String, String> stringDeduplicator = new HashMap<>();
+
+        private final String defaultIndex;
+        private final String defaultRouting;
+        private final FetchSourceContext defaultFetchSourceContext;
+        private final String defaultPipeline;
+        private final Boolean defaultRequireAlias;
+        private final Boolean defaultRequireDataStream;
+        private final Boolean defaultListExecutedPipelines;
+        private final boolean allowExplicitIndex;
+
+        private final XContentType xContentType;
+        private final byte marker;
+        private final BiConsumer<IndexRequest, String> indexRequestConsumer;
+        private final Consumer<UpdateRequest> updateRequestConsumer;
+        private final Consumer<DeleteRequest> deleteRequestConsumer;
+
+        private Exception failure = null;
+        private int incrementalFromOffset = 0;
+        private int line = 0;
         boolean typesDeprecationLogged = false;
 
-        int line = 0;
-        int from = 0;
-        int consumed = 0;
+        private DocWriteRequest<?> currentRequest = null;
+        private String currentType = null;
+        private String currentPipeline = null;
+        private boolean currentListExecutedPipelines = false;
+        private FetchSourceContext currentFetchSourceContext = null;
 
-        while (true) {
-            int nextMarker = findNextMarker(marker, from, data, isIncremental);
-            if (nextMarker == -1) {
-                break;
+        private IncrementalParser(
+            @Nullable String defaultIndex,
+            @Nullable String defaultRouting,
+            @Nullable FetchSourceContext defaultFetchSourceContext,
+            @Nullable String defaultPipeline,
+            @Nullable Boolean defaultRequireAlias,
+            @Nullable Boolean defaultRequireDataStream,
+            @Nullable Boolean defaultListExecutedPipelines,
+            boolean allowExplicitIndex,
+            XContentType xContentType,
+            BiConsumer<IndexRequest, String> indexRequestConsumer,
+            Consumer<UpdateRequest> updateRequestConsumer,
+            Consumer<DeleteRequest> deleteRequestConsumer
+        ) {
+            this.defaultIndex = defaultIndex;
+            this.defaultRouting = defaultRouting;
+            this.defaultFetchSourceContext = defaultFetchSourceContext;
+            this.defaultPipeline = defaultPipeline;
+            this.defaultRequireAlias = defaultRequireAlias;
+            this.defaultRequireDataStream = defaultRequireDataStream;
+            this.defaultListExecutedPipelines = defaultListExecutedPipelines;
+            this.allowExplicitIndex = allowExplicitIndex;
+            this.xContentType = xContentType;
+            this.marker = xContentType.xContent().bulkSeparator();
+            this.indexRequestConsumer = indexRequestConsumer;
+            this.updateRequestConsumer = updateRequestConsumer;
+            this.deleteRequestConsumer = deleteRequestConsumer;
+        }
+
+        public int parse(BytesReference data, boolean lastData) throws IOException {
+            if (failure != null) {
+                assert false : failure.getMessage();
+                throw new IllegalStateException("Parser has already encountered exception", failure);
+            }
+            try {
+                return tryParse(data, lastData);
+            } catch (Exception e) {
+                failure = e;
+                throw e;
             }
-            line++;
+        }
+
+        private int tryParse(BytesReference data, boolean lastData) throws IOException {
+            int from = 0;
+            int consumed = 0;
+
+            while (true) {
+                int nextMarker = findNextMarker(marker, incrementalFromOffset, data, lastData);
+                if (nextMarker == -1) {
+                    incrementalFromOffset = data.length() - consumed;
+                    break;
+                }
+                incrementalFromOffset = nextMarker + 1;
+                line++;
+
+                if (currentRequest == null) {
+                    if (parseActionLine(data, from, nextMarker)) {
+                        if (currentRequest instanceof DeleteRequest deleteRequest) {
+                            deleteRequestConsumer.accept(deleteRequest);
+                            currentRequest = null;
+                        }
+                    }
+                } else {
+                    parseAndConsumeDocumentLine(data, from, nextMarker);
+                    currentRequest = null;
+                }
 
-            // now parse the action
-            try (XContentParser parser = createParser(xContent, data, from, nextMarker)) {
-                // move pointers
                 from = nextMarker + 1;
+                consumed = from;
+            }
+
+            return lastData ? from : consumed;
+        }
+
+        private boolean parseActionLine(BytesReference data, int from, int to) throws IOException {
+            assert currentRequest == null;
+
+            // Reset the fields which are accessed during document line parsing
+            currentType = null;
+            currentPipeline = defaultPipeline;
+            currentListExecutedPipelines = defaultListExecutedPipelines != null && defaultListExecutedPipelines;
+            currentFetchSourceContext = defaultFetchSourceContext;
+
+            try (XContentParser parser = createParser(xContentType.xContent(), data, from, to)) {
 
                 // Move to START_OBJECT
                 XContentParser.Token token = parser.nextToken();
                 if (token == null) {
-                    continue;
+                    return false;
                 }
                 if (token != XContentParser.Token.START_OBJECT) {
                     throw new IllegalArgumentException(
@@ -239,20 +344,16 @@ public final class BulkRequestParser {
                 }
 
                 String index = defaultIndex;
-                String type = null;
                 String id = null;
                 String routing = defaultRouting;
-                FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
                 String opType = null;
                 long version = Versions.MATCH_ANY;
                 VersionType versionType = VersionType.INTERNAL;
                 long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
                 long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
                 int retryOnConflict = 0;
-                String pipeline = defaultPipeline;
                 boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
                 boolean requireDataStream = defaultRequireDataStream != null && defaultRequireDataStream;
-                boolean listExecutedPipelines = defaultListExecutedPipelines != null && defaultListExecutedPipelines;
                 Map<String, String> dynamicTemplates = Map.of();
 
                 // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
@@ -283,7 +384,7 @@ public final class BulkRequestParser {
                                             "Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
                                         );
                                     }
-                                type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
+                                currentType = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
                             } else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
                                 id = parser.text();
                             } else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
@@ -301,15 +402,15 @@ public final class BulkRequestParser {
                             } else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) {
                                 retryOnConflict = parser.intValue();
                             } else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
-                                pipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
+                                currentPipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
                             } else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
-                                fetchSourceContext = FetchSourceContext.fromXContent(parser);
+                                currentFetchSourceContext = FetchSourceContext.fromXContent(parser);
                             } else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) {
                                 requireAlias = parser.booleanValue();
                             } else if (REQUIRE_DATA_STREAM.match(currentFieldName, parser.getDeprecationHandler())) {
                                 requireDataStream = parser.booleanValue();
                             } else if (LIST_EXECUTED_PIPELINES.match(currentFieldName, parser.getDeprecationHandler())) {
-                                listExecutedPipelines = parser.booleanValue();
+                                currentListExecutedPipelines = parser.booleanValue();
                             } else {
                                 throw new IllegalArgumentException(
                                     "Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
@@ -330,7 +431,7 @@ public final class BulkRequestParser {
                                 dynamicTemplates = parser.mapStrings();
                             } else if (token == XContentParser.Token.START_OBJECT
                                 && SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
-                                    fetchSourceContext = FetchSourceContext.fromXContent(parser);
+                                    currentFetchSourceContext = FetchSourceContext.fromXContent(parser);
                                 } else if (token != XContentParser.Token.VALUE_NULL) {
                                     throw new IllegalArgumentException(
                                         "Malformed action/metadata line ["
@@ -364,22 +465,13 @@ public final class BulkRequestParser {
                             "Delete request in line [" + line + "] does not accept " + DYNAMIC_TEMPLATES.getPreferredName()
                         );
                     }
-                    deleteRequestConsumer.accept(
-                        new DeleteRequest(index).id(id)
-                            .routing(routing)
-                            .version(version)
-                            .versionType(versionType)
-                            .setIfSeqNo(ifSeqNo)
-                            .setIfPrimaryTerm(ifPrimaryTerm)
-                    );
-                    consumed = from;
+                    currentRequest = new DeleteRequest(index).id(id)
+                        .routing(routing)
+                        .version(version)
+                        .versionType(versionType)
+                        .setIfSeqNo(ifSeqNo)
+                        .setIfPrimaryTerm(ifPrimaryTerm);
                 } else {
-                    nextMarker = findNextMarker(marker, from, data, isIncremental);
-                    if (nextMarker == -1) {
-                        break;
-                    }
-                    line++;
-
                     // we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks
                     // of index request.
                     if ("index".equals(action) || "create".equals(action)) {
@@ -387,20 +479,19 @@ public final class BulkRequestParser {
                             .routing(routing)
                             .version(version)
                             .versionType(versionType)
-                            .setPipeline(pipeline)
+                            .setPipeline(currentPipeline)
                             .setIfSeqNo(ifSeqNo)
                             .setIfPrimaryTerm(ifPrimaryTerm)
-                            .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
                             .setDynamicTemplates(dynamicTemplates)
                             .setRequireAlias(requireAlias)
                             .setRequireDataStream(requireDataStream)
-                            .setListExecutedPipelines(listExecutedPipelines);
+                            .setListExecutedPipelines(currentListExecutedPipelines);
                         if ("create".equals(action)) {
                             indexRequest = indexRequest.create(true);
                         } else if (opType != null) {
                             indexRequest = indexRequest.create("create".equals(opType));
                         }
-                        indexRequestConsumer.accept(indexRequest, type);
+                        currentRequest = indexRequest;
                     } else if ("update".equals(action)) {
                         if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
                             throw new IllegalArgumentException(
@@ -427,31 +518,38 @@ public final class BulkRequestParser {
                             .setIfPrimaryTerm(ifPrimaryTerm)
                             .setRequireAlias(requireAlias)
                             .routing(routing);
-                        try (
-                            XContentParser sliceParser = createParser(
-                                xContent,
-                                sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType)
-                            )
-                        ) {
-                            updateRequest.fromXContent(sliceParser);
-                        }
-                        if (fetchSourceContext != null) {
-                            updateRequest.fetchSource(fetchSourceContext);
-                        }
-                        IndexRequest upsertRequest = updateRequest.upsertRequest();
-                        if (upsertRequest != null) {
-                            upsertRequest.setPipeline(pipeline).setListExecutedPipelines(listExecutedPipelines);
-                        }
-
-                        updateRequestConsumer.accept(updateRequest);
+                        currentRequest = updateRequest;
                     }
-                    // move pointers
-                    from = nextMarker + 1;
-                    consumed = from;
                 }
             }
+            return true;
         }
-        return isIncremental ? consumed : from;
+
+        private void parseAndConsumeDocumentLine(BytesReference data, int from, int to) throws IOException {
+            assert currentRequest != null && currentRequest instanceof DeleteRequest == false;
+            if (currentRequest instanceof IndexRequest indexRequest) {
+                indexRequest.source(sliceTrimmingCarriageReturn(data, from, to, xContentType), xContentType);
+                indexRequestConsumer.accept(indexRequest, currentType);
+            } else if (currentRequest instanceof UpdateRequest updateRequest) {
+                try (
+                    XContentParser sliceParser = createParser(
+                        xContentType.xContent(),
+                        sliceTrimmingCarriageReturn(data, from, to, xContentType)
+                    )
+                ) {
+                    updateRequest.fromXContent(sliceParser);
+                }
+                if (currentFetchSourceContext != null) {
+                    updateRequest.fetchSource(currentFetchSourceContext);
+                }
+                IndexRequest upsertRequest = updateRequest.upsertRequest();
+                if (upsertRequest != null) {
+                    upsertRequest.setPipeline(currentPipeline).setListExecutedPipelines(currentListExecutedPipelines);
+                }
+                updateRequestConsumer.accept(updateRequest);
+            }
+        }
+
     }
 
     @UpdateForV9

+ 17 - 39
server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

@@ -39,9 +39,7 @@ import org.elasticsearch.transport.Transports;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.function.Supplier;
 
@@ -152,19 +150,10 @@ public class RestBulkAction extends BaseRestHandler {
 
     static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
 
-        private final boolean allowExplicitIndex;
         private final RestRequest request;
 
-        private final Map<String, String> stringDeduplicator = new HashMap<>();
-        private final String defaultIndex;
-        private final String defaultRouting;
-        private final FetchSourceContext defaultFetchSourceContext;
-        private final String defaultPipeline;
-        private final boolean defaultListExecutedPipelines;
-        private final Boolean defaultRequireAlias;
-        private final boolean defaultRequireDataStream;
-        private final BulkRequestParser parser;
         private final Supplier<IncrementalBulkService.Handler> handlerSupplier;
+        private final BulkRequestParser.IncrementalParser parser;
         private IncrementalBulkService.Handler handler;
 
         private volatile RestChannel restChannel;
@@ -174,17 +163,22 @@ public class RestBulkAction extends BaseRestHandler {
         private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);
 
         ChunkHandler(boolean allowExplicitIndex, RestRequest request, Supplier<IncrementalBulkService.Handler> handlerSupplier) {
-            this.allowExplicitIndex = allowExplicitIndex;
             this.request = request;
-            this.defaultIndex = request.param("index");
-            this.defaultRouting = request.param("routing");
-            this.defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
-            this.defaultPipeline = request.param("pipeline");
-            this.defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", false);
-            this.defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
-            this.defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
-            this.parser = new BulkRequestParser(true, request.getRestApiVersion());
             this.handlerSupplier = handlerSupplier;
+            this.parser = new BulkRequestParser(true, request.getRestApiVersion()).incrementalParser(
+                request.param("index"),
+                request.param("routing"),
+                FetchSourceContext.parseFromRestRequest(request),
+                request.param("pipeline"),
+                request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false),
+                request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false),
+                request.paramAsBoolean("list_executed_pipelines", false),
+                allowExplicitIndex,
+                request.getXContentType(),
+                (indexRequest, type) -> items.add(indexRequest),
+                items::add,
+                items::add
+            );
         }
 
         @Override
@@ -220,23 +214,7 @@ public class RestBulkAction extends BaseRestHandler {
 
                     // TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
                     // BulkRequest#add is fine
-                    bytesConsumed = parser.incrementalParse(
-                        data,
-                        defaultIndex,
-                        defaultRouting,
-                        defaultFetchSourceContext,
-                        defaultPipeline,
-                        defaultRequireAlias,
-                        defaultRequireDataStream,
-                        defaultListExecutedPipelines,
-                        allowExplicitIndex,
-                        request.getXContentType(),
-                        (request, type) -> items.add(request),
-                        items::add,
-                        items::add,
-                        isLast == false,
-                        stringDeduplicator
-                    );
+                    bytesConsumed = parser.parse(data, isLast);
                     bytesParsed += bytesConsumed;
 
                 } catch (Exception e) {
@@ -263,7 +241,7 @@ public class RestBulkAction extends BaseRestHandler {
                 items.clear();
                 handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next());
             } else {
-                assert releasables.isEmpty();
+                Releasables.close(releasables);
                 request.contentStream().next();
             }
         }

+ 106 - 1
server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java

@@ -9,6 +9,7 @@
 
 package org.elasticsearch.action.bulk;
 
+import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.core.RestApiVersion;
@@ -21,8 +22,91 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.hamcrest.Matchers.equalTo;
+
 public class BulkRequestParserTests extends ESTestCase {
 
+    public void testParserCannotBeReusedAfterFailure() {
+        BytesArray request = new BytesArray("""
+            { "invalidaction":{ } }
+            {}
+            """);
+
+        BulkRequestParser parser = new BulkRequestParser(randomBoolean(), RestApiVersion.current());
+        BulkRequestParser.IncrementalParser incrementalParser = parser.incrementalParser(
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            false,
+            XContentType.JSON,
+            (req, type) -> fail("expected failure before we got this far"),
+            req -> fail("expected failure before we got this far"),
+            req -> fail("expected failure before we got this far")
+        );
+
+        IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> incrementalParser.parse(request, false));
+        assertEquals(
+            "Malformed action/metadata line [1], expected field [create], [delete], [index] or [update] but found [invalidaction]",
+            ex.getMessage()
+        );
+
+        BytesArray valid = new BytesArray("""
+            { "index":{ "_id": "bar" } }
+            {}
+            """);
+        expectThrows(AssertionError.class, () -> incrementalParser.parse(valid, false));
+    }
+
+    public void testIncrementalParsing() throws IOException {
+        ArrayList<DocWriteRequest<?>> indexRequests = new ArrayList<>();
+        ArrayList<DocWriteRequest<?>> updateRequests = new ArrayList<>();
+        ArrayList<DocWriteRequest<?>> deleteRequests = new ArrayList<>();
+
+        BulkRequestParser parser = new BulkRequestParser(randomBoolean(), RestApiVersion.current());
+        BulkRequestParser.IncrementalParser incrementalParser = parser.incrementalParser(
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            false,
+            XContentType.JSON,
+            (r, t) -> indexRequests.add(r),
+            updateRequests::add,
+            deleteRequests::add
+        );
+
+        BytesArray request = new BytesArray("""
+            { "index":{ "_id": "bar", "pipeline": "foo" } }
+            { "field": "value"}
+            { "index":{ "require_alias": false } }
+            { "field": "value" }
+            { "update":{ "_id": "bus", "require_alias": true } }
+            { "doc": {"field": "value" }}
+            { "delete":{ "_id": "baz" } }
+            { "index": { } }
+            { "field": "value"}
+            { "delete":{ "_id": "bop" } }
+            """);
+
+        int consumed = 0;
+        for (int i = 0; i < request.length() - 1; ++i) {
+            consumed += incrementalParser.parse(request.slice(consumed, i - consumed + 1), false);
+        }
+        consumed += incrementalParser.parse(request.slice(consumed, request.length() - consumed), true);
+        assertThat(consumed, equalTo(request.length()));
+
+        assertThat(indexRequests.size(), equalTo(3));
+        assertThat(updateRequests.size(), equalTo(1));
+        assertThat(deleteRequests.size(), equalTo(2));
+    }
+
     public void testIndexRequest() throws IOException {
         BytesArray request = new BytesArray("""
             { "index":{ "_id": "bar" } }
@@ -126,7 +210,7 @@ public class BulkRequestParserTests extends ESTestCase {
         }, req -> fail());
     }
 
-    public void testBarfOnLackOfTrailingNewline() {
+    public void testBarfOnLackOfTrailingNewline() throws IOException {
         BytesArray request = new BytesArray("""
             { "index":{ "_id": "bar" } }
             {}""");
@@ -150,6 +234,27 @@ public class BulkRequestParserTests extends ESTestCase {
             )
         );
         assertEquals("The bulk request must be terminated by a newline [\\n]", e.getMessage());
+
+        BulkRequestParser.IncrementalParser incrementalParser = parser.incrementalParser(
+            "foo",
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            false,
+            XContentType.JSON,
+            (req, type) -> {},
+            req -> {},
+            req -> {}
+        );
+
+        // Should not throw because not last
+        incrementalParser.parse(request, false);
+
+        IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, () -> incrementalParser.parse(request, true));
+        assertEquals("The bulk request must be terminated by a newline [\\n]", e2.getMessage());
     }
 
     public void testFailOnExplicitIndex() {

+ 1 - 1
server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java

@@ -405,7 +405,7 @@ public class BulkRequestTests extends ESTestCase {
             IllegalArgumentException.class,
             () -> new BulkRequest().add(updateWithDynamicTemplates, null, XContentType.JSON)
         );
-        assertThat(error.getMessage(), equalTo("Update request in line [2] does not accept dynamic_templates"));
+        assertThat(error.getMessage(), equalTo("Update request in line [1] does not accept dynamic_templates"));
 
         BytesArray invalidDynamicTemplates = new BytesArray("""
             { "index":{"_index":"test","dynamic_templates":[]}

+ 1 - 1
server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java

@@ -251,6 +251,7 @@ public class RestBulkActionTests extends ESTestCase {
         assertTrue(next.get());
         next.set(false);
         assertFalse(isLast.get());
+        assertFalse(r1.hasReferences());
 
         ReleasableBytesReference r2 = new ReleasableBytesReference(new BytesArray("{\"field\":1}"), () -> {});
         chunkHandler.handleChunk(channel, r2, false);
@@ -258,7 +259,6 @@ public class RestBulkActionTests extends ESTestCase {
         assertTrue(next.get());
         next.set(false);
         assertFalse(isLast.get());
-        assertTrue(r1.hasReferences());
         assertTrue(r2.hasReferences());
 
         ReleasableBytesReference r3 = new ReleasableBytesReference(new BytesArray("\n{\"delete\":"), () -> {});