Browse Source

ESQL: Enable async get to support formatting (#111104) (#118257)

I've updated the listener for GET /_query/async/{id} to EsqlResponseListener, so it now accepts parameters (delimiter, drop_null_columns and format) like the POST /_query API. Additionally, I have added tests to verify the correctness of the code.

You can now set the format in the request parameters to specify the return style.

Closes #110926

Co-authored-by: kanoshiou <73424326+kanoshiou@users.noreply.github.com>
Bogdan Pintea 10 tháng trước cách đây
mục cha
commit
510ca5d1eb

+ 6 - 0
docs/changelog/111104.yaml

@@ -0,0 +1,6 @@
+pr: 111104
+summary: "ESQL: Enable async get to support formatting"
+area: ES|QL
+type: feature
+issues:
+ - 110926

+ 4 - 0
docs/reference/esql/esql-async-query-get-api.asciidoc

@@ -39,6 +39,10 @@ parameter is `true`.
 [[esql-async-query-get-api-query-params]]
 ==== {api-query-parms-title}
 
+The API accepts the same parameters as the synchronous
+<<esql-query-api-query-params,query API>>, along with the following
+parameters:
+
 `wait_for_completion_timeout`::
 (Optional, <<time-units,time value>>)
 Timeout duration to wait for the request to finish. Defaults to no timeout,

+ 1 - 1
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementService.java

@@ -208,7 +208,7 @@ public class AsyncTaskManagementService<
         ActionListener<Response> listener
     ) {
         AtomicReference<ActionListener<Response>> exclusiveListener = new AtomicReference<>(listener);
-        // This is will performed in case of timeout
+        // This will be performed in case of timeout
         Scheduler.ScheduledCancellable timeoutHandler = threadPool.schedule(() -> {
             ActionListener<Response> acquiredListener = exclusiveListener.getAndSet(null);
             if (acquiredListener != null) {

+ 160 - 43
x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

@@ -350,21 +350,21 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         int count = randomIntBetween(0, 100);
         bulkLoadTestData(count);
         var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
-        assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null));
+        assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null, mode));
     }
 
     public void testCSVMode() throws IOException {
         int count = randomIntBetween(0, 100);
         bulkLoadTestData(count);
         var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
-        assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|'));
+        assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|', mode));
     }
 
     public void testTSVMode() throws IOException {
         int count = randomIntBetween(0, 100);
         bulkLoadTestData(count);
         var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
-        assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null));
+        assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null, mode));
     }
 
     public void testCSVNoHeaderMode() throws IOException {
@@ -1004,53 +1004,35 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
     }
 
     public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject) throws IOException {
-        return runEsqlAsync(requestObject, new AssertWarnings.NoWarnings());
+        return runEsqlAsync(requestObject, randomBoolean(), new AssertWarnings.NoWarnings());
     }
 
     static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode) throws IOException {
         if (mode == ASYNC) {
-            return runEsqlAsync(requestObject, assertWarnings);
+            return runEsqlAsync(requestObject, randomBoolean(), assertWarnings);
         } else {
             return runEsqlSync(requestObject, assertWarnings);
         }
     }
 
     public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
-        requestObject.build();
-        Request request = prepareRequest(SYNC);
-        String mediaType = attachBody(requestObject, request);
-
-        RequestOptions.Builder options = request.getOptions().toBuilder();
-        options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
-        options.addHeader("Content-Type", mediaType);
-
-        if (randomBoolean()) {
-            options.addHeader("Accept", mediaType);
-        } else {
-            request.addParameter("format", requestObject.contentType().queryParameter());
-        }
-        request.setOptions(options);
+        Request request = prepareRequestWithOptions(requestObject, SYNC);
 
         HttpEntity entity = performRequest(request, assertWarnings);
         return entityToMap(entity, requestObject.contentType());
     }
 
     public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
-        addAsyncParameters(requestObject);
-        requestObject.build();
-        Request request = prepareRequest(ASYNC);
-        String mediaType = attachBody(requestObject, request);
-
-        RequestOptions.Builder options = request.getOptions().toBuilder();
-        options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
-        options.addHeader("Content-Type", mediaType);
+        return runEsqlAsync(requestObject, randomBoolean(), assertWarnings);
+    }
 
