Browse Source

ESQL: Use faster field caps (#105067)

The field capabilities has an internal-only option to produce un-merged
output. This expands that option to be available to any caller inside of
Elasticsearch and uses it in ES|QL to speed up queries across many
indices with many fields. Across 5,000 indices with a couple thousand
fields each (metricbeat) the `FROM *` query went from 600ms to 60ms.
Across 50,000 indices that went from 6600ms to 600ms. 600ms is still too
slow for such a simple query, but one step at a time!

This is faster because field capabilities wants to present a
field-centric result but ES|QL actually needs a different flavor of
field-centric result with some differences smoothed away. If we take
over the merging process we can use a few tools that the field caps API
uses internally to be fast - mostly the sha256 of the mapping - to save
on doing work that wasn't available in the other view. Also, two merges
is more expensive than one.

That 90% reduction in runtime doesn't banish field caps from the
flamegraphs. You still see it, but it's now much less prominent. And you
don't see the merging process at all. Now it's all data-node side
operations field caps.

Relates to #103369
Nik Everett 1 year ago
parent
commit
73a170bd4d
20 changed files with 2037 additions and 95 deletions
  1. 5 0
      docs/changelog/105067.yaml
  2. 2 2
      server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java
  3. 2 2
      server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java
  4. 2 2
      server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java
  5. 26 0
      x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/FieldExtractorIT.java
  6. 22 0
      x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/Clusters.java
  7. 1 7
      x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/EsqlSpecIT.java
  8. 26 0
      x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/FieldExtractorIT.java
  9. 26 0
      x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/FieldExtractorIT.java
  10. 1 2
      x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java
  11. 1456 0
      x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/FieldExtractorTestCase.java
  12. 5 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java
  13. 3 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java
  14. 252 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlIndexResolver.java
  15. 142 12
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
  16. 5 10
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java
  17. 44 11
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java
  18. 16 28
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeRegistryTests.java
  19. 0 16
      x-pack/plugin/esql/src/test/resources/empty_field_caps_response.json
  20. 1 1
      x-pack/plugin/mapper-unsigned-long/src/main/java/org/elasticsearch/xpack/unsignedlong/UnsignedLongFieldMapper.java

+ 5 - 0
docs/changelog/105067.yaml

@@ -0,0 +1,5 @@
+pr: 105067
+summary: "ESQL: Use faster field caps"
+area: ES|QL
+type: enhancement
+issues: []

+ 2 - 2
server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java

@@ -24,7 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-final class FieldCapabilitiesIndexResponse implements Writeable {
+public final class FieldCapabilitiesIndexResponse implements Writeable {
     private static final TransportVersion MAPPING_HASH_VERSION = TransportVersions.V_8_2_0;
 
     private final String indexName;
@@ -34,7 +34,7 @@ final class FieldCapabilitiesIndexResponse implements Writeable {
     private final boolean canMatch;
     private final transient TransportVersion originVersion;
 
-    FieldCapabilitiesIndexResponse(
+    public FieldCapabilitiesIndexResponse(
         String indexName,
         @Nullable String indexMappingHash,
         Map<String, IndexFieldCapabilities> responseMap,

+ 2 - 2
server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesRequest.java

@@ -75,7 +75,7 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
      * <p>
      * Note that when using the high-level REST client, results are always merged (this flag is always considered 'true').
      */
-    boolean isMergeResults() {
+    public boolean isMergeResults() {
         return mergeResults;
     }
 
@@ -85,7 +85,7 @@ public final class FieldCapabilitiesRequest extends ActionRequest implements Ind
      * <p>
      * Note that when using the high-level REST client, results are always merged (this flag is always considered 'true').
      */
-    void setMergeResults(boolean mergeResults) {
+    public void setMergeResults(boolean mergeResults) {
         this.mergeResults = mergeResults;
     }
 

+ 2 - 2
server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java

@@ -57,7 +57,7 @@ public class FieldCapabilitiesResponse extends ActionResponse implements Chunked
         this(indices, responseMap, Collections.emptyList(), Collections.emptyList());
     }
 
-    FieldCapabilitiesResponse(List<FieldCapabilitiesIndexResponse> indexResponses, List<FieldCapabilitiesFailure> failures) {
+    public FieldCapabilitiesResponse(List<FieldCapabilitiesIndexResponse> indexResponses, List<FieldCapabilitiesFailure> failures) {
         this(Strings.EMPTY_ARRAY, Collections.emptyMap(), indexResponses, failures);
     }
 
@@ -117,7 +117,7 @@ public class FieldCapabilitiesResponse extends ActionResponse implements Chunked
     /**
      * Returns the actual per-index field caps responses
      */
-    List<FieldCapabilitiesIndexResponse> getIndexResponses() {
+    public List<FieldCapabilitiesIndexResponse> getIndexResponses() {
         return indexResponses;
     }
 

+ 26 - 0
x-pack/plugin/esql/qa/server/mixed-cluster/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/mixed/FieldExtractorIT.java

@@ -0,0 +1,26 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.qa.mixed;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+
+import org.elasticsearch.test.TestClustersThreadFilter;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.xpack.esql.qa.rest.FieldExtractorTestCase;
+import org.junit.ClassRule;
+
+@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
+public class FieldExtractorIT extends FieldExtractorTestCase {
+    @ClassRule
+    public static ElasticsearchCluster cluster = Clusters.mixedVersionCluster();
+
+    @Override
+    protected String getTestRestCluster() {
+        return cluster.getHttpAddresses();
+    }
+}

+ 22 - 0
x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/Clusters.java

@@ -0,0 +1,22 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.qa.multi_node;
+
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.cluster.local.distribution.DistributionType;
+
+public class Clusters {
+    public static ElasticsearchCluster testCluster() {
+        return ElasticsearchCluster.local()
+            .distribution(DistributionType.DEFAULT)
+            .nodes(2)
+            .setting("xpack.security.enabled", "false")
+            .setting("xpack.license.self_generated.type", "trial")
+            .build();
+    }
+}

+ 1 - 7
x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/EsqlSpecIT.java

@@ -8,19 +8,13 @@
 package org.elasticsearch.xpack.esql.qa.multi_node;
 
 import org.elasticsearch.test.cluster.ElasticsearchCluster;
-import org.elasticsearch.test.cluster.local.distribution.DistributionType;
 import org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase;
 import org.elasticsearch.xpack.ql.CsvSpecReader.CsvTestCase;
 import org.junit.ClassRule;
 
 public class EsqlSpecIT extends EsqlSpecTestCase {
     @ClassRule
-    public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
-        .distribution(DistributionType.DEFAULT)
-        .nodes(2)
-        .setting("xpack.security.enabled", "false")
-        .setting("xpack.license.self_generated.type", "trial")
-        .build();
+    public static ElasticsearchCluster cluster = Clusters.testCluster();
 
     @Override
     protected String getTestRestCluster() {

+ 26 - 0
x-pack/plugin/esql/qa/server/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/multi_node/FieldExtractorIT.java

@@ -0,0 +1,26 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.qa.multi_node;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+
+import org.elasticsearch.test.TestClustersThreadFilter;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.xpack.esql.qa.rest.FieldExtractorTestCase;
+import org.junit.ClassRule;
+
+@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
+public class FieldExtractorIT extends FieldExtractorTestCase {
+    @ClassRule
+    public static ElasticsearchCluster cluster = Clusters.testCluster();
+
+    @Override
+    protected String getTestRestCluster() {
+        return cluster.getHttpAddresses();
+    }
+}

+ 26 - 0
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/FieldExtractorIT.java

@@ -0,0 +1,26 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.qa.single_node;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+
+import org.elasticsearch.test.TestClustersThreadFilter;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.xpack.esql.qa.rest.FieldExtractorTestCase;
+import org.junit.ClassRule;
+
+@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
+public class FieldExtractorIT extends FieldExtractorTestCase {
+    @ClassRule
+    public static ElasticsearchCluster cluster = Clusters.testCluster();
+
+    @Override
+    protected String getTestRestCluster() {
+        return cluster.getHttpAddresses();
+    }
+}

+ 1 - 2
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

@@ -185,8 +185,7 @@ public class RestEsqlIT extends RestEsqlTestCase {
         assertException("from test_alias | where _size is not null | limit 1", "Unknown column [_size]");
         assertException(
             "from test_alias | where message.hash is not null | limit 1",
-            "Cannot use field [message.hash] due to ambiguities",
-            "incompatible types: [integer] in [index2], [murmur3] in [index1]"
+            "Cannot use field [message.hash] with unsupported type [murmur3]"
         );
         assertException(
             "from index1 | where message.hash is not null | limit 1",

+ 1456 - 0
x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/FieldExtractorTestCase.java

@@ -0,0 +1,1456 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.qa.rest;
+
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.Version;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.network.NetworkAddress;
+import org.elasticsearch.core.CheckedConsumer;
+import org.elasticsearch.geo.GeometryTestUtils;
+import org.elasticsearch.index.mapper.BlockLoader;
+import org.elasticsearch.index.mapper.SourceFieldMapper;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.ListMatcher;
+import org.elasticsearch.test.rest.ESRestTestCase;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentType;
+import org.elasticsearch.xcontent.json.JsonXContent;
+import org.hamcrest.Matcher;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+import static org.elasticsearch.test.ListMatcher.matchesList;
+import static org.elasticsearch.test.MapMatcher.assertMap;
+import static org.elasticsearch.test.MapMatcher.matchesMap;
+import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.entityToMap;
+import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.runEsqlSync;
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.containsString;
+
+/**
+ * Creates indices with many different mappings and fetches values from them to make sure
+ * we can do it. Think of this as an integration test for {@link BlockLoader}
+ * implementations <strong>and</strong> an integration test for field resolution.
+ * This is a port of a test with the same name on the SQL side.
+ */
+public abstract class FieldExtractorTestCase extends ESRestTestCase {
+    private static final Logger logger = LogManager.getLogger(FieldExtractorTestCase.class);
+
+    public void testTextField() throws IOException {
+        textTest().test(randomAlphaOfLength(20));
+    }
+
+    private Test textTest() {
+        return new Test("text").randomStoreUnlessSynthetic();
+    }
+
+    public void testKeywordField() throws IOException {
+        Integer ignoreAbove = randomBoolean() ? null : between(10, 50);
+        int length = between(10, 50);
+
+        String value = randomAlphaOfLength(length);
+        keywordTest().ignoreAbove(ignoreAbove).test(value, ignoredByIgnoreAbove(ignoreAbove, length) ? null : value);
+    }
+
+    private Test keywordTest() {
+        return new Test("keyword").randomDocValuesAndStoreUnlessSynthetic();
+    }
+
+    public void testConstantKeywordField() throws IOException {
+        boolean specifyInMapping = randomBoolean();
+        boolean specifyInDocument = randomBoolean();
+
+        String value = randomAlphaOfLength(20);
+        new Test("constant_keyword").expectedType("keyword")
+            .value(specifyInMapping ? value : null)
+            .test(specifyInDocument ? value : null, specifyInMapping || specifyInDocument ? value : null);
+    }
+
+    public void testWildcardField() throws IOException {
+        Integer ignoreAbove = randomBoolean() ? null : between(10, 50);
+        int length = between(10, 50);
+
+        String value = randomAlphaOfLength(length);
+        new Test("wildcard").expectedType("keyword")
+            .ignoreAbove(ignoreAbove)
+            .test(value, ignoredByIgnoreAbove(ignoreAbove, length) ? null : value);
+    }
+
+    public void testLong() throws IOException {
+        long value = randomLong();
+        longTest().test(randomBoolean() ? Long.toString(value) : value, value);
+    }
+
+    public void testLongWithDecimalParts() throws IOException {
+        long value = randomLong();
+        int decimalPart = between(1, 99);
+        BigDecimal withDecimals = new BigDecimal(value + "." + decimalPart);
+        /*
+         * It's possible to pass the BigDecimal here without converting to a string
+         * but that rounds in a different way, and I'm not quite able to reproduce it
+         * at the time.
+         */
+        longTest().test(withDecimals.toString(), value);
+    }
+
+    public void testLongMalformed() throws IOException {
+        longTest().forceIgnoreMalformed().test(randomAlphaOfLength(5), null);
+    }
+
+    private Test longTest() {
+        return new Test("long").randomIgnoreMalformedUnlessSynthetic().randomDocValuesUnlessSynthetic();
+    }
+
+    public void testInt() throws IOException {
+        int value = randomInt();
+        intTest().test(randomBoolean() ? Integer.toString(value) : value, value);
+    }
+
+    public void testIntWithDecimalParts() throws IOException {
+        double value = randomDoubleBetween(Integer.MIN_VALUE, Integer.MAX_VALUE, true);
+        intTest().test(randomBoolean() ? Double.toString(value) : value, (int) value);
+    }
+
+    public void testIntMalformed() throws IOException {
+        intTest().forceIgnoreMalformed().test(randomAlphaOfLength(5), null);
+    }
+
+    private Test intTest() {
+        return new Test("integer").randomIgnoreMalformedUnlessSynthetic().randomDocValuesUnlessSynthetic();
+    }
+
+    public void testShort() throws IOException {
+        short value = randomShort();
+        shortTest().test(randomBoolean() ? Short.toString(value) : value, (int) value);
+    }
+
+    public void testShortWithDecimalParts() throws IOException {
+        double value = randomDoubleBetween(Short.MIN_VALUE, Short.MAX_VALUE, true);
+        shortTest().test(randomBoolean() ? Double.toString(value) : value, (int) value);
+    }
+
+    public void testShortMalformed() throws IOException {
+        shortTest().forceIgnoreMalformed().test(randomAlphaOfLength(5), null);
+    }
+
+    private Test shortTest() {
+        return new Test("short").expectedType("integer").randomIgnoreMalformedUnlessSynthetic().randomDocValuesUnlessSynthetic();
+    }
+
+    public void testByte() throws IOException {
+        byte value = randomByte();
+        byteTest().test(Byte.toString(value), (int) value);
+    }
+
+    public void testByteWithDecimalParts() throws IOException {
+        double value = randomDoubleBetween(Byte.MIN_VALUE, Byte.MAX_VALUE, true);
+        byteTest().test(randomBoolean() ? Double.toString(value) : value, (int) value);
+    }
+
+    public void testByteMalformed() throws IOException {
+        byteTest().forceIgnoreMalformed().test(randomAlphaOfLength(5), null);
+    }
+
+    private Test byteTest() {
+        return new Test("byte").expectedType("integer").randomIgnoreMalformedUnlessSynthetic().randomDocValuesUnlessSynthetic();
+    }
+
+    public void testUnsignedLong() throws IOException {
+        assumeTrue(
+            "order of fields in error message inconsistent before 8.14",
+            getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
+        );
+        BigInteger value = randomUnsignedLong();
+        new Test("unsigned_long").randomIgnoreMalformedUnlessSynthetic()
+            .randomDocValuesUnlessSynthetic()
+            .test(
+                randomBoolean() ? value.toString() : value,
+                value.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) <= 0 ? value.longValue() : value
+            );
+    }
+
+    public void testUnsignedLongMalformed() throws IOException {
+        new Test("unsigned_long").forceIgnoreMalformed().randomDocValuesUnlessSynthetic().test(randomAlphaOfLength(5), null);
+    }
+
+    public void testDouble() throws IOException {
+        double value = randomDouble();
+        new Test("double").randomIgnoreMalformedUnlessSynthetic()
+            .randomDocValuesUnlessSynthetic()
+            .test(randomBoolean() ? Double.toString(value) : value, value);
+    }
+
+    public void testFloat() throws IOException {
+        float value = randomFloat();
+        new Test("float").expectedType("double")
+            .randomIgnoreMalformedUnlessSynthetic()
+            .randomDocValuesUnlessSynthetic()
+            .test(randomBoolean() ? Float.toString(value) : value, (double) value);
+    }
+
+    public void testScaledFloat() throws IOException {
+        double value = randomBoolean() ? randomDoubleBetween(-Double.MAX_VALUE, Double.MAX_VALUE, true) : randomFloat();
+        double scalingFactor = randomDoubleBetween(0, Double.MAX_VALUE, false);
+        new Test("scaled_float").expectedType("double")
+            .randomIgnoreMalformedUnlessSynthetic()
+            .randomDocValuesUnlessSynthetic()
+            .scalingFactor(scalingFactor)
+            .test(randomBoolean() ? Double.toString(value) : value, scaledFloatMatcher(scalingFactor, value));
+    }
+
+    private Matcher<Double> scaledFloatMatcher(double scalingFactor, double d) {
+        long encoded = Math.round(d * scalingFactor);
+        double decoded = encoded / scalingFactor;
+        return closeTo(decoded, Math.ulp(decoded));
+    }
+
+    public void testBoolean() throws IOException {
+        boolean value = randomBoolean();
+        new Test("boolean").ignoreMalformed(randomBoolean())
+            .randomDocValuesUnlessSynthetic()
+            .test(randomBoolean() ? Boolean.toString(value) : value, value);
+    }
+
+    public void testIp() throws IOException {
+        ipTest().test(NetworkAddress.format(randomIp(randomBoolean())));
+    }
+
+    private Test ipTest() {
+        return new Test("ip").ignoreMalformed(randomBoolean());
+    }
+
+    public void testVersionField() throws IOException {
+        new Test("version").test(randomVersionString());
+    }
+
+    public void testGeoPoint() throws IOException {
+        assumeTrue(
+            "not supported until 8.13",
+            getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_13_0))
+        );
+        new Test("geo_point")
+            // TODO we should support loading geo_point from doc values if source isn't enabled
+            .sourceMode(randomValueOtherThanMany(s -> s.stored() == false, () -> randomFrom(SourceMode.values())))
+            .ignoreMalformed(randomBoolean())
+            .storeAndDocValues(randomBoolean(), randomBoolean())
+            .test(GeometryTestUtils.randomPoint(false).toString());
+    }
+
+    public void testGeoShape() throws IOException {
+        assumeTrue(
+            "not supported until 8.13",
+            getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_13_0))
+        );
+        new Test("geo_shape")
+            // TODO if source isn't enabled how can we load *something*? It's just triangles, right?
+            .sourceMode(randomValueOtherThanMany(s -> s.stored() == false, () -> randomFrom(SourceMode.values())))
+            .ignoreMalformed(randomBoolean())
+            .storeAndDocValues(randomBoolean(), randomBoolean())
+            // TODO pick supported random shapes
+            .test(GeometryTestUtils.randomPoint(false).toString());
+    }
+
+    public void testAliasToKeyword() throws IOException {
+        keywordTest().createAlias().test(randomAlphaOfLength(20));
+    }
+
+    public void testAliasToText() throws IOException {
+        textTest().createAlias().test(randomAlphaOfLength(20));
+    }
+
+    public void testAliasToInt() throws IOException {
+        intTest().createAlias().test(randomInt());
+    }
+
+    public void testFlattenedUnsupported() throws IOException {
+        new Test("flattened").createIndex("test", "flattened");
+        index("test", """
+            {"flattened": {"a": "foo"}}""");
+        Map<String, Object> result = runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | LIMIT 2"));
+
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("flattened", "unsupported")))
+                .entry("values", List.of(matchesList().item(null)))
+        );
+    }
+
+    public void testEmptyMapping() throws IOException {
+        createIndex("test", index -> {});
+        index("test", """
+            {}""");
+
+        ResponseException e = expectThrows(
+            ResponseException.class,
+            () -> runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | SORT missing | LIMIT 3"))
+        );
+        String err = EntityUtils.toString(e.getResponse().getEntity());
+        assertThat(err, containsString("Unknown column [missing]"));
+
+        // TODO this is broken in main too
+        // Map<String, Object> result = runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | LIMIT 2"));
+        // assertMap(
+        // result,
+        // matchesMap().entry("columns", List.of(columnInfo("f", "unsupported"), columnInfo("f.raw", "unsupported")))
+        // .entry("values", List.of(matchesList().item(null).item(null)))
+        // );
+    }
+
+    /**
+     * <pre>
+     * "text_field": {
+     *   "type": "text",
+     *   "fields": {
+     *     "raw": {
+     *       "type": "keyword",
+     *       "ignore_above": 10
+     *     }
+     *   }
+     * }
+     * </pre>
+     */
+    public void testTextFieldWithKeywordSubfield() throws IOException {
+        String value = randomAlphaOfLength(20);
+        Map<String, Object> result = new Test("text").storeAndDocValues(randomBoolean(), null).sub("raw", keywordTest()).roundTrip(value);
+
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("text_field", "text"), columnInfo("text_field.raw", "keyword")))
+                .entry("values", List.of(matchesList().item(value).item(value)))
+        );
+    }
+
+    /**
+     * <pre>
+     * "text_field": {
+     *   "type": "text",
+     *   "fields": {
+     *     "int": {
+     *       "type": "integer",
+     *       "ignore_malformed": true/false
+     *     }
+     *   }
+     * }
+     * </pre>
+     */
+    public void testTextFieldWithIntegerSubfield() throws IOException {
+        int value = randomInt();
+        Map<String, Object> result = textTest().sub("int", intTest()).roundTrip(value);
+
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("text_field", "text"), columnInfo("text_field.int", "integer")))
+                .entry("values", List.of(matchesList().item(Integer.toString(value)).item(value)))
+        );
+    }
+
+    /**
+     * <pre>
+     * "text_field": {
+     *   "type": "text",
+     *   "fields": {
+     *     "int": {
+     *       "type": "integer",
+     *       "ignore_malformed": true
+     *     }
+     *   }
+     * }
+     * </pre>
+     */
+    public void testTextFieldWithIntegerSubfieldMalformed() throws IOException {
+        String value = randomAlphaOfLength(5);
+        Map<String, Object> result = textTest().sourceMode(SourceMode.DEFAULT).sub("int", intTest().ignoreMalformed(true)).roundTrip(value);
+
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("text_field", "text"), columnInfo("text_field.int", "integer")))
+                .entry("values", List.of(matchesList().item(value).item(null)))
+        );
+    }
+
+    /**
+     * <pre>
+     * "text_field": {
+     *   "type": "text",
+     *   "fields": {
+     *     "ip": {
+     *       "type": "ip",
+     *       "ignore_malformed": true/false
+     *     }
+     *   }
+     * }
+     * </pre>
+     */
+    public void testTextFieldWithIpSubfield() throws IOException {
+        String value = NetworkAddress.format(randomIp(randomBoolean()));
+        Map<String, Object> result = textTest().sub("ip", ipTest()).roundTrip(value);
+
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("text_field", "text"), columnInfo("text_field.ip", "ip")))
+                .entry("values", List.of(matchesList().item(value).item(value)))
+        );
+    }
+
+    /**
+     * <pre>
+     * "text_field": {
+     *   "type": "text",
+     *   "fields": {
+     *     "ip": {
+     *       "type": "ip",
+     *       "ignore_malformed": true
+     *     }
+     *   }
+     * }
+     * </pre>
+     */
+    public void testTextFieldWithIpSubfieldMalformed() throws IOException {
+        String value = randomAlphaOfLength(10);
+        Map<String, Object> result = textTest().sourceMode(SourceMode.DEFAULT).sub("ip", ipTest().ignoreMalformed(true)).roundTrip(value);
+
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("text_field", "text"), columnInfo("text_field.ip", "ip")))
+                .entry("values", List.of(matchesList().item(value).item(null)))
+        );
+    }
+
+    /**
+     * <pre>
+     * "integer_field": {
+     *   "type": "integer",
+     *   "ignore_malformed": true/false,
+     *   "fields": {
+     *     "str": {
+     *       "type": "text/keyword"
+     *     }
+     *   }
+     * }
+     * </pre>
+     */
+    public void testIntFieldWithTextOrKeywordSubfield() throws IOException {
+        int value = randomInt();
+        boolean text = randomBoolean();
+        Map<String, Object> result = intTest().sub("str", text ? textTest() : keywordTest()).roundTrip(value);
+
+        assertMap(
+            result,
+            matchesMap().entry(
+                "columns",
+                List.of(columnInfo("integer_field", "integer"), columnInfo("integer_field.str", text ? "text" : "keyword"))
+            ).entry("values", List.of(matchesList().item(value).item(Integer.toString(value))))
+        );
+    }
+
+    /**
+     * <pre>
+     * "integer_field": {
+     *   "type": "integer",
+     *   "ignore_malformed": true,
+     *   "fields": {
+     *     "str": {
+     *       "type": "text/keyword"
+     *     }
+     *   }
+     * }
+     * </pre>
+     */
+    public void testIntFieldWithTextOrKeywordSubfieldMalformed() throws IOException {
+        String value = randomAlphaOfLength(5);
+        boolean text = randomBoolean();
+        Map<String, Object> result = intTest().forceIgnoreMalformed().sub("str", text ? textTest() : keywordTest()).roundTrip(value);
+
+        assertMap(
+            result,
+            matchesMap().entry(
+                "columns",
+                List.of(columnInfo("integer_field", "integer"), columnInfo("integer_field.str", text ? "text" : "keyword"))
+            ).entry("values", List.of(matchesList().item(null).item(value)))
+        );
+    }
+
+    /**
+     * <pre>
+     * "ip_field": {
+     *   "type": "ip",
+     *   "ignore_malformed": true/false,
+     *   "fields": {
+     *     "str": {
+     *       "type": "text/keyword"
+     *     }
+     *   }
+     * }
+     * </pre>
+     */
+    public void testIpFieldWithTextOrKeywordSubfield() throws IOException {
+        String value = NetworkAddress.format(randomIp(randomBoolean()));
+        boolean text = randomBoolean();
+        Map<String, Object> result = ipTest().sub("str", text ? textTest() : keywordTest()).roundTrip(value);
+
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("ip_field", "ip"), columnInfo("ip_field.str", text ? "text" : "keyword")))
+                .entry("values", List.of(matchesList().item(value).item(value)))
+        );
+    }
+
+    /**
+     * <pre>
+     * "ip_field": {
+     *   "type": "ip",
+     *   "ignore_malformed": true,
+     *   "fields": {
+     *     "str": {
+     *       "type": "text/keyword"
+     *     }
+     *   }
+     * }
+     * </pre>
+     */
+    public void testIpFieldWithTextOrKeywordSubfieldMalformed() throws IOException {
+        String value = randomAlphaOfLength(5);
+        boolean text = randomBoolean();
+        Map<String, Object> result = ipTest().forceIgnoreMalformed().sub("str", text ? textTest() : keywordTest()).roundTrip(value);
+
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("ip_field", "ip"), columnInfo("ip_field.str", text ? "text" : "keyword")))
+                .entry("values", List.of(matchesList().item(null).item(value)))
+        );
+    }
+
+    /**
+     * <pre>
+     * "integer_field": {
+     *   "type": "ip",
+     *   "ignore_malformed": true/false,
+     *   "fields": {
+     *     "byte": {
+     *       "type": "byte",
+     *       "ignore_malformed": true/false
+     *     }
+     *   }
+     * }
+     * </pre>
+     */
+    public void testIntFieldWithByteSubfield() throws IOException {
+        byte value = randomByte();
+        Map<String, Object> result = intTest().sub("byte", byteTest()).roundTrip(value);
+
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("integer_field", "integer"), columnInfo("integer_field.byte", "integer")))
+                .entry("values", List.of(matchesList().item((int) value).item((int) value)))
+        );
+    }
+
+    /**
+     * <pre>
+     * "integer_field": {
+     *   "type": "integer",
+     *   "ignore_malformed": true/false,
+     *   "fields": {
+     *     "byte": {
+     *       "type": "byte",
+     *       "ignore_malformed": true
+     *     }
+     *   }
+     * }
+     * </pre>
+     */
+    public void testIntFieldWithByteSubfieldTooBig() throws IOException {
+        int value = randomValueOtherThanMany((Integer v) -> (Byte.MIN_VALUE <= v) && (v <= Byte.MAX_VALUE), ESTestCase::randomInt);
+        Map<String, Object> result = intTest().sourceMode(SourceMode.DEFAULT)
+            .sub("byte", byteTest().ignoreMalformed(true))
+            .roundTrip(value);
+
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("integer_field", "integer"), columnInfo("integer_field.byte", "integer")))
+                .entry("values", List.of(matchesList().item(value).item(null)))
+        );
+    }
+
+    /**
+     * <pre>
+     * "byte_field": {
+     *   "type": "byte",
+     *   "ignore_malformed": true/false,
+     *   "fields": {
+     *     "int": {
+     *       "type": "int",
+     *       "ignore_malformed": true/false
+     *     }
+     *   }
+     * }
+     * </pre>
+     */
+    public void testByteFieldWithIntSubfield() throws IOException {
+        byte value = randomByte();
+        Map<String, Object> result = byteTest().sub("int", intTest()).roundTrip(value);
+
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("byte_field", "integer"), columnInfo("byte_field.int", "integer")))
+                .entry("values", List.of(matchesList().item((int) value).item((int) value)))
+        );
+    }
+
+    /**
+     * <pre>
+     * "byte_field": {
+     *   "type": "byte",
+     *   "ignore_malformed": true,
+     *   "fields": {
+     *     "int": {
+     *       "type": "int",
+     *       "ignore_malformed": true/false
+     *     }
+     *   }
+     * }
+     * </pre>
+     */
+    public void testByteFieldWithIntSubfieldTooBig() throws IOException {
+        int value = randomValueOtherThanMany((Integer v) -> (Byte.MIN_VALUE <= v) && (v <= Byte.MAX_VALUE), ESTestCase::randomInt);
+        Map<String, Object> result = byteTest().forceIgnoreMalformed().sub("int", intTest()).roundTrip(value);
+
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("byte_field", "integer"), columnInfo("byte_field.int", "integer")))
+                .entry("values", List.of(matchesList().item(null).item(value)))
+        );
+    }
+
+    /**
+     * Two indices, one with:
+     * <pre>
+     * "f": {
+     *     "type": "keyword"
+     * }
+     * </pre>
+     * and the other with
+     * <pre>
+     * "f": {
+     *     "type": "long"
+     * }
+     * </pre>.
+     */
+    public void testIncompatibleTypes() throws IOException {
+        keywordTest().createIndex("test1", "f");
+        index("test1", """
+            {"f": "f1"}""");
+        longTest().createIndex("test2", "f");
+        index("test2", """
+            {"f": 1}""");
+
+        Map<String, Object> result = runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test*"));
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("f", "unsupported")))
+                .entry("values", List.of(matchesList().item(null), matchesList().item(null)))
+        );
+        ResponseException e = expectThrows(
+            ResponseException.class,
+            () -> runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | SORT f | LIMIT 3"))
+        );
+        String err = EntityUtils.toString(e.getResponse().getEntity());
+        assertThat(
+            deyaml(err),
+            containsString(
+                "Cannot use field [f] due to ambiguities being mapped as [2] incompatible types: [keyword] in [test1], [long] in [test2]"
+            )
+        );
+    }
+
+    /**
+     * Two indices, one with:
+     * <pre>
+     * "file": {
+     *     "type": "keyword"
+     * }
+     * </pre>
+     * and the other with
+     * <pre>
+     * "other_file": {
+     *     "type": "keyword"
+     * }
+     * </pre>.
+     */
+    public void testDistinctInEachIndex() throws IOException {
+        keywordTest().createIndex("test1", "file");
+        index("test1", """
+            {"file": "f1"}""");
+        keywordTest().createIndex("test2", "other");
+        index("test2", """
+            {"other": "o2"}""");
+
+        Map<String, Object> result = runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | SORT file, other"));
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("file", "keyword"), columnInfo("other", "keyword")))
+                .entry("values", List.of(matchesList().item("f1").item(null), matchesList().item(null).item("o2")))
+        );
+    }
+
+    /**
+     * Two indices, one with:
+     * <pre>
+     * "file": {
+     *    "type": "keyword"
+     * }
+     * </pre>
+     * and the other with
+     * <pre>
+     * "file": {
+     *    "type": "object",
+     *    "properties": {
+     *       "raw": {
+     *          "type": "keyword"
+     *       }
+     *    }
+     * }
+     * </pre>.
+     */
+    public void testMergeKeywordAndObject() throws IOException {
+        assumeTrue(
+            "order of fields in error message inconsistent before 8.14",
+            getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
+        );
+        keywordTest().createIndex("test1", "file");
+        index("test1", """
+            {"file": "f1"}""");
+        createIndex("test2", index -> {
+            index.startObject("properties");
+            {
+                index.startObject("file");
+                {
+                    index.field("type", "object");
+                    index.startObject("properties");
+                    {
+                        index.startObject("raw").field("type", "keyword").endObject();
+                    }
+                    index.endObject();
+                }
+                index.endObject();
+            }
+            index.endObject();
+        });
+        index("test2", """
+            {"file": {"raw": "o2"}}""");
+
+        ResponseException e = expectThrows(
+            ResponseException.class,
+            () -> runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | SORT file, file.raw | LIMIT 3"))
+        );
+        String err = EntityUtils.toString(e.getResponse().getEntity());
+        assertThat(
+            deyaml(err),
+            containsString(
+                "Cannot use field [file] due to ambiguities"
+                    + " being mapped as [2] incompatible types: [keyword] in [test1], [object] in [test2]"
+            )
+        );
+
+        Map<String, Object> result = runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | SORT file.raw | LIMIT 2"));
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("file", "unsupported"), columnInfo("file.raw", "keyword")))
+                .entry("values", List.of(matchesList().item(null).item("o2"), matchesList().item(null).item(null)))
+        );
+    }
+
+    /**
+     * One index with an unsupported field and a supported sub-field. The supported sub-field
+     * is marked as unsupported <strong>because</strong> the parent is unsupported. Mapping:
+     * <pre>
+     * "f": {
+     *    "type": "ip_range"  ----- The type here doesn't matter, but it has to be one we don't support
+     *    "fields": {
+     *       "raw": {
+     *          "type": "keyword"
+     *       }
+     *    }
+     * }
+     * </pre>.
+     */
+    public void testPropagateUnsupportedToSubFields() throws IOException {
+        createIndex("test", index -> {
+            index.startObject("properties");
+            index.startObject("f");
+            {
+                index.field("type", "ip_range");
+                index.startObject("fields");
+                {
+                    index.startObject("raw").field("type", "keyword").endObject();
+                }
+                index.endObject();
+            }
+            index.endObject();
+            index.endObject();
+        });
+        index("test", """
+            {"f": "192.168.0.1/24"}""");
+
+        ResponseException e = expectThrows(
+            ResponseException.class,
+            () -> runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | SORT f, f.raw | LIMIT 3"))
+        );
+        String err = EntityUtils.toString(e.getResponse().getEntity());
+        assertThat(err, containsString("Cannot use field [f] with unsupported type [ip_range]"));
+        assertThat(err, containsString("Cannot use field [f.raw] with unsupported type [ip_range]"));
+
+        Map<String, Object> result = runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | LIMIT 2"));
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("f", "unsupported"), columnInfo("f.raw", "unsupported")))
+                .entry("values", List.of(matchesList().item(null).item(null)))
+        );
+    }
+
+    /**
+     * Two indices, one with:
+     * <pre>
+     * "f": {
+     *    "type": "ip_range"  ----- The type here doesn't matter, but it has to be one we don't support
+     * }
+     * </pre>
+     * and the other with
+     * <pre>
+     * "f": {
+     *    "type": "object",
+     *    "properties": {
+     *       "raw": {
+     *          "type": "keyword"
+     *       }
+     *    }
+     * }
+     * </pre>.
+     */
+    public void testMergeUnsupportedAndObject() throws IOException {
+        assumeTrue(
+            "order of fields in error message inconsistent before 8.14",
+            getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
+        );
+        createIndex("test1", index -> {
+            index.startObject("properties");
+            index.startObject("f").field("type", "ip_range").endObject();
+            index.endObject();
+        });
+        index("test1", """
+            {"f": "192.168.0.1/24"}""");
+        createIndex("test2", index -> {
+            index.startObject("properties");
+            {
+                index.startObject("f");
+                {
+                    index.field("type", "object");
+                    index.startObject("properties");
+                    {
+                        index.startObject("raw").field("type", "keyword").endObject();
+                    }
+                    index.endObject();
+                }
+                index.endObject();
+            }
+            index.endObject();
+        });
+        index("test2", """
+            {"f": {"raw": "o2"}}""");
+
+        ResponseException e = expectThrows(
+            ResponseException.class,
+            () -> runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | SORT f, f.raw | LIMIT 3"))
+        );
+        String err = EntityUtils.toString(e.getResponse().getEntity());
+        assertThat(err, containsString("Cannot use field [f] with unsupported type [ip_range]"));
+        assertThat(err, containsString("Cannot use field [f.raw] with unsupported type [ip_range]"));
+
+        Map<String, Object> result = runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | LIMIT 2"));
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("f", "unsupported"), columnInfo("f.raw", "unsupported")))
+                .entry("values", List.of(matchesList().item(null).item(null), matchesList().item(null).item(null)))
+        );
+    }
+
+    /**
+     * Two indices, one with:
+     * <pre>
+     * "emp_no": {
+     *     "type": "integer"
+     * }
+     * </pre>
+     * and the other with
+     * <pre>
+     * "emp_no": {
+     *     "type": "integer",
+     *     "doc_values": false
+     * }
+     * </pre>.
+     */
+    public void testIntegerDocValuesConflict() throws IOException {
+        assumeTrue(
+            "order of fields in error message inconsistent before 8.14",
+            getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
+        );
+        intTest().sourceMode(SourceMode.DEFAULT).storeAndDocValues(null, true).createIndex("test1", "emp_no");
+        index("test1", """
+            {"emp_no": 1}""");
+        intTest().sourceMode(SourceMode.DEFAULT).storeAndDocValues(null, false).createIndex("test2", "emp_no");
+        index("test2", """
+            {"emp_no": 2}""");
+
+        Map<String, Object> result = runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | SORT emp_no | LIMIT 2"));
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("emp_no", "integer")))
+                .entry("values", List.of(matchesList().item(1), matchesList().item(2)))
+        );
+    }
+
+    /**
+     * Two indices, one with:
+     * <pre>
+     * "emp_no": {
+     *     "type": "long"
+     * }
+     * </pre>
+     * and the other with
+     * <pre>
+     * "emp_no": {
+     *     "type": "integer"
+     * }
+     * </pre>.
+     *
+     * In an ideal world we'd promote the {@code integer} to an {@code long} and just go.
+     */
+    public void testLongIntegerConflict() throws IOException {
+        assumeTrue(
+            "order of fields in error message inconsistent before 8.14",
+            getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
+        );
+        longTest().sourceMode(SourceMode.DEFAULT).createIndex("test1", "emp_no");
+        index("test1", """
+            {"emp_no": 1}""");
+        intTest().sourceMode(SourceMode.DEFAULT).createIndex("test2", "emp_no");
+        index("test2", """
+            {"emp_no": 2}""");
+
+        ResponseException e = expectThrows(
+            ResponseException.class,
+            () -> runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | SORT emp_no | LIMIT 3"))
+        );
+        String err = EntityUtils.toString(e.getResponse().getEntity());
+        assertThat(
+            deyaml(err),
+            containsString(
+                "Cannot use field [emp_no] due to ambiguities being "
+                    + "mapped as [2] incompatible types: [integer] in [test2], [long] in [test1]"
+            )
+        );
+
+        Map<String, Object> result = runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | LIMIT 2"));
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("emp_no", "unsupported")))
+                .entry("values", List.of(matchesList().item(null), matchesList().item(null)))
+        );
+    }
+
+    /**
+     * Two indices, one with:
+     * <pre>
+     * "emp_no": {
+     *     "type": "integer"
+     * }
+     * </pre>
+     * and the other with
+     * <pre>
+     * "emp_no": {
+     *     "type": "short"
+     * }
+     * </pre>.
+     *
+     * In an ideal world we'd promote the {@code short} to an {@code integer} and just go.
+     */
+    public void testIntegerShortConflict() throws IOException {
+        assumeTrue(
+            "order of fields in error message inconsistent before 8.14",
+            getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
+        );
+        intTest().sourceMode(SourceMode.DEFAULT).createIndex("test1", "emp_no");
+        index("test1", """
+            {"emp_no": 1}""");
+        shortTest().sourceMode(SourceMode.DEFAULT).createIndex("test2", "emp_no");
+        index("test2", """
+            {"emp_no": 2}""");
+
+        ResponseException e = expectThrows(
+            ResponseException.class,
+            () -> runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | SORT emp_no | LIMIT 3"))
+        );
+        String err = EntityUtils.toString(e.getResponse().getEntity());
+        assertThat(
+            deyaml(err),
+            containsString(
+                "Cannot use field [emp_no] due to ambiguities being "
+                    + "mapped as [2] incompatible types: [integer] in [test1], [short] in [test2]"
+            )
+        );
+
+        Map<String, Object> result = runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | LIMIT 2"));
+        assertMap(
+            result,
+            matchesMap().entry("columns", List.of(columnInfo("emp_no", "unsupported")))
+                .entry("values", List.of(matchesList().item(null), matchesList().item(null)))
+        );
+    }
+
+    /**
+     * Two indices, one with:
+     * <pre>
+     * "foo": {
+     *   "type": "object",
+     *   "properties": {
+     *     "emp_no": {
+     *       "type": "integer"
+     *     }
+     * }
+     * </pre>
+     * and the other with
+     * <pre>
+     * "foo": {
+     *   "type": "object",
+     *   "properties": {
+     *     "emp_no": {
+     *       "type": "keyword"
+     *     }
+     * }
+     * </pre>.
+     */
+    public void testTypeConflictInObject() throws IOException {
+        assumeTrue(
+            "order of fields in error message inconsistent before 8.14",
+            getCachedNodesVersions().stream().allMatch(v -> Version.fromString(v).onOrAfter(Version.V_8_14_0))
+        );
+        createIndex("test1", empNoInObject("integer"));
+        index("test1", """
+            {"foo": {"emp_no": 1}}""");
+        createIndex("test2", empNoInObject("keyword"));
+        index("test2", """
+            {"foo": {"emp_no": "cat"}}""");
+
+        Map<String, Object> result = runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | LIMIT 3"));
+        assertMap(result, matchesMap().entry("columns", List.of(columnInfo("foo.emp_no", "unsupported"))).extraOk());
+
+        ResponseException e = expectThrows(
+            ResponseException.class,
+            () -> runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | SORT foo.emp_no | LIMIT 3"))
+        );
+        String err = EntityUtils.toString(e.getResponse().getEntity());
+        assertThat(
+            deyaml(err),
+            containsString(
+                "Cannot use field [foo.emp_no] due to ambiguities being "
+                    + "mapped as [2] incompatible types: [integer] in [test1], [keyword] in [test2]"
+            )
+        );
+    }
+
+    private CheckedConsumer<XContentBuilder, IOException> empNoInObject(String empNoType) {
+        return index -> {
+            index.startObject("properties");
+            {
+                index.startObject("foo");
+                {
+                    index.field("type", "object");
+                    index.startObject("properties");
+                    {
+                        index.startObject("emp_no").field("type", empNoType).endObject();
+                    }
+                    index.endObject();
+                }
+                index.endObject();
+            }
+            index.endObject();
+        };
+    }
+
+    private enum SourceMode {
+        DEFAULT {
+            @Override
+            void sourceMapping(XContentBuilder builder) {}
+
+            @Override
+            boolean stored() {
+                return true;
+            }
+        },
+        STORED {
+            @Override
+            void sourceMapping(XContentBuilder builder) throws IOException {
+                builder.startObject(SourceFieldMapper.NAME).field("mode", "stored").endObject();
+            }
+
+            @Override
+            boolean stored() {
+                return true;
+            }
+        },
+        /* TODO add support to this test for disabling _source
+        DISABLED {
+            @Override
+            void sourceMapping(XContentBuilder builder) throws IOException {
+                builder.startObject(SourceFieldMapper.NAME).field("mode", "disabled").endObject();
+            }
+
+            @Override
+            boolean stored() {
+                return false;
+            }
+        },
+         */
+        SYNTHETIC {
+            @Override
+            void sourceMapping(XContentBuilder builder) throws IOException {
+                builder.startObject(SourceFieldMapper.NAME).field("mode", "synthetic").endObject();
+            }
+
+            @Override
+            boolean stored() {
+                return false;
+            }
+        };
+
+        abstract void sourceMapping(XContentBuilder builder) throws IOException;
+
+        abstract boolean stored();
+    }
+
+    private boolean ignoredByIgnoreAbove(Integer ignoreAbove, int length) {
+        return ignoreAbove != null && length > ignoreAbove;
+    }
+
+    private BigInteger randomUnsignedLong() {
+        BigInteger big = BigInteger.valueOf(randomNonNegativeLong()).shiftLeft(1);
+        return big.add(randomBoolean() ? BigInteger.ONE : BigInteger.ZERO);
+    }
+
+    private static String randomVersionString() {
+        return randomVersionNumber() + (randomBoolean() ? "" : randomPrerelease());
+    }
+
+    private static String randomVersionNumber() {
+        int numbers = between(1, 3);
+        String v = Integer.toString(between(0, 100));
+        for (int i = 1; i < numbers; i++) {
+            v += "." + between(0, 100);
+        }
+        return v;
+    }
+
+    private static String randomPrerelease() {
+        if (rarely()) {
+            return randomFrom("alpha", "beta", "prerelease", "whatever");
+        }
+        return randomFrom("alpha", "beta", "") + randomVersionNumber();
+    }
+
+    private record StoreAndDocValues(Boolean store, Boolean docValues) {}
+
+    private static class Test {
+        private final String type;
+        private final Map<String, Test> subFields = new TreeMap<>();
+
+        private SourceMode sourceMode;
+        private String expectedType;
+        private Function<SourceMode, Boolean> ignoreMalformed;
+        private Function<SourceMode, StoreAndDocValues> storeAndDocValues = s -> new StoreAndDocValues(null, null);
+        private Double scalingFactor;
+        private Integer ignoreAbove;
+        private Object value;
+        private boolean createAlias;
+
+        Test(String type) {
+            this.type = type;
+            // Default the expected return type to the field type.
+            this.expectedType = type;
+        }
+
+        Test sourceMode(SourceMode sourceMode) {
+            this.sourceMode = sourceMode;
+            return this;
+        }
+
+        Test expectedType(String expectedType) {
+            this.expectedType = expectedType;
+            return this;
+        }
+
+        Test ignoreMalformed(boolean ignoreMalformed) {
+            this.ignoreMalformed = s -> ignoreMalformed;
+            return this;
+        }
+
+        /**
+         * Enable {@code ignore_malformed} and disable synthetic _source because
+         * most fields don't support ignore_malformed and synthetic _source.
+         */
+        Test forceIgnoreMalformed() {
+            return this.sourceMode(randomValueOtherThan(SourceMode.SYNTHETIC, () -> randomFrom(SourceMode.values()))).ignoreMalformed(true);
+        }
+
+        Test randomIgnoreMalformedUnlessSynthetic() {
+            this.ignoreMalformed = s -> s == SourceMode.SYNTHETIC ? false : randomBoolean();
+            return this;
+        }
+
+        Test storeAndDocValues(Boolean store, Boolean docValues) {
+            this.storeAndDocValues = s -> new StoreAndDocValues(store, docValues);
+            return this;
+        }
+
+        Test randomStoreUnlessSynthetic() {
+            this.storeAndDocValues = s -> new StoreAndDocValues(s == SourceMode.SYNTHETIC ? true : randomBoolean(), null);
+            return this;
+        }
+
+        Test randomDocValuesAndStoreUnlessSynthetic() {
+            this.storeAndDocValues = s -> {
+                if (s == SourceMode.SYNTHETIC) {
+                    boolean store = randomBoolean();
+                    return new StoreAndDocValues(store, store == false || randomBoolean());
+                }
+                return new StoreAndDocValues(randomBoolean(), randomBoolean());
+            };
+            return this;
+        }
+
+        Test randomDocValuesUnlessSynthetic() {
+            this.storeAndDocValues = s -> new StoreAndDocValues(null, s == SourceMode.SYNTHETIC || randomBoolean());
+            return this;
+        }
+
+        Test scalingFactor(double scalingFactor) {
+            this.scalingFactor = scalingFactor;
+            return this;
+        }
+
+        Test ignoreAbove(Integer ignoreAbove) {
+            this.ignoreAbove = ignoreAbove;
+            return this;
+        }
+
+        Test value(Object value) {
+            this.value = value;
+            return this;
+        }
+
+        Test createAlias() {
+            this.createAlias = true;
+            return this;
+        }
+
+        Test sub(String name, Test sub) {
+            this.subFields.put(name, sub);
+            return this;
+        }
+
+        Map<String, Object> roundTrip(Object value) throws IOException {
+            String fieldName = type + "_field";
+            createIndex("test", fieldName);
+            if (randomBoolean()) {
+                createIndex("test2", fieldName);
+            }
+
+            if (value == null) {
+                logger.info("indexing empty doc");
+                index("test", "{}");
+            } else {
+                logger.info("indexing {}::{}", value, value.getClass().getName());
+                index("test", Strings.toString(JsonXContent.contentBuilder().startObject().field(fieldName, value).endObject()));
+            }
+
+            return fetchAll();
+        }
+
+        void test(Object value) throws IOException {
+            test(value, value);
+        }
+
+        /**
+         * Round trip the value through and index configured by the parameters
+         * of this test and assert that it matches the {@code expectedValues}
+         * which can be either the expected value or a subclass of {@link Matcher}.
+         */
+        void test(Object value, Object expectedValue) throws IOException {
+            Map<String, Object> result = roundTrip(value);
+
+            logger.info("expecting {}", expectedValue == null ? null : expectedValue + "::" + expectedValue.getClass().getName());
+
+            List<Map<String, Object>> columns = new ArrayList<>();
+            columns.add(columnInfo(type + "_field", expectedType));
+            if (createAlias) {
+                columns.add(columnInfo("a.b.c." + type + "_field_alias", expectedType));
+                columns.add(columnInfo(type + "_field_alias", expectedType));
+            }
+            Collections.sort(columns, Comparator.comparing(m -> (String) m.get("name")));
+
+            ListMatcher values = matchesList();
+            values = values.item(expectedValue);
+            if (createAlias) {
+                values = values.item(expectedValue);
+                values = values.item(expectedValue);
+            }
+
+            assertMap(result, matchesMap().entry("columns", columns).entry("values", List.of(values)));
+        }
+
+        void createIndex(String name, String fieldName) throws IOException {
+            if (sourceMode == null) {
+                sourceMode(randomFrom(SourceMode.values()));
+            }
+            logger.info("source_mode: {}", sourceMode);
+
+            FieldExtractorTestCase.createIndex(name, index -> {
+                sourceMode.sourceMapping(index);
+                index.startObject("properties");
+                {
+                    index.startObject(fieldName);
+                    fieldMapping(index);
+                    index.endObject();
+
+                    if (createAlias) {
+                        // create two aliases - one within a hierarchy, the other just a simple field w/o hierarchy
+                        index.startObject(fieldName + "_alias");
+                        {
+                            index.field("type", "alias");
+                            index.field("path", fieldName);
+                        }
+                        index.endObject();
+                        index.startObject("a.b.c." + fieldName + "_alias");
+                        {
+                            index.field("type", "alias");
+                            index.field("path", fieldName);
+                        }
+                        index.endObject();
+                    }
+                }
+                index.endObject();
+            });
+        }
+
+        private void fieldMapping(XContentBuilder builder) throws IOException {
+            builder.field("type", type);
+            if (ignoreMalformed != null) {
+                boolean v = ignoreMalformed.apply(sourceMode);
+                builder.field("ignore_malformed", v);
+                ignoreMalformed = m -> v;
+            }
+            StoreAndDocValues sd = storeAndDocValues.apply(sourceMode);
+            storeAndDocValues = m -> sd;
+            if (sd.docValues != null) {
+                builder.field("doc_values", sd.docValues);
+            }
+            if (sd.store != null) {
+                builder.field("store", sd.store);
+            }
+            if (scalingFactor != null) {
+                builder.field("scaling_factor", scalingFactor);
+            }
+            if (ignoreAbove != null) {
+                builder.field("ignore_above", ignoreAbove);
+            }
+            if (value != null) {
+                builder.field("value", value);
+            }
+
+            if (subFields.isEmpty() == false) {
+                builder.startObject("fields");
+                for (Map.Entry<String, Test> sub : subFields.entrySet()) {
+                    builder.startObject(sub.getKey());
+                    if (sub.getValue().sourceMode == null) {
+                        sub.getValue().sourceMode = sourceMode;
+                    } else if (sub.getValue().sourceMode != sourceMode) {
+                        throw new IllegalStateException("source_mode can't be configured on sub-fields");
+                    }
+                    sub.getValue().fieldMapping(builder);
+                    builder.endObject();
+                }
+                builder.endObject();
+            }
+        }
+
+        private Map<String, Object> fetchAll() throws IOException {
+            return runEsqlSync(new RestEsqlTestCase.RequestObjectBuilder().query("FROM test* | LIMIT 10"));
+        }
+    }
+
+    private static Map<String, Object> columnInfo(String name, String type) {
+        return Map.of("name", name, "type", type);
+    }
+
+    private static void index(String name, String... docs) throws IOException {
+        Request request = new Request("POST", "/" + name + "/_bulk");
+        request.addParameter("refresh", "true");
+        StringBuilder bulk = new StringBuilder();
+        for (String doc : docs) {
+            bulk.append(String.format(Locale.ROOT, """
+                {"index":{}}
+                %s
+                """, doc));
+        }
+        request.setJsonEntity(bulk.toString());
+        Response response = client().performRequest(request);
+        Map<String, Object> result = entityToMap(response.getEntity(), XContentType.JSON);
+        assertMap(result, matchesMap().extraOk().entry("errors", false));
+    }
+
+    private static void createIndex(String name, CheckedConsumer<XContentBuilder, IOException> mapping) throws IOException {
+        Request request = new Request("PUT", "/" + name);
+        XContentBuilder index = JsonXContent.contentBuilder().prettyPrint().startObject();
+        index.startObject("settings");
+        {
+            index.field("index.number_of_replicas", 0);
+            index.field("index.number_of_shards", 1);
+        }
+        index.endObject();
+        index.startObject("mappings");
+        mapping.accept(index);
+        index.endObject();
+        index.endObject();
+        String configStr = Strings.toString(index);
+        logger.info("index: {} {}", name, configStr);
+        request.setJsonEntity(configStr);
+        client().performRequest(request);
+    }
+
+    /**
+     * Yaml adds newlines and some indentation which we don't want to match.
+     */
+    private String deyaml(String err) {
+        return err.replaceAll("\\\\\n\s+\\\\", "");
+    }
+}

