Pārlūkot izejas kodu

Support ES|QL requests through the NodeClient::execute (#106244)

This commit adds support for executing ES|QL requests through the NodeClient::execute.

A subset of ES|QL's transport request and response APIs has been added x-pack/core. This offers basic functionality to run ES|QL queries without depending upon the ES|QL plugin directly. The API is deliberately small so as to not expose any unnecessary parts of the ES|QL implementation. It can be expanded later if and when needed, e.g. adding an explicitly Page, and Block types.
Chris Hegarty 1 gadu atpakaļ
vecāks
revīzija
aa6222a408
29 mainītis faili ar 741 papildinājumiem un 34 dzēšanām
  1. 5 0
      docs/changelog/106244.yaml
  2. 2 0
      x-pack/plugin/core/src/main/java/module-info.java
  3. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/ColumnInfo.java
  4. 27 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/EsqlQueryRequest.java
  5. 35 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/EsqlQueryRequestBuilder.java
  6. 39 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/EsqlQueryResponse.java
  7. 47 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/EsqlResponse.java
  8. 41 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/internal/SharedSecrets.java
  9. 22 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/esql/action/EsqlQueryRequestBuilderTests.java
  10. 16 0
      x-pack/plugin/esql/qa/action/build.gradle
  11. 178 0
      x-pack/plugin/esql/qa/action/src/internalClusterTest/java/org/elasticsearch/test/esql/qa/action/CoreEsqlActionIT.java
  12. 1 0
      x-pack/plugin/esql/qa/testFixtures/build.gradle
  13. 1 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java
  14. 1 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java
  15. 1 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java
  16. 1 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncActionIT.java
  17. 3 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java
  18. 10 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestBuilder.java
  19. 35 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java
  20. 89 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseImpl.java
  21. 1 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java
  22. 57 23
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java
  23. 1 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java
  24. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/formatter/TextFormat.java
  25. 12 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java
  26. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
  27. 111 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java
  28. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java
  29. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java

+ 5 - 0
docs/changelog/106244.yaml

@@ -0,0 +1,5 @@
+pr: 106244
+summary: Support ES|QL requests through the `NodeClient::execute`
+area: ES|QL
+type: feature
+issues: []

+ 2 - 0
x-pack/plugin/core/src/main/java/module-info.java

@@ -65,6 +65,8 @@ module org.elasticsearch.xcore {
     exports org.elasticsearch.xpack.core.enrich;
     exports org.elasticsearch.xpack.core.eql;
     exports org.elasticsearch.xpack.core.esql;
+    exports org.elasticsearch.xpack.core.esql.action;
+    exports org.elasticsearch.xpack.core.esql.action.internal; // TODO: qualify to esql when modularized
     exports org.elasticsearch.xpack.core.frozen.action;
     exports org.elasticsearch.xpack.core.frozen;
     exports org.elasticsearch.xpack.core.graph.action;

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ColumnInfo.java → x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/ColumnInfo.java

@@ -5,7 +5,7 @@
  * 2.0.
  */
 
-package org.elasticsearch.xpack.esql.action;
+package org.elasticsearch.xpack.core.esql.action;
 
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;

+ 27 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/EsqlQueryRequest.java

@@ -0,0 +1,27 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.esql.action;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.index.query.QueryBuilder;
+
+import java.io.IOException;
+
+public abstract class EsqlQueryRequest extends ActionRequest {
+
+    protected EsqlQueryRequest() {}
+
+    protected EsqlQueryRequest(StreamInput in) throws IOException {
+        super(in);
+    }
+
+    public abstract String query();
+
+    public abstract QueryBuilder filter();
+}

+ 35 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/EsqlQueryRequestBuilder.java

@@ -0,0 +1,35 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.esql.action;
+
+import org.elasticsearch.action.ActionRequestBuilder;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.client.internal.ElasticsearchClient;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.xpack.core.esql.action.internal.SharedSecrets;
+
+public abstract class EsqlQueryRequestBuilder<Request extends EsqlQueryRequest, Response extends EsqlQueryResponse> extends
+    ActionRequestBuilder<Request, Response> {
+
+    /** Creates a new ES|QL query request builder. */
+    public static EsqlQueryRequestBuilder<? extends EsqlQueryRequest, ? extends EsqlQueryResponse> newRequestBuilder(
+        ElasticsearchClient client
+    ) {
+        return SharedSecrets.getEsqlQueryRequestBuilderAccess().newEsqlQueryRequestBuilder(client);
+    }
+
+    // not for direct use
+    protected EsqlQueryRequestBuilder(ElasticsearchClient client, ActionType<Response> action, Request request) {
+        super(client, action, request);
+    }
+
+    public abstract EsqlQueryRequestBuilder<Request, Response> query(String query);
+
+    public abstract EsqlQueryRequestBuilder<Request, Response> filter(QueryBuilder filter);
+
+}

+ 39 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/EsqlQueryResponse.java

@@ -0,0 +1,39 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.esql.action;
+
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.core.Releasable;
+
+/**
+ * Response to an ES|QL query request.
+ *
+ * This query response must be closed when the consumer of its response
+ * object is finished. Closing the query response closes and invalidates
+ * the response object. Calling {@link #response()} on a closed query
+ * response results in an IllegalStateException.
+ */
+public abstract class EsqlQueryResponse extends ActionResponse implements Releasable {
+
+    private boolean closed;
+
+    /** Returns the response object. */
+    public EsqlResponse response() {
+        if (closed) {
+            throw new IllegalStateException("closed");
+        }
+        return responseInternal();
+    }
+
+    protected abstract EsqlResponse responseInternal();
+
+    @Override
+    public void close() {
+        closed = true;
+    }
+}

+ 47 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/EsqlResponse.java

@@ -0,0 +1,47 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.esql.action;
+
+import org.elasticsearch.core.Releasable;
+
+import java.util.List;
+
+/**
+ * An ES|QL Response object.
+ *
+ * <p> Iterator based access to values of type T has the following properties:
+ * <ol>
+ *   <li>single-value is of type {@code T}</li>
+ *   <li>multi-value is of type {@code List<T>}</li>
+ *   <li>absent value is {@code null}</li>
+ * </ol>
+ *
+ * <p> This response object should be closed when the consumer of its values
+ * is finished. Closing the response object invalidates any iterators of its
+ * values. An invalidated iterator, if not already exhausted, will eventually
+ * throw an IllegalStateException. Once a response object is closed, calling
+ * {@link #rows()}, {@link #column(int)}, or operating on an Iterable return
+ * from the aforementioned value accessor methods, results in an
+ * IllegalStateException.
+ */
+public interface EsqlResponse extends Releasable {
+
+    /** Returns the column info. */
+    List<? extends ColumnInfo> columns();
+
+    /**
+     * Returns an iterable that allows to iterator over the values in all rows
+     * of the response, this is the rows-iterator. A further iterator can be
+     * retrieved from the rows-iterator, which iterates over the actual values
+     * in the row, one row at a time, column-wise from left to right.
+     */
+    Iterable<Iterable<Object>> rows();
+
+    /** Returns an iterable over the values in the given column. */
+    Iterable<Object> column(int columnIndex);
+}

+ 41 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/internal/SharedSecrets.java

@@ -0,0 +1,41 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.esql.action.internal;
+
+import org.elasticsearch.client.internal.ElasticsearchClient;
+import org.elasticsearch.xpack.core.esql.action.EsqlQueryRequest;
+import org.elasticsearch.xpack.core.esql.action.EsqlQueryRequestBuilder;
+import org.elasticsearch.xpack.core.esql.action.EsqlQueryResponse;
+
+/**
+ * For secret access to ES|QL internals only. Do not use.
+ * TODO qualify export when ES|QL is modularized
+ */
+public class SharedSecrets {
+
+    private static EsqlQueryRequestBuilderAccess esqlQueryRequestBuilderAccess;
+
+    public static void setEsqlQueryRequestBuilderAccess(EsqlQueryRequestBuilderAccess access) {
+        esqlQueryRequestBuilderAccess = access;
+    }
+
+    public static EsqlQueryRequestBuilderAccess getEsqlQueryRequestBuilderAccess() {
+        var access = esqlQueryRequestBuilderAccess;
+        if (access == null) {
+            throw new IllegalStateException("ESQL module not present or initialized");
+        }
+        return access;
+    }
+
+    public interface EsqlQueryRequestBuilderAccess {
+
+        EsqlQueryRequestBuilder<? extends EsqlQueryRequest, ? extends EsqlQueryResponse> newEsqlQueryRequestBuilder(
+            ElasticsearchClient client
+        );
+    }
+}

+ 22 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/esql/action/EsqlQueryRequestBuilderTests.java

@@ -0,0 +1,22 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.esql.action;
+
+import org.elasticsearch.test.ESIntegTestCase;
+
+import static org.hamcrest.core.IsEqual.equalTo;
+
+public class EsqlQueryRequestBuilderTests extends ESIntegTestCase {
+
+    // This is a trivial test that asserts IAE when the ES|QL module is
+    // not present.
+    public void testIllegalStateException() {
+        var e = expectThrows(IllegalStateException.class, () -> EsqlQueryRequestBuilder.newRequestBuilder(client()));
+        assertThat(e.getMessage(), equalTo("ESQL module not present or initialized"));
+    }
+}

+ 16 - 0
x-pack/plugin/esql/qa/action/build.gradle

@@ -0,0 +1,16 @@
+apply plugin: 'elasticsearch.java'
+apply plugin: 'elasticsearch.internal-cluster-test'
+
+description = 'Tests for requests made through the Node Client request API'
+
+dependencies {
+    api project(":test:framework")
+    api project(':server')
+    compileOnly project(path: xpackModule('core'))
+
+    testImplementation(testArtifact(project(xpackModule('core'))))
+    // runtime only - since the test source should not explicitly depend
+    // upon any types from ES|QL (only xpack core)
+    testImplementation project(':x-pack:plugin:ql')
+    testImplementation project(':x-pack:plugin:esql')
+}

+ 178 - 0
x-pack/plugin/esql/qa/action/src/internalClusterTest/java/org/elasticsearch/test/esql/qa/action/CoreEsqlActionIT.java

@@ -0,0 +1,178 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.test.esql.qa.action;
+
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
+import org.elasticsearch.xpack.core.esql.action.EsqlQueryRequest;
+import org.elasticsearch.xpack.core.esql.action.EsqlQueryRequestBuilder;
+import org.elasticsearch.xpack.core.esql.action.EsqlQueryResponse;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.contains;
+
+// A subset of test scenarios exercised through the xpack core ES|QL
+// transport API (rather than through the ES|QL request API).
+// Tests here have no static dependencies on types from the ES|QL plugin.
+public class CoreEsqlActionIT extends ESIntegTestCase {
+
+    @Before
+    public void setupIndex() {
+        createAndPopulateIndex("test");
+    }
+
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        try {
+            @SuppressWarnings("unchecked")
+            var c = (Class<? extends Plugin>) Class.forName("org.elasticsearch.xpack.esql.plugin.EsqlPlugin");
+            return List.of(c);
+        } catch (ClassNotFoundException e) {
+            throw new AssertionError(e); // the ES|QL plugin must be present
+        }
+    }
+
+    public void testRowTypesAndValues() {
+        var query = "row a = 1, b = \"x\", c = 1000000000000, d = 1.1";
+        var request = EsqlQueryRequestBuilder.newRequestBuilder(client()).query(query);
+        try (var queryResp = run(request)) {
+            logger.info("response=" + queryResp);
+            var resp = queryResp.response();
+            assertThat(resp.columns().stream().map(ColumnInfo::name).toList(), contains("a", "b", "c", "d"));
+            assertThat(resp.columns().stream().map(ColumnInfo::type).toList(), contains("integer", "keyword", "long", "double"));
+            assertThat(getValuesList(resp.rows()), contains(List.of(1, "x", 1000000000000L, 1.1d)));
+        }
+    }
+
+    public void testRowStatsProjectGroupByInt() {
+        var query = "row a = 1, b = 2 | stats count(b) by a | keep a";
+        var request = EsqlQueryRequestBuilder.newRequestBuilder(client()).query(query);
+        try (var queryResp = run(request)) {
+            logger.info("response=" + queryResp);
+            var resp = queryResp.response();
+            assertThat(resp.columns().stream().map(ColumnInfo::name).toList(), contains("a"));
+            assertThat(resp.columns().stream().map(ColumnInfo::type).toList(), contains("integer"));
+            assertThat(getValuesList(resp.rows()), contains(List.of(1)));
+        }
+    }
+
+    public void testFrom() {
+        var query = "from test | keep item, cost, color, sale | sort item";
+        var request = EsqlQueryRequestBuilder.newRequestBuilder(client()).query(query);
+        try (var queryResp = run(request)) {
+            var resp = queryResp.response();
+            logger.info("response=" + queryResp);
+            assertThat(resp.columns().stream().map(ColumnInfo::name).toList(), contains("item", "cost", "color", "sale"));
+            assertThat(resp.columns().stream().map(ColumnInfo::type).toList(), contains("long", "double", "keyword", "date"));
+            // columnar values
+            assertThat(columnValues(resp.column(0)), contains(1L, 2L, 3L, 4L));
+            assertThat(columnValues(resp.column(1)), contains(1.1d, 2.1d, 3.1d, 4.1d));
+            assertThat(columnValues(resp.column(2)), contains("red", "blue", "green", "red"));
+            var d = List.of("2004-03-02T00:00:00.000Z", "1992-06-01T00:00:00.000Z", "1965-06-01T00:00:00.000Z", "2000-03-15T00:00:00.000Z");
+            assertThat(columnValues(resp.column(3)), contains(d.toArray()));
+            // row values
+            List<List<Object>> values = getValuesList(resp.rows());
+            assertThat(values.get(0), contains(1L, 1.1d, "red", "2004-03-02T00:00:00.000Z"));
+            assertThat(values.get(1), contains(2L, 2.1d, "blue", "1992-06-01T00:00:00.000Z"));
+            assertThat(values.get(2), contains(3L, 3.1d, "green", "1965-06-01T00:00:00.000Z"));
+            assertThat(values.get(3), contains(4L, 4.1d, "red", "2000-03-15T00:00:00.000Z"));
+        }
+    }
+
+    public void testAccessAfterClose() {
+        for (var closedQueryResp : new boolean[] { true, false }) {
+            var query = "row a = 1";
+            var request = EsqlQueryRequestBuilder.newRequestBuilder(client()).query(query);
+            var queryResp = run(request);
+            var resp = queryResp.response();
+            var rows = resp.rows();
+            var rowItr = rows.iterator();
+            var cols = resp.column(0);
+            var colItr = cols.iterator();
+
+            // must close at least one of them
+            if (closedQueryResp) queryResp.close();
+            if (randomBoolean() || closedQueryResp == false) resp.close();
+
+            assertThrows(IllegalStateException.class, resp::rows);
+            assertThrows(IllegalStateException.class, () -> resp.column(0));
+            assertThrows(IllegalStateException.class, () -> rows.iterator());
+            assertThrows(IllegalStateException.class, () -> cols.iterator());
+            assertThrows(IllegalStateException.class, () -> queryResp.response().rows());
+            assertThrows(IllegalStateException.class, () -> queryResp.response().column(0));
+            assertThrows(IllegalStateException.class, () -> rowItr.next().iterator().next());
+            assertThrows(IllegalStateException.class, () -> colItr.next());
+            if (closedQueryResp) {
+                assertThrows(IllegalStateException.class, () -> queryResp.response());
+            } else {
+                queryResp.close(); // we must close the query response if not already closed
+            }
+        }
+    }
+
+    protected EsqlQueryResponse run(EsqlQueryRequestBuilder<? extends EsqlQueryRequest, ? extends EsqlQueryResponse> request) {
+        try {
+            if (randomBoolean()) {
+                return request.execute().actionGet(30, SECONDS);
+            } else {
+                return ClientHelper.executeWithHeaders(
+                    Map.of("Foo", "bar"),
+                    "origin",
+                    client(),
+                    () -> request.execute().actionGet(30, SECONDS)
+                );
+            }
+        } catch (ElasticsearchTimeoutException e) {
+            throw new AssertionError("timeout", e);
+        }
+    }
+
+    static List<List<Object>> getValuesList(Iterable<Iterable<Object>> values) {
+        var valuesList = new ArrayList<List<Object>>();
+        values.forEach(row -> {
+            var rowValues = new ArrayList<>();
+            row.forEach(rowValues::add);
+            valuesList.add(rowValues);
+        });
+        return valuesList;
+    }
+
+    static List<Object> columnValues(Iterable<Object> values) {
+        List<Object> l = new ArrayList<>();
+        values.forEach(l::add);
+        return l;
+    }
+
+    private void createAndPopulateIndex(String indexName) {
+        var client = client().admin().indices();
+        var CreateRequest = client.prepareCreate(indexName)
+            .setSettings(Settings.builder().put("index.number_of_shards", 1))
+            .setMapping("item", "type=long", "cost", "type=double", "color", "type=keyword", "sale", "type=date");
+        assertAcked(CreateRequest);
+        client().prepareBulk()
+            .add(new IndexRequest(indexName).id("1").source("item", 1, "cost", 1.1d, "color", "red", "sale", "2004-03-02T00:00:00.000Z"))
+            .add(new IndexRequest(indexName).id("2").source("item", 2, "cost", 2.1d, "color", "blue", "sale", "1992-06-01T00:00:00.000Z"))
+            .add(new IndexRequest(indexName).id("3").source("item", 3, "cost", 3.1d, "color", "green", "sale", "1965-06-01T00:00:00.000Z"))
+            .add(new IndexRequest(indexName).id("4").source("item", 4, "cost", 4.1d, "color", "red", "sale", "2000-03-15T00:00:00.000Z"))
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .get();
+        ensureYellow(indexName);
+    }
+}

+ 1 - 0
x-pack/plugin/esql/qa/testFixtures/build.gradle

@@ -4,6 +4,7 @@ apply plugin: 'elasticsearch.java'
 dependencies {
     implementation project(':x-pack:plugin:esql:compute')
     compileOnly project(':x-pack:plugin:esql')
+    compileOnly project(path: xpackModule('core'))
     implementation project(":libs:elasticsearch-x-content")
     implementation project(':client:rest')
     implementation project(':libs:elasticsearch-logging')

+ 1 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java

@@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
 import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
 import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
 import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 import org.hamcrest.core.IsEqual;
 

+ 1 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java

@@ -40,6 +40,7 @@ import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
 import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
 import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.enrich.EnrichPlugin;
 import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.plan.logical.Enrich;

+ 1 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java

@@ -31,6 +31,7 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ListMatcher;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.json.JsonXContent;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.parser.ParsingException;

+ 1 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncActionIT.java

@@ -23,6 +23,7 @@ import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
 import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
 import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
 import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.TestBlockFactory;
 import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.parser.ParsingException;

+ 3 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java

@@ -8,7 +8,6 @@
 package org.elasticsearch.xpack.esql.action;
 
 import org.elasticsearch.Build;
-import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.CompositeIndicesRequest;
 import org.elasticsearch.common.Strings;
@@ -29,7 +28,7 @@ import java.util.Map;
 
 import static org.elasticsearch.action.ValidateActions.addValidationError;
 
-public class EsqlQueryRequest extends ActionRequest implements CompositeIndicesRequest {
+public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.EsqlQueryRequest implements CompositeIndicesRequest {
 
     public static TimeValue DEFAULT_KEEP_ALIVE = TimeValue.timeValueDays(5);
     public static TimeValue DEFAULT_WAIT_FOR_COMPLETION = TimeValue.timeValueSeconds(1);
@@ -81,6 +80,7 @@ public class EsqlQueryRequest extends ActionRequest implements CompositeIndicesR
         this.query = query;
     }
 
+    @Override
     public String query() {
         return query;
     }
@@ -124,6 +124,7 @@ public class EsqlQueryRequest extends ActionRequest implements CompositeIndicesR
         this.filter = filter;
     }
 
+    @Override
     public QueryBuilder filter() {
         return filter;
     }

+ 10 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestBuilder.java

@@ -7,13 +7,15 @@
 
 package org.elasticsearch.xpack.esql.action;
 
-import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.client.internal.ElasticsearchClient;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.xpack.core.esql.action.internal.SharedSecrets;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 
-public class EsqlQueryRequestBuilder extends ActionRequestBuilder<EsqlQueryRequest, EsqlQueryResponse> {
+public class EsqlQueryRequestBuilder extends org.elasticsearch.xpack.core.esql.action.EsqlQueryRequestBuilder<
+    EsqlQueryRequest,
+    EsqlQueryResponse> {
 
     public static EsqlQueryRequestBuilder newAsyncEsqlQueryRequestBuilder(ElasticsearchClient client) {
         return new EsqlQueryRequestBuilder(client, EsqlQueryRequest.asyncEsqlQueryRequest());
@@ -27,6 +29,7 @@ public class EsqlQueryRequestBuilder extends ActionRequestBuilder<EsqlQueryReque
         super(client, EsqlQueryAction.INSTANCE, request);
     }
 
+    @Override
     public EsqlQueryRequestBuilder query(String query) {
         request.query(query);
         return this;
@@ -37,6 +40,7 @@ public class EsqlQueryRequestBuilder extends ActionRequestBuilder<EsqlQueryReque
         return this;
     }
 
+    @Override
     public EsqlQueryRequestBuilder filter(QueryBuilder filter) {
         request.filter(filter);
         return this;
@@ -61,4 +65,8 @@ public class EsqlQueryRequestBuilder extends ActionRequestBuilder<EsqlQueryReque
         request.keepOnCompletion(keepOnCompletion);
         return this;
     }
+
+    static { // plumb access from x-pack core
+        SharedSecrets.setEsqlQueryRequestBuilderAccess(EsqlQueryRequestBuilder::newSyncEsqlQueryRequestBuilder);
+    }
 }

+ 35 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

@@ -8,7 +8,6 @@
 package org.elasticsearch.xpack.esql.action;
 
 import org.elasticsearch.TransportVersions;
-import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -26,6 +25,8 @@ import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.xcontent.ToXContent;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
+import org.elasticsearch.xpack.core.esql.action.EsqlResponse;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -34,7 +35,10 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 
-public class EsqlQueryResponse extends ActionResponse implements ChunkedToXContentObject, Releasable {
+public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.EsqlQueryResponse
+    implements
+        ChunkedToXContentObject,
+        Releasable {
 
     @SuppressWarnings("this-escape")
     private final AbstractRefCounted counted = AbstractRefCounted.of(this::closeInternal);
@@ -130,6 +134,16 @@ public class EsqlQueryResponse extends ActionResponse implements ChunkedToXConte
         return ResponseValueUtils.pagesToValues(dataTypes, pages);
     }
 
+    public Iterable<Iterable<Object>> rows() {
+        List<String> dataTypes = columns.stream().map(ColumnInfo::type).toList();
+        return ResponseValueUtils.valuesForRowsInPages(dataTypes, pages);
+    }
+
+    public Iterator<Object> column(int columnIndex) {
+        if (columnIndex < 0 || columnIndex >= columns.size()) throw new IllegalArgumentException();
+        return ResponseValueUtils.valuesForColumn(columnIndex, columns.get(columnIndex).type(), pages);
+    }
+
     public Profile profile() {
         return profile;
     }
@@ -261,13 +275,32 @@ public class EsqlQueryResponse extends ActionResponse implements ChunkedToXConte
 
     @Override
     public void close() {
+        super.close();
         decRef();
+        if (esqlResponse != null) {
+            esqlResponse.setClosedState();
+        }
     }
 
     void closeInternal() {
         Releasables.close(() -> Iterators.map(pages.iterator(), p -> p::releaseBlocks));
     }
 
+    // singleton lazy set view over this response
+    private EsqlResponseImpl esqlResponse;
+
+    @Override
+    public EsqlResponse responseInternal() {
+        if (hasReferences() == false) {
+            throw new IllegalStateException("closed");
+        }
+        if (esqlResponse != null) {
+            return esqlResponse;
+        }
+        esqlResponse = new EsqlResponseImpl(this);
+        return esqlResponse;
+    }
+
     public static class Profile implements Writeable, ChunkedToXContentObject {
         private final List<DriverProfile> drivers;
 

+ 89 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseImpl.java

@@ -0,0 +1,89 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
+import org.elasticsearch.xpack.core.esql.action.EsqlResponse;
+
+import java.util.Iterator;
+import java.util.List;
+
+/** View over the response, that supports the xpack core transport API. */
+public class EsqlResponseImpl implements EsqlResponse {
+
+    private final EsqlQueryResponse queryResponse;
+    private boolean closed;
+
+    EsqlResponseImpl(EsqlQueryResponse queryResponse) {
+        this.queryResponse = queryResponse;
+    }
+
+    @Override
+    public List<? extends ColumnInfo> columns() {
+        return queryResponse.columns();
+    }
+
+    @Override
+    public Iterable<Iterable<Object>> rows() {
+        ensureOpen();
+        return () -> {
+            ensureOpen();
+            return new DelegatingIterator<>(queryResponse.rows().iterator());
+        };
+    }
+
+    @Override
+    public Iterable<Object> column(int columnIndex) {
+        ensureOpen();
+        return () -> {
+            ensureOpen();
+            return new DelegatingIterator<>(queryResponse.column(columnIndex));
+        };
+    }
+
+    @Override
+    public void close() {
+        setClosedState();
+    }
+
+    public void setClosedState() {
+        closed = true;
+    }
+
+    private void ensureOpen() {
+        if (closed || queryResponse.hasReferences() == false) {
+            throw new IllegalStateException("closed");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "EsqlResponse[response=" + queryResponse + "]";
+    }
+
+    /** A delegating iterator, that first checks the closed state before delegating. */
+    final class DelegatingIterator<T> implements Iterator<T> {
+        final Iterator<T> delegate;
+
+        DelegatingIterator(Iterator<T> delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public boolean hasNext() {
+            ensureOpen();
+            return delegate.hasNext();
+        }
+
+        @Override
+        public T next() {
+            ensureOpen();
+            return delegate.next();
+        }
+    }
+}

+ 1 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java

@@ -23,6 +23,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.XContentParserConfiguration;
 import org.elasticsearch.xcontent.XContentType;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.versionfield.Version;
 
 import java.io.IOException;

+ 57 - 23
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java

@@ -26,6 +26,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.XContentParserConfiguration;
 import org.elasticsearch.xcontent.json.JsonXContent;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
 import org.elasticsearch.xpack.esql.planner.PlannerUtils;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
@@ -58,32 +59,65 @@ public final class ResponseValueUtils {
         BytesRef scratch = new BytesRef();
         return Iterators.flatMap(
             pages.iterator(),
-            page -> Iterators.forRange(0, page.getPositionCount(), p -> Iterators.forRange(0, page.getBlockCount(), b -> {
-                Block block = page.getBlock(b);
-                if (block.isNull(p)) {
-                    return null;
-                }
-                /*
-                 * Use the ESQL data type to map to the output to make sure compute engine
-                 * respects its types. See the INTEGER clause where is doesn't always
-                 * respect it.
-                 */
-                int count = block.getValueCount(p);
-                int start = block.getFirstValueIndex(p);
-                String dataType = dataTypes.get(b);
-                if (count == 1) {
-                    return valueAt(dataType, block, start, scratch);
-                }
-                List<Object> thisResult = new ArrayList<>(count);
-                int end = count + start;
-                for (int i = start; i < end; i++) {
-                    thisResult.add(valueAt(dataType, block, i, scratch));
-                }
-                return thisResult;
-            }))
+            page -> Iterators.forRange(
+                0,
+                page.getPositionCount(),
+                pos -> Iterators.forRange(0, page.getBlockCount(), b -> valueAtPosition(page.getBlock(b), pos, dataTypes.get(b), scratch))
+            )
+        );
+    }
+
+    /** Returns an iterable of iterables over the values in the given pages. There is one iterables for each row. */
+    static Iterable<Iterable<Object>> valuesForRowsInPages(List<String> dataTypes, List<Page> pages) {
+        BytesRef scratch = new BytesRef();
+        return () -> Iterators.flatMap(pages.iterator(), page -> valuesForRowsInPage(dataTypes, page, scratch));
+    }
+
+    /** Returns an iterable of iterables over the values in the given page. There is one iterables for each row. */
+    static Iterator<Iterable<Object>> valuesForRowsInPage(List<String> dataTypes, Page page, BytesRef scratch) {
+        return Iterators.forRange(0, page.getPositionCount(), position -> valuesForRow(dataTypes, page, position, scratch));
+    }
+
+    /** Returns an iterable over the values in the given row in a page. */
+    static Iterable<Object> valuesForRow(List<String> dataTypes, Page page, int position, BytesRef scratch) {
+        return () -> Iterators.forRange(
+            0,
+            page.getBlockCount(),
+            blockIdx -> valueAtPosition(page.getBlock(blockIdx), position, dataTypes.get(blockIdx), scratch)
         );
     }
 
+    /**  Returns an iterator of values for the given column. */
+    static Iterator<Object> valuesForColumn(int columnIndex, String dataType, List<Page> pages) {
+        BytesRef scratch = new BytesRef();
+        return Iterators.flatMap(
+            pages.iterator(),
+            page -> Iterators.forRange(
+                0,
+                page.getPositionCount(),
+                pos -> valueAtPosition(page.getBlock(columnIndex), pos, dataType, scratch)
+            )
+        );
+    }
+
+    /** Returns the value that the position and with the given data type, in the block. */
+    static Object valueAtPosition(Block block, int position, String dataType, BytesRef scratch) {
+        if (block.isNull(position)) {
+            return null;
+        }
+        int count = block.getValueCount(position);
+        int start = block.getFirstValueIndex(position);
+        if (count == 1) {
+            return valueAt(dataType, block, start, scratch);
+        }
+        List<Object> values = new ArrayList<>(count);
+        int end = count + start;
+        for (int i = start; i < end; i++) {
+            values.add(valueAt(dataType, block, i, scratch));
+        }
+        return values;
+    }
+
     private static Object valueAt(String dataType, Block block, int offset, BytesRef scratch) {
         return switch (dataType) {
             case "unsigned_long" -> unsignedLongAsNumber(((LongBlock) block).getLong(offset));

+ 1 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java

@@ -12,6 +12,7 @@ import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.xcontent.ToXContent;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 
 import java.util.Collections;
 import java.util.Iterator;

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/formatter/TextFormat.java

@@ -11,7 +11,7 @@ import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.xcontent.MediaType;
-import org.elasticsearch.xpack.esql.action.ColumnInfo;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
 import org.elasticsearch.xpack.ql.util.StringUtils;
 

+ 12 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -49,6 +49,7 @@ import org.elasticsearch.xpack.esql.EsqlInfoTransportAction;
 import org.elasticsearch.xpack.esql.EsqlUsageTransportAction;
 import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction;
 import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
+import org.elasticsearch.xpack.esql.action.EsqlQueryRequestBuilder;
 import org.elasticsearch.xpack.esql.action.RestEsqlAsyncQueryAction;
 import org.elasticsearch.xpack.esql.action.RestEsqlDeleteAsyncResultAction;
 import org.elasticsearch.xpack.esql.action.RestEsqlGetAsyncResultAction;
@@ -60,6 +61,7 @@ import org.elasticsearch.xpack.esql.session.EsqlIndexResolver;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypeRegistry;
 import org.elasticsearch.xpack.ql.index.IndexResolver;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
@@ -101,6 +103,7 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
         );
         BigArrays bigArrays = services.indicesService().getBigArrays().withCircuitBreaking();
         BlockFactory blockFactory = new BlockFactory(circuitBreaker, bigArrays, maxPrimitiveArrayBlockSize);
+        setupSharedSecrets();
         return List.of(
             new PlanExecutor(
                 new IndexResolver(
@@ -116,6 +119,15 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
         );
     }
 
+    private void setupSharedSecrets() {
+        try {
+            // EsqlQueryRequestBuilder.<clinit> initializes the shared secret access
+            MethodHandles.lookup().ensureInitialized(EsqlQueryRequestBuilder.class);
+        } catch (IllegalAccessException e) {
+            throw new AssertionError(e);
+        }
+    }
+
     /**
      * The settings defined by the ESQL plugin.
      *

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -28,7 +28,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.XPackPlugin;
 import org.elasticsearch.xpack.core.async.AsyncExecutionId;
-import org.elasticsearch.xpack.esql.action.ColumnInfo;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
 import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
 import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;

+ 111 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java

@@ -44,6 +44,7 @@ import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xcontent.json.JsonXContent;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.TestBlockFactory;
 import org.elasticsearch.xpack.esql.planner.PlannerUtils;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
@@ -56,6 +57,7 @@ import org.junit.Before;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
@@ -67,6 +69,7 @@ import static org.elasticsearch.xpack.esql.action.EsqlQueryResponse.DROP_NULL_CO
 import static org.elasticsearch.xpack.esql.action.ResponseValueUtils.valuesToPage;
 import static org.elasticsearch.xpack.ql.util.SpatialCoordinateTypes.CARTESIAN;
 import static org.elasticsearch.xpack.ql.util.SpatialCoordinateTypes.GEO;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 
 public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<EsqlQueryResponse> {
@@ -513,4 +516,112 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
     protected void dispose(EsqlQueryResponse esqlQueryResponse) {
         esqlQueryResponse.close();
     }
+
+    // Tests for response::column
+    public void testColumns() {
+        var intBlk1 = blockFactory.newIntArrayVector(new int[] { 10, 20 }, 2).asBlock();
+        var intBlk2 = blockFactory.newIntArrayVector(new int[] { 30, 40, 50 }, 3).asBlock();
+        var longBlk1 = blockFactory.newLongArrayVector(new long[] { 100L, 200L }, 2).asBlock();
+        var longBlk2 = blockFactory.newLongArrayVector(new long[] { 300L, 400L, 500L }, 3).asBlock();
+        var columnInfo = List.of(new ColumnInfo("foo", "integer"), new ColumnInfo("bar", "long"));
+        var pages = List.of(new Page(intBlk1, longBlk1), new Page(intBlk2, longBlk2));
+        try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false)) {
+            assertThat(columnValues(response.column(0)), contains(10, 20, 30, 40, 50));
+            assertThat(columnValues(response.column(1)), contains(100L, 200L, 300L, 400L, 500L));
+            expectThrows(IllegalArgumentException.class, () -> response.column(-1));
+            expectThrows(IllegalArgumentException.class, () -> response.column(2));
+        }
+    }
+
+    public void testColumnsIllegalArg() {
+        var intBlk1 = blockFactory.newIntArrayVector(new int[] { 10 }, 1).asBlock();
+        var columnInfo = List.of(new ColumnInfo("foo", "integer"));
+        var pages = List.of(new Page(intBlk1));
+        try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false)) {
+            expectThrows(IllegalArgumentException.class, () -> response.column(-1));
+            expectThrows(IllegalArgumentException.class, () -> response.column(1));
+        }
+    }
+
+    public void testColumnsWithNull() {
+        IntBlock blk1, blk2, blk3;
+        try (
+            var bb1 = blockFactory.newIntBlockBuilder(2);
+            var bb2 = blockFactory.newIntBlockBuilder(4);
+            var bb3 = blockFactory.newIntBlockBuilder(4)
+        ) {
+            blk1 = bb1.appendInt(10).appendNull().build();
+            blk2 = bb2.appendInt(30).appendNull().appendNull().appendInt(60).build();
+            blk3 = bb3.appendNull().appendInt(80).appendInt(90).appendNull().build();
+        }
+        var columnInfo = List.of(new ColumnInfo("foo", "integer"));
+        var pages = List.of(new Page(blk1), new Page(blk2), new Page(blk3));
+        try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false)) {
+            assertThat(columnValues(response.column(0)), contains(10, null, 30, null, null, 60, null, 80, 90, null));
+            expectThrows(IllegalArgumentException.class, () -> response.column(-1));
+            expectThrows(IllegalArgumentException.class, () -> response.column(2));
+        }
+    }
+
+    public void testColumnsWithMultiValue() {
+        IntBlock blk1, blk2, blk3;
+        try (
+            var bb1 = blockFactory.newIntBlockBuilder(2);
+            var bb2 = blockFactory.newIntBlockBuilder(4);
+            var bb3 = blockFactory.newIntBlockBuilder(4)
+        ) {
+            blk1 = bb1.beginPositionEntry().appendInt(10).appendInt(20).endPositionEntry().appendNull().build();
+            blk2 = bb2.beginPositionEntry().appendInt(40).appendInt(50).endPositionEntry().build();
+            blk3 = bb3.appendNull().appendInt(70).appendInt(80).appendNull().build();
+        }
+        var columnInfo = List.of(new ColumnInfo("foo", "integer"));
+        var pages = List.of(new Page(blk1), new Page(blk2), new Page(blk3));
+        try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false)) {
+            assertThat(columnValues(response.column(0)), contains(List.of(10, 20), null, List.of(40, 50), null, 70, 80, null));
+            expectThrows(IllegalArgumentException.class, () -> response.column(-1));
+            expectThrows(IllegalArgumentException.class, () -> response.column(2));
+        }
+    }
+
+    public void testRowValues() {
+        for (int times = 0; times < 10; times++) {
+            int numColumns = randomIntBetween(1, 10);
+            List<ColumnInfo> columns = randomList(numColumns, numColumns, this::randomColumnInfo);
+            int noPages = randomIntBetween(1, 20);
+            List<Page> pages = randomList(noPages, noPages, () -> randomPage(columns));
+            try (var resp = new EsqlQueryResponse(columns, pages, null, false, "", false, false)) {
+                var rowValues = getValuesList(resp.rows());
+                var valValues = getValuesList(resp.values());
+                for (int i = 0; i < rowValues.size(); i++) {
+                    assertThat(rowValues.get(i), equalTo(valValues.get(i)));
+                }
+            }
+        }
+    }
+
+    static List<List<Object>> getValuesList(Iterator<Iterator<Object>> values) {
+        var valuesList = new ArrayList<List<Object>>();
+        values.forEachRemaining(row -> {
+            var rowValues = new ArrayList<>();
+            row.forEachRemaining(rowValues::add);
+            valuesList.add(rowValues);
+        });
+        return valuesList;
+    }
+
+    static List<List<Object>> getValuesList(Iterable<Iterable<Object>> values) {
+        var valuesList = new ArrayList<List<Object>>();
+        values.forEach(row -> {
+            var rowValues = new ArrayList<>();
+            row.forEach(rowValues::add);
+            valuesList.add(rowValues);
+        });
+        return valuesList;
+    }
+
+    static List<Object> columnValues(Iterator<Object> values) {
+        List<Object> l = new ArrayList<>();
+        values.forEachRemaining(l::add);
+        return l;
+    }
 }

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java

@@ -17,8 +17,8 @@ import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.rest.FakeRestRequest;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.TestBlockFactory;
-import org.elasticsearch.xpack.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
 import org.elasticsearch.xpack.ql.util.StringUtils;
 

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java

@@ -15,8 +15,8 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.geometry.Point;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.TestBlockFactory;
-import org.elasticsearch.xpack.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
 
 import java.util.Arrays;