Explorar o código

[ESQL] Use DataType instead of Strings in ColumnInfo (#110288)

The ColumnInfo class, which is used for request and response serialization, stores a data type. Prior to this PR, it stored that type as a String, and there were several places (seen below) where we needed to switch behavior based on that string. Switching on strings is brittle, as there's no way for the compiler to enforce that all cases are covered, so we have to rely on a default -> throw pattern, and hope that a test catches that path. On the other hand, we can instead switch on the actual DataType enum value, which the compiler can then enforce checking all values.

Eventually, it would make sense for most or all of these switches to become functions on DataType, but the visibility between esql.core and esql right now makes that difficult.
Mark Tozzi hai 1 ano
pai
achega
95da99ee6d
Modificáronse 17 ficheiros con 426 adicións e 319 borrados
  1. 8 37
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/ColumnInfo.java
  2. 14 5
      x-pack/plugin/esql/qa/action/src/internalClusterTest/java/org/elasticsearch/test/esql/qa/action/CoreEsqlActionIT.java
  3. 2 1
      x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java
  4. 4 5
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java
  5. 10 10
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java
  6. 106 90
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java
  7. 42 36
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java
  8. 105 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ColumnInfoImpl.java
  9. 8 8
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java
  10. 16 16
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java
  11. 36 41
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java
  12. 5 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java
  13. 3 3
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
  14. 3 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java
  15. 43 39
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java
  16. 8 8
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java
  17. 13 13
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java

+ 8 - 37
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/esql/action/ColumnInfo.java

@@ -7,52 +7,23 @@
 
 package org.elasticsearch.xpack.core.esql.action;
 
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.xcontent.InstantiatingObjectParser;
-import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.XContentParser;
 
 import java.io.IOException;
 
-import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
-
-public record ColumnInfo(String name, String type) implements Writeable {
-
-    private static final InstantiatingObjectParser<ColumnInfo, Void> PARSER;
-    static {
-        InstantiatingObjectParser.Builder<ColumnInfo, Void> parser = InstantiatingObjectParser.builder(
-            "esql/column_info",
-            true,
-            ColumnInfo.class
-        );
-        parser.declareString(constructorArg(), new ParseField("name"));
-        parser.declareString(constructorArg(), new ParseField("type"));
-        PARSER = parser.build();
+public interface ColumnInfo extends Writeable {
+    /*
+    static ColumnInfo fromXContent(XContentParser parser) {
+        return ColumnInfoImpl.PARSER.apply(parser, null);
     }
 
-    public static ColumnInfo fromXContent(XContentParser parser) {
-        return PARSER.apply(parser, null);
-    }
+     */
 
-    public ColumnInfo(StreamInput in) throws IOException {
-        this(in.readString(), in.readString());
-    }
+    XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException;
 
-    @Override
-    public void writeTo(StreamOutput out) throws IOException {
-        out.writeString(name);
-        out.writeString(type);
-    }
+    String name();
 
-    public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
-        builder.startObject();
-        builder.field("name", name);
-        builder.field("type", type);
-        builder.endObject();
-        return builder;
-    }
+    String outputType();
 }

+ 14 - 5
x-pack/plugin/esql/qa/action/src/internalClusterTest/java/org/elasticsearch/test/esql/qa/action/CoreEsqlActionIT.java

@@ -18,6 +18,9 @@ import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.core.esql.action.EsqlQueryRequest;
 import org.elasticsearch.xpack.core.esql.action.EsqlQueryRequestBuilder;
 import org.elasticsearch.xpack.core.esql.action.EsqlQueryResponse;
+import org.elasticsearch.xpack.core.esql.action.EsqlResponse;
+import org.elasticsearch.xpack.esql.action.ColumnInfoImpl;
+import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.junit.Before;
 
 import java.util.ArrayList;
@@ -52,11 +55,14 @@ public class CoreEsqlActionIT extends ESIntegTestCase {
     public void testRowTypesAndValues() {
         var query = "row a = 1, b = \"x\", c = 1000000000000, d = 1.1";
         var request = EsqlQueryRequestBuilder.newRequestBuilder(client()).query(query);
-        try (var queryResp = run(request)) {
+        try (EsqlQueryResponse queryResp = run(request)) {
             logger.info("response=" + queryResp);
-            var resp = queryResp.response();
+            EsqlResponse resp = queryResp.response();
             assertThat(resp.columns().stream().map(ColumnInfo::name).toList(), contains("a", "b", "c", "d"));
-            assertThat(resp.columns().stream().map(ColumnInfo::type).toList(), contains("integer", "keyword", "long", "double"));
+            assertThat(
+                resp.columns().stream().map(c -> ((ColumnInfoImpl) c).type()).toList(),
+                contains(DataType.INTEGER, DataType.KEYWORD, DataType.LONG, DataType.DOUBLE)
+            );
             assertThat(getValuesList(resp.rows()), contains(List.of(1, "x", 1000000000000L, 1.1d)));
         }
     }
@@ -68,7 +74,7 @@ public class CoreEsqlActionIT extends ESIntegTestCase {
             logger.info("response=" + queryResp);
             var resp = queryResp.response();
             assertThat(resp.columns().stream().map(ColumnInfo::name).toList(), contains("a"));
-            assertThat(resp.columns().stream().map(ColumnInfo::type).toList(), contains("integer"));
+            assertThat(resp.columns().stream().map(c -> ((ColumnInfoImpl) c).type()).toList(), contains(DataType.INTEGER));
             assertThat(getValuesList(resp.rows()), contains(List.of(1)));
         }
     }
@@ -80,7 +86,10 @@ public class CoreEsqlActionIT extends ESIntegTestCase {
             var resp = queryResp.response();
             logger.info("response=" + queryResp);
             assertThat(resp.columns().stream().map(ColumnInfo::name).toList(), contains("item", "cost", "color", "sale"));
-            assertThat(resp.columns().stream().map(ColumnInfo::type).toList(), contains("long", "double", "keyword", "date"));
+            assertThat(
+                resp.columns().stream().map(c -> ((ColumnInfoImpl) c).type()).toList(),
+                contains(DataType.LONG, DataType.DOUBLE, DataType.KEYWORD, DataType.DATETIME)
+            );
             // columnar values
             assertThat(columnValues(resp.column(0)), contains(1L, 2L, 3L, 4L));
             assertThat(columnValues(resp.column(1)), contains(1.1d, 2.1d, 3.1d, 4.1d));

+ 2 - 1
x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java

@@ -28,6 +28,7 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.logging.Logger;
 import org.elasticsearch.test.VersionUtils;
 import org.elasticsearch.xpack.esql.action.ResponseValueUtils;
+import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.util.StringUtils;
 import org.supercsv.io.CsvListReader;
 import org.supercsv.prefs.CsvPreference;
@@ -537,7 +538,7 @@ public final class CsvTestUtils {
     record ActualResults(
         List<String> columnNames,
         List<Type> columnTypes,
-        List<String> dataTypes,
+        List<DataType> dataTypes,
         List<Page> pages,
         Map<String, List<String>> responseHeaders
     ) {

+ 4 - 5
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java

@@ -18,7 +18,6 @@ import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
 import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
 import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
 import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
-import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 import org.hamcrest.core.IsEqual;
 
@@ -90,7 +89,7 @@ public class AsyncEsqlQueryActionIT extends AbstractPausableIntegTestCase {
             try (var finalResponse = future.get()) {
                 assertThat(finalResponse, notNullValue());
                 assertThat(finalResponse.isRunning(), is(false));
-                assertThat(finalResponse.columns(), equalTo(List.of(new ColumnInfo("sum(pause_me)", "long"))));
+                assertThat(finalResponse.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long"))));
                 assertThat(getValuesList(finalResponse).size(), equalTo(1));
             }
 
@@ -99,7 +98,7 @@ public class AsyncEsqlQueryActionIT extends AbstractPausableIntegTestCase {
             try (var finalResponse = again.get()) {
                 assertThat(finalResponse, notNullValue());
                 assertThat(finalResponse.isRunning(), is(false));
-                assertThat(finalResponse.columns(), equalTo(List.of(new ColumnInfo("sum(pause_me)", "long"))));
+                assertThat(finalResponse.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long"))));
                 assertThat(getValuesList(finalResponse).size(), equalTo(1));
             }
 
@@ -174,7 +173,7 @@ public class AsyncEsqlQueryActionIT extends AbstractPausableIntegTestCase {
 
         try (var response = request.execute().actionGet(60, TimeUnit.SECONDS)) {
             assertThat(response.isRunning(), is(false));
-            assertThat(response.columns(), equalTo(List.of(new ColumnInfo("sum(pause_me)", "long"))));
+            assertThat(response.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long"))));
             assertThat(getValuesList(response).size(), equalTo(1));
 
             if (keepOnCompletion) {
@@ -187,7 +186,7 @@ public class AsyncEsqlQueryActionIT extends AbstractPausableIntegTestCase {
                 try (var resp = future.actionGet(60, TimeUnit.SECONDS)) {
                     assertThat(resp.asyncExecutionId().get(), equalTo(id));
                     assertThat(resp.isRunning(), is(false));
-                    assertThat(resp.columns(), equalTo(List.of(new ColumnInfo("sum(pause_me)", "long"))));
+                    assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("sum(pause_me)", "long"))));
                     assertThat(getValuesList(resp).size(), equalTo(1));
                 }
             } else {

+ 10 - 10
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java

@@ -40,9 +40,9 @@ import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
 import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
 import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
-import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.enrich.EnrichPlugin;
 import org.elasticsearch.xpack.esql.EsqlTestUtils;
+import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.plan.logical.Enrich;
 import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 import org.junit.After;
@@ -226,12 +226,12 @@ public class EnrichIT extends AbstractEsqlIntegTestCase {
 
     public void testSumDurationByArtist() {
         Function<EsqlQueryResponse, Map<String, Double>> extractStats = resp -> {
-            List<ColumnInfo> columns = resp.columns();
+            List<ColumnInfoImpl> columns = resp.columns();
             assertThat(columns, hasSize(2));
             assertThat(columns.get(0).name(), equalTo("sum(duration)"));
-            assertThat(columns.get(0).type(), equalTo("double"));
+            assertThat(columns.get(0).type(), equalTo(DataType.DOUBLE));
             assertThat(columns.get(1).name(), equalTo("artist"));
-            assertThat(columns.get(1).type(), equalTo("keyword"));
+            assertThat(columns.get(1).type(), equalTo(DataType.KEYWORD));
             Iterator<Iterator<Object>> rows = resp.values();
             Map<String, Double> actualValues = new HashMap<>();
             while (rows.hasNext()) {
@@ -256,12 +256,12 @@ public class EnrichIT extends AbstractEsqlIntegTestCase {
 
     public void testAvgDurationByArtist() {
         Function<EsqlQueryResponse, Map<String, Double>> extractStats = resp -> {
-            List<ColumnInfo> columns = resp.columns();
+            List<ColumnInfoImpl> columns = resp.columns();
             assertThat(columns, hasSize(2));
             assertThat(columns.get(0).name(), equalTo("avg(duration)"));
-            assertThat(columns.get(0).type(), equalTo("double"));
+            assertThat(columns.get(0).type(), equalTo(DataType.DOUBLE));
             assertThat(columns.get(1).name(), equalTo("artist"));
-            assertThat(columns.get(1).type(), equalTo("keyword"));
+            assertThat(columns.get(1).type(), equalTo(DataType.KEYWORD));
             Iterator<Iterator<Object>> rows = resp.values();
             Map<String, Double> actualValues = new HashMap<>();
             while (rows.hasNext()) {
@@ -282,12 +282,12 @@ public class EnrichIT extends AbstractEsqlIntegTestCase {
 
     public void testListeningRatio() {
         Function<EsqlQueryResponse, Map<String, Double>> extractStats = resp -> {
-            List<ColumnInfo> columns = resp.columns();
+            List<ColumnInfoImpl> columns = resp.columns();
             assertThat(columns, hasSize(2));
             assertThat(columns.get(0).name(), equalTo("ratio"));
-            assertThat(columns.get(0).type(), equalTo("double"));
+            assertThat(columns.get(0).type(), equalTo(DataType.DOUBLE));
             assertThat(columns.get(1).name(), equalTo("artist"));
-            assertThat(columns.get(1).type(), equalTo("keyword"));
+            assertThat(columns.get(1).type(), equalTo(DataType.KEYWORD));
             Iterator<Iterator<Object>> rows = resp.values();
             Map<String, Double> actualValues = new HashMap<>();
             while (rows.hasNext()) {

+ 106 - 90
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java

@@ -33,6 +33,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.json.JsonXContent;
 import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.VerificationException;
+import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.parser.ParsingException;
 import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
@@ -100,7 +101,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
 
     public void testProjectConstant() {
         try (EsqlQueryResponse results = run("from test | eval x = 1 | keep x")) {
-            assertThat(results.columns(), equalTo(List.of(new ColumnInfo("x", "integer"))));
+            assertThat(results.columns(), equalTo(List.of(new ColumnInfoImpl("x", "integer"))));
             assertThat(getValuesList(results).size(), equalTo(40));
             assertThat(getValuesList(results).get(0).get(0), equalTo(1));
         }
@@ -108,7 +109,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
 
     public void testStatsOverConstant() {
         try (EsqlQueryResponse results = run("from test | eval x = 1 | stats x = count(x)")) {
-            assertThat(results.columns(), equalTo(List.of(new ColumnInfo("x", "long"))));
+            assertThat(results.columns(), equalTo(List.of(new ColumnInfoImpl("x", "long"))));
             assertThat(getValuesList(results).size(), equalTo(1));
             assertThat(getValuesList(results).get(0).get(0), equalTo(40L));
         }
@@ -139,12 +140,12 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
             assertEquals(2, results.columns().size());
 
             // assert column metadata
-            ColumnInfo valuesColumn = results.columns().get(0);
+            ColumnInfoImpl valuesColumn = results.columns().get(0);
             assertEquals(expectedFieldName, valuesColumn.name());
-            assertEquals("double", valuesColumn.type());
-            ColumnInfo groupColumn = results.columns().get(1);
+            assertEquals(DataType.DOUBLE, valuesColumn.type());
+            ColumnInfoImpl groupColumn = results.columns().get(1);
             assertEquals(expectedGroupName, groupColumn.name());
-            assertEquals("long", groupColumn.type());
+            assertEquals(DataType.LONG, groupColumn.type());
 
             // assert column values
             List<List<Object>> valueValues = getValuesList(results);
@@ -178,12 +179,12 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
             assertEquals(2, results.columns().size());
 
             // assert column metadata
-            ColumnInfo groupColumn = results.columns().get(0);
+            ColumnInfoImpl groupColumn = results.columns().get(0);
             assertEquals(expectedGroupName, groupColumn.name());
-            assertEquals("long", groupColumn.type());
-            ColumnInfo valuesColumn = results.columns().get(1);
+            assertEquals(DataType.LONG, groupColumn.type());
+            ColumnInfoImpl valuesColumn = results.columns().get(1);
             assertEquals(expectedFieldName, valuesColumn.name());
-            assertEquals("long", valuesColumn.type());
+            assertEquals(DataType.LONG, valuesColumn.type());
 
             // assert column values
             List<List<Object>> valueValues = getValuesList(results);
@@ -212,9 +213,9 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
 
             // assert column metadata
             assertEquals("avg(count)", results.columns().get(0).name());
-            assertEquals("double", results.columns().get(0).type());
+            assertEquals(DataType.DOUBLE, results.columns().get(0).type());
             assertEquals("time", results.columns().get(1).name());
-            assertEquals("long", results.columns().get(1).type());
+            assertEquals(DataType.LONG, results.columns().get(1).type());
 
             // assert column values
             List<Long> expectedValues = LongStream.range(0, 40).map(i -> epoch + i).sorted().boxed().toList();
@@ -244,9 +245,9 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
 
             assertThat(results.columns(), hasSize(2));
             assertEquals("avg(count)", results.columns().get(0).name());
-            assertEquals("double", results.columns().get(0).type());
+            assertEquals(DataType.DOUBLE, results.columns().get(0).type());
             assertEquals("data", results.columns().get(1).name());
-            assertEquals("long", results.columns().get(1).type());
+            assertEquals(DataType.LONG, results.columns().get(1).type());
 
             record Group(Long data, Double avg) {}
             List<Group> expectedGroups = List.of(new Group(1L, 42.0), new Group(2L, 44.0), new Group(99L, null), new Group(null, 12.0));
@@ -263,9 +264,9 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
 
             // assert column metadata
             assertEquals("avg(count)", results.columns().get(0).name());
-            assertEquals("double", results.columns().get(0).type());
+            assertEquals(DataType.DOUBLE, results.columns().get(0).type());
             assertEquals("color", results.columns().get(1).name());
-            assertEquals("keyword", results.columns().get(1).type());
+            assertEquals(DataType.KEYWORD, results.columns().get(1).type());
             record Group(String color, double avg) {
 
             }
@@ -298,9 +299,9 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
 
                 // assert column metadata
                 assertEquals("avg", results.columns().get(0).name());
-                assertEquals("double", results.columns().get(0).type());
+                assertEquals(DataType.DOUBLE, results.columns().get(0).type());
                 assertEquals("color", results.columns().get(1).name());
-                assertEquals("keyword", results.columns().get(1).type());
+                assertEquals(DataType.KEYWORD, results.columns().get(1).type());
                 record Group(String color, Double avg) {
 
                 }
@@ -332,17 +333,17 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
 
             // assert column metadata
             assertEquals("a", results.columns().get(0).name());
-            assertEquals("double", results.columns().get(0).type());
+            assertEquals(DataType.DOUBLE, results.columns().get(0).type());
             assertEquals("mi", results.columns().get(1).name());
-            assertEquals("long", results.columns().get(1).type());
+            assertEquals(DataType.LONG, results.columns().get(1).type());
             assertEquals("ma", results.columns().get(2).name());
-            assertEquals("long", results.columns().get(2).type());
+            assertEquals(DataType.LONG, results.columns().get(2).type());
             assertEquals("s", results.columns().get(3).name());
-            assertEquals("long", results.columns().get(3).type());
+            assertEquals(DataType.LONG, results.columns().get(3).type());
             assertEquals("c", results.columns().get(4).name());
-            assertEquals("long", results.columns().get(4).type());
+            assertEquals(DataType.LONG, results.columns().get(4).type());
             assertEquals("color", results.columns().get(5).name());
-            assertEquals("keyword", results.columns().get(5).type());
+            assertEquals(DataType.KEYWORD, results.columns().get(5).type());
             record Group(double avg, long mi, long ma, long s, long c, String color) {}
             List<Group> expectedGroups = List.of(
                 new Group(42, 42, 42, 420, 10, "blue"),
@@ -380,7 +381,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("from test | stats avg_count = avg(count) by data | keep data")) {
             logger.info(results);
             assertThat(results.columns().stream().map(ColumnInfo::name).toList(), contains("data"));
-            assertThat(results.columns().stream().map(ColumnInfo::type).toList(), contains("long"));
+            assertThat(results.columns().stream().map(ColumnInfoImpl::type).toList(), contains(DataType.LONG));
             assertThat(getValuesList(results), containsInAnyOrder(List.of(1L), List.of(2L)));
         }
     }
@@ -389,7 +390,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("row a = 1, b = 2 | stats count(b) by a | keep a")) {
             logger.info(results);
             assertThat(results.columns().stream().map(ColumnInfo::name).toList(), contains("a"));
-            assertThat(results.columns().stream().map(ColumnInfo::type).toList(), contains("integer"));
+            assertThat(results.columns().stream().map(ColumnInfoImpl::type).toList(), contains(DataType.INTEGER));
             assertThat(getValuesList(results), contains(List.of(1)));
         }
     }
@@ -398,7 +399,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("row a = 1000000000000, b = 2 | stats count(b) by a | keep a")) {
             logger.info(results);
             assertThat(results.columns().stream().map(ColumnInfo::name).toList(), contains("a"));
-            assertThat(results.columns().stream().map(ColumnInfo::type).toList(), contains("long"));
+            assertThat(results.columns().stream().map(ColumnInfoImpl::type).toList(), contains(DataType.LONG));
             assertThat(getValuesList(results), contains(List.of(1000000000000L)));
         }
     }
@@ -407,7 +408,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("row a = 1.0, b = 2 | stats count(b) by a | keep a")) {
             logger.info(results);
             assertThat(results.columns().stream().map(ColumnInfo::name).toList(), contains("a"));
-            assertThat(results.columns().stream().map(ColumnInfo::type).toList(), contains("double"));
+            assertThat(results.columns().stream().map(ColumnInfoImpl::type).toList(), contains(DataType.DOUBLE));
             assertThat(getValuesList(results), contains(List.of(1.0)));
         }
     }
@@ -416,7 +417,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("row a = \"hello\", b = 2 | stats count(b) by a | keep a")) {
             logger.info(results);
             assertThat(results.columns().stream().map(ColumnInfo::name).toList(), contains("a"));
-            assertThat(results.columns().stream().map(ColumnInfo::type).toList(), contains("keyword"));
+            assertThat(results.columns().stream().map(ColumnInfoImpl::type).toList(), contains(DataType.KEYWORD));
             assertThat(getValuesList(results), contains(List.of("hello")));
         }
     }
@@ -425,7 +426,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("from test | stats count(count) by data_d | keep data_d")) {
             logger.info(results);
             assertThat(results.columns().stream().map(ColumnInfo::name).toList(), contains("data_d"));
-            assertThat(results.columns().stream().map(ColumnInfo::type).toList(), contains("double"));
+            assertThat(results.columns().stream().map(ColumnInfoImpl::type).toList(), contains(DataType.DOUBLE));
             assertThat(getValuesList(results), containsInAnyOrder(List.of(1.0), List.of(2.0)));
         }
     }
@@ -435,7 +436,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run(query)) {
             logger.info(results);
             assertThat(results.columns().stream().map(ColumnInfo::name).toList(), contains("d", "d2"));
-            assertThat(results.columns().stream().map(ColumnInfo::type).toList(), contains("long", "long"));
+            assertThat(results.columns().stream().map(ColumnInfoImpl::type).toList(), contains(DataType.LONG, DataType.LONG));
             assertThat(getValuesList(results), containsInAnyOrder(List.of(1L, 1L), List.of(2L, 2L)));
         }
     }
@@ -444,7 +445,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("from test | stats a = avg(count) by data | keep a")) {
             logger.info(results);
             assertThat(results.columns().stream().map(ColumnInfo::name).toList(), contains("a"));
-            assertThat(results.columns().stream().map(ColumnInfo::type).toList(), contains("double"));
+            assertThat(results.columns().stream().map(ColumnInfoImpl::type).toList(), contains(DataType.DOUBLE));
             assertThat(getValuesList(results), containsInAnyOrder(List.of(42d), List.of(44d)));
         }
     }
@@ -453,7 +454,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("from test | stats a = avg(count) by data | rename a as b | keep b")) {
             logger.info(results);
             assertThat(results.columns().stream().map(ColumnInfo::name).toList(), contains("b"));
-            assertThat(results.columns().stream().map(ColumnInfo::type).toList(), contains("double"));
+            assertThat(results.columns().stream().map(ColumnInfoImpl::type).toList(), contains(DataType.DOUBLE));
             assertThat(getValuesList(results), containsInAnyOrder(List.of(42d), List.of(44d)));
         }
     }
@@ -462,7 +463,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("from test | rename data as d | keep d, count | stats avg(count) by d")) {
             logger.info(results);
             assertThat(results.columns().stream().map(ColumnInfo::name).toList(), contains("avg(count)", "d"));
-            assertThat(results.columns().stream().map(ColumnInfo::type).toList(), contains("double", "long"));
+            assertThat(results.columns().stream().map(ColumnInfoImpl::type).toList(), contains(DataType.DOUBLE, DataType.LONG));
             assertThat(getValuesList(results), containsInAnyOrder(List.of(42d, 1L), List.of(44d, 2L)));
         }
     }
@@ -471,7 +472,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("from test | rename count as c | keep c, data | stats avg(c) by data")) {
             logger.info(results);
             assertThat(results.columns().stream().map(ColumnInfo::name).toList(), contains("avg(c)", "data"));
-            assertThat(results.columns().stream().map(ColumnInfo::type).toList(), contains("double", "long"));
+            assertThat(results.columns().stream().map(ColumnInfoImpl::type).toList(), contains(DataType.DOUBLE, DataType.LONG));
             assertThat(getValuesList(results), containsInAnyOrder(List.of(42d, 1L), List.of(44d, 2L)));
         }
     }
@@ -482,7 +483,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
             assertEquals(1, results.columns().size());
             assertEquals(1, getValuesList(results).size());
             assertEquals("avg(ratio)", results.columns().get(0).name());
-            assertEquals("double", results.columns().get(0).type());
+            assertEquals(DataType.DOUBLE, results.columns().get(0).type());
             assertEquals(1, getValuesList(results).get(0).size());
             assertEquals(0.034d, (double) getValuesList(results).get(0).get(0), 0.001d);
         }
@@ -494,7 +495,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
             assertEquals(1, results.columns().size());
             assertEquals(1, getValuesList(results).size());
             assertEquals("count(*)", results.columns().get(0).name());
-            assertEquals("long", results.columns().get(0).type());
+            assertEquals(DataType.LONG, results.columns().get(0).type());
             var values = getValuesList(results).get(0);
             assertEquals(1, values.size());
             assertEquals(40, (long) values.get(0));
@@ -507,7 +508,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
             assertEquals(1, results.columns().size());
             assertEquals(1, getValuesList(results).size());
             assertEquals("count(*)", results.columns().get(0).name());
-            assertEquals("long", results.columns().get(0).type());
+            assertEquals(DataType.LONG, results.columns().get(0).type());
             var values = getValuesList(results).get(0);
             assertEquals(1, values.size());
             assertEquals(20, (long) values.get(0));
@@ -520,9 +521,9 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
             assertEquals(2, results.columns().size());
             assertEquals(1, getValuesList(results).size());
             assertEquals("count(*)", results.columns().get(0).name());
-            assertEquals("long", results.columns().get(0).type());
+            assertEquals(DataType.LONG, results.columns().get(0).type());
             assertEquals("data", results.columns().get(1).name());
-            assertEquals("long", results.columns().get(1).type());
+            assertEquals(DataType.LONG, results.columns().get(1).type());
             var values = getValuesList(results).get(0);
             assertEquals(2, values.size());
             assertEquals(20, (long) values.get(0));
@@ -536,10 +537,10 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
             logger.info(results);
             assertEquals(1, getValuesList(results).size());
             assertEquals(2, getValuesList(results).get(0).size());
-            assertEquals(50, (double) getValuesList(results).get(0).get(results.columns().indexOf(new ColumnInfo("x", "double"))), 1d);
+            assertEquals(50, (double) getValuesList(results).get(0).get(results.columns().indexOf(new ColumnInfoImpl("x", "double"))), 1d);
             assertEquals(
                 43,
-                (double) getValuesList(results).get(0).get(results.columns().indexOf(new ColumnInfo("avg_count", "double"))),
+                (double) getValuesList(results).get(0).get(results.columns().indexOf(new ColumnInfoImpl("avg_count", "double"))),
                 1d
             );
         }
@@ -549,7 +550,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("from test | where count > 40")) {
             logger.info(results);
             assertEquals(30, getValuesList(results).size());
-            var countIndex = results.columns().indexOf(new ColumnInfo("count", "long"));
+            var countIndex = results.columns().indexOf(new ColumnInfoImpl("count", "long"));
             for (List<Object> values : getValuesList(results)) {
                 assertThat((Long) values.get(countIndex), greaterThan(40L));
             }
@@ -560,7 +561,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("from test | keep count | where count > 40")) {
             logger.info(results);
             assertEquals(30, getValuesList(results).size());
-            int countIndex = results.columns().indexOf(new ColumnInfo("count", "long"));
+            int countIndex = results.columns().indexOf(new ColumnInfoImpl("count", "long"));
             for (List<Object> values : getValuesList(results)) {
                 assertThat((Long) values.get(countIndex), greaterThan(40L));
             }
@@ -571,7 +572,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("from test | eval x = count / 2 | where x > 20")) {
             logger.info(results);
             assertEquals(30, getValuesList(results).size());
-            int countIndex = results.columns().indexOf(new ColumnInfo("x", "long"));
+            int countIndex = results.columns().indexOf(new ColumnInfoImpl("x", "long"));
             for (List<Object> values : getValuesList(results)) {
                 assertThat((Long) values.get(countIndex), greaterThan(20L));
             }
@@ -589,7 +590,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("from test | eval l = length(color)")) {
             logger.info(results);
             assertThat(getValuesList(results), hasSize(40));
-            int countIndex = results.columns().indexOf(new ColumnInfo("l", "integer"));
+            int countIndex = results.columns().indexOf(new ColumnInfoImpl("l", "integer"));
             for (List<Object> values : getValuesList(results)) {
                 assertThat((Integer) values.get(countIndex), greaterThanOrEqualTo(3));
             }
@@ -608,11 +609,11 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("from test | eval newCount = count + 1 | where newCount > 1")) {
             logger.info(results);
             assertEquals(40, getValuesList(results).size());
-            assertThat(results.columns(), hasItem(equalTo(new ColumnInfo("count", "long"))));
-            assertThat(results.columns(), hasItem(equalTo(new ColumnInfo("count_d", "double"))));
-            assertThat(results.columns(), hasItem(equalTo(new ColumnInfo("data", "long"))));
-            assertThat(results.columns(), hasItem(equalTo(new ColumnInfo("data_d", "double"))));
-            assertThat(results.columns(), hasItem(equalTo(new ColumnInfo("time", "long"))));
+            assertThat(results.columns(), hasItem(equalTo(new ColumnInfoImpl("count", "long"))));
+            assertThat(results.columns(), hasItem(equalTo(new ColumnInfoImpl("count_d", "double"))));
+            assertThat(results.columns(), hasItem(equalTo(new ColumnInfoImpl("data", "long"))));
+            assertThat(results.columns(), hasItem(equalTo(new ColumnInfoImpl("data_d", "double"))));
+            assertThat(results.columns(), hasItem(equalTo(new ColumnInfoImpl("time", "long"))));
         }
     }
 
@@ -646,7 +647,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
             assertEquals(40, getValuesList(results).size());
             assertEquals(1, results.columns().stream().filter(c -> c.name().equals("count")).count());
             int countIndex = results.columns().size() - 1;
-            assertEquals(new ColumnInfo("count", "long"), results.columns().get(countIndex));
+            assertEquals(new ColumnInfoImpl("count", "long"), results.columns().get(countIndex));
             for (List<Object> values : getValuesList(results)) {
                 assertThat((Long) values.get(countIndex), greaterThanOrEqualTo(42L));
             }
@@ -657,7 +658,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (var results = run("from test | eval y = count | rename count as x | keep x, y")) {
             logger.info(results);
             assertEquals(40, getValuesList(results).size());
-            assertThat(results.columns(), contains(new ColumnInfo("x", "long"), new ColumnInfo("y", "long")));
+            assertThat(results.columns(), contains(new ColumnInfoImpl("x", "long"), new ColumnInfoImpl("y", "long")));
             for (List<Object> values : getValuesList(results)) {
                 assertThat((Long) values.get(0), greaterThanOrEqualTo(40L));
                 assertThat(values.get(1), is(values.get(0)));
@@ -672,10 +673,10 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
             assertThat(
                 results.columns(),
                 contains(
-                    new ColumnInfo("x", "long"),
-                    new ColumnInfo("y", "long"),
-                    new ColumnInfo("x2", "long"),
-                    new ColumnInfo("y2", "long")
+                    new ColumnInfoImpl("x", "long"),
+                    new ColumnInfoImpl("y", "long"),
+                    new ColumnInfoImpl("x2", "long"),
+                    new ColumnInfoImpl("y2", "long")
                 )
             );
             for (List<Object> values : getValuesList(results)) {
@@ -691,7 +692,10 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (var results = run("from test | eval y = count | rename count as x | keep x, y | eval z = x + y | keep x, y, z")) {
             logger.info(results);
             assertEquals(40, getValuesList(results).size());
-            assertThat(results.columns(), contains(new ColumnInfo("x", "long"), new ColumnInfo("y", "long"), new ColumnInfo("z", "long")));
+            assertThat(
+                results.columns(),
+                contains(new ColumnInfoImpl("x", "long"), new ColumnInfoImpl("y", "long"), new ColumnInfoImpl("z", "long"))
+            );
             for (List<Object> values : getValuesList(results)) {
                 assertThat((Long) values.get(0), greaterThanOrEqualTo(40L));
                 assertThat(values.get(1), is(values.get(0)));
@@ -704,7 +708,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (var results = run("from test | eval cnt = count | rename count as data | keep cnt, data")) {
             logger.info(results);
             assertEquals(40, getValuesList(results).size());
-            assertThat(results.columns(), contains(new ColumnInfo("cnt", "long"), new ColumnInfo("data", "long")));
+            assertThat(results.columns(), contains(new ColumnInfoImpl("cnt", "long"), new ColumnInfoImpl("data", "long")));
             for (List<Object> values : getValuesList(results)) {
                 assertThat(values.get(1), is(values.get(0)));
             }
@@ -865,7 +869,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
             assertEquals(1, results.columns().size());
             assertEquals(1, getValuesList(results).size());
             assertEquals("avg(nullsum)", results.columns().get(0).name());
-            assertEquals("double", results.columns().get(0).type());
+            assertEquals(DataType.DOUBLE, results.columns().get(0).type());
             assertEquals(1, getValuesList(results).get(0).size());
             assertNull(getValuesList(results).get(0).get(0));
         }
@@ -874,7 +878,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
     public void testFromStatsLimit() {
         try (EsqlQueryResponse results = run("from test | stats ac = avg(count) by data | limit 1")) {
             logger.info(results);
-            assertThat(results.columns(), contains(new ColumnInfo("ac", "double"), new ColumnInfo("data", "long")));
+            assertThat(results.columns(), contains(new ColumnInfoImpl("ac", "double"), new ColumnInfoImpl("data", "long")));
             assertThat(getValuesList(results), contains(anyOf(contains(42.0, 1L), contains(44.0, 2L))));
         }
     }
@@ -882,7 +886,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
     public void testFromLimit() {
         try (EsqlQueryResponse results = run("from test | keep data | limit 2")) {
             logger.info(results);
-            assertThat(results.columns(), contains(new ColumnInfo("data", "long")));
+            assertThat(results.columns(), contains(new ColumnInfoImpl("data", "long")));
             assertThat(getValuesList(results), contains(anyOf(contains(1L), contains(2L)), anyOf(contains(1L), contains(2L))));
         }
     }
@@ -891,7 +895,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("from test | keep data | drop data | eval a = 1")) {
             logger.info(results);
             assertThat(results.columns(), hasSize(1));
-            assertThat(results.columns(), contains(new ColumnInfo("a", "integer")));
+            assertThat(results.columns(), contains(new ColumnInfoImpl("a", "integer")));
             assertThat(getValuesList(results), is(empty()));
         }
     }
@@ -1010,7 +1014,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
     public void testEmptyIndex() {
         assertAcked(client().admin().indices().prepareCreate("test_empty").setMapping("k", "type=keyword", "v", "type=long").get());
         try (EsqlQueryResponse results = run("from test_empty")) {
-            assertThat(results.columns(), equalTo(List.of(new ColumnInfo("k", "keyword"), new ColumnInfo("v", "long"))));
+            assertThat(results.columns(), equalTo(List.of(new ColumnInfoImpl("k", "keyword"), new ColumnInfoImpl("v", "long"))));
             assertThat(getValuesList(results), empty());
         }
     }
@@ -1019,7 +1023,13 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         try (EsqlQueryResponse results = run("show info")) {
             assertThat(
                 results.columns(),
-                equalTo(List.of(new ColumnInfo("version", "keyword"), new ColumnInfo("date", "keyword"), new ColumnInfo("hash", "keyword")))
+                equalTo(
+                    List.of(
+                        new ColumnInfoImpl("version", "keyword"),
+                        new ColumnInfoImpl("date", "keyword"),
+                        new ColumnInfoImpl("hash", "keyword")
+                    )
+                )
             );
             assertThat(getValuesList(results).size(), equalTo(1));
             assertThat(getValuesList(results).get(0).get(0), equalTo(Build.current().version()));
@@ -1034,16 +1044,16 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
                 results.columns(),
                 equalTo(
                     List.of(
-                        new ColumnInfo("name", "keyword"),
-                        new ColumnInfo("synopsis", "keyword"),
-                        new ColumnInfo("argNames", "keyword"),
-                        new ColumnInfo("argTypes", "keyword"),
-                        new ColumnInfo("argDescriptions", "keyword"),
-                        new ColumnInfo("returnType", "keyword"),
-                        new ColumnInfo("description", "keyword"),
-                        new ColumnInfo("optionalArgs", "boolean"),
-                        new ColumnInfo("variadic", "boolean"),
-                        new ColumnInfo("isAggregation", "boolean")
+                        new ColumnInfoImpl("name", "keyword"),
+                        new ColumnInfoImpl("synopsis", "keyword"),
+                        new ColumnInfoImpl("argNames", "keyword"),
+                        new ColumnInfoImpl("argTypes", "keyword"),
+                        new ColumnInfoImpl("argDescriptions", "keyword"),
+                        new ColumnInfoImpl("returnType", "keyword"),
+                        new ColumnInfoImpl("description", "keyword"),
+                        new ColumnInfoImpl("optionalArgs", "boolean"),
+                        new ColumnInfoImpl("variadic", "boolean"),
+                        new ColumnInfoImpl("isAggregation", "boolean")
                     )
                 )
             );
@@ -1053,7 +1063,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
 
     public void testInWithNullValue() {
         try (EsqlQueryResponse results = run("from test | where null in (data, 2) | keep data")) {
-            assertThat(results.columns(), equalTo(List.of(new ColumnInfo("data", "long"))));
+            assertThat(results.columns(), equalTo(List.of(new ColumnInfoImpl("data", "long"))));
             assertThat(getValuesList(results).size(), equalTo(0));
         }
     }
@@ -1088,11 +1098,11 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
 
             // assert column metadata
             assertEquals("data", results.columns().get(0).name());
-            assertEquals("long", results.columns().get(0).type());
+            assertEquals(DataType.LONG, results.columns().get(0).type());
             assertEquals("count", results.columns().get(1).name());
-            assertEquals("long", results.columns().get(1).type());
+            assertEquals(DataType.LONG, results.columns().get(1).type());
             assertEquals("color", results.columns().get(2).name());
-            assertEquals("keyword", results.columns().get(2).type());
+            assertEquals(DataType.KEYWORD, results.columns().get(2).type());
             record Group(Long data, Long count, String color) {
                 Group(Long data, Long count) {
                     this(data, count, "yellow");
@@ -1139,7 +1149,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
 
             // assert column metadata
             assertEquals("time", results.columns().get(0).name());
-            assertEquals("long", results.columns().get(0).type());
+            assertEquals(DataType.LONG, results.columns().get(0).type());
 
             boolean sortedDesc = "desc".equals(sortOrder);
             var expected = LongStream.range(0, 40)
@@ -1214,7 +1224,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
 
     public void testLoadId() {
         try (EsqlQueryResponse results = run("from test metadata _id | keep _id | sort _id ")) {
-            assertThat(results.columns(), equalTo(List.of(new ColumnInfo("_id", "keyword"))));
+            assertThat(results.columns(), equalTo(List.of(new ColumnInfoImpl("_id", "keyword"))));
             ListMatcher values = matchesList();
             for (int i = 10; i < 50; i++) {
                 values = values.item(List.of(Integer.toString(i)));
@@ -1420,12 +1430,12 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
 
         try (EsqlQueryResponse resp = run(from + "METADATA _source | EVAL x = 123")) {
             assertFalse(resp.values().hasNext());
-            assertThat(resp.columns(), equalTo(List.of(new ColumnInfo("_source", "_source"), new ColumnInfo("x", "integer"))));
+            assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("_source", "_source"), new ColumnInfoImpl("x", "integer"))));
         }
 
         try (EsqlQueryResponse resp = run(from)) {
             assertFalse(resp.values().hasNext());
-            assertThat(resp.columns(), equalTo(List.of(new ColumnInfo("<no-fields>", "null"))));
+            assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("<no-fields>", "null"))));
         }
     }
 
@@ -1450,32 +1460,38 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
             assertFalse(resp.values().hasNext());
             assertThat(
                 resp.columns(),
-                equalTo(List.of(new ColumnInfo("name", "text"), new ColumnInfo("_source", "_source"), new ColumnInfo("x", "integer")))
+                equalTo(
+                    List.of(
+                        new ColumnInfoImpl("name", "text"),
+                        new ColumnInfoImpl("_source", "_source"),
+                        new ColumnInfoImpl("x", "integer")
+                    )
+                )
             );
         }
 
         try (EsqlQueryResponse resp = run(from)) {
             assertFalse(resp.values().hasNext());
-            assertThat(resp.columns(), equalTo(List.of(new ColumnInfo("name", "text"))));
+            assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("name", "text"))));
         }
     }
 
     private void assertEmptyIndexQueries(String from) {
         try (EsqlQueryResponse resp = run(from + "METADATA _source | KEEP _source | LIMIT 1")) {
             assertFalse(resp.values().hasNext());
-            assertThat(resp.columns(), equalTo(List.of(new ColumnInfo("_source", "_source"))));
+            assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("_source", "_source"))));
         }
 
         try (EsqlQueryResponse resp = run(from + "| EVAL y = 1 | KEEP y | LIMIT 1 | EVAL x = 1")) {
             assertFalse(resp.values().hasNext());
-            assertThat(resp.columns(), equalTo(List.of(new ColumnInfo("y", "integer"), new ColumnInfo("x", "integer"))));
+            assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("y", "integer"), new ColumnInfoImpl("x", "integer"))));
         }
 
         try (EsqlQueryResponse resp = run(from + "| STATS c = count()")) {
             assertTrue(resp.values().hasNext());
             Iterator<Object> row = resp.values().next();
             assertThat(row.next(), equalTo((long) 0));
-            assertThat(resp.columns(), equalTo(List.of(new ColumnInfo("c", "long"))));
+            assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("c", "long"))));
         }
 
         try (EsqlQueryResponse resp = run(from + "| STATS c = count() | EVAL x = 123")) {
@@ -1484,7 +1500,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
             assertThat(row.next(), equalTo((long) 0));
             assertThat(row.next(), equalTo(123));
             assertFalse(row.hasNext());
-            assertThat(resp.columns(), equalTo(List.of(new ColumnInfo("c", "long"), new ColumnInfo("x", "integer"))));
+            assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("c", "long"), new ColumnInfoImpl("x", "integer"))));
         }
     }
 