+ 5 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/execution/PlanExecutor.java

@@ -18,6 +18,7 @@ import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.planner.Mapper;
 import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
+import org.elasticsearch.xpack.esql.session.EsqlIndexResolver;
 import org.elasticsearch.xpack.esql.session.EsqlSession;
 import org.elasticsearch.xpack.esql.stats.Metrics;
 import org.elasticsearch.xpack.esql.stats.QueryMetric;
@@ -29,14 +30,16 @@ import static org.elasticsearch.action.ActionListener.wrap;
 public class PlanExecutor {
 
     private final IndexResolver indexResolver;
+    private final EsqlIndexResolver esqlIndexResolver;
     private final PreAnalyzer preAnalyzer;
     private final FunctionRegistry functionRegistry;
     private final Mapper mapper;
     private final Metrics metrics;
     private final Verifier verifier;
 
-    public PlanExecutor(IndexResolver indexResolver) {
+    public PlanExecutor(IndexResolver indexResolver, EsqlIndexResolver esqlIndexResolver) {
         this.indexResolver = indexResolver;
+        this.esqlIndexResolver = esqlIndexResolver;
         this.preAnalyzer = new PreAnalyzer();
         this.functionRegistry = new EsqlFunctionRegistry();
         this.mapper = new Mapper(functionRegistry);
@@ -55,6 +58,7 @@ public class PlanExecutor {
             sessionId,
             cfg,
             indexResolver,
+            esqlIndexResolver,
             enrichPolicyResolver,
             preAnalyzer,
             functionRegistry,

+ 3 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -54,6 +54,7 @@ import org.elasticsearch.xpack.esql.action.RestEsqlGetAsyncResultAction;
 import org.elasticsearch.xpack.esql.action.RestEsqlQueryAction;
 import org.elasticsearch.xpack.esql.execution.PlanExecutor;
 import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
+import org.elasticsearch.xpack.esql.session.EsqlIndexResolver;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypeRegistry;
 import org.elasticsearch.xpack.ql.index.IndexResolver;
 
@@ -106,7 +107,8 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
                     services.clusterService().getClusterName().value(),
                     EsqlDataTypeRegistry.INSTANCE,
                     Set::of
-                )
+                ),
+                new EsqlIndexResolver(services.client(), EsqlDataTypeRegistry.INSTANCE)
             ),
             new ExchangeService(
                 services.clusterService().getSettings(),

+ 252 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlIndexResolver.java

@@ -0,0 +1,252 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+package org.elasticsearch.xpack.esql.session;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
+import org.elasticsearch.action.fieldcaps.IndexFieldCapabilities;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.index.mapper.TimeSeriesParams;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.ql.index.EsIndex;
+import org.elasticsearch.xpack.ql.index.IndexResolution;
+import org.elasticsearch.xpack.ql.index.IndexResolver;
+import org.elasticsearch.xpack.ql.type.DataType;
+import org.elasticsearch.xpack.ql.type.DataTypeRegistry;
+import org.elasticsearch.xpack.ql.type.DateEsField;
+import org.elasticsearch.xpack.ql.type.EsField;
+import org.elasticsearch.xpack.ql.type.InvalidMappedField;
+import org.elasticsearch.xpack.ql.type.KeywordEsField;
+import org.elasticsearch.xpack.ql.type.TextEsField;
+import org.elasticsearch.xpack.ql.type.UnsupportedEsField;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static org.elasticsearch.xpack.ql.type.DataTypes.DATETIME;
+import static org.elasticsearch.xpack.ql.type.DataTypes.KEYWORD;
+import static org.elasticsearch.xpack.ql.type.DataTypes.OBJECT;
+import static org.elasticsearch.xpack.ql.type.DataTypes.TEXT;
+import static org.elasticsearch.xpack.ql.type.DataTypes.UNSUPPORTED;
+
+public class EsqlIndexResolver {
+    private final Client client;
+    private final DataTypeRegistry typeRegistry;
+
+    public EsqlIndexResolver(Client client, DataTypeRegistry typeRegistry) {
+        this.client = client;
+        this.typeRegistry = typeRegistry;
+    }
+
+    /**
+     * Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping.
+     */
+    public void resolveAsMergedMapping(String indexWildcard, Set<String> fieldNames, ActionListener<IndexResolution> listener) {
+        client.fieldCaps(
+            createFieldCapsRequest(indexWildcard, fieldNames),
+            listener.delegateFailureAndWrap((l, response) -> l.onResponse(mergedMappings(indexWildcard, response)))
+        );
+    }
+
+    public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResponse fieldCapsResponse) {
+        assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION); // too expensive to run this on a transport worker
+        if (fieldCapsResponse.getIndexResponses().isEmpty()) {
+            return IndexResolution.notFound(indexPattern);
+        }
+
+        Map<String, List<IndexFieldCapabilities>> fieldsCaps = collectFieldCaps(fieldCapsResponse);
+
+        // Build hierarchical fields - it's easier to do it in sorted order so the object fields come first.
+        // TODO flattened is simpler - could we get away with that?
+        String[] names = fieldsCaps.keySet().toArray(new String[0]);
+        Arrays.sort(names);
+        Map<String, EsField> rootFields = new HashMap<>();
+        for (String name : names) {
+            Map<String, EsField> fields = rootFields;
+            String fullName = name;
+            boolean isAlias = false;
+            UnsupportedEsField firstUnsupportedParent = null;
+            while (true) {
+                int nextDot = name.indexOf('.');
+                if (nextDot < 0) {
+                    break;
+                }
+                String parent = name.substring(0, nextDot);
+                EsField obj = fields.get(parent);
+                if (obj == null) {
+                    obj = new EsField(parent, OBJECT, new HashMap<>(), false, true);
+                    isAlias = true;
+                    fields.put(parent, obj);
+                } else if (firstUnsupportedParent == null && obj instanceof UnsupportedEsField unsupportedParent) {
+                    firstUnsupportedParent = unsupportedParent;
+                }
+                fields = obj.getProperties();
+                name = name.substring(nextDot + 1);
+            }
+            // TODO we're careful to make isAlias match IndexResolver - but do we use it?
+            EsField field = firstUnsupportedParent == null
+                ? createField(fieldCapsResponse, name, fullName, fieldsCaps.get(fullName), isAlias)
+                : new UnsupportedEsField(
+                    fullName,
+                    firstUnsupportedParent.getOriginalType(),
+                    firstUnsupportedParent.getName(),
+                    new HashMap<>()
+                );
+            fields.put(name, field);
+        }
+
+        boolean allEmpty = true;
+        for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
+            allEmpty &= ir.get().isEmpty();
+        }
+        if (allEmpty) {
+            // If all the mappings are empty we return an empty set of resolved indices to line up with QL
+            return IndexResolution.valid(new EsIndex(indexPattern, rootFields, Set.of()));
+        }
+
+        Set<String> concreteIndices = new HashSet<>(fieldCapsResponse.getIndexResponses().size());
+        for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
+            concreteIndices.add(ir.getIndexName());
+        }
+        return IndexResolution.valid(new EsIndex(indexPattern, rootFields, concreteIndices));
+    }
+
+    private static Map<String, List<IndexFieldCapabilities>> collectFieldCaps(FieldCapabilitiesResponse fieldCapsResponse) {
+        Set<String> seenHashes = new HashSet<>();
+        Map<String, List<IndexFieldCapabilities>> fieldsCaps = new HashMap<>();
+        for (FieldCapabilitiesIndexResponse response : fieldCapsResponse.getIndexResponses()) {
+            if (seenHashes.add(response.getIndexMappingHash()) == false) {
+                continue;
+            }
+            for (IndexFieldCapabilities fc : response.get().values()) {
+                if (fc.isMetadatafield()) {
+                    // ESQL builds the metadata fields if they are asked for without using the resolution.
+                    continue;
+                }
+                List<IndexFieldCapabilities> all = fieldsCaps.computeIfAbsent(fc.name(), (_key) -> new ArrayList<>());
+                all.add(fc);
+            }
+        }
+        return fieldsCaps;
+    }
+
+    private EsField createField(
+        FieldCapabilitiesResponse fieldCapsResponse,
+        String name,
+        String fullName,
+        List<IndexFieldCapabilities> fcs,
+        boolean isAlias
+    ) {
+        IndexFieldCapabilities first = fcs.get(0);
+        List<IndexFieldCapabilities> rest = fcs.subList(1, fcs.size());
+        DataType type = typeRegistry.fromEs(first.type(), first.metricType());
+        boolean aggregatable = first.isAggregatable();
+        if (rest.isEmpty() == false) {
+            for (IndexFieldCapabilities fc : rest) {
+                if (first.metricType() != fc.metricType()) {
+                    return conflictingMetricTypes(name, fullName, fieldCapsResponse);
+                }
+            }
+            for (IndexFieldCapabilities fc : rest) {
+                if (type != typeRegistry.fromEs(fc.type(), fc.metricType())) {
+                    return conflictingTypes(name, fullName, fieldCapsResponse);
+                }
+            }
+            for (IndexFieldCapabilities fc : rest) {
+                aggregatable &= fc.isAggregatable();
+            }
+        }
+
+        // TODO I think we only care about unmapped fields if we're aggregating on them. do we even then?
+
+        if (type == TEXT) {
+            return new TextEsField(name, new HashMap<>(), false, isAlias);
+        }
+        if (type == KEYWORD) {
+            int length = Short.MAX_VALUE;
+            // TODO: to check whether isSearchable/isAggregateable takes into account the presence of the normalizer
+            boolean normalized = false;
+            return new KeywordEsField(name, new HashMap<>(), aggregatable, length, normalized, isAlias);
+        }
+        if (type == DATETIME) {
+            return DateEsField.dateEsField(name, new HashMap<>(), aggregatable);
+        }
+        if (type == UNSUPPORTED) {
+            return unsupported(name, first);
+        }
+
+        return new EsField(name, type, new HashMap<>(), aggregatable, isAlias);
+    }
+
+    private UnsupportedEsField unsupported(String name, IndexFieldCapabilities fc) {
+        String originalType = fc.metricType() == TimeSeriesParams.MetricType.COUNTER ? "counter" : fc.type();
+        return new UnsupportedEsField(name, originalType);
+    }
+
+    private EsField conflictingTypes(String name, String fullName, FieldCapabilitiesResponse fieldCapsResponse) {
+        Map<String, Set<String>> typesToIndices = new TreeMap<>();
+        for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
+            IndexFieldCapabilities fc = ir.get().get(fullName);
+            if (fc != null) {
+                DataType type = typeRegistry.fromEs(fc.type(), fc.metricType());
+                if (type == UNSUPPORTED) {
+                    return unsupported(name, fc);
+                }
+                typesToIndices.computeIfAbsent(type.esType(), _key -> new TreeSet<>()).add(ir.getIndexName());
+            }
+        }
+        StringBuilder errorMessage = new StringBuilder();
+        errorMessage.append("mapped as [");
+        errorMessage.append(typesToIndices.size());
+        errorMessage.append("] incompatible types: ");
+        boolean first = true;
+        for (Map.Entry<String, Set<String>> e : typesToIndices.entrySet()) {
+            if (first) {
+                first = false;
+            } else {
+                errorMessage.append(", ");
+            }
+            errorMessage.append("[");
+            errorMessage.append(e.getKey());
+            errorMessage.append("] in ");
+            errorMessage.append(e.getValue());
+        }
+        return new InvalidMappedField(name, errorMessage.toString());
+    }
+
+    private EsField conflictingMetricTypes(String name, String fullName, FieldCapabilitiesResponse fieldCapsResponse) {
+        TreeSet<String> indices = new TreeSet<>();
+        for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
+            IndexFieldCapabilities fc = ir.get().get(fullName);
+            if (fc != null) {
+                indices.add(ir.getIndexName());
+            }
+        }
+        return new InvalidMappedField(name, "mapped as different metric types in indices: " + indices);
+    }
+
+    private static FieldCapabilitiesRequest createFieldCapsRequest(String index, Set<String> fieldNames) {
+        FieldCapabilitiesRequest req = new FieldCapabilitiesRequest().indices(Strings.commaDelimitedListToStringArray(index));
+        req.fields(fieldNames.toArray(String[]::new));
+        req.includeUnmapped(true);
+        // lenient because we throw our own errors looking at the response e.g. if something was not resolved
+        // also because this way security doesn't throw authorization exceptions but rather honors ignore_unavailable
+        req.indicesOptions(IndexResolver.FIELD_CAPS_INDICES_OPTIONS);
+        req.setMergeResults(false);
+        return req;
+    }
+}