-        if (randomBoolean()) {
-            options.addHeader("Accept", mediaType);
-        } else {
-            request.addParameter("format", requestObject.contentType().queryParameter());
-        }
-        request.setOptions(options);
+    public static Map<String, Object> runEsqlAsync(
+        RequestObjectBuilder requestObject,
+        boolean keepOnCompletion,
+        AssertWarnings assertWarnings
+    ) throws IOException {
+        addAsyncParameters(requestObject, keepOnCompletion);
+        Request request = prepareRequestWithOptions(requestObject, ASYNC);
 
         if (shouldLog()) {
             LOGGER.info("REQUEST={}", request);
@@ -1062,7 +1044,7 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         Object initialColumns = null;
         Object initialValues = null;
         var json = entityToMap(entity, requestObject.contentType());
-        checkKeepOnCompletion(requestObject, json);
+        checkKeepOnCompletion(requestObject, json, keepOnCompletion);
         String id = (String) json.get("id");
 
         var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false);
@@ -1102,7 +1084,7 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
 
             // issue a second request to "async get" the results
             Request getRequest = prepareAsyncGetRequest(id);
-            getRequest.setOptions(options);
+            getRequest.setOptions(request.getOptions());
             response = performRequest(getRequest);
             entity = response.getEntity();
         }
@@ -1120,6 +1102,66 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         return removeAsyncProperties(result);
     }
 
+    public void testAsyncGetWithoutContentType() throws IOException {
+        int count = randomIntBetween(0, 100);
+        bulkLoadTestData(count);
+        var requestObject = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
+
+        addAsyncParameters(requestObject, true);
+        Request request = prepareRequestWithOptions(requestObject, ASYNC);
+
+        if (shouldLog()) {
+            LOGGER.info("REQUEST={}", request);
+        }
+
+        Response response = performRequest(request);
+        HttpEntity entity = response.getEntity();
+
+        var json = entityToMap(entity, requestObject.contentType());
+        checkKeepOnCompletion(requestObject, json, true);
+        String id = (String) json.get("id");
+        // results won't be returned since keepOnCompletion is true
+        assertThat(id, is(not(emptyOrNullString())));
+
+        // issue an "async get" request with no Content-Type
+        Request getRequest = prepareAsyncGetRequest(id);
+        response = performRequest(getRequest);
+        entity = response.getEntity();
+        var result = entityToMap(entity, XContentType.JSON);
+
+        ListMatcher values = matchesList();
+        for (int i = 0; i < count; i++) {
+            values = values.item(matchesList().item("keyword" + i).item(i));
+        }
+        assertMap(
+            result,
+            matchesMap().entry(
+                "columns",
+                matchesList().item(matchesMap().entry("name", "keyword").entry("type", "keyword"))
+                    .item(matchesMap().entry("name", "integer").entry("type", "integer"))
+            ).entry("values", values).entry("took", greaterThanOrEqualTo(0)).entry("id", id).entry("is_running", false)
+        );
+
+    }
+
+    static Request prepareRequestWithOptions(RequestObjectBuilder requestObject, Mode mode) throws IOException {
+        requestObject.build();
+        Request request = prepareRequest(mode);
+        String mediaType = attachBody(requestObject, request);
+
+        RequestOptions.Builder options = request.getOptions().toBuilder();
+        options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
+        options.addHeader("Content-Type", mediaType);
+
+        if (randomBoolean()) {
+            options.addHeader("Accept", mediaType);
+        } else {
+            request.addParameter("format", requestObject.contentType().queryParameter());
+        }
+        request.setOptions(options);
+        return request;
+    }
+
     // Removes async properties, otherwise consuming assertions would need to handle sync and async differences
     static Map<String, Object> removeAsyncProperties(Map<String, Object> map) {
         Map<String, Object> copy = new HashMap<>(map);
@@ -1140,17 +1182,20 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         }
     }
 