@@ -1561,7 +1577,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
 
     private void assertNoNestedDocuments(String query, int docsCount, long minValue, long maxValue) {
         try (EsqlQueryResponse results = run(query)) {
-            assertThat(results.columns(), contains(new ColumnInfo("data", "long")));
+            assertThat(results.columns(), contains(new ColumnInfoImpl("data", "long")));
             assertThat(results.columns().size(), is(1));
             assertThat(getValuesList(results).size(), is(docsCount));
             for (List<Object> row : getValuesList(results)) {

+ 42 - 36
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java

@@ -12,7 +12,6 @@ import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.Rounding;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.junit.Before;
 
@@ -196,7 +195,7 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
             }
         }
         try (var resp = run("METRICS hosts sum(rate(request_count, 1second))")) {
-            assertThat(resp.columns(), equalTo(List.of(new ColumnInfo("sum(rate(request_count, 1second))", "double"))));
+            assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("sum(rate(request_count, 1second))", "double"))));
             List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
             assertThat(values, hasSize(1));
             assertThat(values.get(0), hasSize(1));
@@ -205,7 +204,12 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
         try (var resp = run("METRICS hosts max(rate(request_count)), min(rate(request_count))")) {
             assertThat(
                 resp.columns(),
-                equalTo(List.of(new ColumnInfo("max(rate(request_count))", "double"), new ColumnInfo("min(rate(request_count))", "double")))
+                equalTo(
+                    List.of(
+                        new ColumnInfoImpl("max(rate(request_count))", "double"),
+                        new ColumnInfoImpl("min(rate(request_count))", "double")
+                    )
+                )
             );
             List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
             assertThat(values, hasSize(1));
@@ -218,9 +222,9 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
                 resp.columns(),
                 equalTo(
                     List.of(
-                        new ColumnInfo("max(rate(request_count))", "double"),
-                        new ColumnInfo("avg(rate(request_count))", "double"),
-                        new ColumnInfo("max(rate(request_count, 1minute))", "double")
+                        new ColumnInfoImpl("max(rate(request_count))", "double"),
+                        new ColumnInfoImpl("avg(rate(request_count))", "double"),
+                        new ColumnInfoImpl("max(rate(request_count, 1minute))", "double")
                     )
                 )
             );