+ 142 - 12
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.fieldcaps.FieldCapabilities;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.regex.Regex;
+import org.elasticsearch.core.Assertions;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.logging.LogManager;
 import org.elasticsearch.logging.Logger;
@@ -51,11 +52,14 @@ import org.elasticsearch.xpack.ql.plan.TableIdentifier;
 import org.elasticsearch.xpack.ql.plan.logical.Aggregate;
 import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.ql.plan.logical.Project;
+import org.elasticsearch.xpack.ql.type.DataTypes;
+import org.elasticsearch.xpack.ql.type.EsField;
 import org.elasticsearch.xpack.ql.type.InvalidMappedField;
 import org.elasticsearch.xpack.ql.util.Holder;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -75,6 +79,7 @@ public class EsqlSession {
     private final String sessionId;
     private final EsqlConfiguration configuration;
     private final IndexResolver indexResolver;
+    private final EsqlIndexResolver esqlIndexResolver;
     private final EnrichPolicyResolver enrichPolicyResolver;
 
     private final PreAnalyzer preAnalyzer;
@@ -89,6 +94,7 @@ public class EsqlSession {
         String sessionId,
         EsqlConfiguration configuration,
         IndexResolver indexResolver,
+        EsqlIndexResolver esqlIndexResolver,
         EnrichPolicyResolver enrichPolicyResolver,
         PreAnalyzer preAnalyzer,
         FunctionRegistry functionRegistry,
@@ -99,6 +105,7 @@ public class EsqlSession {
         this.sessionId = sessionId;
         this.configuration = configuration;
         this.indexResolver = indexResolver;
+        this.esqlIndexResolver = esqlIndexResolver;
         this.enrichPolicyResolver = enrichPolicyResolver;
         this.preAnalyzer = preAnalyzer;
         this.verifier = verifier;
@@ -201,18 +208,11 @@ public class EsqlSession {
             TableIdentifier table = tableInfo.id();
             var fieldNames = fieldNames(parsed, enrichPolicyMatchFields);
 
-            indexResolver.resolveAsMergedMapping(
-                table.index(),
-                fieldNames,
-                false,
-                Map.of(),
-                listener,
-                EsqlSession::specificValidity,
-                IndexResolver.PRESERVE_PROPERTIES,
-                // TODO no matter what metadata fields are asked in a query, the "allowedMetadataFields" is always _index, does it make
-                // sense to reflect the actual list of metadata fields instead?
-                IndexResolver.INDEX_METADATA_FIELD
-            );
+            if (Assertions.ENABLED) {
+                resolveMergedMappingAgainstBothResolvers(table.index(), fieldNames, listener);
+            } else {
+                esqlIndexResolver.resolveAsMergedMapping(table.index(), fieldNames, listener);
+            }
         } else {
             try {
                 // occurs when dealing with local relations (row a = 1)
@@ -223,6 +223,136 @@ public class EsqlSession {
         }
     }
 
+    /**
+     * Resolves the mapping against both the new, fast {@link #esqlIndexResolver}
+     * and the older, known correct {@link #indexResolver}. We then assert that they
+     * produce the same output.
+     */
+    private void resolveMergedMappingAgainstBothResolvers(
+        String indexWildcard,
+        Set<String> fieldNames,
+        ActionListener<IndexResolution> listener
+    ) {
+        indexResolver.resolveAsMergedMapping(indexWildcard, fieldNames, false, Map.of(), new ActionListener<>() {
+            @Override
+            public void onResponse(IndexResolution fromQl) {
+                esqlIndexResolver.resolveAsMergedMapping(indexWildcard, fieldNames, new ActionListener<>() {
+                    @Override
+                    public void onResponse(IndexResolution fromEsql) {
+                        if (fromQl.isValid() == false) {
+                            if (fromEsql.isValid()) {
+                                throw new IllegalArgumentException(
+                                    "ql and esql didn't make the same resolution: validity differs " + fromQl + " != " + fromEsql
+                                );
+                            }
+                        } else {
+                            assertSameMappings("", fromQl.get().mapping(), fromEsql.get().mapping());
+                            if (fromQl.get().concreteIndices().equals(fromEsql.get().concreteIndices()) == false) {
+                                throw new IllegalArgumentException(
+                                    "ql and esql didn't make the same resolution: concrete indices differ "
+                                        + fromQl.get().concreteIndices()
+                                        + " != "
+                                        + fromEsql.get().concreteIndices()
+                                );
+                            }
+                        }
+                        listener.onResponse(fromEsql);
+                    }
+
+                    private void assertSameMappings(String prefix, Map<String, EsField> fromQl, Map<String, EsField> fromEsql) {
+                        List<String> qlFields = new ArrayList<>();
+                        qlFields.addAll(fromQl.keySet());
+                        Collections.sort(qlFields);
+
+                        List<String> esqlFields = new ArrayList<>();
+                        esqlFields.addAll(fromEsql.keySet());
+                        Collections.sort(esqlFields);
+                        if (qlFields.equals(esqlFields) == false) {
+                            throw new IllegalArgumentException(
+                                prefix + ": ql and esql didn't make the same resolution: fields differ \n" + qlFields + " !=\n" + esqlFields
+                            );
+                        }
+
+                        for (int f = 0; f < qlFields.size(); f++) {
+                            String name = qlFields.get(f);
+                            EsField qlField = fromQl.get(name);
+                            EsField esqlField = fromEsql.get(name);
+
+                            if (qlField.getProperties().isEmpty() == false || esqlField.getProperties().isEmpty() == false) {
+                                assertSameMappings(
+                                    prefix.equals("") ? name : prefix + "." + name,
+                                    qlField.getProperties(),
+                                    esqlField.getProperties()
+                                );
+                            }
+
+                            /*
+                             * Check that the field itself is the same, skipping isAlias because
+                             * we don't actually use it in ESQL and the EsqlIndexResolver doesn't
+                             * produce exactly the same result.
+                             */
+                            if (qlField.getDataType().equals(DataTypes.UNSUPPORTED) == false
+                                && qlField.getName().equals(esqlField.getName()) == false
+                            // QL uses full paths for unsupported fields. ESQL does not. This particular difference is fine.
+                            ) {
+                                throw new IllegalArgumentException(
+                                    prefix
+                                        + "."
+                                        + name
+                                        + ": ql and esql didn't make the same resolution: names differ ["
+                                        + qlField.getName()
+                                        + "] != ["
+                                        + esqlField.getName()
+                                        + "]"
+                                );
+                            }
+                            if (qlField.getDataType() != esqlField.getDataType()) {
+                                throw new IllegalArgumentException(
+                                    prefix
+                                        + "."
+                                        + name
+                                        + ": ql and esql didn't make the same resolution: types differ ["
+                                        + qlField.getDataType()
+                                        + "] != ["
+                                        + esqlField.getDataType()
+                                        + "]"
+                                );
+                            }
+                            if (qlField.isAggregatable() != esqlField.isAggregatable()) {
+                                throw new IllegalArgumentException(
+                                    prefix
+                                        + "."
+                                        + name
+                                        + ": ql and esql didn't make the same resolution: aggregability differ ["
+                                        + qlField.isAggregatable()
+                                        + "] != ["
+                                        + esqlField.isAggregatable()
+                                        + "]"
+                                );
+                            }
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        listener.onFailure(e);
+                    }
+                });
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                listener.onFailure(e);
+            }
+        },
+            EsqlSession::specificValidity,
+            IndexResolver.PRESERVE_PROPERTIES,
+            // TODO no matter what metadata fields are asked in a query, the "allowedMetadataFields" is always _index, does it make
+            // sense to reflect the actual list of metadata fields instead?
+            IndexResolver.INDEX_METADATA_FIELD
+        );
+    }
+
     static Set<String> fieldNames(LogicalPlan parsed, Set<String> enrichPolicyMatchFields) {
         if (false == parsed.anyMatch(plan -> plan instanceof Aggregate || plan instanceof Project)) {
             // no explicit columns selection, for example "from employees"

+ 5 - 10
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.esql.analysis;
 
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.Streams;
@@ -28,7 +29,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Eval;
 import org.elasticsearch.xpack.esql.plan.logical.Row;
 import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
 import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
-import org.elasticsearch.xpack.esql.session.EsqlSession;
+import org.elasticsearch.xpack.esql.session.EsqlIndexResolver;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypeRegistry;
 import org.elasticsearch.xpack.ql.expression.Alias;
 import org.elasticsearch.xpack.ql.expression.Attribute;
@@ -40,7 +41,6 @@ import org.elasticsearch.xpack.ql.expression.ReferenceAttribute;
 import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute;
 import org.elasticsearch.xpack.ql.index.EsIndex;
 import org.elasticsearch.xpack.ql.index.IndexResolution;
-import org.elasticsearch.xpack.ql.index.IndexResolver;
 import org.elasticsearch.xpack.ql.plan.TableIdentifier;
 import org.elasticsearch.xpack.ql.plan.logical.Aggregate;
 import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
@@ -1767,14 +1767,9 @@ public class AnalyzerTests extends ESTestCase {
     }
 
     private static LogicalPlan analyzeWithEmptyFieldCapsResponse(String query) throws IOException {
-        IndexResolution resolution = IndexResolver.mergedMappings(
-            EsqlDataTypeRegistry.INSTANCE,
-            "test*",
-            readFieldCapsResponse("empty_field_caps_response.json"),
-            EsqlSession::specificValidity,
-            IndexResolver.PRESERVE_PROPERTIES,
-            IndexResolver.INDEX_METADATA_FIELD
-        );
+        List<FieldCapabilitiesIndexResponse> idxResponses = List.of(new FieldCapabilitiesIndexResponse("idx", "idx", Map.of(), true));
+        FieldCapabilitiesResponse caps = new FieldCapabilitiesResponse(idxResponses, List.of());
+        IndexResolution resolution = new EsqlIndexResolver(null, EsqlDataTypeRegistry.INSTANCE).mergedMappings("test*", caps);
         var analyzer = analyzer(resolution, TEST_VERIFIER, configuration(query));
         return analyze(query, analyzer);
     }

+ 44 - 11
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java

@@ -9,7 +9,9 @@ package org.elasticsearch.xpack.esql.stats;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.fieldcaps.FieldCapabilities;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
+import org.elasticsearch.action.fieldcaps.IndexFieldCapabilities;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.TestThreadPool;
@@ -21,19 +23,19 @@ import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
 import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
 import org.elasticsearch.xpack.esql.execution.PlanExecutor;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
+import org.elasticsearch.xpack.esql.session.EsqlIndexResolver;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypeRegistry;
 import org.elasticsearch.xpack.ql.index.IndexResolver;
 import org.junit.After;
 import org.junit.Before;
 import org.mockito.stubbing.Answer;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static java.util.Collections.emptyMap;
-import static java.util.Collections.singletonMap;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.mockito.ArgumentMatchers.any;
@@ -68,11 +70,10 @@ public class PlanExecutorMetricsTests extends ESTestCase {
     }
 
     public void testFailedMetric() {
-        Client client = mock(Client.class);
-        IndexResolver idxResolver = new IndexResolver(client, randomAlphaOfLength(10), EsqlDataTypeRegistry.INSTANCE, Set::of);
-        var planExecutor = new PlanExecutor(idxResolver);
         String[] indices = new String[] { "test" };
-        var enrichResolver = mockEnrichResolver();
+
+        Client qlClient = mock(Client.class);
+        IndexResolver idxResolver = new IndexResolver(qlClient, randomAlphaOfLength(10), EsqlDataTypeRegistry.INSTANCE, Set::of);
         // simulate a valid field_caps response so we can parse and correctly analyze de query
         FieldCapabilitiesResponse fieldCapabilitiesResponse = mock(FieldCapabilitiesResponse.class);
         when(fieldCapabilitiesResponse.getIndices()).thenReturn(indices);
@@ -80,9 +81,23 @@ public class PlanExecutorMetricsTests extends ESTestCase {
         doAnswer((Answer<Void>) invocation -> {
             @SuppressWarnings("unchecked")
             ActionListener<FieldCapabilitiesResponse> listener = (ActionListener<FieldCapabilitiesResponse>) invocation.getArguments()[1];
+            // simulate a valid field_caps response so we can parse and correctly analyze de query
             listener.onResponse(fieldCapabilitiesResponse);
             return null;
-        }).when(client).fieldCaps(any(), any());
+        }).when(qlClient).fieldCaps(any(), any());
+
+        Client esqlClient = mock(Client.class);
+        EsqlIndexResolver esqlIndexResolver = new EsqlIndexResolver(esqlClient, EsqlDataTypeRegistry.INSTANCE);
+        doAnswer((Answer<Void>) invocation -> {
+            @SuppressWarnings("unchecked")
+            ActionListener<FieldCapabilitiesResponse> listener = (ActionListener<FieldCapabilitiesResponse>) invocation.getArguments()[1];
+            // simulate a valid field_caps response so we can parse and correctly analyze de query
+            listener.onResponse(new FieldCapabilitiesResponse(indexFieldCapabilities(indices), List.of()));
+            return null;
+        }).when(esqlClient).fieldCaps(any(), any());
+
+        var planExecutor = new PlanExecutor(idxResolver, esqlIndexResolver);
+        var enrichResolver = mockEnrichResolver();
 
         var request = new EsqlQueryRequest();
         // test a failed query: xyz field doesn't exist
@@ -122,12 +137,30 @@ public class PlanExecutorMetricsTests extends ESTestCase {
         assertEquals(1, planExecutor.metrics().stats().get("features.stats"));
     }
 
+    private List<FieldCapabilitiesIndexResponse> indexFieldCapabilities(String[] indices) {
+        List<FieldCapabilitiesIndexResponse> responses = new ArrayList<>();
+        for (String idx : indices) {
+            responses.add(
+                new FieldCapabilitiesIndexResponse(
+                    idx,
+                    idx,
+                    Map.ofEntries(
+                        Map.entry("foo", new IndexFieldCapabilities("foo", "integer", false, true, true, false, null, Map.of())),
+                        Map.entry("bar", new IndexFieldCapabilities("bar", "long", false, true, true, false, null, Map.of()))
+                    ),
+                    true
+                )
+            );
+        }
+        return responses;
+    }
+
     private Map<String, Map<String, FieldCapabilities>> fields(String[] indices) {
-        FieldCapabilities fooField = new FieldCapabilities("foo", "integer", false, true, true, indices, null, null, emptyMap());
-        FieldCapabilities barField = new FieldCapabilities("bar", "long", false, true, true, indices, null, null, emptyMap());
+        FieldCapabilities fooField = new FieldCapabilities("foo", "integer", false, true, true, indices, null, null, Map.of());
+        FieldCapabilities barField = new FieldCapabilities("bar", "long", false, true, true, indices, null, null, Map.of());
         Map<String, Map<String, FieldCapabilities>> fields = new HashMap<>();
-        fields.put(fooField.getName(), singletonMap(fooField.getName(), fooField));
-        fields.put(barField.getName(), singletonMap(barField.getName(), barField));
+        fields.put(fooField.getName(), Map.of(fooField.getName(), fooField));
+        fields.put(barField.getName(), Map.of(barField.getName(), barField));
         return fields;
     }
 

+ 16 - 28
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeRegistryTests.java

@@ -6,17 +6,18 @@
  */
 package org.elasticsearch.xpack.esql.type;
 
-import org.elasticsearch.action.fieldcaps.FieldCapabilities;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
+import org.elasticsearch.action.fieldcaps.IndexFieldCapabilities;
 import org.elasticsearch.index.mapper.TimeSeriesParams;
 import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.xpack.esql.session.EsqlSession;
+import org.elasticsearch.xpack.esql.session.EsqlIndexResolver;
 import org.elasticsearch.xpack.ql.index.IndexResolution;
-import org.elasticsearch.xpack.ql.index.IndexResolver;
 import org.elasticsearch.xpack.ql.type.DataType;
 import org.elasticsearch.xpack.ql.type.DataTypes;
 import org.elasticsearch.xpack.ql.type.EsField;
 
+import java.util.List;
 import java.util.Map;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -35,33 +36,20 @@ public class EsqlDataTypeRegistryTests extends ESTestCase {
     }
 
     private void resolve(String esTypeName, TimeSeriesParams.MetricType metricType, DataType expected) {
-        String[] indices = new String[] { "idx-" + randomAlphaOfLength(5) };
-        FieldCapabilities fieldCap = new FieldCapabilities(
-            randomAlphaOfLength(3),
-            esTypeName,
-            false,
-            true,
-            true,
-            false,
-            metricType,
-            indices,
-            null,
-            null,
-            null,
-            null,
-            Map.of()
-        );
-        FieldCapabilitiesResponse caps = new FieldCapabilitiesResponse(indices, Map.of(fieldCap.getName(), Map.of(esTypeName, fieldCap)));
-        IndexResolution resolution = IndexResolver.mergedMappings(
-            EsqlDataTypeRegistry.INSTANCE,
-            "idx-*",
-            caps,
-            EsqlSession::specificValidity,
-            IndexResolver.PRESERVE_PROPERTIES,
-            null
+        String idx = "idx-" + randomAlphaOfLength(5);
+        String field = "f" + randomAlphaOfLength(3);
+        List<FieldCapabilitiesIndexResponse> idxResponses = List.of(
+            new FieldCapabilitiesIndexResponse(
+                idx,
+                idx,
+                Map.of(field, new IndexFieldCapabilities(field, esTypeName, false, true, true, false, metricType, Map.of())),
+                true
+            )
         );
 
-        EsField f = resolution.get().mapping().get(fieldCap.getName());
+        FieldCapabilitiesResponse caps = new FieldCapabilitiesResponse(idxResponses, List.of());
+        IndexResolution resolution = new EsqlIndexResolver(null, EsqlDataTypeRegistry.INSTANCE).mergedMappings("idx-*", caps);
+        EsField f = resolution.get().mapping().get(field);
         assertThat(f.getDataType(), equalTo(expected));
     }
 }

+ 0 - 16
x-pack/plugin/esql/src/test/resources/empty_field_caps_response.json

@@ -1,16 +0,0 @@
-{
-  "indices": [
-    "test1",
-    "test2"
-  ],
-  "fields": {
-    "_index": {
-      "_index": {
-        "type": "_index",
-        "metadata_field": true,
-        "searchable": true,
-        "aggregatable": true
-      }
-    }
-  }
-}

+ 1 - 1
x-pack/plugin/mapper-unsigned-long/src/main/java/org/elasticsearch/xpack/unsignedlong/UnsignedLongFieldMapper.java

@@ -333,7 +333,7 @@ public class UnsignedLongFieldMapper extends FieldMapper {
                     if (value.equals("")) {
                         return nullValueFormatted;
                     }
-                    return parseUnsignedLong(value);
+                    return unsignedToSortableSignedLong(parseUnsignedLong(value));
                 }
             };
             BlockSourceReader.LeafIteratorLookup lookup = isStored() || isIndexed()