-    static void addAsyncParameters(RequestObjectBuilder requestObject) throws IOException {
+    static void addAsyncParameters(RequestObjectBuilder requestObject, boolean keepOnCompletion) throws IOException {
         // deliberately short in order to frequently trigger return without results
         requestObject.waitForCompletion(TimeValue.timeValueNanos(randomIntBetween(1, 100)));
-        requestObject.keepOnCompletion(randomBoolean());
+        requestObject.keepOnCompletion(keepOnCompletion);
         requestObject.keepAlive(TimeValue.timeValueDays(randomIntBetween(1, 10)));
     }
 
     // If keep_on_completion is set then an id must always be present, regardless of the value of any other property.
-    static void checkKeepOnCompletion(RequestObjectBuilder requestObject, Map<String, Object> json) {
+    static void checkKeepOnCompletion(RequestObjectBuilder requestObject, Map<String, Object> json, boolean keepOnCompletion) {
         if (requestObject.keepOnCompletion()) {
+            assertTrue(keepOnCompletion);
             assertThat((String) json.get("id"), not(emptyOrNullString()));
+        } else {
+            assertFalse(keepOnCompletion);
         }
     }
 
@@ -1168,14 +1213,19 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
-    static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter) throws IOException {
-        Request request = prepareRequest(SYNC);
+    static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter, Mode mode)
+        throws IOException {
+        Request request = prepareRequest(mode);
+        if (mode == ASYNC) {
+            addAsyncParameters(builder, randomBoolean());
+        }
         String mediaType = attachBody(builder.build(), request);
 
         RequestOptions.Builder options = request.getOptions().toBuilder();
         options.addHeader("Content-Type", mediaType);
 
-        if (randomBoolean()) {
+        boolean addParam = randomBoolean();
+        if (addParam) {
             request.addParameter("format", format);
         } else {
             switch (format) {
@@ -1189,8 +1239,75 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         }
         request.setOptions(options);
 
-        HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings());
-        return Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
+        if (shouldLog()) {
+            LOGGER.info("REQUEST={}", request);
+        }
+
+        Response response = performRequest(request);
+        HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings());
+
+        // get the content, it could be empty because the request might have not completed
+        String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
+        String id = response.getHeader("X-Elasticsearch-Async-Id");
+
+        if (mode == SYNC) {
+            assertThat(id, is(emptyOrNullString()));
+            return initialValue;
+        }
+
+        if (id == null) {
+            // no id returned from an async call, must have completed immediately and without keep_on_completion
+            assertThat(builder.keepOnCompletion(), either(nullValue()).or(is(false)));
+            assertNull(response.getHeader("is_running"));
+            // the content cant be empty
+            assertThat(initialValue, not(emptyOrNullString()));
+            return initialValue;
+        } else {
+            // async may not return results immediately, so may need an async get
+            assertThat(id, is(not(emptyOrNullString())));
+            String isRunning = response.getHeader("X-Elasticsearch-Async-Is-Running");
+            if ("?0".equals(isRunning)) {
+                // must have completed immediately so keep_on_completion must be true
+                assertThat(builder.keepOnCompletion(), is(true));
+            } else {
+                // did not return results immediately, so we will need an async get
+                // Also, different format modes return different results.
+                switch (format) {
+                    case "txt" -> assertThat(initialValue, emptyOrNullString());
+                    case "csv" -> {
+                        assertEquals(initialValue, "\r\n");
+                        initialValue = "";
+                    }
+                    case "tsv" -> {
+                        assertEquals(initialValue, "\n");
+                        initialValue = "";
+                    }
+                }
+            }
+            // issue a second request to "async get" the results
+            Request getRequest = prepareAsyncGetRequest(id);
+            if (delimiter != null) {
+                getRequest.addParameter("delimiter", String.valueOf(delimiter));
+            }
+            // If the `format` parameter is not added, the GET request will return a response
+            // with the `Content-Type` type due to the lack of an `Accept` header.
+            if (addParam) {
+                getRequest.addParameter("format", format);
+            }
+            // if `addParam` is false, `options` will already have an `Accept` header
+            getRequest.setOptions(options);
+            response = performRequest(getRequest);
+            entity = assertWarnings(response, new AssertWarnings.NoWarnings());
+        }
+        String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
+
+        // assert initial contents, if any, are the same as async get contents
+        if (initialValue != null && initialValue.isEmpty() == false) {
+            assertEquals(initialValue, newValue);
+        }
+
+        assertDeletable(id);
+        return newValue;
     }
 
     private static Request prepareRequest(Mode mode) {

+ 42 - 24
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java

@@ -22,6 +22,7 @@ import org.elasticsearch.rest.RestResponse;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
 import org.elasticsearch.xcontent.MediaType;
+import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xpack.esql.arrow.ArrowFormat;
 import org.elasticsearch.xpack.esql.arrow.ArrowResponse;
 import org.elasticsearch.xpack.esql.formatter.TextFormat;
@@ -87,7 +88,7 @@ public final class EsqlResponseListener extends RestRefCountedChunkedToXContentL
     /**
      * Keep the initial query for logging purposes.
      */
-    private final String esqlQuery;
+    private final String esqlQueryOrId;
     /**
      * Stop the time it took to build a response to later log it. Use something thread-safe here because stopping time requires state and
      * {@link EsqlResponseListener} might be used from different threads.
@@ -98,29 +99,23 @@ public final class EsqlResponseListener extends RestRefCountedChunkedToXContentL
      * To correctly time the execution of a request, a {@link EsqlResponseListener} must be constructed immediately before execution begins.
      */
     public EsqlResponseListener(RestChannel channel, RestRequest restRequest, EsqlQueryRequest esqlRequest) {
-        super(channel);
+        this(channel, restRequest, esqlRequest.query(), EsqlMediaTypeParser.getResponseMediaType(restRequest, esqlRequest));
+    }
 
+    /**
+     * Async query GET API does not have an EsqlQueryRequest.
+     */
+    public EsqlResponseListener(RestChannel channel, RestRequest getRequest) {
+        this(channel, getRequest, getRequest.param("id"), EsqlMediaTypeParser.getResponseMediaType(getRequest, XContentType.JSON));
+    }
+
+    private EsqlResponseListener(RestChannel channel, RestRequest restRequest, String esqlQueryOrId, MediaType mediaType) {
+        super(channel);
         this.channel = channel;
         this.restRequest = restRequest;
-        this.esqlQuery = esqlRequest.query();
-        mediaType = EsqlMediaTypeParser.getResponseMediaType(restRequest, esqlRequest);
-
-        /*
-         * Special handling for the "delimiter" parameter which should only be
-         * checked for being present or not in the case of CSV format. We cannot
-         * override {@link BaseRestHandler#responseParams()} because this
-         * parameter should only be checked for CSV, not other formats.
-         */
-        if (mediaType != CSV && restRequest.hasParam(URL_PARAM_DELIMITER)) {
-            String message = String.format(
-                Locale.ROOT,
-                "parameter: [%s] can only be used with the format [%s] for request [%s]",
-                URL_PARAM_DELIMITER,
-                CSV.queryParameter(),
-                restRequest.path()
-            );
-            throw new IllegalArgumentException(message);
-        }
+        this.esqlQueryOrId = esqlQueryOrId;
+        this.mediaType = mediaType;
+        checkDelimiter();
     }
 
     @Override
@@ -197,14 +192,18 @@ public final class EsqlResponseListener extends RestRefCountedChunkedToXContentL
             listener.onResponse(r);
             // At this point, the StopWatch should already have been stopped, so we log a consistent time.
             LOGGER.debug(
-                "Finished execution of ESQL query.\nQuery string: [{}]\nExecution time: [{}]ms",
-                esqlQuery,
+                "Finished execution of ESQL query.\nQuery string or async ID: [{}]\nExecution time: [{}]ms",
+                esqlQueryOrId,
                 getTook(r, TimeUnit.MILLISECONDS)
             );
         }, ex -> {
             // In case of failure, stop the time manually before sending out the response.
             long timeMillis = getTook(null, TimeUnit.MILLISECONDS);
-            LOGGER.debug("Failed execution of ESQL query.\nQuery string: [{}]\nExecution time: [{}]ms", esqlQuery, timeMillis);
+            LOGGER.debug(
+                "Failed execution of ESQL query.\nQuery string or async ID: [{}]\nExecution time: [{}]ms",
+                esqlQueryOrId,
+                timeMillis
+            );
             listener.onFailure(ex);
         });
     }
