Forráskód Böngészése

[9.1] ESQL: Fix async query inconsistent headers (#135273)

* ESQL: Fix async query inconsistent headers (#135078)

Fixes https://github.com/elastic/elasticsearch/issues/135042

This PR:
- Fixes the get-result not always returning the expected headers
- Fixes the non-async query incorrectly returning the "is running" async header

* Revert rest tests file

* Added main changes on test class

* Backported test for the headers fix

* Remove unrelated changes
Iván Cea Fontenla 1 hete
szülő
commit
5e689d8928

+ 6 - 0
docs/changelog/135078.yaml

@@ -0,0 +1,6 @@
+pr: 135078
+summary: Fix async get results with inconsistent headers
+area: ES|QL
+type: bug
+issues:
+ - 135042

+ 76 - 24
x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

@@ -16,6 +16,7 @@ import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.WarningsHandler;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.io.Streams;
@@ -41,6 +42,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -51,6 +53,8 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.function.IntFunction;
 
 import static java.util.Collections.emptySet;
@@ -60,11 +64,11 @@ import static org.elasticsearch.test.ListMatcher.matchesList;
 import static org.elasticsearch.test.MapMatcher.assertMap;
 import static org.elasticsearch.test.MapMatcher.matchesMap;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
-import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertNotPartial;
 import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.ASYNC;
 import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC;
 import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString;
 import static org.hamcrest.Matchers.any;
+import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.either;
 import static org.hamcrest.Matchers.emptyOrNullString;
@@ -396,7 +400,9 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         options.addHeader("Content-Type", mediaType);
         options.addHeader("Accept", "text/csv; header=absent");
         request.setOptions(options);
-        HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings());
+        Response response = performRequest(request);
+        assertWarnings(response, new AssertWarnings.NoWarnings());
+        HttpEntity entity = response.getEntity();
         String actual = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
         assertEquals("keyword0,0\r\n", actual);
     }
@@ -1258,7 +1264,10 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         var results = mode == ASYNC
             ? runEsqlAsync(requestObject, randomBoolean(), assertWarnings)
             : runEsqlSync(requestObject, assertWarnings);
-        return checkPartialResults ? assertNotPartial(results) : results;
+        if (checkPartialResults) {
+            assertNotPartial(results);
+        }
+        return results;
     }
 
     public static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode)
@@ -1269,8 +1278,17 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
     public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
         Request request = prepareRequestWithOptions(requestObject, SYNC);
 
-        HttpEntity entity = performRequest(request, assertWarnings);
-        return entityToMap(entity, requestObject.contentType());
+        Response response = performRequest(request);
+        HttpEntity entity = response.getEntity();
+        Map<String, Object> json = entityToMap(entity, requestObject.contentType());
+
+        var supportsAsyncHeadersFix = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix"));
+        if (supportsAsyncHeadersFix) {
+            assertNoAsyncHeaders(response);
+        }
+        assertWarnings(response, assertWarnings);
+
+        return json;
     }
 
     public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
@@ -1298,17 +1316,18 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         checkKeepOnCompletion(requestObject, json, keepOnCompletion);
         String id = (String) json.get("id");
 
-        var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false);
-        var supportsSuggestedCast = clusterHasCapability("POST", "/_query", List.of(), List.of("suggested_cast")).orElse(false);
+        var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix"));
+        var supportsSuggestedCast = hasCapabilities(adminClient(), List.of("suggested_cast"));
+
+        // Check headers on initial query call
+        if (supportsAsyncHeaders) {
+            assertAsyncHeaders(response, id, (boolean) json.get("is_running"));
+        }
 
         if (id == null) {
             // no id returned from an async call, must have completed immediately and without keep_on_completion
             assertThat(requestObject.keepOnCompletion(), either(nullValue()).or(is(false)));
             assertThat((boolean) json.get("is_running"), is(false));
-            if (supportsAsyncHeaders) {
-                assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue());
-                assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is("?0"));
-            }
             assertWarnings(response, assertWarnings);
             json.remove("is_running"); // remove this to not mess up later map assertions
             return Collections.unmodifiableMap(json);
@@ -1329,11 +1348,6 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
                 assertThat(json.get("pages"), nullValue());
             }
 