@@ -237,8 +241,8 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
                 resp.columns(),
                 equalTo(
                     List.of(
-                        new ColumnInfo("avg(rate(request_count))", "double"),
-                        new ColumnInfo("avg(rate(request_count, 1second))", "double")
+                        new ColumnInfoImpl("avg(rate(request_count))", "double"),
+                        new ColumnInfoImpl("avg(rate(request_count, 1second))", "double")
                     )
                 )
             );
@@ -292,7 +296,7 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
         try (var resp = run("METRICS hosts sum(rate(request_count)) BY cluster | SORT cluster")) {
             assertThat(
                 resp.columns(),
-                equalTo(List.of(new ColumnInfo("sum(rate(request_count))", "double"), new ColumnInfo("cluster", "keyword")))
+                equalTo(List.of(new ColumnInfoImpl("sum(rate(request_count))", "double"), new ColumnInfoImpl("cluster", "keyword")))
             );
             List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
             assertThat(values, hasSize(bucketToRates.size()));
@@ -307,7 +311,7 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
         try (var resp = run("METRICS hosts avg(rate(request_count)) BY cluster | SORT cluster")) {
             assertThat(
                 resp.columns(),
-                equalTo(List.of(new ColumnInfo("avg(rate(request_count))", "double"), new ColumnInfo("cluster", "keyword")))
+                equalTo(List.of(new ColumnInfoImpl("avg(rate(request_count))", "double"), new ColumnInfoImpl("cluster", "keyword")))
             );
             List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
             assertThat(values, hasSize(bucketToRates.size()));
@@ -330,9 +334,9 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
                 resp.columns(),
                 equalTo(
                     List.of(
-                        new ColumnInfo("avg(rate(request_count, 1minute))", "double"),
-                        new ColumnInfo("avg(rate(request_count))", "double"),
-                        new ColumnInfo("cluster", "keyword")
+                        new ColumnInfoImpl("avg(rate(request_count, 1minute))", "double"),
+                        new ColumnInfoImpl("avg(rate(request_count))", "double"),
+                        new ColumnInfoImpl("cluster", "keyword")
                     )
                 )
             );
@@ -376,7 +380,7 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
         try (var resp = run("METRICS hosts sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute) | SORT ts | LIMIT 5")) {
             assertThat(
                 resp.columns(),
-                equalTo(List.of(new ColumnInfo("sum(rate(request_count))", "double"), new ColumnInfo("ts", "date")))
+                equalTo(List.of(new ColumnInfoImpl("sum(rate(request_count))", "double"), new ColumnInfoImpl("ts", "date")))
             );
             List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
             assertThat(values, hasSize(sortedKeys.size()));
@@ -396,7 +400,7 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
         try (var resp = run("METRICS hosts avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) | SORT ts | LIMIT 5")) {
             assertThat(
                 resp.columns(),
-                equalTo(List.of(new ColumnInfo("avg(rate(request_count))", "double"), new ColumnInfo("ts", "date")))
+                equalTo(List.of(new ColumnInfoImpl("avg(rate(request_count))", "double"), new ColumnInfoImpl("ts", "date")))
             );
             List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
             assertThat(values, hasSize(sortedKeys.size()));
@@ -423,9 +427,9 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
                 resp.columns(),
                 equalTo(
                     List.of(
-                        new ColumnInfo("avg(rate(request_count, 1minute))", "double"),
-                        new ColumnInfo("avg(rate(request_count))", "double"),
-                        new ColumnInfo("ts", "date")
+                        new ColumnInfoImpl("avg(rate(request_count, 1minute))", "double"),
+                        new ColumnInfoImpl("avg(rate(request_count))", "double"),
+                        new ColumnInfoImpl("ts", "date")
                     )
                 )
             );
@@ -485,9 +489,9 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
                 resp.columns(),
                 equalTo(
                     List.of(
-                        new ColumnInfo("sum(rate(request_count))", "double"),
-                        new ColumnInfo("ts", "date"),
-                        new ColumnInfo("cluster", "keyword")
+                        new ColumnInfoImpl("sum(rate(request_count))", "double"),
+                        new ColumnInfoImpl("ts", "date"),
+                        new ColumnInfoImpl("cluster", "keyword")
                     )
                 )
             );
@@ -515,9 +519,9 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
                 resp.columns(),
                 equalTo(
                     List.of(
-                        new ColumnInfo("avg(rate(request_count))", "double"),
-                        new ColumnInfo("ts", "date"),
-                        new ColumnInfo("cluster", "keyword")
+                        new ColumnInfoImpl("avg(rate(request_count))", "double"),
+                        new ColumnInfoImpl("ts", "date"),
+                        new ColumnInfoImpl("cluster", "keyword")
                     )
                 )
             );
