Browse Source

Standardize error code when bulk body is invalid (#114869) (#114944)

Currently the incremental and non-incremental bulk variations will
return different error codes when the json body provided is invalid.
This commit ensures both version return status code 400. Additionally,
this renames the incremental rest tests to bulk tests and ensures that
all tests work with both bulk api versions. We set these tests to
randomize which version of the api we test each run.
Tim Brooks 1 year ago
parent
commit
9922d544a1

+ 5 - 0
docs/changelog/114869.yaml

@@ -0,0 +1,5 @@
+pr: 114869
+summary: Standardize error code when bulk body is invalid
+area: CRUD
+type: bug
+issues: []

+ 33 - 9
qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java → qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/BulkRestIT.java

@@ -9,6 +9,8 @@
 
 package org.elasticsearch.http;
 
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
 import org.elasticsearch.action.bulk.IncrementalBulkService;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
@@ -19,24 +21,30 @@ import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.xcontent.json.JsonXContent;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 
+import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
 import static org.elasticsearch.rest.RestStatus.OK;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
-public class IncrementalBulkRestIT extends HttpSmokeTestCase {
+public class BulkRestIT extends HttpSmokeTestCase {
 
     @Override
     protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
         return Settings.builder()
             .put(super.nodeSettings(nodeOrdinal, otherSettings))
-            .put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), true)
+            .put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), seventyFivePercentOfTheTime())
             .build();
     }
 
+    private static boolean seventyFivePercentOfTheTime() {
+        return (randomBoolean() && randomBoolean()) == false;
+    }
+
     public void testBulkUriMatchingDoesNotMatchBulkCapabilitiesApi() throws IOException {
         Request request = new Request("GET", "/_capabilities?method=GET&path=%2F_bulk&capabilities=failure_store_status&pretty");
         Response response = getRestClient().performRequest(request);
@@ -51,6 +59,26 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase {
         assertThat(responseException.getMessage(), containsString("request body is required"));
     }
 
+    public void testBulkInvalidIndexNameString() throws IOException {
+        Request request = new Request("POST", "/_bulk");
+
+        byte[] bytes1 = "{\"create\":{\"_index\":\"".getBytes(StandardCharsets.UTF_8);
+        byte[] bytes2 = new byte[] { (byte) 0xfe, (byte) 0xfe, (byte) 0xff, (byte) 0xff };
+        byte[] bytes3 = "\",\"_id\":\"1\"}}\n{\"field\":1}\n\r\n".getBytes(StandardCharsets.UTF_8);
+        byte[] bulkBody = new byte[bytes1.length + bytes2.length + bytes3.length];
+        System.arraycopy(bytes1, 0, bulkBody, 0, bytes1.length);
+        System.arraycopy(bytes2, 0, bulkBody, bytes1.length, bytes2.length);
+        System.arraycopy(bytes3, 0, bulkBody, bytes1.length + bytes2.length, bytes3.length);
+
+        request.setEntity(new ByteArrayEntity(bulkBody, ContentType.APPLICATION_JSON));
+
+        ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
+        assertThat(responseException.getResponse().getStatusLine().getStatusCode(), equalTo(BAD_REQUEST.getStatus()));
+        assertThat(responseException.getMessage(), containsString("could not parse bulk request body"));
+        assertThat(responseException.getMessage(), containsString("json_parse_exception"));
+        assertThat(responseException.getMessage(), containsString("Invalid UTF-8"));
+    }
+
     public void testBulkRequestBodyImproperlyTerminated() throws IOException {
         Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
         // missing final line of the bulk body. cannot process
@@ -61,10 +89,10 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase {
         );
         ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
         assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
-        assertThat(responseException.getMessage(), containsString("could not parse bulk request body"));
+        assertThat(responseException.getMessage(), containsString("The bulk request must be terminated by a newline"));
     }
 
-    public void testIncrementalBulk() throws IOException {
+    public void testBulkRequest() throws IOException {
         Request createRequest = new Request("PUT", "/index_name");
         createRequest.setJsonEntity("""
             {
@@ -81,7 +109,6 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase {
 
         Request firstBulkRequest = new Request("POST", "/index_name/_bulk");
 
-        // index documents for the rollup job
         String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
             + "{\"field\":1}\n"
             + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
@@ -113,7 +140,6 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase {
 
         Request firstBulkRequest = new Request("POST", "/index_name/_bulk");
 
-        // index documents for the rollup job
         String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
             + "{\"field\":1}\n"
             + "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
@@ -137,7 +163,7 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase {
         }
     }
 
-    public void testIncrementalMalformed() throws IOException {
+    public void testMalformedActionLineBulk() throws IOException {
         Request createRequest = new Request("PUT", "/index_name");
         createRequest.setJsonEntity("""
             {
@@ -154,7 +180,6 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase {
 
         Request bulkRequest = new Request("POST", "/index_name/_bulk");
 
-        // index documents for the rollup job
         final StringBuilder bulk = new StringBuilder();
         bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
         bulk.append("{\"field\":1}\n");
@@ -170,7 +195,6 @@ public class IncrementalBulkRestIT extends HttpSmokeTestCase {
     private static void sendLargeBulk() throws IOException {
         Request bulkRequest = new Request("POST", "/index_name/_bulk");
 
-        // index documents for the rollup job
         final StringBuilder bulk = new StringBuilder();
         bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
         int updates = 0;

+ 27 - 16
server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

@@ -110,19 +110,23 @@ public class RestBulkAction extends BaseRestHandler {
             boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
             bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
             bulkRequest.setRefreshPolicy(request.param("refresh"));
-            bulkRequest.add(
-                request.requiredContent(),
-                defaultIndex,
-                defaultRouting,
-                defaultFetchSourceContext,
-                defaultPipeline,
-                defaultRequireAlias,
-                defaultRequireDataStream,
-                defaultListExecutedPipelines,
-                allowExplicitIndex,
-                request.getXContentType(),
-                request.getRestApiVersion()
-            );
+            try {
+                bulkRequest.add(
+                    request.requiredContent(),
+                    defaultIndex,
+                    defaultRouting,
+                    defaultFetchSourceContext,
+                    defaultPipeline,
+                    defaultRequireAlias,
+                    defaultRequireDataStream,
+                    defaultListExecutedPipelines,
+                    allowExplicitIndex,
+                    request.getXContentType(),
+                    request.getRestApiVersion()
+                );
+            } catch (Exception e) {
+                return channel -> new RestToXContentListener<>(channel).onFailure(parseFailureException(e));
+            }
 
             return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel));
         } else {
@@ -137,6 +141,15 @@ public class RestBulkAction extends BaseRestHandler {
         }
     }
 
+    private static Exception parseFailureException(Exception e) {
+        if (e instanceof IllegalArgumentException) {
+            return e;
+        } else {
+            // TODO: Maybe improve in follow-up to be XContentParseException and include line number and column
+            return new ElasticsearchParseException("could not parse bulk request body", e);
+        }
+    }
+
     static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
 
         private final boolean allowExplicitIndex;
@@ -229,9 +242,7 @@ public class RestBulkAction extends BaseRestHandler {
 
                 } catch (Exception e) {
                     shortCircuit();
-                    new RestToXContentListener<>(channel).onFailure(
-                        new ElasticsearchParseException("could not parse bulk request body", e)
-                    );
+                    new RestToXContentListener<>(channel).onFailure(parseFailureException(e));
                     return;
                 }
             }