@@ -213,4 +212,23 @@ public final class EsqlResponseListener extends RestRefCountedChunkedToXContentL
         RestStatus status = ExceptionsHelper.status(throwable);
         LOGGER.log(status.getStatus() >= 500 ? Level.WARN : Level.DEBUG, () -> "Request failed with status [" + status + "]: ", throwable);
     }
+
+    /*
+     * Special handling for the "delimiter" parameter which should only be
+     * checked for being present or not in the case of CSV format. We cannot
+     * override {@link BaseRestHandler#responseParams()} because this
+     * parameter should only be checked for CSV, not other formats.
+     */
+    private void checkDelimiter() {
+        if (mediaType != CSV && restRequest.hasParam(URL_PARAM_DELIMITER)) {
+            String message = String.format(
+                Locale.ROOT,
+                "parameter: [%s] can only be used with the format [%s] for request [%s]",
+                URL_PARAM_DELIMITER,
+                CSV.queryParameter(),
+                restRequest.path()
+            );
+            throw new IllegalArgumentException(message);
+        }
+    }
 }

+ 1 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlGetAsyncResultAction.java

@@ -12,7 +12,6 @@ import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.Scope;
 import org.elasticsearch.rest.ServerlessScope;
-import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
 import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
 
 import java.util.List;