@@ -546,10 +550,10 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
                 resp.columns(),
                 equalTo(
                     List.of(
-                        new ColumnInfo("avg(rate(request_count, 1minute))", "double"),
-                        new ColumnInfo("avg(rate(request_count))", "double"),
-                        new ColumnInfo("ts", "date"),
-                        new ColumnInfo("cluster", "keyword")
+                        new ColumnInfoImpl("avg(rate(request_count, 1minute))", "double"),
+                        new ColumnInfoImpl("avg(rate(request_count))", "double"),
+                        new ColumnInfoImpl("ts", "date"),
+                        new ColumnInfoImpl("cluster", "keyword")
                     )
                 )
             );
@@ -588,11 +592,11 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
                 resp.columns(),
                 equalTo(
                     List.of(
-                        new ColumnInfo("avg_rate", "double"),
-                        new ColumnInfo("max(rate(request_count))", "double"),
-                        new ColumnInfo("avg(rate(request_count))", "double"),
-                        new ColumnInfo("ts", "date"),
-                        new ColumnInfo("cluster", "keyword")
+                        new ColumnInfoImpl("avg_rate", "double"),
+                        new ColumnInfoImpl("max(rate(request_count))", "double"),
+                        new ColumnInfoImpl("avg(rate(request_count))", "double"),
+                        new ColumnInfoImpl("ts", "date"),
+                        new ColumnInfoImpl("cluster", "keyword")
                     )
                 )
             );