-            if (supportsAsyncHeaders) {
-                assertThat(response.getHeader("X-Elasticsearch-Async-Id"), is(id));
-                assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is(isRunning ? "?1" : "?0"));
-            }
-
             // issue a second request to "async get" the results
             Request getRequest = prepareAsyncGetRequest(id);
             getRequest.setOptions(request.getOptions());
@@ -1343,6 +1357,11 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
 
         var result = entityToMap(entity, requestObject.contentType());
 
+        // Check headers on get call
+        if (supportsAsyncHeaders) {
+            assertAsyncHeaders(response, id, (boolean) result.get("is_running"));
+        }
+
         // assert initial contents, if any, are the same as async get contents
         if (initialColumns != null) {
             if (supportsSuggestedCast == false) {
@@ -1361,6 +1380,26 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         return removeAsyncProperties(result);
     }
 
+    record CapabilitesCacheKey(RestClient client, List<String> capabilities) {}
+
+    /**
+     * Cache of capabilities.
+     */
+    private static final ConcurrentMap<CapabilitesCacheKey, Boolean> capabilities = new ConcurrentHashMap<>();
+
+    public static boolean hasCapabilities(RestClient client, List<String> requiredCapabilities) {
+        if (requiredCapabilities.isEmpty()) {
+            return true;
+        }
+        return capabilities.computeIfAbsent(new CapabilitesCacheKey(client, requiredCapabilities), r -> {
+            try {
+                return clusterHasCapability(client, "POST", "/_query", List.of(), requiredCapabilities).orElse(false);
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        });
+    }
+
     private static Object removeOriginalTypesAndSuggestedCast(Object response) {
         if (response instanceof ArrayList<?> columns) {
             var newColumns = new ArrayList<>();
@@ -1589,7 +1628,8 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         }
 
         Response response = performRequest(request);
-        HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings());
+        assertWarnings(response, new AssertWarnings.NoWarnings());
+        HttpEntity entity = response.getEntity();
 
         // get the content, it could be empty because the request might have not completed
         String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
@@ -1642,7 +1682,8 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
             // if `addParam` is false, `options` will already have an `Accept` header
             getRequest.setOptions(options);
             response = performRequest(getRequest);
-            entity = assertWarnings(response, new AssertWarnings.NoWarnings());
+            assertWarnings(response, new AssertWarnings.NoWarnings());
+            entity = response.getEntity();
         }
         String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
 
@@ -1681,10 +1722,6 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         return mediaType;
     }
 
-    private static HttpEntity performRequest(Request request, AssertWarnings assertWarnings) throws IOException {
-        return assertWarnings(performRequest(request), assertWarnings);
-    }
-
     protected static Response performRequest(Request request) throws IOException {
         Response response = client().performRequest(request);
         if (shouldLog()) {
@@ -1695,14 +1732,19 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         return response;
     }
 
-    private static HttpEntity assertWarnings(Response response, AssertWarnings assertWarnings) {
+    static void assertNotPartial(Map<String, Object> answer) {
+        var clusters = answer.get("_clusters");
+        var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "");
+        assertThat(reason, answer.get("is_partial"), anyOf(nullValue(), is(false)));
+    }
+
+    private static void assertWarnings(Response response, AssertWarnings assertWarnings) {
         List<String> warnings = new ArrayList<>(response.getWarnings());
         warnings.removeAll(mutedWarnings());
         if (shouldLog()) {
             LOGGER.info("RESPONSE warnings (after muted)={}", warnings);
         }
         assertWarnings.assertWarnings(warnings);
-        return response.getEntity();
     }
 
     private static Set<String> mutedWarnings() {
@@ -1813,6 +1855,16 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
         assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
     }
 
+    private static void assertAsyncHeaders(Response response, @Nullable String asyncId, boolean isRunning) {
+        assertThat(response.getHeader("X-Elasticsearch-Async-Id"), asyncId == null ? nullValue() : equalTo(asyncId));
+        assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), isRunning ? is("?1") : is("?0"));
+    }
+
+    private static void assertNoAsyncHeaders(Response response) {
+        assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue());
+        assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), nullValue());
+    }
+
     public static RequestObjectBuilder requestObjectBuilder() throws IOException {
         return new RequestObjectBuilder();
     }