@@ -43,7 +42,7 @@ public class RestEsqlGetAsyncResultAction extends BaseRestHandler {
         if (request.hasParam("keep_alive")) {
             get.setKeepAlive(request.paramAsTime("keep_alive", get.getKeepAlive()));
         }
-        return channel -> client.execute(EsqlAsyncGetResultAction.INSTANCE, get, new RestRefCountedChunkedToXContentListener<>(channel));
+        return channel -> client.execute(EsqlAsyncGetResultAction.INSTANCE, get, new EsqlResponseListener(channel, request));
     }
 
     @Override

+ 10 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlMediaTypeParser.java

@@ -42,16 +42,23 @@ public class EsqlMediaTypeParser {
      * combinations are detected.
      */
     public static MediaType getResponseMediaType(RestRequest request, EsqlQueryRequest esqlRequest) {
-        var mediaType = request.hasParam(URL_PARAM_FORMAT) ? mediaTypeFromParams(request) : mediaTypeFromHeaders(request);
+        var mediaType = getResponseMediaType(request, (MediaType) null);
         validateColumnarRequest(esqlRequest.columnar(), mediaType);
         validateIncludeCCSMetadata(esqlRequest.includeCCSMetadata(), mediaType);
         return checkNonNullMediaType(mediaType, request);
     }
 
+    /*
+     * Retrieve the mediaType of a REST request. If no mediaType can be established from the request, return the provided default.
+     */
+    public static MediaType getResponseMediaType(RestRequest request, MediaType defaultMediaType) {
+        var mediaType = request.hasParam(URL_PARAM_FORMAT) ? mediaTypeFromParams(request) : mediaTypeFromHeaders(request);
+        return mediaType == null ? defaultMediaType : mediaType;
+    }
+
     private static MediaType mediaTypeFromHeaders(RestRequest request) {
         ParsedMediaType acceptType = request.getParsedAccept();
-        MediaType mediaType = acceptType != null ? acceptType.toMediaType(MEDIA_TYPE_REGISTRY) : request.getXContentType();
-        return checkNonNullMediaType(mediaType, request);
+        return acceptType != null ? acceptType.toMediaType(MEDIA_TYPE_REGISTRY) : request.getXContentType();
     }
 
     private static MediaType mediaTypeFromParams(RestRequest request) {

+ 12 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/EsqlMediaTypeParserTests.java

@@ -17,6 +17,7 @@ import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
 import java.util.Collections;
 import java.util.Map;
 
+import static org.elasticsearch.xcontent.XContentType.JSON;
 import static org.elasticsearch.xpack.esql.formatter.TextFormat.CSV;
 import static org.elasticsearch.xpack.esql.formatter.TextFormat.PLAIN_TEXT;
 import static org.elasticsearch.xpack.esql.formatter.TextFormat.TSV;
@@ -123,11 +124,17 @@ public class EsqlMediaTypeParserTests extends ESTestCase {
     public void testNoFormat() {
         IllegalArgumentException e = expectThrows(
             IllegalArgumentException.class,
-            () -> getResponseMediaType(new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).build(), createTestInstance(false))
+            () -> getResponseMediaType(emptyRequest(), createTestInstance(false))
         );
         assertEquals(e.getMessage(), "Invalid request content type: Accept=[null], Content-Type=[null], format=[null]");
     }
 
+    public void testNoContentType() {
+        RestRequest fakeRestRequest = emptyRequest();
+        assertThat(getResponseMediaType(fakeRestRequest, CSV), is(CSV));
+        assertThat(getResponseMediaType(fakeRestRequest, JSON), is(JSON));
+    }
+
     private static RestRequest reqWithAccept(String acceptHeader) {
         return new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withHeaders(
             Map.of("Content-Type", Collections.singletonList("application/json"), "Accept", Collections.singletonList(acceptHeader))
@@ -140,6 +147,10 @@ public class EsqlMediaTypeParserTests extends ESTestCase {
         ).withParams(params).build();
     }
 
+    private static RestRequest emptyRequest() {
+        return new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).build();
+    }
+
     protected EsqlQueryRequest createTestInstance(boolean columnar) {
         var request = new EsqlQueryRequest();
         request.columnar(columnar);