@@ -711,14 +715,14 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
             }
         }
         try (var resp = run("METRICS hosts sum(abs(rate(request_count, 1second)))")) {
-            assertThat(resp.columns(), equalTo(List.of(new ColumnInfo("sum(abs(rate(request_count, 1second)))", "double"))));
+            assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("sum(abs(rate(request_count, 1second)))", "double"))));
             List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
             assertThat(values, hasSize(1));
             assertThat(values.get(0), hasSize(1));
             assertThat((double) values.get(0).get(0), closeTo(rates.stream().mapToDouble(d -> d).sum(), 0.1));
         }
         try (var resp = run("METRICS hosts sum(10.0 * rate(request_count, 1second))")) {
-            assertThat(resp.columns(), equalTo(List.of(new ColumnInfo("sum(10.0 * rate(request_count, 1second))", "double"))));
+            assertThat(resp.columns(), equalTo(List.of(new ColumnInfoImpl("sum(10.0 * rate(request_count, 1second))", "double"))));
             List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
             assertThat(values, hasSize(1));
             assertThat(values.get(0), hasSize(1));
@@ -728,7 +732,9 @@ public class TimeSeriesIT extends AbstractEsqlIntegTestCase {
             assertThat(
                 resp.columns(),
                 equalTo(
-                    List.of(new ColumnInfo("sum(20 * rate(request_count, 1second) + 10 * floor(rate(request_count, 1second)))", "double"))
+                    List.of(
+                        new ColumnInfoImpl("sum(20 * rate(request_count, 1second) + 10 * floor(rate(request_count, 1second)))", "double")
+                    )
                 )
             );
             List<List<Object>> values = EsqlTestUtils.getValuesList(resp);

+ 105 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ColumnInfoImpl.java

@@ -0,0 +1,105 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.xcontent.InstantiatingObjectParser;
+import org.elasticsearch.xcontent.ParseField;
+import org.elasticsearch.xcontent.ParserConstructor;
+import org.elasticsearch.xcontent.ToXContent;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
+
+public class ColumnInfoImpl implements ColumnInfo {
+
+    public static final InstantiatingObjectParser<ColumnInfoImpl, Void> PARSER;
+    static {
+        InstantiatingObjectParser.Builder<ColumnInfoImpl, Void> parser = InstantiatingObjectParser.builder(
+            "esql/column_info",
+            true,
+            ColumnInfoImpl.class
+        );
+        parser.declareString(constructorArg(), new ParseField("name"));
+        parser.declareString(constructorArg(), new ParseField("type"));
+        PARSER = parser.build();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if ((o instanceof ColumnInfoImpl that)) {
+            return Objects.equals(name, that.name) && Objects.equals(type, that.type);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, type);
+    }
+
+    public static ColumnInfo fromXContent(XContentParser parser) {
+        return PARSER.apply(parser, null);
+    }
+
+    private String name;
+    private DataType type;
+
+    @ParserConstructor
+    public ColumnInfoImpl(String name, String type) {
+        this(name, DataType.fromEs(type));
+    }
+
+    public ColumnInfoImpl(String name, DataType type) {
+        this.name = name;
+        this.type = type;
+    }
+
+    public ColumnInfoImpl(StreamInput in) throws IOException {
+        this(in.readString(), in.readString());
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeString(name);
+        out.writeString(type.outputType());
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
+        builder.startObject();
+        builder.field("name", name);
+        builder.field("type", type.outputType());
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public String outputType() {
+        return type.outputType();
+    }
+
+    public DataType type() {
+        return type;
+    }
+}

+ 8 - 8
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

@@ -25,8 +25,8 @@ import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.xcontent.ToXContent;
-import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.core.esql.action.EsqlResponse;
+import org.elasticsearch.xpack.esql.core.type.DataType;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -45,7 +45,7 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
 
     public static final String DROP_NULL_COLUMNS_OPTION = "drop_null_columns";
 
-    private final List<ColumnInfo> columns;
+    private final List<ColumnInfoImpl> columns;
     private final List<Page> pages;
     private final Profile profile;
     private final boolean columnar;
@@ -55,7 +55,7 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
     private final boolean isAsync;
 
     public EsqlQueryResponse(
-        List<ColumnInfo> columns,
+        List<ColumnInfoImpl> columns,
         List<Page> pages,
         @Nullable Profile profile,
         boolean columnar,
@@ -72,7 +72,7 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
         this.isAsync = isAsync;
     }
 
-    public EsqlQueryResponse(List<ColumnInfo> columns, List<Page> pages, @Nullable Profile profile, boolean columnar, boolean isAsync) {
+    public EsqlQueryResponse(List<ColumnInfoImpl> columns, List<Page> pages, @Nullable Profile profile, boolean columnar, boolean isAsync) {
         this(columns, pages, profile, columnar, null, false, isAsync);
     }
 
@@ -97,7 +97,7 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
             isRunning = in.readBoolean();
             isAsync = in.readBoolean();
         }
-        List<ColumnInfo> columns = in.readCollectionAsList(ColumnInfo::new);
+        List<ColumnInfoImpl> columns = in.readCollectionAsList(ColumnInfoImpl::new);
         List<Page> pages = in.readCollectionAsList(Page::new);
         if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
             profile = in.readOptionalWriteable(Profile::new);
@@ -121,7 +121,7 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
         out.writeBoolean(columnar);
     }
 
-    public List<ColumnInfo> columns() {
+    public List<ColumnInfoImpl> columns() {
         return columns;
     }
 
@@ -130,12 +130,12 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
     }
 
     public Iterator<Iterator<Object>> values() {
-        List<String> dataTypes = columns.stream().map(ColumnInfo::type).toList();
+        List<DataType> dataTypes = columns.stream().map(ColumnInfoImpl::type).toList();
         return ResponseValueUtils.pagesToValues(dataTypes, pages);
     }
 
     public Iterable<Iterable<Object>> rows() {
-        List<String> dataTypes = columns.stream().map(ColumnInfo::type).toList();
+        List<DataType> dataTypes = columns.stream().map(ColumnInfoImpl::type).toList();
         return ResponseValueUtils.valuesForRowsInPages(dataTypes, pages);
     }
 

+ 16 - 16
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java

@@ -22,7 +22,6 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.XContentParserConfiguration;
 import org.elasticsearch.xcontent.XContentType;
-import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 
 import java.io.IOException;
 
@@ -59,30 +58,30 @@ abstract class PositionToXContent {
     protected abstract XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
         throws IOException;
 
-    public static PositionToXContent positionToXContent(ColumnInfo columnInfo, Block block, BytesRef scratch) {
+    public static PositionToXContent positionToXContent(ColumnInfoImpl columnInfo, Block block, BytesRef scratch) {
         return switch (columnInfo.type()) {
-            case "long", "counter_long" -> new PositionToXContent(block) {
+            case LONG, COUNTER_LONG -> new PositionToXContent(block) {
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
                     return builder.value(((LongBlock) block).getLong(valueIndex));
                 }
             };
-            case "integer", "counter_integer" -> new PositionToXContent(block) {
+            case INTEGER, COUNTER_INTEGER -> new PositionToXContent(block) {
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
                     return builder.value(((IntBlock) block).getInt(valueIndex));
                 }
             };
-            case "double", "counter_double" -> new PositionToXContent(block) {
+            case DOUBLE, COUNTER_DOUBLE -> new PositionToXContent(block) {
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
                     return builder.value(((DoubleBlock) block).getDouble(valueIndex));
                 }
             };
-            case "unsigned_long" -> new PositionToXContent(block) {
+            case UNSIGNED_LONG -> new PositionToXContent(block) {
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
@@ -90,7 +89,7 @@ abstract class PositionToXContent {
                     return builder.value(unsignedLongAsNumber(l));
                 }
             };
-            case "keyword", "text" -> new PositionToXContent(block) {
+            case KEYWORD, TEXT -> new PositionToXContent(block) {
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
@@ -103,7 +102,7 @@ abstract class PositionToXContent {
                     return builder.utf8Value(val.bytes, val.offset, val.length);
                 }
             };
-            case "ip" -> new PositionToXContent(block) {
+            case IP -> new PositionToXContent(block) {
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
@@ -111,7 +110,7 @@ abstract class PositionToXContent {
                     return builder.value(ipToString(val));
                 }
             };
-            case "date" -> new PositionToXContent(block) {
+            case DATETIME -> new PositionToXContent(block) {
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
@@ -119,21 +118,21 @@ abstract class PositionToXContent {
                     return builder.value(dateTimeToString(longVal));
                 }
             };
-            case "geo_point", "geo_shape", "cartesian_point", "cartesian_shape" -> new PositionToXContent(block) {
+            case GEO_POINT, GEO_SHAPE, CARTESIAN_POINT, CARTESIAN_SHAPE -> new PositionToXContent(block) {
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
                     return builder.value(spatialToString(((BytesRefBlock) block).getBytesRef(valueIndex, scratch)));
                 }
             };
-            case "boolean" -> new PositionToXContent(block) {
+            case BOOLEAN -> new PositionToXContent(block) {
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
                     return builder.value(((BooleanBlock) block).getBoolean(valueIndex));
                 }
             };
-            case "version" -> new PositionToXContent(block) {
+            case VERSION -> new PositionToXContent(block) {
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
@@ -141,21 +140,21 @@ abstract class PositionToXContent {
                     return builder.value(versionToString(val));
                 }
             };
-            case "null" -> new PositionToXContent(block) {
+            case NULL -> new PositionToXContent(block) {
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
                     return builder.nullValue();
                 }
             };
-            case "unsupported" -> new PositionToXContent(block) {
+            case UNSUPPORTED -> new PositionToXContent(block) {
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
                     return builder.value(UnsupportedValueSource.UNSUPPORTED_OUTPUT);
                 }
             };
-            case "_source" -> new PositionToXContent(block) {
+            case SOURCE -> new PositionToXContent(block) {
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
@@ -166,7 +165,8 @@ abstract class PositionToXContent {
                     }
                 }
             };
-            default -> throw new IllegalArgumentException("can't convert values of type [" + columnInfo.type() + "]");
+            case DATE_PERIOD, TIME_DURATION, DOC_DATA_TYPE, TSID_DATA_TYPE, SHORT, BYTE, OBJECT, NESTED, FLOAT, HALF_FLOAT, SCALED_FLOAT ->
+                throw new IllegalArgumentException("can't convert values of type [" + columnInfo.type() + "]");
         };
     }
 }

+ 36 - 41
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java

@@ -25,7 +25,6 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.XContentParserConfiguration;
 import org.elasticsearch.xcontent.json.JsonXContent;
-import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.planner.PlannerUtils;
@@ -57,7 +56,7 @@ public final class ResponseValueUtils {
      * Returns an iterator of iterators over the values in the given pages. There is one iterator
      * for each block.
      */
-    public static Iterator<Iterator<Object>> pagesToValues(List<String> dataTypes, List<Page> pages) {
+    public static Iterator<Iterator<Object>> pagesToValues(List<DataType> dataTypes, List<Page> pages) {
         BytesRef scratch = new BytesRef();
         return Iterators.flatMap(
             pages.iterator(),
@@ -70,18 +69,18 @@ public final class ResponseValueUtils {
     }
 
     /** Returns an iterable of iterables over the values in the given pages. There is one iterables for each row. */
-    static Iterable<Iterable<Object>> valuesForRowsInPages(List<String> dataTypes, List<Page> pages) {
+    static Iterable<Iterable<Object>> valuesForRowsInPages(List<DataType> dataTypes, List<Page> pages) {
         BytesRef scratch = new BytesRef();
         return () -> Iterators.flatMap(pages.iterator(), page -> valuesForRowsInPage(dataTypes, page, scratch));
     }
 
     /** Returns an iterable of iterables over the values in the given page. There is one iterables for each row. */
-    static Iterator<Iterable<Object>> valuesForRowsInPage(List<String> dataTypes, Page page, BytesRef scratch) {
+    static Iterator<Iterable<Object>> valuesForRowsInPage(List<DataType> dataTypes, Page page, BytesRef scratch) {
         return Iterators.forRange(0, page.getPositionCount(), position -> valuesForRow(dataTypes, page, position, scratch));
     }
 
     /** Returns an iterable over the values in the given row in a page. */
-    static Iterable<Object> valuesForRow(List<String> dataTypes, Page page, int position, BytesRef scratch) {
+    static Iterable<Object> valuesForRow(List<DataType> dataTypes, Page page, int position, BytesRef scratch) {
         return () -> Iterators.forRange(
             0,
             page.getBlockCount(),
@@ -90,7 +89,7 @@ public final class ResponseValueUtils {
     }
 
     /**  Returns an iterator of values for the given column. */
-    static Iterator<Object> valuesForColumn(int columnIndex, String dataType, List<Page> pages) {
+    static Iterator<Object> valuesForColumn(int columnIndex, DataType dataType, List<Page> pages) {
         BytesRef scratch = new BytesRef();
         return Iterators.flatMap(
             pages.iterator(),
@@ -103,7 +102,7 @@ public final class ResponseValueUtils {
     }
 
     /** Returns the value that the position and with the given data type, in the block. */
-    static Object valueAtPosition(Block block, int position, String dataType, BytesRef scratch) {
+    static Object valueAtPosition(Block block, int position, DataType dataType, BytesRef scratch) {
         if (block.isNull(position)) {
             return null;
         }
@@ -120,28 +119,28 @@ public final class ResponseValueUtils {
         return values;
     }
 
-    private static Object valueAt(String dataType, Block block, int offset, BytesRef scratch) {
+    private static Object valueAt(DataType dataType, Block block, int offset, BytesRef scratch) {
         return switch (dataType) {
-            case "unsigned_long" -> unsignedLongAsNumber(((LongBlock) block).getLong(offset));
-            case "long", "counter_long" -> ((LongBlock) block).getLong(offset);
-            case "integer", "counter_integer" -> ((IntBlock) block).getInt(offset);
-            case "double", "counter_double" -> ((DoubleBlock) block).getDouble(offset);
-            case "keyword", "text" -> ((BytesRefBlock) block).getBytesRef(offset, scratch).utf8ToString();
-            case "ip" -> {
+            case UNSIGNED_LONG -> unsignedLongAsNumber(((LongBlock) block).getLong(offset));
+            case LONG, COUNTER_LONG -> ((LongBlock) block).getLong(offset);
+            case INTEGER, COUNTER_INTEGER -> ((IntBlock) block).getInt(offset);
+            case DOUBLE, COUNTER_DOUBLE -> ((DoubleBlock) block).getDouble(offset);
+            case KEYWORD, TEXT -> ((BytesRefBlock) block).getBytesRef(offset, scratch).utf8ToString();
+            case IP -> {
                 BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch);
                 yield ipToString(val);
             }
-            case "date" -> {
+            case DATETIME -> {
                 long longVal = ((LongBlock) block).getLong(offset);
                 yield dateTimeToString(longVal);
             }
-            case "boolean" -> ((BooleanBlock) block).getBoolean(offset);
-            case "version" -> versionToString(((BytesRefBlock) block).getBytesRef(offset, scratch));
-            case "geo_point", "geo_shape", "cartesian_point", "cartesian_shape" -> spatialToString(
+            case BOOLEAN -> ((BooleanBlock) block).getBoolean(offset);
+            case VERSION -> versionToString(((BytesRefBlock) block).getBytesRef(offset, scratch));
+            case GEO_POINT, GEO_SHAPE, CARTESIAN_POINT, CARTESIAN_SHAPE -> spatialToString(
                 ((BytesRefBlock) block).getBytesRef(offset, scratch)
             );
-            case "unsupported" -> UnsupportedValueSource.UNSUPPORTED_OUTPUT;
-            case "_source" -> {
+            case UNSUPPORTED -> UnsupportedValueSource.UNSUPPORTED_OUTPUT;
+            case SOURCE -> {
                 BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch);
                 try {
                     try (XContentParser parser = XContentHelper.createParser(XContentParserConfiguration.EMPTY, new BytesArray(val))) {
@@ -152,7 +151,8 @@ public final class ResponseValueUtils {
                     throw new UncheckedIOException(e);
                 }
             }
-            default -> throw EsqlIllegalArgumentException.illegalDataType(dataType);
+            case SHORT, BYTE, FLOAT, HALF_FLOAT, SCALED_FLOAT, OBJECT, NESTED, DATE_PERIOD, TIME_DURATION, DOC_DATA_TYPE, TSID_DATA_TYPE,
+                NULL -> throw EsqlIllegalArgumentException.illegalDataType(dataType);
         };
     }
 
@@ -160,10 +160,10 @@ public final class ResponseValueUtils {
      * Converts a list of values to Pages so that we can parse from xcontent. It's not
      * super efficient, but it doesn't really have to be.
      */
-    static Page valuesToPage(BlockFactory blockFactory, List<ColumnInfo> columns, List<List<Object>> values) {
-        List<String> dataTypes = columns.stream().map(ColumnInfo::type).toList();
+    static Page valuesToPage(BlockFactory blockFactory, List<ColumnInfoImpl> columns, List<List<Object>> values) {
+        List<DataType> dataTypes = columns.stream().map(ColumnInfoImpl::type).toList();
         List<Block.Builder> results = dataTypes.stream()
-            .map(c -> PlannerUtils.toElementType(DataType.fromEs(c)).newBlockBuilder(values.size(), blockFactory))
+            .map(c -> PlannerUtils.toElementType(c).newBlockBuilder(values.size(), blockFactory))
             .toList();
 
         for (List<Object> row : values) {
@@ -171,24 +171,20 @@ public final class ResponseValueUtils {
                 var builder = results.get(c);
                 var value = row.get(c);
                 switch (dataTypes.get(c)) {
-                    case "unsigned_long" -> ((LongBlock.Builder) builder).appendLong(
-                        longToUnsignedLong(((Number) value).longValue(), true)
-                    );
-                    case "long", "counter_long" -> ((LongBlock.Builder) builder).appendLong(((Number) value).longValue());
-                    case "integer", "counter_integer" -> ((IntBlock.Builder) builder).appendInt(((Number) value).intValue());
-                    case "double", "counter_double" -> ((DoubleBlock.Builder) builder).appendDouble(((Number) value).doubleValue());
-                    case "keyword", "text", "unsupported" -> ((BytesRefBlock.Builder) builder).appendBytesRef(
-                        new BytesRef(value.toString())
-                    );
-                    case "ip" -> ((BytesRefBlock.Builder) builder).appendBytesRef(stringToIP(value.toString()));
-                    case "date" -> {
+                    case UNSIGNED_LONG -> ((LongBlock.Builder) builder).appendLong(longToUnsignedLong(((Number) value).longValue(), true));
+                    case LONG, COUNTER_LONG -> ((LongBlock.Builder) builder).appendLong(((Number) value).longValue());
+                    case INTEGER, COUNTER_INTEGER -> ((IntBlock.Builder) builder).appendInt(((Number) value).intValue());
+                    case DOUBLE, COUNTER_DOUBLE -> ((DoubleBlock.Builder) builder).appendDouble(((Number) value).doubleValue());
+                    case KEYWORD, TEXT, UNSUPPORTED -> ((BytesRefBlock.Builder) builder).appendBytesRef(new BytesRef(value.toString()));
+                    case IP -> ((BytesRefBlock.Builder) builder).appendBytesRef(stringToIP(value.toString()));
+                    case DATETIME -> {
                         long longVal = dateTimeToLong(value.toString());
                         ((LongBlock.Builder) builder).appendLong(longVal);
                     }
-                    case "boolean" -> ((BooleanBlock.Builder) builder).appendBoolean(((Boolean) value));
-                    case "null" -> builder.appendNull();
-                    case "version" -> ((BytesRefBlock.Builder) builder).appendBytesRef(stringToVersion(new BytesRef(value.toString())));
-                    case "_source" -> {
+                    case BOOLEAN -> ((BooleanBlock.Builder) builder).appendBoolean(((Boolean) value));
+                    case NULL -> builder.appendNull();
+                    case VERSION -> ((BytesRefBlock.Builder) builder).appendBytesRef(stringToVersion(new BytesRef(value.toString())));
+                    case SOURCE -> {
                         @SuppressWarnings("unchecked")
                         Map<String, ?> o = (Map<String, ?>) value;
                         try {
@@ -200,12 +196,11 @@ public final class ResponseValueUtils {
                             throw new UncheckedIOException(e);
                         }
                     }
-                    case "geo_point", "geo_shape", "cartesian_point", "cartesian_shape" -> {
+                    case GEO_POINT, GEO_SHAPE, CARTESIAN_POINT, CARTESIAN_SHAPE -> {
                         // This just converts WKT to WKB, so does not need CRS knowledge, we could merge GEO and CARTESIAN here
                         BytesRef wkb = stringToSpatial(value.toString());
                         ((BytesRefBlock.Builder) builder).appendBytesRef(wkb);
                     }
-                    default -> throw EsqlIllegalArgumentException.illegalDataType(dataTypes.get(c));
                 }
             }
         }

+ 5 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java

@@ -27,7 +27,7 @@ final class ResponseXContentUtils {
     /**
      * Returns the column headings for the given columns.
      */
-    static Iterator<? extends ToXContent> allColumns(List<ColumnInfo> columns, String name) {
+    static Iterator<? extends ToXContent> allColumns(List<ColumnInfoImpl> columns, String name) {
         return ChunkedToXContentHelper.singleChunk((builder, params) -> {
             builder.startArray(name);
             for (ColumnInfo col : columns) {
@@ -41,7 +41,7 @@ final class ResponseXContentUtils {
      * Returns the column headings for the given columns, moving the heading
      * for always-null columns to a {@code null_columns} section.
      */
-    static Iterator<? extends ToXContent> nonNullColumns(List<ColumnInfo> columns, boolean[] nullColumns, String name) {
+    static Iterator<? extends ToXContent> nonNullColumns(List<ColumnInfoImpl> columns, boolean[] nullColumns, String name) {
         return ChunkedToXContentHelper.singleChunk((builder, params) -> {
             builder.startArray(name);
             for (int c = 0; c < columns.size(); c++) {
@@ -55,7 +55,7 @@ final class ResponseXContentUtils {
 
     /** Returns the column values for the given pages (described by the column infos). */
     static Iterator<? extends ToXContent> columnValues(
-        List<ColumnInfo> columns,
+        List<ColumnInfoImpl> columns,
         List<Page> pages,
         boolean columnar,
         boolean[] nullColumns
@@ -70,7 +70,7 @@ final class ResponseXContentUtils {
     }
 
     /** Returns a columnar based representation of the values in the given pages (described by the column infos). */
-    static Iterator<? extends ToXContent> columnarValues(List<ColumnInfo> columns, List<Page> pages, boolean[] nullColumns) {
+    static Iterator<? extends ToXContent> columnarValues(List<ColumnInfoImpl> columns, List<Page> pages, boolean[] nullColumns) {
         final BytesRef scratch = new BytesRef();
         return Iterators.flatMap(Iterators.forRange(0, columns.size(), column -> {
             if (nullColumns != null && nullColumns[column]) {
@@ -96,7 +96,7 @@ final class ResponseXContentUtils {
     }
 
     /** Returns a row based representation of the values in the given pages (described by the column infos). */
-    static Iterator<? extends ToXContent> rowValues(List<ColumnInfo> columns, List<Page> pages, boolean[] nullColumns) {
+    static Iterator<? extends ToXContent> rowValues(List<ColumnInfoImpl> columns, List<Page> pages, boolean[] nullColumns) {
         final BytesRef scratch = new BytesRef();
         return Iterators.flatMap(pages.iterator(), page -> {
             final int columnCount = columns.size();

+ 3 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -28,7 +28,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.XPackPlugin;
 import org.elasticsearch.xpack.core.async.AsyncExecutionId;
-import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
+import org.elasticsearch.xpack.esql.action.ColumnInfoImpl;
 import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
 import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
 import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
@@ -169,9 +169,9 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
                     physicalPlan,
                     configuration,
                     delegate.map(result -> {
-                        List<ColumnInfo> columns = physicalPlan.output()
+                        List<ColumnInfoImpl> columns = physicalPlan.output()
                             .stream()
-                            .map(c -> new ColumnInfo(c.qualifiedName(), c.dataType().outputType()))
+                            .map(c -> new ColumnInfoImpl(c.qualifiedName(), c.dataType().outputType()))
                             .toList();
                         EsqlQueryResponse.Profile profile = configuration.profile()
                             ? new EsqlQueryResponse.Profile(result.profiles())

+ 3 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

@@ -59,6 +59,7 @@ import org.elasticsearch.xpack.esql.core.expression.function.FunctionRegistry;
 import org.elasticsearch.xpack.esql.core.index.EsIndex;
 import org.elasticsearch.xpack.esql.core.index.IndexResolution;
 import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
 import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
@@ -413,10 +414,10 @@ public class CsvTests extends ESTestCase {
         }
 
         List<String> columnNames = Expressions.names(coordinatorPlan.output());
-        List<String> dataTypes = new ArrayList<>(columnNames.size());
+        List<DataType> dataTypes = new ArrayList<>(columnNames.size());
         List<Type> columnTypes = coordinatorPlan.output()
             .stream()
-            .peek(o -> dataTypes.add(o.dataType().outputType()))
+            .peek(o -> dataTypes.add(o.dataType()))
             .map(o -> Type.asType(o.dataType().nameUpper()))
             .toList();
 

+ 43 - 39
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java

@@ -44,7 +44,6 @@ import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xcontent.json.JsonXContent;
-import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.TestBlockFactory;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.planner.PlannerUtils;
@@ -109,7 +108,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
 
     EsqlQueryResponse randomResponseAsync(boolean columnar, EsqlQueryResponse.Profile profile, boolean async) {
         int noCols = randomIntBetween(1, 10);
-        List<ColumnInfo> columns = randomList(noCols, noCols, this::randomColumnInfo);
+        List<ColumnInfoImpl> columns = randomList(noCols, noCols, this::randomColumnInfo);
         int noPages = randomIntBetween(1, 20);
         List<Page> values = randomList(noPages, noPages, () -> randomPage(columns));
         String id = null;
@@ -121,12 +120,12 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
         return new EsqlQueryResponse(columns, values, profile, columnar, id, isRunning, async);
     }
 
-    private ColumnInfo randomColumnInfo() {
+    private ColumnInfoImpl randomColumnInfo() {
         DataType type = randomValueOtherThanMany(
             t -> false == DataType.isPrimitive(t) || t == DataType.DATE_PERIOD || t == DataType.TIME_DURATION || t == DataType.PARTIAL_AGG,
             () -> randomFrom(DataType.types())
         ).widenSmallNumeric();
-        return new ColumnInfo(randomAlphaOfLength(10), type.esType());
+        return new ColumnInfoImpl(randomAlphaOfLength(10), type.esType());
     }
 
     private EsqlQueryResponse.Profile randomProfile() {
@@ -136,34 +135,34 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
         return new EsqlQueryResponseProfileTests().createTestInstance();
     }
 
-    private Page randomPage(List<ColumnInfo> columns) {
+    private Page randomPage(List<ColumnInfoImpl> columns) {
         return new Page(columns.stream().map(c -> {
-            Block.Builder builder = PlannerUtils.toElementType(DataType.fromEs(c.type())).newBlockBuilder(1, blockFactory);
+            Block.Builder builder = PlannerUtils.toElementType(c.type()).newBlockBuilder(1, blockFactory);
             switch (c.type()) {
-                case "unsigned_long", "long", "counter_long" -> ((LongBlock.Builder) builder).appendLong(randomLong());
-                case "integer", "counter_integer" -> ((IntBlock.Builder) builder).appendInt(randomInt());
-                case "double", "counter_double" -> ((DoubleBlock.Builder) builder).appendDouble(randomDouble());
-                case "keyword" -> ((BytesRefBlock.Builder) builder).appendBytesRef(new BytesRef(randomAlphaOfLength(10)));
-                case "text" -> ((BytesRefBlock.Builder) builder).appendBytesRef(new BytesRef(randomAlphaOfLength(10000)));
-                case "ip" -> ((BytesRefBlock.Builder) builder).appendBytesRef(
+                case UNSIGNED_LONG, LONG, COUNTER_LONG -> ((LongBlock.Builder) builder).appendLong(randomLong());
+                case INTEGER, COUNTER_INTEGER -> ((IntBlock.Builder) builder).appendInt(randomInt());
+                case DOUBLE, COUNTER_DOUBLE -> ((DoubleBlock.Builder) builder).appendDouble(randomDouble());
+                case KEYWORD -> ((BytesRefBlock.Builder) builder).appendBytesRef(new BytesRef(randomAlphaOfLength(10)));
+                case TEXT -> ((BytesRefBlock.Builder) builder).appendBytesRef(new BytesRef(randomAlphaOfLength(10000)));
+                case IP -> ((BytesRefBlock.Builder) builder).appendBytesRef(
                     new BytesRef(InetAddressPoint.encode(randomIp(randomBoolean())))
                 );
-                case "date" -> ((LongBlock.Builder) builder).appendLong(randomInstant().toEpochMilli());
-                case "boolean" -> ((BooleanBlock.Builder) builder).appendBoolean(randomBoolean());
-                case "unsupported" -> ((BytesRefBlock.Builder) builder).appendBytesRef(
+                case DATETIME -> ((LongBlock.Builder) builder).appendLong(randomInstant().toEpochMilli());
+                case BOOLEAN -> ((BooleanBlock.Builder) builder).appendBoolean(randomBoolean());
+                case UNSUPPORTED -> ((BytesRefBlock.Builder) builder).appendBytesRef(
                     new BytesRef(UnsupportedValueSource.UNSUPPORTED_OUTPUT)
                 );
-                case "version" -> ((BytesRefBlock.Builder) builder).appendBytesRef(new Version(randomIdentifier()).toBytesRef());
-                case "geo_point" -> ((BytesRefBlock.Builder) builder).appendBytesRef(GEO.asWkb(GeometryTestUtils.randomPoint()));
-                case "cartesian_point" -> ((BytesRefBlock.Builder) builder).appendBytesRef(CARTESIAN.asWkb(ShapeTestUtils.randomPoint()));
-                case "geo_shape" -> ((BytesRefBlock.Builder) builder).appendBytesRef(
+                case VERSION -> ((BytesRefBlock.Builder) builder).appendBytesRef(new Version(randomIdentifier()).toBytesRef());
+                case GEO_POINT -> ((BytesRefBlock.Builder) builder).appendBytesRef(GEO.asWkb(GeometryTestUtils.randomPoint()));
+                case CARTESIAN_POINT -> ((BytesRefBlock.Builder) builder).appendBytesRef(CARTESIAN.asWkb(ShapeTestUtils.randomPoint()));
+                case GEO_SHAPE -> ((BytesRefBlock.Builder) builder).appendBytesRef(
                     GEO.asWkb(GeometryTestUtils.randomGeometry(randomBoolean()))
                 );
-                case "cartesian_shape" -> ((BytesRefBlock.Builder) builder).appendBytesRef(
+                case CARTESIAN_SHAPE -> ((BytesRefBlock.Builder) builder).appendBytesRef(
                     CARTESIAN.asWkb(ShapeTestUtils.randomGeometry(randomBoolean()))
                 );
-                case "null" -> builder.appendNull();
-                case "_source" -> {
+                case NULL -> builder.appendNull();
+                case SOURCE -> {
                     try {
                         ((BytesRefBlock.Builder) builder).appendBytesRef(
                             BytesReference.bytes(
@@ -177,7 +176,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                         throw new UncheckedIOException(e);
                     }
                 }
-                default -> throw new UnsupportedOperationException("unsupported data type [" + c + "]");
+                // default -> throw new UnsupportedOperationException("unsupported data type [" + c + "]");
             }
             return builder.build();
         }).toArray(Block[]::new));
@@ -186,17 +185,17 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
     @Override
     protected EsqlQueryResponse mutateInstance(EsqlQueryResponse instance) {
         boolean allNull = true;
-        for (ColumnInfo info : instance.columns()) {
-            if (false == info.type().equals("null")) {
+        for (ColumnInfoImpl info : instance.columns()) {
+            if (info.type() != DataType.NULL) {
                 allNull = false;
             }
         }
         return switch (allNull ? between(0, 2) : between(0, 3)) {
             case 0 -> {
                 int mutCol = between(0, instance.columns().size() - 1);
-                List<ColumnInfo> cols = new ArrayList<>(instance.columns());
+                List<ColumnInfoImpl> cols = new ArrayList<>(instance.columns());
                 // keep the type the same so the values are still valid but change the name
-                cols.set(mutCol, new ColumnInfo(cols.get(mutCol).name() + "mut", cols.get(mutCol).type()));
+                cols.set(mutCol, new ColumnInfoImpl(cols.get(mutCol).name() + "mut", cols.get(mutCol).type()));
                 yield new EsqlQueryResponse(cols, deepCopyOfPages(instance), instance.profile(), instance.columnar(), instance.isAsync());
             }
             case 1 -> new EsqlQueryResponse(
@@ -273,7 +272,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                 IS_RUNNING,
                 ObjectParser.ValueType.BOOLEAN_OR_NULL
             );
-            parser.declareObjectArray(constructorArg(), (p, c) -> ColumnInfo.fromXContent(p), new ParseField("columns"));
+            parser.declareObjectArray(constructorArg(), (p, c) -> ColumnInfoImpl.fromXContent(p), new ParseField("columns"));
             parser.declareField(constructorArg(), (p, c) -> p.list(), new ParseField("values"), ObjectParser.ValueType.OBJECT_ARRAY);
             PARSER = parser.build();
         }
@@ -282,7 +281,12 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
         private final EsqlQueryResponse response;
 
         @ParserConstructor
-        public ResponseBuilder(@Nullable String asyncExecutionId, Boolean isRunning, List<ColumnInfo> columns, List<List<Object>> values) {
+        public ResponseBuilder(
+            @Nullable String asyncExecutionId,
+            Boolean isRunning,
+            List<ColumnInfoImpl> columns,
+            List<List<Object>> values
+        ) {
             this.response = new EsqlQueryResponse(
                 columns,
                 List.of(valuesToPage(TestBlockFactory.getNonBreakingInstance(), columns, values)),
@@ -372,7 +376,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
     public void testBasicXContentIdAndRunning() {
         try (
             EsqlQueryResponse response = new EsqlQueryResponse(
-                List.of(new ColumnInfo("foo", "integer")),
+                List.of(new ColumnInfoImpl("foo", "integer")),
                 List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock())),
                 null,
                 false,
@@ -389,7 +393,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
     public void testNullColumnsXContentDropNulls() {
         try (
             EsqlQueryResponse response = new EsqlQueryResponse(
-                List.of(new ColumnInfo("foo", "integer"), new ColumnInfo("all_null", "integer")),
+                List.of(new ColumnInfoImpl("foo", "integer"), new ColumnInfoImpl("all_null", "integer")),
                 List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock(), blockFactory.newConstantNullBlock(2))),
                 null,
                 false,
@@ -418,7 +422,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
             b.appendNull();
             try (
                 EsqlQueryResponse response = new EsqlQueryResponse(
-                    List.of(new ColumnInfo("foo", "integer"), new ColumnInfo("all_null", "integer")),
+                    List.of(new ColumnInfoImpl("foo", "integer"), new ColumnInfoImpl("all_null", "integer")),
                     List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock(), b.build())),
                     null,
                     false,
@@ -444,7 +448,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
 
     private EsqlQueryResponse simple(boolean columnar, boolean async) {
         return new EsqlQueryResponse(
-            List.of(new ColumnInfo("foo", "integer")),
+            List.of(new ColumnInfoImpl("foo", "integer")),
             List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock())),
             null,
             columnar,
@@ -455,7 +459,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
     public void testProfileXContent() {
         try (
             EsqlQueryResponse response = new EsqlQueryResponse(
-                List.of(new ColumnInfo("foo", "integer")),
+                List.of(new ColumnInfoImpl("foo", "integer")),
                 List.of(new Page(blockFactory.newIntArrayVector(new int[] { 40, 80 }, 2).asBlock())),
                 new EsqlQueryResponse.Profile(
                     List.of(
@@ -520,7 +524,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
         var intBlk2 = blockFactory.newIntArrayVector(new int[] { 30, 40, 50 }, 3).asBlock();
         var longBlk1 = blockFactory.newLongArrayVector(new long[] { 100L, 200L }, 2).asBlock();
         var longBlk2 = blockFactory.newLongArrayVector(new long[] { 300L, 400L, 500L }, 3).asBlock();
-        var columnInfo = List.of(new ColumnInfo("foo", "integer"), new ColumnInfo("bar", "long"));
+        var columnInfo = List.of(new ColumnInfoImpl("foo", "integer"), new ColumnInfoImpl("bar", "long"));
         var pages = List.of(new Page(intBlk1, longBlk1), new Page(intBlk2, longBlk2));
         try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false)) {
             assertThat(columnValues(response.column(0)), contains(10, 20, 30, 40, 50));
@@ -532,7 +536,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
 
     public void testColumnsIllegalArg() {
         var intBlk1 = blockFactory.newIntArrayVector(new int[] { 10 }, 1).asBlock();
-        var columnInfo = List.of(new ColumnInfo("foo", "integer"));
+        var columnInfo = List.of(new ColumnInfoImpl("foo", "integer"));
         var pages = List.of(new Page(intBlk1));
         try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false)) {
             expectThrows(IllegalArgumentException.class, () -> response.column(-1));
@@ -551,7 +555,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
             blk2 = bb2.appendInt(30).appendNull().appendNull().appendInt(60).build();
             blk3 = bb3.appendNull().appendInt(80).appendInt(90).appendNull().build();
         }
-        var columnInfo = List.of(new ColumnInfo("foo", "integer"));
+        var columnInfo = List.of(new ColumnInfoImpl("foo", "integer"));
         var pages = List.of(new Page(blk1), new Page(blk2), new Page(blk3));
         try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false)) {
             assertThat(columnValues(response.column(0)), contains(10, null, 30, null, null, 60, null, 80, 90, null));
@@ -571,7 +575,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
             blk2 = bb2.beginPositionEntry().appendInt(40).appendInt(50).endPositionEntry().build();
             blk3 = bb3.appendNull().appendInt(70).appendInt(80).appendNull().build();
         }
-        var columnInfo = List.of(new ColumnInfo("foo", "integer"));
+        var columnInfo = List.of(new ColumnInfoImpl("foo", "integer"));
         var pages = List.of(new Page(blk1), new Page(blk2), new Page(blk3));
         try (var response = new EsqlQueryResponse(columnInfo, pages, null, false, null, false, false)) {
             assertThat(columnValues(response.column(0)), contains(List.of(10, 20), null, List.of(40, 50), null, 70, 80, null));
@@ -583,7 +587,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
     public void testRowValues() {
         for (int times = 0; times < 10; times++) {
             int numColumns = randomIntBetween(1, 10);
-            List<ColumnInfo> columns = randomList(numColumns, numColumns, this::randomColumnInfo);
+            List<ColumnInfoImpl> columns = randomList(numColumns, numColumns, this::randomColumnInfo);
             int noPages = randomIntBetween(1, 20);
             List<Page> pages = randomList(noPages, noPages, () -> randomPage(columns));
             try (var resp = new EsqlQueryResponse(columns, pages, null, false, "", false, false)) {

+ 8 - 8
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java

@@ -17,8 +17,8 @@ import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.rest.FakeRestRequest;
 import org.elasticsearch.xcontent.NamedXContentRegistry;
-import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.TestBlockFactory;
+import org.elasticsearch.xpack.esql.action.ColumnInfoImpl;
 import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
 import org.elasticsearch.xpack.esql.core.util.StringUtils;
 
@@ -246,17 +246,17 @@ public class TextFormatTests extends ESTestCase {
     }
 
     private static EsqlQueryResponse emptyData() {
-        return new EsqlQueryResponse(singletonList(new ColumnInfo("name", "keyword")), emptyList(), null, false, false);
+        return new EsqlQueryResponse(singletonList(new ColumnInfoImpl("name", "keyword")), emptyList(), null, false, false);
     }
 
     private static EsqlQueryResponse regularData() {
         BlockFactory blockFactory = TestBlockFactory.getNonBreakingInstance();
         // headers
-        List<ColumnInfo> headers = asList(
-            new ColumnInfo("string", "keyword"),
-            new ColumnInfo("number", "integer"),
-            new ColumnInfo("location", "geo_point"),
-            new ColumnInfo("location2", "cartesian_point")
+        List<ColumnInfoImpl> headers = asList(
+            new ColumnInfoImpl("string", "keyword"),
+            new ColumnInfoImpl("number", "integer"),
+            new ColumnInfoImpl("location", "geo_point"),
+            new ColumnInfoImpl("location2", "cartesian_point")
         );
 
         BytesRefArray geoPoints = new BytesRefArray(2, BigArrays.NON_RECYCLING_INSTANCE);
@@ -283,7 +283,7 @@ public class TextFormatTests extends ESTestCase {
 
     private static EsqlQueryResponse escapedData() {
         // headers
-        List<ColumnInfo> headers = asList(new ColumnInfo("first", "keyword"), new ColumnInfo("\"special\"", "keyword"));
+        List<ColumnInfoImpl> headers = asList(new ColumnInfoImpl("first", "keyword"), new ColumnInfoImpl("\"special\"", "keyword"));
 
         // values
         List<Page> values = List.of(

+ 13 - 13
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java

@@ -15,8 +15,8 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.geometry.Point;
 import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.xpack.core.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.TestBlockFactory;
+import org.elasticsearch.xpack.esql.action.ColumnInfoImpl;
 import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
 
 import java.util.Arrays;
@@ -32,17 +32,17 @@ public class TextFormatterTests extends ESTestCase {
 
     static BlockFactory blockFactory = TestBlockFactory.getNonBreakingInstance();
 
-    private final List<ColumnInfo> columns = Arrays.asList(
-        new ColumnInfo("foo", "keyword"),
-        new ColumnInfo("bar", "long"),
-        new ColumnInfo("15charwidename!", "double"),
-        new ColumnInfo("null_field1", "integer"),
-        new ColumnInfo("superduperwidename!!!", "double"),
-        new ColumnInfo("baz", "keyword"),
-        new ColumnInfo("date", "date"),
-        new ColumnInfo("location", "geo_point"),
-        new ColumnInfo("location2", "cartesian_point"),
-        new ColumnInfo("null_field2", "keyword")
+    private final List<ColumnInfoImpl> columns = Arrays.asList(
+        new ColumnInfoImpl("foo", "keyword"),
+        new ColumnInfoImpl("bar", "long"),
+        new ColumnInfoImpl("15charwidename!", "double"),
+        new ColumnInfoImpl("null_field1", "integer"),
+        new ColumnInfoImpl("superduperwidename!!!", "double"),
+        new ColumnInfoImpl("baz", "keyword"),
+        new ColumnInfoImpl("date", "date"),
+        new ColumnInfoImpl("location", "geo_point"),
+        new ColumnInfoImpl("location2", "cartesian_point"),
+        new ColumnInfoImpl("null_field2", "keyword")
     );
 
     private static final BytesRefArray geoPoints = new BytesRefArray(2, BigArrays.NON_RECYCLING_INSTANCE);
@@ -183,7 +183,7 @@ public class TextFormatterTests extends ESTestCase {
             getTextBodyContent(
                 new TextFormatter(
                     new EsqlQueryResponse(
-                        List.of(new ColumnInfo("foo", "keyword")),
+                        List.of(new ColumnInfoImpl("foo", "keyword")),
                         List.of(
                             new Page(
                                 blockFactory.newBytesRefBlockBuilder(2)