+ 5 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -628,6 +628,11 @@ public class EsqlCapabilities {
          */
         ASYNC_QUERY_STATUS_HEADERS,
 
+        /**
+         * Fix async headers not being sent on "get" requests
+         */
+        ASYNC_QUERY_STATUS_HEADERS_FIX,
+
         /**
          * Consider the upper bound when computing the interval in BUCKET auto mode.
          */

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

@@ -222,7 +222,7 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
     }
 
     public boolean isAsync() {
-        return isRunning;
+        return isAsync;
     }
 
     public boolean isPartial() {

+ 18 - 8
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java

@@ -23,6 +23,7 @@ import org.elasticsearch.injection.guice.Inject;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.async.AsyncExecutionId;
 import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
 import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction;
@@ -35,6 +36,7 @@ import org.elasticsearch.xpack.esql.parser.ParsingException;
 public class TransportEsqlAsyncGetResultsAction extends AbstractTransportQlAsyncGetResultsAction<EsqlQueryResponse, EsqlQueryTask> {
 
     private final BlockFactory blockFactory;
+    private final ThreadPool threadPool;
 
     @Inject
     public TransportEsqlAsyncGetResultsAction(
@@ -43,9 +45,9 @@ public class TransportEsqlAsyncGetResultsAction extends AbstractTransportQlAsync
         ClusterService clusterService,
         NamedWriteableRegistry registry,
         Client client,
-        ThreadPool threadPool,
         BigArrays bigArrays,
-        BlockFactoryProvider blockFactoryProvider
+        BlockFactoryProvider blockFactoryProvider,
+        ThreadPool threadPool
     ) {
         super(
             EsqlAsyncGetResultAction.NAME,
@@ -59,11 +61,12 @@ public class TransportEsqlAsyncGetResultsAction extends AbstractTransportQlAsync
             EsqlQueryTask.class
         );
         this.blockFactory = blockFactoryProvider.blockFactory();
+        this.threadPool = threadPool;
     }
 
     @Override
     protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener<EsqlQueryResponse> listener) {
-        super.doExecute(task, request, unwrapListener(listener));
+        super.doExecute(task, request, unwrapListener(request.getId(), listener));
     }
 
     @Override
@@ -75,14 +78,21 @@ public class TransportEsqlAsyncGetResultsAction extends AbstractTransportQlAsync
     static final String VERIFY_EX_NAME = ElasticsearchException.getExceptionName(new VerificationException(""));
 
     /**
-     * Unwraps the exception in the case of failure. This keeps the exception types
-     * the same as the sync API, namely ParsingException and VerificationException.
+     * Adds async headers, and unwraps the exception in the case of failure.
+     * <p>
+     *     This keeps the exception types the same as the sync API, namely ParsingException and VerificationException.
+     * </p>
      */
-    static <R> ActionListener<R> unwrapListener(ActionListener<R> listener) {
+    ActionListener<EsqlQueryResponse> unwrapListener(String asyncExecutionId, ActionListener<EsqlQueryResponse> listener) {
         return new ActionListener<>() {
             @Override
-            public void onResponse(R o) {
-                listener.onResponse(o);
+            public void onResponse(EsqlQueryResponse response) {
+                boolean isRunning = response.isRunning();
+                threadPool.getThreadContext()
+                    .addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0");
+                threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
+
+                listener.onResponse(response);
             }
 
             @Override

+ 14 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -252,7 +252,20 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
             ActionListener.wrap(result -> {
                 recordCCSTelemetry(task, executionInfo, request, null);
                 planExecutor.metrics().recordTook(executionInfo.overallTook().millis());
-                listener.onResponse(toResponse(task, request, configuration, result));
+                var response = toResponse(task, request, configuration, result);
+                assert response.isAsync() == request.async() : "The response must be async if the request was async";
+
+                if (response.isAsync()) {
+                    if (response.asyncExecutionId().isPresent()) {
+                        String asyncExecutionId = response.asyncExecutionId().get();
+                        threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
+                    }
+                    boolean isRunning = response.isRunning();
+                    threadPool.getThreadContext()
+                        .addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0");
+                }
+
+                listener.onResponse(response);
             }, ex -> {
                 recordCCSTelemetry(task, executionInfo, request, ex);
                 listener.onFailure(ex);
@@ -338,10 +351,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
         EsqlQueryResponse.Profile profile = configuration.profile()
             ? new EsqlQueryResponse.Profile(result.completionInfo().driverProfiles(), result.completionInfo().planProfiles())
             : null;
-        threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0");
         if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
             String asyncExecutionId = asyncTask.getExecutionId().getEncoded();
-            threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
             return new EsqlQueryResponse(
                 columns,
                 result.pages(),