Browse Source

Use SearchStats instead of field.isAggregatable in data node planning (#115744) (#116800)

Since ES|QL makes use of field-caps and only considers `isAggregatable` during Lucene pushdown, turning off doc-values disables Lucene pushdown. This is incorrect. The physical planning decision for Lucene pushdown is made during local planning on the data node, at which point `SearchStats` are known, and both `isIndexed` and `hasDocValues` are separately knowable. The Lucene pushdown should happen for `isIndexed` and not consider `hasDocValues` at all.

This PR adds hasDocValues to SearchStats and the uses isIndexed and hasDocValue separately during local physical planning on the data nodes. This immediately cleared up one issue for spatial data, which could not push down a lucene query when doc-values was disabled.

Summary of what `isAggregatable` means for different implementations of `MappedFieldType`:
* Default implementation of `isAggregatable` in `MappedFieldType` is `hasDocValues`, and does not consider `isIndexed`
* All classes that extend `AbstractScriptFieldType` (eg. `LongScriptFieldType`) hard coded `isAggregatable` to `true`. This presumably means Lucene is happy to mimic having doc-values
* `TestFieldType`, and classes that extend it, return the value of `fielddata`, so consider the field aggregatable if there is field-data.
* `AggregateDoubleMetricFieldType` and `ConstantFieldType` hard coded to `true`
* `DenseVectorFieldType` hard coded to `false`
* `IdFieldType` return the value of `fieldDataEnabled.getAsBoolean()`

In no case is `isIndexed` used for `isAggregatable`. However, for our Lucene pushdown of filters, `isIndexed` would make a lot more sense. But for pushdown of TopN, `hasDocValues` makes more sense.

Summarising the results of the various options for the various field types, where `?` means configrable:

| Class | isAggregatable | isIndexed | isStored | hasDocValues |
| --- | --- | --- | --- | --- |
| AbstractScriptFieldType                 | true  | false | false | false |
| AggregateDoubleMetricFieldType | true  | true  | false | false |
| DenseVectorFieldType                    | false | ?       | false | !indexed |
| IdFieldType                                      | fieldData | true | true | false |
| TsidExtractingIdField                       | false | true | true | false |
| TextFieldType                                   | fieldData | ? | ? | false |
| ? (the rest)                                        | hasDocValues | ? | ? | ? |

It has also been observed that we cannot push filters to source without checking `hasDocValues` when we use the `SingleValueQuery`. So this leads to three groups of conditions:

| Category | require `indexed` | require `docValues` |
| --- | --- | --- |
| Filters(single-value) | true | true |
| Filters(multi-value) | true | false |
| TopN | true | true |

And for all cases we will also consider `isAggregatable` as a disjunction to cover the script field types, leading to two possible combinations:

* `fa.isAggregatable() || searchStats.isIndexed(fa.name()) && searchStats.hasDocValues(fa.name())`
* `fa.isAggregatable() || searchStats.isIndexed(fa.name())`
Craig Taverner 11 months ago
parent
commit
f5246cda55
26 changed files with 1049 additions and 565 deletions
  1. 6 0
      docs/changelog/115744.yaml
  2. 14 2
      server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java
  3. 1 1
      test/framework/src/main/java/org/elasticsearch/index/mapper/TextFieldFamilySyntheticSourceTestSetup.java
  4. 7 0
      x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java
  5. 93 9
      x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java
  6. 30 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-airports_no_doc_values.json
  7. 30 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-airports_not_indexed.json
  8. 0 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-airports_not_indexed_nor_doc_values.json
  9. 36 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/spatial.csv-spec
  10. 38 40
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/spatial/SpatialPushDownPointsTestCase.java
  11. 72 19
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/spatial/SpatialPushDownTestCase.java
  12. 14 9
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/EnableSpatialDistancePushdown.java
  13. 0 37
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/LucenePushDownUtils.java
  14. 111 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/LucenePushdownPredicates.java
  15. 29 21
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java
  16. 7 8
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java
  17. 14 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SpatialDocValuesExtraction.java
  18. 12 12
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java
  19. 1 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
  20. 357 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/stats/SearchContextStats.java
  21. 49 322
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/stats/SearchStats.java
  22. 2 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java
  23. 105 61
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java
  24. 2 3
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java
  25. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java
  26. 18 9
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/DisabledSearchStats.java

+ 6 - 0
docs/changelog/115744.yaml

@@ -0,0 +1,6 @@
+pr: 115744
+summary: Use `SearchStats` instead of field.isAggregatable in data node planning
+area: ES|QL
+type: bug
+issues:
+ - 115737

+ 14 - 2
server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java

@@ -968,15 +968,27 @@ public final class TextFieldMapper extends FieldMapper {
             return fielddata;
         }
 
-        public boolean canUseSyntheticSourceDelegateForQuerying() {
+        /**
+         * Returns true if the delegate sub-field can be used for loading and querying (ie. either isIndexed or isStored is true)
+         */
+        public boolean canUseSyntheticSourceDelegateForLoading() {
             return syntheticSourceDelegate != null
                 && syntheticSourceDelegate.ignoreAbove() == Integer.MAX_VALUE
                 && (syntheticSourceDelegate.isIndexed() || syntheticSourceDelegate.isStored());
         }
 
+        /**
+         * Returns true if the delegate sub-field can be used for querying only (ie. isIndexed must be true)
+         */
+        public boolean canUseSyntheticSourceDelegateForQuerying() {
+            return syntheticSourceDelegate != null
+                && syntheticSourceDelegate.ignoreAbove() == Integer.MAX_VALUE
+                && syntheticSourceDelegate.isIndexed();
+        }
+
         @Override
         public BlockLoader blockLoader(BlockLoaderContext blContext) {
-            if (canUseSyntheticSourceDelegateForQuerying()) {
+            if (canUseSyntheticSourceDelegateForLoading()) {
                 return new BlockLoader.Delegating(syntheticSourceDelegate.blockLoader(blContext)) {
                     @Override
                     protected String delegatingTo() {

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/mapper/TextFieldFamilySyntheticSourceTestSetup.java

@@ -39,7 +39,7 @@ public final class TextFieldFamilySyntheticSourceTestSetup {
             TextFieldMapper.TextFieldType text = (TextFieldMapper.TextFieldType) ft;
             boolean supportsColumnAtATimeReader = text.syntheticSourceDelegate() != null
                 && text.syntheticSourceDelegate().hasDocValues()
-                && text.canUseSyntheticSourceDelegateForQuerying();
+                && text.canUseSyntheticSourceDelegateForLoading();
             return new MapperTestCase.BlockReaderSupport(supportsColumnAtATimeReader, mapper, loaderFieldName);
         }
         MappedFieldType parent = mapper.fieldType(parentName);

+ 7 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java

@@ -72,6 +72,10 @@ public class CsvTestsDataLoader {
     private static final TestsDataset DECADES = new TestsDataset("decades");
     private static final TestsDataset AIRPORTS = new TestsDataset("airports");
     private static final TestsDataset AIRPORTS_MP = AIRPORTS.withIndex("airports_mp").withData("airports_mp.csv");
+    private static final TestsDataset AIRPORTS_NO_DOC_VALUES = new TestsDataset("airports_no_doc_values").withData("airports.csv");
+    private static final TestsDataset AIRPORTS_NOT_INDEXED = new TestsDataset("airports_not_indexed").withData("airports.csv");
+    private static final TestsDataset AIRPORTS_NOT_INDEXED_NOR_DOC_VALUES = new TestsDataset("airports_not_indexed_nor_doc_values")
+        .withData("airports.csv");
     private static final TestsDataset AIRPORTS_WEB = new TestsDataset("airports_web");
     private static final TestsDataset DATE_NANOS = new TestsDataset("date_nanos");
     private static final TestsDataset COUNTRIES_BBOX = new TestsDataset("countries_bbox");
@@ -105,6 +109,9 @@ public class CsvTestsDataLoader {
         Map.entry(DECADES.indexName, DECADES),
         Map.entry(AIRPORTS.indexName, AIRPORTS),
         Map.entry(AIRPORTS_MP.indexName, AIRPORTS_MP),
+        Map.entry(AIRPORTS_NO_DOC_VALUES.indexName, AIRPORTS_NO_DOC_VALUES),
+        Map.entry(AIRPORTS_NOT_INDEXED.indexName, AIRPORTS_NOT_INDEXED),
+        Map.entry(AIRPORTS_NOT_INDEXED_NOR_DOC_VALUES.indexName, AIRPORTS_NOT_INDEXED_NOR_DOC_VALUES),
         Map.entry(AIRPORTS_WEB.indexName, AIRPORTS_WEB),
         Map.entry(COUNTRIES_BBOX.indexName, COUNTRIES_BBOX),
         Map.entry(COUNTRIES_BBOX_WEB.indexName, COUNTRIES_BBOX_WEB),

+ 93 - 9
x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

@@ -89,6 +89,8 @@ import java.time.Duration;
 import java.time.Period;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -206,9 +208,30 @@ public final class EsqlTestUtils {
         return new EsRelation(EMPTY, new EsIndex(randomAlphaOfLength(8), emptyMap()), IndexMode.STANDARD, randomBoolean());
     }
 
-    public static class TestSearchStats extends SearchStats {
-        public TestSearchStats() {
-            super(emptyList());
+    /**
+     * This version of SearchStats always returns true for all fields for all boolean methods.
+     * For custom behaviour either use {@link TestConfigurableSearchStats} or override the specific methods.
+     */
+    public static class TestSearchStats implements SearchStats {
+
+        @Override
+        public boolean exists(String field) {
+            return true;
+        }
+
+        @Override
+        public boolean isIndexed(String field) {
+            return exists(field);
+        }
+
+        @Override
+        public boolean hasDocValues(String field) {
+            return exists(field);
+        }
+
+        @Override
+        public boolean hasExactSubfield(String field) {
+            return exists(field);
         }
 
         @Override
@@ -226,11 +249,6 @@ public final class EsqlTestUtils {
             return exists(field) ? -1 : 0;
         }
 
-        @Override
-        public boolean exists(String field) {
-            return true;
-        }
-
         @Override
         public byte[] min(String field, DataType dataType) {
             return null;
@@ -245,10 +263,76 @@ public final class EsqlTestUtils {
         public boolean isSingleValue(String field) {
             return false;
         }
+    }
+
+    /**
+     * This version of SearchStats can be preconfigured to return true/false for various combinations of the four field settings:
+     * <ol>
+     *     <li>exists</li>
+     *     <li>isIndexed</li>
+     *     <li>hasDocValues</li>
+     *     <li>hasExactSubfield</li>
+     * </ol>
+     * The default will return true for all fields. The include/exclude methods can be used to configure the settings for specific fields.
+     * If you call 'include' with no fields, it will switch to return false for all fields.
+     */
+    public static class TestConfigurableSearchStats extends TestSearchStats {
+        public enum Config {
+            EXISTS,
+            INDEXED,
+            DOC_VALUES,
+            EXACT_SUBFIELD
+        }
+
+        private final Map<Config, Set<String>> includes = new HashMap<>();
+        private final Map<Config, Set<String>> excludes = new HashMap<>();
+
+        public TestConfigurableSearchStats include(Config key, String... fields) {
+            // If this method is called with no fields, it is interpreted to mean include none, so we include a dummy field
+            for (String field : fields.length == 0 ? new String[] { "-" } : fields) {
+                includes.computeIfAbsent(key, k -> new HashSet<>()).add(field);
+                excludes.computeIfAbsent(key, k -> new HashSet<>()).remove(field);
+            }
+            return this;
+        }
+
+        public TestConfigurableSearchStats exclude(Config key, String... fields) {
+            for (String field : fields) {
+                includes.computeIfAbsent(key, k -> new HashSet<>()).remove(field);
+                excludes.computeIfAbsent(key, k -> new HashSet<>()).add(field);
+            }
+            return this;
+        }
+
+        private boolean isConfigationSet(Config config, String field) {
+            Set<String> in = includes.getOrDefault(config, Set.of());
+            Set<String> ex = excludes.getOrDefault(config, Set.of());
+            return (in.isEmpty() || in.contains(field)) && ex.contains(field) == false;
+        }
+
+        @Override
+        public boolean exists(String field) {
+            return isConfigationSet(Config.EXISTS, field);
+        }
 
         @Override
         public boolean isIndexed(String field) {
-            return exists(field);
+            return isConfigationSet(Config.INDEXED, field);
+        }
+
+        @Override
+        public boolean hasDocValues(String field) {
+            return isConfigationSet(Config.DOC_VALUES, field);
+        }
+
+        @Override
+        public boolean hasExactSubfield(String field) {
+            return isConfigationSet(Config.EXACT_SUBFIELD, field);
+        }
+
+        @Override
+        public String toString() {
+            return "TestConfigurableSearchStats{" + "includes=" + includes + ", excludes=" + excludes + '}';
         }
     }
 

+ 30 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-airports_no_doc_values.json

@@ -0,0 +1,30 @@
+{
+  "properties": {
+    "abbrev": {
+      "type": "keyword"
+    },
+    "name": {
+      "type": "text"
+    },
+    "scalerank": {
+      "type": "integer"
+    },
+    "type": {
+      "type": "keyword"
+    },
+    "location": {
+      "type": "geo_point",
+      "index": true,
+      "doc_values": false
+    },
+    "country": {
+      "type": "keyword"
+    },
+    "city": {
+      "type": "keyword"
+    },
+    "city_location": {
+      "type": "geo_point"
+    }
+  }
+}

+ 30 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-airports_not_indexed.json

@@ -0,0 +1,30 @@
+{
+  "properties": {
+    "abbrev": {
+      "type": "keyword"
+    },
+    "name": {
+      "type": "text"
+    },
+    "scalerank": {
+      "type": "integer"
+    },
+    "type": {
+      "type": "keyword"
+    },
+    "location": {
+      "type": "geo_point",
+      "index": false,
+      "doc_values": true
+    },
+    "country": {
+      "type": "keyword"
+    },
+    "city": {
+      "type": "keyword"
+    },
+    "city_location": {
+      "type": "geo_point"
+    }
+  }
+}

+ 0 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-airports-no-doc-values.json → x-pack/plugin/esql/qa/testFixtures/src/main/resources/mapping-airports_not_indexed_nor_doc_values.json


+ 36 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/spatial.csv-spec

@@ -484,6 +484,42 @@ centroid:geo_point                            | count:long
 POINT (42.97109629958868 14.7552534006536)    | 1
 ;
 
+centroidFromAirportsAfterIntersectsCompoundPredicateNoDocValues
+required_capability: st_intersects
+
+FROM airports_no_doc_values
+| WHERE scalerank == 9 AND ST_INTERSECTS(location, TO_GEOSHAPE("POLYGON((42 14, 43 14, 43 15, 42 15, 42 14))")) AND country == "Yemen"
+| STATS centroid=ST_CENTROID_AGG(location), count=COUNT()
+;
+
+centroid:geo_point                            | count:long
+POINT (42.97109629958868 14.7552534006536)    | 1
+;
+
+centroidFromAirportsAfterIntersectsCompoundPredicateNotIndexedNorDocValues
+required_capability: st_intersects
+
+FROM airports_not_indexed_nor_doc_values
+| WHERE scalerank == 9 AND ST_INTERSECTS(location, TO_GEOSHAPE("POLYGON((42 14, 43 14, 43 15, 42 15, 42 14))")) AND country == "Yemen"
+| STATS centroid=ST_CENTROID_AGG(location), count=COUNT()
+;
+
+centroid:geo_point                            | count:long
+POINT (42.97109629958868 14.7552534006536)    | 1
+;
+
+centroidFromAirportsAfterIntersectsCompoundPredicateNotIndexed
+required_capability: st_intersects
+
+FROM airports_not_indexed
+| WHERE scalerank == 9 AND ST_INTERSECTS(location, TO_GEOSHAPE("POLYGON((42 14, 43 14, 43 15, 42 15, 42 14))")) AND country == "Yemen"
+| STATS centroid=ST_CENTROID_AGG(location), count=COUNT()
+;
+
+centroid:geo_point                            | count:long
+POINT (42.97109629958868 14.7552534006536)    | 1
+;
+
 ###############################################
 # Tests for ST_INTERSECTS on GEO_POINT type
 

+ 38 - 40
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/spatial/SpatialPushDownPointsTestCase.java

@@ -12,8 +12,6 @@ import org.elasticsearch.geometry.Point;
 import org.elasticsearch.geometry.utils.GeometryValidator;
 import org.elasticsearch.geometry.utils.WellKnownText;
 import org.elasticsearch.lucene.spatial.CentroidCalculator;
-import org.elasticsearch.xpack.core.esql.action.EsqlQueryRequestBuilder;
-import org.elasticsearch.xpack.core.esql.action.EsqlQueryResponse;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeMatcher;
@@ -22,6 +20,7 @@ import java.io.IOException;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Locale;
 
 import static org.hamcrest.Matchers.closeTo;
@@ -62,8 +61,7 @@ public abstract class SpatialPushDownPointsTestCase extends SpatialPushDownTestC
         CentroidCalculator withinCentroid = new CentroidCalculator();
         CentroidCalculator disjointCentroid = new CentroidCalculator();
         for (int i = 0; i < data.size(); i++) {
-            index("indexed", i + "", "{\"location\" : " + data.get(i).data + " }");
-            index("not-indexed", i + "", "{\"location\" : " + data.get(i).data + " }");
+            addToIndexes(i, data.get(i).data, "indexed", "not-indexed", "not-indexed-nor-doc-values", "no-doc-values");
             if (data.get(i).intersects) {
                 expectedIntersects++;
                 data.get(i).centroid.addTo(intersectsCentroid);
@@ -76,7 +74,7 @@ public abstract class SpatialPushDownPointsTestCase extends SpatialPushDownTestC
                 data.get(i).centroid.addTo(withinCentroid);
             }
         }
-        refresh("indexed", "not-indexed");
+        refresh("indexed", "not-indexed", "not-indexed-nor-doc-values", "no-doc-values");
 
         for (String polygon : new String[] {
             "POLYGON ((-10 -10, -10 10, 10 10, 10 -10, -10 -10))",
@@ -89,24 +87,28 @@ public abstract class SpatialPushDownPointsTestCase extends SpatialPushDownTestC
 
     protected void assertFunction(String spatialFunction, String wkt, long expected, CentroidCalculator centroid) throws IOException,
         ParseException {
-        final String query1 = String.format(Locale.ROOT, """
+        List<String> queries = getQueries(String.format(Locale.ROOT, """
             FROM indexed | WHERE %s(location, %s("%s")) | STATS COUNT(*), ST_CENTROID_AGG(location)
-            """, spatialFunction, castingFunction(), wkt);
-        final String query2 = String.format(Locale.ROOT, """
-             FROM not-indexed | WHERE %s(location, %s("%s")) | STATS COUNT(*), ST_CENTROID_AGG(location)
-            """, spatialFunction, castingFunction(), wkt);
-        try (
-            EsqlQueryResponse response1 = EsqlQueryRequestBuilder.newRequestBuilder(client()).query(query1).get();
-            EsqlQueryResponse response2 = EsqlQueryRequestBuilder.newRequestBuilder(client()).query(query2).get();
-        ) {
-            Object indexedCount = response1.response().column(0).iterator().next();
-            Object notIndexedCount = response2.response().column(0).iterator().next();
-            assertEquals(spatialFunction + "[expected=" + expected + "]", expected, indexedCount);
-            assertEquals(spatialFunction + "[expected=" + expected + "]", expected, notIndexedCount);
-            Object indexedCentroid = response1.response().column(1).iterator().next();
-            Object notIndexedCentroid = response2.response().column(1).iterator().next();
-            assertThat(spatialFunction + "[expected=" + toString(centroid) + "]", centroid, matchesCentroid(indexedCentroid));
-            assertThat(spatialFunction + "[expected=" + toString(centroid) + "]", centroid, matchesCentroid(notIndexedCentroid));
+            """, spatialFunction, castingFunction(), wkt));
+        try (TestQueryResponseCollection responses = new TestQueryResponseCollection(queries)) {
+            for (int i = 0; i < ALL_INDEXES.length; i++) {
+                Object resultCount = responses.getResponse(i, 0);
+                Object resultCentroid = responses.getResponse(i, 1);
+                assertEquals(spatialFunction + "[expected=" + expected + "] for " + ALL_INDEXES[i], expected, resultCount);
+                assertThat(
+                    spatialFunction + "[expected=" + toString(centroid) + "] for " + ALL_INDEXES[i],
+                    centroid,
+                    matchesCentroid(resultCentroid)
+                );
+            }
+            long allIndexesCount = (long) responses.getResponse(ALL_INDEXES.length, 0);
+            assertEquals(spatialFunction + "[expected=" + expected + "] for all indexes", expected * 4, allIndexesCount);
+            Object allIndexesCentroid = responses.getResponse(ALL_INDEXES.length, 1);
+            assertThat(
+                spatialFunction + "[expected=" + toString(centroid) + "] for all indexes",
+                centroid,
+                matchesCentroid(allIndexesCentroid)
+            );
         }
     }
 
@@ -126,16 +128,14 @@ public abstract class SpatialPushDownPointsTestCase extends SpatialPushDownTestC
                 for (int j = 0; j < values.length; j++) {
                     values[j] = "\"" + WellKnownText.toWKT(getIndexGeometry()) + "\"";
                 }
-                index("indexed", i + "", "{\"location\" : " + Arrays.toString(values) + " }");
-                index("not-indexed", i + "", "{\"location\" : " + Arrays.toString(values) + " }");
+                addToIndexes(i, Arrays.toString(values), "indexed", "not-indexed", "not-indexed-nor-doc-values", "no-doc-values");
             } else {
                 final String value = WellKnownText.toWKT(getIndexGeometry());
-                index("indexed", i + "", "{\"location\" : \"" + value + "\" }");
-                index("not-indexed", i + "", "{\"location\" : \"" + value + "\" }");
+                addToIndexes(i, "\"" + value + "\"", "indexed", "not-indexed", "not-indexed-nor-doc-values", "no-doc-values");
             }
         }
 
-        refresh("indexed", "not-indexed");
+        refresh("indexed", "not-indexed", "not-indexed-nor-doc-values", "no-doc-values");
 
         for (int i = 0; i < 10; i++) {
             final Geometry geometry = getIndexGeometry();
@@ -149,19 +149,17 @@ public abstract class SpatialPushDownPointsTestCase extends SpatialPushDownTestC
     protected void assertDistanceFunction(String wkt) {
         String spatialFunction = "ST_DISTANCE";
         String castingFunction = castingFunction().replaceAll("SHAPE", "POINT");
-        final String query1 = String.format(Locale.ROOT, """
-            FROM indexed | WHERE %s(location, %s("%s")) < %.1f | STATS COUNT(*)
-            """, spatialFunction, castingFunction, wkt, searchDistance());
-        final String query2 = String.format(Locale.ROOT, """
-            FROM not-indexed | WHERE %s(location, %s("%s")) < %.1f | STATS COUNT(*)
-            """, spatialFunction, castingFunction, wkt, searchDistance());
-        try (
-            EsqlQueryResponse response1 = EsqlQueryRequestBuilder.newRequestBuilder(client()).query(query1).get();
-            EsqlQueryResponse response2 = EsqlQueryRequestBuilder.newRequestBuilder(client()).query(query2).get();
-        ) {
-            Object indexedResult = response1.response().column(0).iterator().next();
-            Object notIndexedResult = response2.response().column(0).iterator().next();
-            assertEquals(spatialFunction, indexedResult, notIndexedResult);
+        List<String> queries = getQueries(String.format(Locale.ROOT, """
+            FROM index | WHERE %s(location, %s("%s")) < %.1f | STATS COUNT(*)
+            """, spatialFunction, castingFunction, wkt, searchDistance()));
+        try (TestQueryResponseCollection responses = new TestQueryResponseCollection(queries)) {
+            Object indexedResult = responses.getResponse(0, 0);
+            for (int i = 1; i < ALL_INDEXES.length; i++) {
+                Object result = responses.getResponse(i, 0);
+                assertEquals(spatialFunction + " for " + ALL_INDEXES[i], indexedResult, result);
+            }
+            long allIndexesResult = (long) responses.getResponse(ALL_INDEXES.length, 0);
+            assertEquals(spatialFunction + " for all indexes", (long) indexedResult * 4, allIndexesResult);
         }
     }
 

+ 72 - 19
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/spatial/SpatialPushDownTestCase.java

@@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.esql.action.EsqlQueryResponse;
 import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 import org.elasticsearch.xpack.spatial.SpatialPlugin;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -35,6 +36,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
  */
 public abstract class SpatialPushDownTestCase extends ESIntegTestCase {
 
+    protected static final String[] ALL_INDEXES = new String[] { "indexed", "not-indexed", "not-indexed-nor-doc-values", "no-doc-values" };
+
     protected Collection<Class<? extends Plugin>> nodePlugins() {
         return List.of(EsqlPlugin.class, SpatialPlugin.class);
     }
@@ -78,12 +81,34 @@ public abstract class SpatialPushDownTestCase extends ESIntegTestCase {
             """, fieldType())));
 
         assertAcked(prepareCreate("not-indexed").setMapping(String.format(Locale.ROOT, """
+            {
+              "properties" : {
+               "location": { "type" : "%s",  "index" : false, "doc_values" : true }
+              }
+            }
+            """, fieldType())));
+
+        assertAcked(prepareCreate("not-indexed-nor-doc-values").setMapping(String.format(Locale.ROOT, """
             {
               "properties" : {
                "location": { "type" : "%s",  "index" : false, "doc_values" : false }
               }
             }
             """, fieldType())));
+
+        assertAcked(prepareCreate("no-doc-values").setMapping(String.format(Locale.ROOT, """
+            {
+              "properties" : {
+               "location": { "type" : "%s",  "index" : true, "doc_values" : false }
+              }
+            }
+            """, fieldType())));
+    }
+
+    protected void addToIndexes(int id, String values, String... indexes) {
+        for (String index : indexes) {
+            index(index, id + "", "{\"location\" : " + values + " }");
+        }
     }
 
     private void assertPushedDownQueries(boolean multiValue) throws RuntimeException {
@@ -94,16 +119,14 @@ public abstract class SpatialPushDownTestCase extends ESIntegTestCase {
                 for (int j = 0; j < values.length; j++) {
                     values[j] = "\"" + WellKnownText.toWKT(getIndexGeometry()) + "\"";
                 }
-                index("indexed", i + "", "{\"location\" : " + Arrays.toString(values) + " }");
-                index("not-indexed", i + "", "{\"location\" : " + Arrays.toString(values) + " }");
+                addToIndexes(i, Arrays.toString(values), ALL_INDEXES);
             } else {
                 final String value = WellKnownText.toWKT(getIndexGeometry());
-                index("indexed", i + "", "{\"location\" : \"" + value + "\" }");
-                index("not-indexed", i + "", "{\"location\" : \"" + value + "\" }");
+                addToIndexes(i, "\"" + value + "\"", ALL_INDEXES);
             }
         }
 
-        refresh("indexed", "not-indexed");
+        refresh(ALL_INDEXES);
 
         String smallRectangleCW = "POLYGON ((-10 -10, -10 10, 10 10, 10 -10, -10 -10))";
         assertFunction("ST_WITHIN", smallRectangleCW);
@@ -115,27 +138,57 @@ public abstract class SpatialPushDownTestCase extends ESIntegTestCase {
             assertFunction("ST_INTERSECTS", wkt);
             assertFunction("ST_DISJOINT", wkt);
             assertFunction("ST_CONTAINS", wkt);
-            // within and lines are not globally supported so we avoid it here
+            // within and lines are not globally supported, so we avoid it here
             if (containsLine(geometry) == false) {
                 assertFunction("ST_WITHIN", wkt);
             }
         }
     }
 
+    protected List<String> getQueries(String query) {
+        ArrayList<String> queries = new ArrayList<>();
+        Arrays.stream(ALL_INDEXES).forEach(index -> queries.add(query.replaceAll("FROM (\\w+) \\|", "FROM " + index + " |")));
+        queries.add(query.replaceAll("FROM (\\w+) \\|", "FROM " + String.join(",", ALL_INDEXES) + " |"));
+        return queries;
+    }
+
     protected void assertFunction(String spatialFunction, String wkt) {
-        final String query1 = String.format(Locale.ROOT, """
-            FROM indexed | WHERE %s(location, %s("%s")) | STATS COUNT(*)
-            """, spatialFunction, castingFunction(), wkt);
-        final String query2 = String.format(Locale.ROOT, """
-             FROM not-indexed | WHERE %s(location, %s("%s")) | STATS COUNT(*)
-            """, spatialFunction, castingFunction(), wkt);
-        try (
-            EsqlQueryResponse response1 = EsqlQueryRequestBuilder.newRequestBuilder(client()).query(query1).get();
-            EsqlQueryResponse response2 = EsqlQueryRequestBuilder.newRequestBuilder(client()).query(query2).get();
-        ) {
-            Object indexedResult = response1.response().column(0).iterator().next();
-            Object notIndexedResult = response2.response().column(0).iterator().next();
-            assertEquals(spatialFunction, indexedResult, notIndexedResult);
+        List<String> queries = getQueries(String.format(Locale.ROOT, """
+            FROM index | WHERE %s(location, %s("%s")) | STATS COUNT(*)
+            """, spatialFunction, castingFunction(), wkt));
+        try (TestQueryResponseCollection responses = new TestQueryResponseCollection(queries)) {
+            Object indexedResult = responses.getResponse(0, 0);
+            for (int i = 1; i < ALL_INDEXES.length; i++) {
+                Object result = responses.getResponse(i, 0);
+                assertEquals(spatialFunction + " for " + ALL_INDEXES[i], indexedResult, result);
+            }
+            long allIndexesResult = (long) responses.getResponse(ALL_INDEXES.length, 0);
+            assertEquals(spatialFunction + " for all indexes", (long) indexedResult * 4, allIndexesResult);
+        }
+    }
+
+    protected static class TestQueryResponseCollection implements AutoCloseable {
+        private final List<? extends EsqlQueryResponse> responses;
+
+        public TestQueryResponseCollection(List<String> queries) {
+            this.responses = queries.stream().map(query -> {
+                try {
+                    return EsqlQueryRequestBuilder.newRequestBuilder(client()).query(query).get();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }).toList();
+        }
+
+        protected Object getResponse(int index, int column) {
+            return responses.get(index).response().column(column).iterator().next();
+        }
+
+        @Override
+        public void close() {
+            for (EsqlQueryResponse response : responses) {
+                response.close();
+            }
         }
     }
 

+ 14 - 9
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/EnableSpatialDistancePushdown.java

@@ -76,15 +76,15 @@ public class EnableSpatialDistancePushdown extends PhysicalOptimizerRules.Parame
     protected PhysicalPlan rule(FilterExec filterExec, LocalPhysicalOptimizerContext ctx) {
         PhysicalPlan plan = filterExec;
         if (filterExec.child() instanceof EsQueryExec esQueryExec) {
-            plan = rewrite(filterExec, esQueryExec);
+            plan = rewrite(filterExec, esQueryExec, LucenePushdownPredicates.from(ctx.searchStats()));
         } else if (filterExec.child() instanceof EvalExec evalExec && evalExec.child() instanceof EsQueryExec esQueryExec) {
-            plan = rewriteBySplittingFilter(filterExec, evalExec, esQueryExec);
+            plan = rewriteBySplittingFilter(filterExec, evalExec, esQueryExec, LucenePushdownPredicates.from(ctx.searchStats()));
         }
 
         return plan;
     }
 
-    private FilterExec rewrite(FilterExec filterExec, EsQueryExec esQueryExec) {
+    private FilterExec rewrite(FilterExec filterExec, EsQueryExec esQueryExec, LucenePushdownPredicates lucenePushdownPredicates) {
         // Find and rewrite any binary comparisons that involve a distance function and a literal
         var rewritten = filterExec.condition().transformDown(EsqlBinaryComparison.class, comparison -> {
             ComparisonType comparisonType = ComparisonType.from(comparison.getFunctionType());
@@ -95,7 +95,7 @@ public class EnableSpatialDistancePushdown extends PhysicalOptimizerRules.Parame
             }
             return comparison;
         });
-        if (rewritten.equals(filterExec.condition()) == false && canPushToSource(rewritten, x -> false)) {
+        if (rewritten.equals(filterExec.condition()) == false && canPushToSource(rewritten, lucenePushdownPredicates)) {
             return new FilterExec(filterExec.source(), esQueryExec, rewritten);
         }
         return filterExec;
@@ -119,9 +119,14 @@ public class EnableSpatialDistancePushdown extends PhysicalOptimizerRules.Parame
      *     | WHERE other &gt; 10
      * </pre>
      */
-    private PhysicalPlan rewriteBySplittingFilter(FilterExec filterExec, EvalExec evalExec, EsQueryExec esQueryExec) {
+    private PhysicalPlan rewriteBySplittingFilter(
+        FilterExec filterExec,
+        EvalExec evalExec,
+        EsQueryExec esQueryExec,
+        LucenePushdownPredicates lucenePushdownPredicates
+    ) {
         // Find all pushable distance functions in the EVAL
-        Map<NameId, StDistance> distances = getPushableDistances(evalExec.fields());
+        Map<NameId, StDistance> distances = getPushableDistances(evalExec.fields(), lucenePushdownPredicates);
 
         // Don't do anything if there are no distances to push down
         if (distances.isEmpty()) {
@@ -139,7 +144,7 @@ public class EnableSpatialDistancePushdown extends PhysicalOptimizerRules.Parame
             // Find and rewrite any binary comparisons that involve a distance function and a literal
             var rewritten = rewriteDistanceFilters(resExp, distances);
             // If all pushable StDistance functions were found and re-written, we need to re-write the FILTER/EVAL combination
-            if (rewritten.equals(resExp) == false && canPushToSource(rewritten, x -> false)) {
+            if (rewritten.equals(resExp) == false && canPushToSource(rewritten, lucenePushdownPredicates)) {
                 pushable.add(rewritten);
             } else {
                 nonPushable.add(exp);
@@ -163,10 +168,10 @@ public class EnableSpatialDistancePushdown extends PhysicalOptimizerRules.Parame
         }
     }
 
-    private Map<NameId, StDistance> getPushableDistances(List<Alias> aliases) {
+    private Map<NameId, StDistance> getPushableDistances(List<Alias> aliases, LucenePushdownPredicates lucenePushdownPredicates) {
         Map<NameId, StDistance> distances = new LinkedHashMap<>();
         aliases.forEach(alias -> {
-            if (alias.child() instanceof StDistance distance && canPushSpatialFunctionToSource(distance)) {
+            if (alias.child() instanceof StDistance distance && canPushSpatialFunctionToSource(distance, lucenePushdownPredicates)) {
                 distances.put(alias.id(), distance);
             } else if (alias.child() instanceof ReferenceAttribute ref && distances.containsKey(ref.id())) {
                 StDistance distance = distances.get(ref.id());

+ 0 - 37
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/LucenePushDownUtils.java

@@ -1,37 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;
-
-import org.elasticsearch.xpack.esql.core.expression.Expression;
-import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
-import org.elasticsearch.xpack.esql.core.type.DataType;
-import org.elasticsearch.xpack.esql.stats.SearchStats;
-
-import java.util.function.Predicate;
-
-class LucenePushDownUtils {
-    /**
-     * this method is supposed to be used to define if a field can be used for exact push down (eg. sort or filter).
-     * "aggregatable" is the most accurate information we can have from field_caps as of now.
-     * Pushing down operations on fields that are not aggregatable would result in an error.
-     */
-    public static boolean isAggregatable(FieldAttribute f) {
-        return f.exactAttribute().field().isAggregatable();
-    }
-
-    public static boolean hasIdenticalDelegate(FieldAttribute attr, SearchStats stats) {
-        return stats.hasIdenticalDelegate(attr.name());
-    }
-
-    public static boolean isPushableFieldAttribute(Expression exp, Predicate<FieldAttribute> hasIdenticalDelegate) {
-        if (exp instanceof FieldAttribute fa && fa.getExactInfo().hasExact() && isAggregatable(fa)) {
-            return fa.dataType() != DataType.TEXT || hasIdenticalDelegate.test(fa);
-        }
-        return false;
-    }
-}

+ 111 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/LucenePushdownPredicates.java

@@ -0,0 +1,111 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;
+
+import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.stats.SearchStats;
+
+/**
+ * When deciding if a filter or topN can be pushed down to Lucene, we need to check a few things on the field.
+ * Exactly what is checked depends on the type of field and the query. For example, we have the following possible combinations:
+ * <ol>
+ *     <li>A normal filter on a normal field will be pushed down using SingleValueQuery to remove multi-valued results,
+ *         and this requires knowing if the field is indexed and has doc-values.</li>
+ *     <li>A filter using a spatial function will allow multi-valued fields and we only need to know if the field is indexed,
+ *         and do not need doc values.</li>
+ *     <li>A TopN will be pushed down if the field is indexed and has doc values.</li>
+ *     <li>Filters with TEXT fields can only be pushed down if the TEXT field has a nested KEYWORD field,
+ *         referred to here as ExactSubfield. This that this is related to normal ES|QL predicates,
+ *         not the full-text search provided by the MATCH and QSTR functions, which are pushed down separately.</li>
+ * </ol>
+ */
+public interface LucenePushdownPredicates {
+    /**
+     * For TEXT fields, we need to check if the field has a subfield of type KEYWORD that can be used instead.
+     */
+    boolean hasExactSubfield(FieldAttribute attr);
+
+    /**
+     * For pushing down TopN and for pushing down filters with SingleValueQuery,
+     * we need to check if the field is indexed and has doc values.
+     */
+    boolean isIndexedAndHasDocValues(FieldAttribute attr);
+
+    /**
+     * For pushing down filters when multi-value results are allowed (spatial functions like ST_INTERSECTS),
+     * we only need to know if the field is indexed.
+     */
+    boolean isIndexed(FieldAttribute attr);
+
+    /**
+     * We see fields as pushable if either they are aggregatable or they are indexed.
+     * This covers non-indexed cases like <code>AbstractScriptFieldType</code> which hard-coded <code>isAggregatable</code> to true,
+     * as well as normal <code>FieldAttribute</code>'s which can only be pushed down if they are indexed.
+     * The reason we don't just rely entirely on <code>isAggregatable</code> is because this is often false for normal fields, and could
+     * also differ from node to node, and we can physically plan each node separately, allowing Lucene pushdown on the nodes that
+     * support it, and relying on the compute engine for the nodes that do not.
+     */
+    default boolean isPushableFieldAttribute(Expression exp) {
+        if (exp instanceof FieldAttribute fa && fa.getExactInfo().hasExact() && isIndexedAndHasDocValues(fa)) {
+            return (fa.dataType() != DataType.TEXT && fa.dataType() != DataType.SEMANTIC_TEXT) || hasExactSubfield(fa);
+        }
+        return false;
+    }
+
+    /**
+     * The default implementation of this has no access to SearchStats, so it can only make decisions based on the FieldAttribute itself.
+     * In particular, it assumes TEXT fields have no exact subfields (underlying keyword field),
+     * and that isAggregatable means indexed and has hasDocValues.
+     */
+    LucenePushdownPredicates DEFAULT = new LucenePushdownPredicates() {
+        @Override
+        public boolean hasExactSubfield(FieldAttribute attr) {
+            return false;
+        }
+
+        @Override
+        public boolean isIndexedAndHasDocValues(FieldAttribute attr) {
+            // Is the FieldType.isAggregatable() check correct here? In FieldType isAggregatable usually only means hasDocValues
+            return attr.field().isAggregatable();
+        }
+
+        @Override
+        public boolean isIndexed(FieldAttribute attr) {
+            // TODO: This is the original behaviour, but is it correct? In FieldType isAggregatable usually only means hasDocValues
+            return attr.field().isAggregatable();
+        }
+    };
+
+    /**
+     * If we have access to SearchStats over a collection of shards, we can make more fine-grained decisions about what can be pushed down.
+     * This should open up more opportunities for lucene pushdown.
+     */
+    static LucenePushdownPredicates from(SearchStats stats) {
+        return new LucenePushdownPredicates() {
+            @Override
+            public boolean hasExactSubfield(FieldAttribute attr) {
+                return stats.hasExactSubfield(attr.name());
+            }
+
+            @Override
+            public boolean isIndexedAndHasDocValues(FieldAttribute attr) {
+                // We still consider the value of isAggregatable here, because some fields like ScriptFieldTypes are always aggregatable
+                // But this could hide issues with fields that are not indexed but are aggregatable
+                // This is the original behaviour for ES|QL, but is it correct?
+                return attr.field().isAggregatable() || stats.isIndexed(attr.name()) && stats.hasDocValues(attr.name());
+            }
+
+            @Override
+            public boolean isIndexed(FieldAttribute attr) {
+                return stats.isIndexed(attr.name());
+            }
+        };
+    }
+}

+ 29 - 21
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java

@@ -55,11 +55,9 @@ import org.elasticsearch.xpack.esql.planner.PlannerUtils;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.function.Predicate;
 
 import static java.util.Arrays.asList;
 import static org.elasticsearch.xpack.esql.core.expression.predicate.Predicates.splitAnd;
-import static org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushDownUtils.isAggregatable;
 
 public class PushFiltersToSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<FilterExec, LocalPhysicalOptimizerContext> {
 
@@ -78,7 +76,7 @@ public class PushFiltersToSource extends PhysicalOptimizerRules.ParameterizedOpt
         List<Expression> pushable = new ArrayList<>();
         List<Expression> nonPushable = new ArrayList<>();
         for (Expression exp : splitAnd(filterExec.condition())) {
-            (canPushToSource(exp, x -> LucenePushDownUtils.hasIdenticalDelegate(x, ctx.searchStats())) ? pushable : nonPushable).add(exp);
+            (canPushToSource(exp, LucenePushdownPredicates.from(ctx.searchStats())) ? pushable : nonPushable).add(exp);
         }
         return rewrite(filterExec, queryExec, pushable, nonPushable, List.of());
     }
@@ -94,9 +92,7 @@ public class PushFiltersToSource extends PhysicalOptimizerRules.ParameterizedOpt
         List<Expression> nonPushable = new ArrayList<>();
         for (Expression exp : splitAnd(filterExec.condition())) {
             Expression resExp = exp.transformUp(ReferenceAttribute.class, r -> aliasReplacedBy.resolve(r, r));
-            (canPushToSource(resExp, x -> LucenePushDownUtils.hasIdenticalDelegate(x, ctx.searchStats())) ? pushable : nonPushable).add(
-                exp
-            );
+            (canPushToSource(resExp, LucenePushdownPredicates.from(ctx.searchStats())) ? pushable : nonPushable).add(exp);
         }
         // Replace field references with their actual field attributes
         pushable.replaceAll(e -> e.transformDown(ReferenceAttribute.class, r -> aliasReplacedBy.resolve(r, r)));
@@ -222,17 +218,27 @@ public class PushFiltersToSource extends PhysicalOptimizerRules.ParameterizedOpt
         return changed ? CollectionUtils.combine(others, bcs, ranges) : pushable;
     }
 
-    public static boolean canPushToSource(Expression exp, Predicate<FieldAttribute> hasIdenticalDelegate) {
+    /**
+     * Check if the given expression can be pushed down to the source.
+     * This version of the check is called when we do not have SearchStats available. It assumes no exact subfields for TEXT fields,
+     * and makes the indexed/doc-values check using the isAggregatable flag only, which comes from field-caps, represents the field state
+     * over the entire cluster (is not node specific), and has risks for indexed=false/doc_values=true fields.
+     */
+    public static boolean canPushToSource(Expression exp) {
+        return canPushToSource(exp, LucenePushdownPredicates.DEFAULT);
+    }
+
+    static boolean canPushToSource(Expression exp, LucenePushdownPredicates lucenePushdownPredicates) {
         if (exp instanceof BinaryComparison bc) {
-            return isAttributePushable(bc.left(), bc, hasIdenticalDelegate) && bc.right().foldable();
+            return isAttributePushable(bc.left(), bc, lucenePushdownPredicates) && bc.right().foldable();
         } else if (exp instanceof InsensitiveBinaryComparison bc) {
-            return isAttributePushable(bc.left(), bc, hasIdenticalDelegate) && bc.right().foldable();
+            return isAttributePushable(bc.left(), bc, lucenePushdownPredicates) && bc.right().foldable();
         } else if (exp instanceof BinaryLogic bl) {
-            return canPushToSource(bl.left(), hasIdenticalDelegate) && canPushToSource(bl.right(), hasIdenticalDelegate);
+            return canPushToSource(bl.left(), lucenePushdownPredicates) && canPushToSource(bl.right(), lucenePushdownPredicates);
         } else if (exp instanceof In in) {
-            return isAttributePushable(in.value(), null, hasIdenticalDelegate) && Expressions.foldable(in.list());
+            return isAttributePushable(in.value(), null, lucenePushdownPredicates) && Expressions.foldable(in.list());
         } else if (exp instanceof Not not) {
-            return canPushToSource(not.field(), hasIdenticalDelegate);
+            return canPushToSource(not.field(), lucenePushdownPredicates);
         } else if (exp instanceof UnaryScalarFunction usf) {
             if (usf instanceof RegexMatch<?> || usf instanceof IsNull || usf instanceof IsNotNull) {
                 if (usf instanceof IsNull || usf instanceof IsNotNull) {
@@ -240,12 +246,13 @@ public class PushFiltersToSource extends PhysicalOptimizerRules.ParameterizedOpt
                         return true;
                     }
                 }
-                return isAttributePushable(usf.field(), usf, hasIdenticalDelegate);
+                return isAttributePushable(usf.field(), usf, lucenePushdownPredicates);
             }
         } else if (exp instanceof CIDRMatch cidrMatch) {
-            return isAttributePushable(cidrMatch.ipField(), cidrMatch, hasIdenticalDelegate) && Expressions.foldable(cidrMatch.matches());
+            return isAttributePushable(cidrMatch.ipField(), cidrMatch, lucenePushdownPredicates)
+                && Expressions.foldable(cidrMatch.matches());
         } else if (exp instanceof SpatialRelatesFunction spatial) {
-            return canPushSpatialFunctionToSource(spatial);
+            return canPushSpatialFunctionToSource(spatial, lucenePushdownPredicates);
         } else if (exp instanceof StringQueryPredicate) {
             return true;
         } else if (exp instanceof QueryString) {
@@ -259,23 +266,24 @@ public class PushFiltersToSource extends PhysicalOptimizerRules.ParameterizedOpt
     /**
      * Push-down to Lucene is only possible if one field is an indexed spatial field, and the other is a constant spatial or string column.
      */
-    public static boolean canPushSpatialFunctionToSource(BinarySpatialFunction s) {
+    public static boolean canPushSpatialFunctionToSource(BinarySpatialFunction s, LucenePushdownPredicates lucenePushdownPredicates) {
         // The use of foldable here instead of SpatialEvaluatorFieldKey.isConstant is intentional to match the behavior of the
         // Lucene pushdown code in EsqlTranslationHandler::SpatialRelatesTranslator
         // We could enhance both places to support ReferenceAttributes that refer to constants, but that is a larger change
-        return isPushableSpatialAttribute(s.left()) && s.right().foldable() || isPushableSpatialAttribute(s.right()) && s.left().foldable();
+        return isPushableSpatialAttribute(s.left(), lucenePushdownPredicates) && s.right().foldable()
+            || isPushableSpatialAttribute(s.right(), lucenePushdownPredicates) && s.left().foldable();
     }
 
-    private static boolean isPushableSpatialAttribute(Expression exp) {
-        return exp instanceof FieldAttribute fa && fa.getExactInfo().hasExact() && isAggregatable(fa) && DataType.isSpatial(fa.dataType());
+    private static boolean isPushableSpatialAttribute(Expression exp, LucenePushdownPredicates p) {
+        return exp instanceof FieldAttribute fa && DataType.isSpatial(fa.dataType()) && fa.getExactInfo().hasExact() && p.isIndexed(fa);
     }
 
     private static boolean isAttributePushable(
         Expression expression,
         Expression operation,
-        Predicate<FieldAttribute> hasIdenticalDelegate
+        LucenePushdownPredicates lucenePushdownPredicates
     ) {
-        if (LucenePushDownUtils.isPushableFieldAttribute(expression, hasIdenticalDelegate)) {
+        if (lucenePushdownPredicates.isPushableFieldAttribute(expression)) {
             return true;
         }
         if (expression instanceof MetadataAttribute ma && ma.searchable()) {

+ 7 - 8
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSource.java

@@ -30,7 +30,6 @@ import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.function.Predicate;
 
 /**
  * We handle two main scenarios here:
@@ -60,7 +59,7 @@ import java.util.function.Predicate;
 public class PushTopNToSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<TopNExec, LocalPhysicalOptimizerContext> {
     @Override
     protected PhysicalPlan rule(TopNExec topNExec, LocalPhysicalOptimizerContext ctx) {
-        Pushable pushable = evaluatePushable(topNExec, x -> LucenePushDownUtils.hasIdenticalDelegate(x, ctx.searchStats()));
+        Pushable pushable = evaluatePushable(topNExec, LucenePushdownPredicates.from(ctx.searchStats()));
         return pushable.rewrite(topNExec);
     }
 
@@ -121,11 +120,11 @@ public class PushTopNToSource extends PhysicalOptimizerRules.ParameterizedOptimi
         }
     }
 
-    private static Pushable evaluatePushable(TopNExec topNExec, Predicate<FieldAttribute> hasIdenticalDelegate) {
+    private static Pushable evaluatePushable(TopNExec topNExec, LucenePushdownPredicates lucenePushdownPredicates) {
         PhysicalPlan child = topNExec.child();
         if (child instanceof EsQueryExec queryExec
             && queryExec.canPushSorts()
-            && canPushDownOrders(topNExec.order(), hasIdenticalDelegate)) {
+            && canPushDownOrders(topNExec.order(), lucenePushdownPredicates)) {
             // With the simplest case of `FROM index | SORT ...` we only allow pushing down if the sort is on a field
             return new PushableQueryExec(queryExec);
         }
@@ -148,7 +147,7 @@ public class PushTopNToSource extends PhysicalOptimizerRules.ParameterizedOptimi
 
             List<EsQueryExec.Sort> pushableSorts = new ArrayList<>();
             for (Order order : orders) {
-                if (LucenePushDownUtils.isPushableFieldAttribute(order.child(), hasIdenticalDelegate)) {
+                if (lucenePushdownPredicates.isPushableFieldAttribute(order.child())) {
                     pushableSorts.add(
                         new EsQueryExec.FieldSort(
                             ((FieldAttribute) order.child()).exactAttribute(),
@@ -169,7 +168,7 @@ public class PushTopNToSource extends PhysicalOptimizerRules.ParameterizedOptimi
                             break;
                         }
                     } else if (aliasReplacedBy.resolve(referenceAttribute, referenceAttribute) instanceof FieldAttribute fieldAttribute
-                        && LucenePushDownUtils.isPushableFieldAttribute(fieldAttribute, hasIdenticalDelegate)) {
+                        && lucenePushdownPredicates.isPushableFieldAttribute(fieldAttribute)) {
                             // If the SORT refers to a reference to a pushable field, we can push it down
                             pushableSorts.add(
                                 new EsQueryExec.FieldSort(fieldAttribute.exactAttribute(), order.direction(), order.nullsPosition())
@@ -192,9 +191,9 @@ public class PushTopNToSource extends PhysicalOptimizerRules.ParameterizedOptimi
         return NO_OP;
     }
 
-    private static boolean canPushDownOrders(List<Order> orders, Predicate<FieldAttribute> hasIdenticalDelegate) {
+    private static boolean canPushDownOrders(List<Order> orders, LucenePushdownPredicates lucenePushdownPredicates) {
         // allow only exact FieldAttributes (no expressions) for sorting
-        return orders.stream().allMatch(o -> LucenePushDownUtils.isPushableFieldAttribute(o.child(), hasIdenticalDelegate));
+        return orders.stream().allMatch(o -> lucenePushdownPredicates.isPushableFieldAttribute(o.child()));
     }
 
     private static List<EsQueryExec.Sort> buildFieldSorts(List<Order> orders) {

+ 14 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SpatialDocValuesExtraction.java

@@ -15,6 +15,7 @@ import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.SpatialAggregateFunction;
 import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.BinarySpatialFunction;
 import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.SpatialRelatesFunction;
+import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
 import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
 import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
 import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
@@ -22,6 +23,7 @@ import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
 import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
+import org.elasticsearch.xpack.esql.stats.SearchStats;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -63,9 +65,11 @@ import java.util.Set;
  * is the only place where this information is available. This also means that the knowledge of the usage of doc-values does not need
  * to be serialized between nodes, and is only used locally.
  */
-public class SpatialDocValuesExtraction extends PhysicalOptimizerRules.OptimizerRule<AggregateExec> {
+public class SpatialDocValuesExtraction extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
+    AggregateExec,
+    LocalPhysicalOptimizerContext> {
     @Override
-    protected PhysicalPlan rule(AggregateExec aggregate) {
+    protected PhysicalPlan rule(AggregateExec aggregate, LocalPhysicalOptimizerContext ctx) {
         var foundAttributes = new HashSet<FieldAttribute>();
 
         PhysicalPlan plan = aggregate.transformDown(UnaryExec.class, exec -> {
@@ -75,7 +79,7 @@ public class SpatialDocValuesExtraction extends PhysicalOptimizerRules.Optimizer
                 for (NamedExpression aggExpr : agg.aggregates()) {
                     if (aggExpr instanceof Alias as && as.child() instanceof SpatialAggregateFunction af) {
                         if (af.field() instanceof FieldAttribute fieldAttribute
-                            && allowedForDocValues(fieldAttribute, agg, foundAttributes)) {
+                            && allowedForDocValues(fieldAttribute, ctx.searchStats(), agg, foundAttributes)) {
                             // We need to both mark the field to load differently, and change the spatial function to know to use it
                             foundAttributes.add(fieldAttribute);
                             changedAggregates = true;
@@ -153,8 +157,13 @@ public class SpatialDocValuesExtraction extends PhysicalOptimizerRules.Optimizer
      * This function disallows the use of more than one field for doc-values extraction in the same spatial relation function.
      * This is because comparing two doc-values fields is not supported in the current implementation.
      */
-    private boolean allowedForDocValues(FieldAttribute fieldAttribute, AggregateExec agg, Set<FieldAttribute> foundAttributes) {
-        if (fieldAttribute.field().isAggregatable() == false) {
+    private boolean allowedForDocValues(
+        FieldAttribute fieldAttribute,
+        SearchStats stats,
+        AggregateExec agg,
+        Set<FieldAttribute> foundAttributes
+    ) {
+        if (stats.hasDocValues(fieldAttribute.fieldName()) == false) {
             return false;
         }
         var candidateDocValuesAttributes = new HashSet<>(foundAttributes);

+ 12 - 12
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

@@ -20,7 +20,6 @@ import org.elasticsearch.index.query.SearchExecutionContext;
 import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
 import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
-import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
 import org.elasticsearch.xpack.esql.core.expression.predicate.Predicates;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -52,13 +51,13 @@ import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
 import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper;
 import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
 import org.elasticsearch.xpack.esql.session.Configuration;
+import org.elasticsearch.xpack.esql.stats.SearchContextStats;
 import org.elasticsearch.xpack.esql.stats.SearchStats;
 
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.function.Predicate;
 
 import static java.util.Arrays.asList;
 import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES;
@@ -138,7 +137,7 @@ public class PlannerUtils {
     }
 
     public static PhysicalPlan localPlan(List<SearchExecutionContext> searchContexts, Configuration configuration, PhysicalPlan plan) {
-        return localPlan(configuration, plan, new SearchStats(searchContexts));
+        return localPlan(configuration, plan, SearchContextStats.from(searchContexts));
     }
 
     public static PhysicalPlan localPlan(Configuration configuration, PhysicalPlan plan, SearchStats searchStats) {
@@ -174,17 +173,18 @@ public class PlannerUtils {
     }
 
     /**
-     * Extracts the ES query provided by the filter parameter
-     * @param plan
-     * @param hasIdenticalDelegate a lambda that given a field attribute sayis if it has
-     *                             a synthetic source delegate with the exact same value
-     * @return
+     * Extracts the ES query for the <code>@timestamp</code> field for the passed plan.
      */
-    public static QueryBuilder requestFilter(PhysicalPlan plan, Predicate<FieldAttribute> hasIdenticalDelegate) {
-        return detectFilter(plan, "@timestamp", hasIdenticalDelegate);
+    public static QueryBuilder requestTimestampFilter(PhysicalPlan plan) {
+        return detectFilter(plan, "@timestamp");
     }
 
-    static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName, Predicate<FieldAttribute> hasIdenticalDelegate) {
+    /**
+     * Note that since this filter does not have access to SearchStats, it cannot detect if the field is a text field with a delegate.
+     * We currently only use this filter for the @timestamp field, which is always a date field. Any tests that wish to use this should
+     * take care to not use it with TEXT fields.
+     */
+    static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) {
         // first position is the REST filter, the second the query filter
         var requestFilter = new QueryBuilder[] { null, null };
 
@@ -205,7 +205,7 @@ public class PlannerUtils {
                         boolean matchesField = refs.removeIf(e -> fieldName.equals(e.name()));
                         // the expression only contains the target reference
                         // and the expression is pushable (functions can be fully translated)
-                        if (matchesField && refs.isEmpty() && canPushToSource(exp, hasIdenticalDelegate)) {
+                        if (matchesField && refs.isEmpty() && canPushToSource(exp)) {
                             matches.add(exp);
                         }
                     }

+ 1 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -309,11 +309,7 @@ public class ComputeService {
                 return reductionNode == null ? f : f.withReducer(reductionNode);
             });
 
-        // The lambda is to say if a TEXT field has an identical exact subfield
-        // We cannot use SearchContext because we don't have it yet.
-        // Since it's used only for @timestamp, it is relatively safe to assume it's not needed
-        // but it would be better to have a proper impl.
-        QueryBuilder requestFilter = PlannerUtils.requestFilter(planWithReducer, x -> true);
+        QueryBuilder requestFilter = PlannerUtils.requestTimestampFilter(planWithReducer);
         var lookupListener = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink());
         // SearchShards API can_match is done in lookupDataNodes
         lookupDataNodes(parentTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(dataNodeResult -> {

+ 357 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/stats/SearchContextStats.java

@@ -0,0 +1,357 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.stats;
+
+import org.apache.lucene.index.DocValuesType;
+import org.apache.lucene.index.FieldInfo;
+import org.apache.lucene.index.FieldInfos;
+import org.apache.lucene.index.IndexOptions;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.LeafReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.PointValues;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.Terms;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.index.mapper.ConstantFieldType;
+import org.elasticsearch.index.mapper.DocCountFieldMapper.DocCountFieldType;
+import org.elasticsearch.index.mapper.IdFieldMapper;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.mapper.NumberFieldMapper.NumberFieldType;
+import org.elasticsearch.index.mapper.SeqNoFieldMapper;
+import org.elasticsearch.index.mapper.TextFieldMapper;
+import org.elasticsearch.index.query.SearchExecutionContext;
+import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper.TimestampFieldType;
+import static org.elasticsearch.index.mapper.DateFieldMapper.DateFieldType;
+import static org.elasticsearch.index.mapper.KeywordFieldMapper.KeywordFieldType;
+
+/**
+ * This class provides <code>SearchStats</code> from a list of <code>SearchExecutionContext</code>'s.
+ * It contains primarily a cache of <code>FieldStats</code> which is dynamically updated as needed.
+ * Each <code>FieldStats</code> contains <code>FieldConfig</code> information which is populated once at creation time.
+ * The remaining statistics are lazily computed and cached only on demand.
+ * This cache is not thread-safe.
+ */
+public class SearchContextStats implements SearchStats {
+
+    private final List<SearchExecutionContext> contexts;
+
+    private record FieldConfig(boolean exists, boolean hasExactSubfield, boolean indexed, boolean hasDocValues) {}
+
+    private static class FieldStats {
+        private Long count;
+        private Object min, max;
+        private Boolean singleValue;
+        private FieldConfig config;
+    }
+
+    private static final int CACHE_SIZE = 32;
+
+    // simple non-thread-safe cache for avoiding unnecessary IO (which while fast is still I/O)
+    private final Map<String, FieldStats> cache = new LinkedHashMap<>(CACHE_SIZE, 0.75f, true) {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<String, FieldStats> eldest) {
+            return size() > CACHE_SIZE;
+        }
+    };
+
+    public static SearchStats from(List<SearchExecutionContext> contexts) {
+        if (contexts == null || contexts.isEmpty()) {
+            return SearchStats.EMPTY;
+        }
+        return new SearchContextStats(contexts);
+    }
+
+    private SearchContextStats(List<SearchExecutionContext> contexts) {
+        this.contexts = contexts;
+        assert contexts != null && contexts.isEmpty() == false;
+    }
+
+    public boolean exists(String field) {
+        var stat = cache.computeIfAbsent(field, this::makeFieldStats);
+        return stat.config.exists;
+    }
+
+    private FieldStats makeFieldStats(String field) {
+        var stat = new FieldStats();
+        stat.config = makeFieldConfig(field);
+        return stat;
+    }
+
+    private FieldConfig makeFieldConfig(String field) {
+        boolean exists = false;
+        boolean hasExactSubfield = true;
+        boolean indexed = true;
+        boolean hasDocValues = true;
+        // even if there are deleted documents, check the existence of a field
+        // since if it's missing, deleted documents won't change that
+        for (SearchExecutionContext context : contexts) {
+            if (context.isFieldMapped(field)) {
+                exists = exists || true;
+                MappedFieldType type = context.getFieldType(field);
+                indexed = indexed && type.isIndexed();
+                hasDocValues = hasDocValues && type.hasDocValues();
+                if (type instanceof TextFieldMapper.TextFieldType t) {
+                    hasExactSubfield = hasExactSubfield && t.canUseSyntheticSourceDelegateForQuerying();
+                } else {
+                    hasExactSubfield = false;
+                }
+            } else {
+                indexed = false;
+                hasDocValues = false;
+                hasExactSubfield = false;
+            }
+        }
+        if (exists == false) {
+            // if it does not exist on any context, no other settings are valid
+            return new FieldConfig(false, false, false, false);
+        } else {
+            return new FieldConfig(exists, hasExactSubfield, indexed, hasDocValues);
+        }
+    }
+
+    public boolean isIndexed(String field) {
+        var stat = cache.computeIfAbsent(field, this::makeFieldStats);
+        return stat.config.indexed;
+    }
+
+    public boolean hasDocValues(String field) {
+        var stat = cache.computeIfAbsent(field, this::makeFieldStats);
+        return stat.config.hasDocValues;
+    }
+
+    public boolean hasExactSubfield(String field) {
+        var stat = cache.computeIfAbsent(field, this::makeFieldStats);
+        return stat.config.hasExactSubfield;
+    }
+
+    public long count() {
+        var count = new long[] { 0 };
+        boolean completed = doWithContexts(r -> {
+            count[0] += r.numDocs();
+            return true;
+        }, false);
+        return completed ? count[0] : -1;
+    }
+
+    public long count(String field) {
+        var stat = cache.computeIfAbsent(field, this::makeFieldStats);
+        if (stat.count == null) {
+            var count = new long[] { 0 };
+            boolean completed = doWithContexts(r -> {
+                count[0] += countEntries(r, field);
+                return true;
+            }, false);
+            stat.count = completed ? count[0] : -1;
+        }
+        return stat.count;
+    }
+
+    public long count(String field, BytesRef value) {
+        var count = new long[] { 0 };
+        Term term = new Term(field, value);
+        boolean completed = doWithContexts(r -> {
+            count[0] += r.docFreq(term);
+            return true;
+        }, false);
+        return completed ? count[0] : -1;
+    }
+
+    public byte[] min(String field, DataType dataType) {
+        var stat = cache.computeIfAbsent(field, this::makeFieldStats);
+        if (stat.min == null) {
+            var min = new byte[][] { null };
+            doWithContexts(r -> {
+                byte[] localMin = PointValues.getMinPackedValue(r, field);
+                // TODO: how to compare with the previous min
+                if (localMin != null) {
+                    if (min[0] == null) {
+                        min[0] = localMin;
+                    } else {
+                        throw new EsqlIllegalArgumentException("Don't know how to compare with previous min");
+                    }
+                }
+                return true;
+            }, true);
+            stat.min = min[0];
+        }
+        // return stat.min;
+        return null;
+    }
+
+    public byte[] max(String field, DataType dataType) {
+        var stat = cache.computeIfAbsent(field, this::makeFieldStats);
+        if (stat.max == null) {
+            var max = new byte[][] { null };
+            doWithContexts(r -> {
+                byte[] localMax = PointValues.getMaxPackedValue(r, field);
+                // TODO: how to compare with the previous max
+                if (localMax != null) {
+                    if (max[0] == null) {
+                        max[0] = localMax;
+                    } else {
+                        throw new EsqlIllegalArgumentException("Don't know how to compare with previous max");
+                    }
+                }
+                return true;
+            }, true);
+            stat.max = max[0];
+        }
+        // return stat.max;
+        return null;
+    }
+
+    public boolean isSingleValue(String field) {
+        var stat = cache.computeIfAbsent(field, this::makeFieldStats);
+        if (stat.singleValue == null) {
+            // there's no such field so no need to worry about multi-value fields
+            if (exists(field) == false) {
+                stat.singleValue = true;
+            } else {
+                // fields are MV per default
+                var sv = new boolean[] { false };
+                for (SearchExecutionContext context : contexts) {
+                    MappedFieldType mappedType = context.isFieldMapped(field) ? context.getFieldType(field) : null;
+                    if (mappedType != null) {
+                        sv[0] = true;
+                        doWithContexts(r -> {
+                            sv[0] &= detectSingleValue(r, mappedType, field);
+                            return sv[0];
+                        }, true);
+                        break;
+                    }
+                }
+                stat.singleValue = sv[0];
+            }
+        }
+        return stat.singleValue;
+    }
+
+    private boolean detectSingleValue(IndexReader r, MappedFieldType fieldType, String name) throws IOException {
+        // types that are always single value (and are accessible through instanceof)
+        if (fieldType instanceof ConstantFieldType || fieldType instanceof DocCountFieldType || fieldType instanceof TimestampFieldType) {
+            return true;
+        }
+
+        var typeName = fieldType.typeName();
+
+        // non-visible fields, check their names
+        boolean found = switch (typeName) {
+            case IdFieldMapper.NAME, SeqNoFieldMapper.NAME -> true;
+            default -> false;
+        };
+
+        if (found) {
+            return true;
+        }
+
+        // check against doc size
+        DocCountTester tester = null;
+        if (fieldType instanceof DateFieldType || fieldType instanceof NumberFieldType) {
+            tester = lr -> {
+                PointValues values = lr.getPointValues(name);
+                return values == null || values.size() == values.getDocCount();
+            };
+        } else if (fieldType instanceof KeywordFieldType) {
+            tester = lr -> {
+                Terms terms = lr.terms(name);
+                return terms == null || terms.size() == terms.getDocCount();
+            };
+        }
+
+        if (tester != null) {
+            // check each leaf
+            for (LeafReaderContext context : r.leaves()) {
+                if (tester.test(context.reader()) == false) {
+                    return false;
+                }
+            }
+            // field is missing or single value
+            return true;
+        }
+
+        // unsupported type - default to MV
+        return false;
+    }
+
+    private interface DocCountTester {
+        Boolean test(LeafReader leafReader) throws IOException;
+    }
+
+    //
+    // @see org.elasticsearch.search.query.QueryPhaseCollectorManager#shortcutTotalHitCount(IndexReader, Query)
+    //
+    private static long countEntries(IndexReader indexReader, String field) {
+        long count = 0;
+        try {
+            for (LeafReaderContext context : indexReader.leaves()) {
+                LeafReader reader = context.reader();
+                FieldInfos fieldInfos = reader.getFieldInfos();
+                FieldInfo fieldInfo = fieldInfos.fieldInfo(field);
+
+                if (fieldInfo != null) {
+                    if (fieldInfo.getDocValuesType() == DocValuesType.NONE) {
+                        // no shortcut possible: it's a text field, empty values are counted as no value.
+                        return -1;
+                    }
+                    if (fieldInfo.getPointIndexDimensionCount() > 0) {
+                        PointValues points = reader.getPointValues(field);
+                        if (points != null) {
+                            count += points.size();
+                        }
+                    } else if (fieldInfo.getIndexOptions() != IndexOptions.NONE) {
+                        Terms terms = reader.terms(field);
+                        if (terms != null) {
+                            count += terms.getSumTotalTermFreq();
+                        }
+                    } else {
+                        return -1; // no shortcut possible for fields that are not indexed
+                    }
+                }
+            }
+        } catch (IOException ex) {
+            throw new EsqlIllegalArgumentException("Cannot access data storage", ex);
+        }
+        return count;
+    }
+
+    private interface IndexReaderConsumer {
+        /**
+         * Returns true if the consumer should keep on going, false otherwise.
+         */
+        boolean consume(IndexReader reader) throws IOException;
+    }
+
+    private boolean doWithContexts(IndexReaderConsumer consumer, boolean acceptsDeletions) {
+        try {
+            for (SearchExecutionContext context : contexts) {
+                for (LeafReaderContext leafContext : context.searcher().getLeafContexts()) {
+                    var reader = leafContext.reader();
+                    if (acceptsDeletions == false && reader.hasDeletions()) {
+                        return false;
+                    }
+                    // check if the looping continues or not
+                    if (consumer.consume(reader) == false) {
+                        return false;
+                    }
+                }
+            }
+            return true;
+        } catch (IOException ex) {
+            throw new EsqlIllegalArgumentException("Cannot access data storage", ex);
+        }
+    }
+}

+ 49 - 322
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/stats/SearchStats.java

@@ -7,363 +7,90 @@
 
 package org.elasticsearch.xpack.esql.stats;
 
-import org.apache.lucene.index.DocValuesType;
-import org.apache.lucene.index.FieldInfo;
-import org.apache.lucene.index.FieldInfos;
-import org.apache.lucene.index.IndexOptions;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.LeafReader;
-import org.apache.lucene.index.LeafReaderContext;
-import org.apache.lucene.index.PointValues;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.Terms;
 import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.index.mapper.AbstractScriptFieldType;
-import org.elasticsearch.index.mapper.ConstantFieldType;
-import org.elasticsearch.index.mapper.DocCountFieldMapper.DocCountFieldType;
-import org.elasticsearch.index.mapper.IdFieldMapper;
-import org.elasticsearch.index.mapper.MappedFieldType;
-import org.elasticsearch.index.mapper.NumberFieldMapper.NumberFieldType;
-import org.elasticsearch.index.mapper.SeqNoFieldMapper;
-import org.elasticsearch.index.mapper.TextFieldMapper;
-import org.elasticsearch.index.query.SearchExecutionContext;
-import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 
-import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+/**
+ * Interface for determining information about fields in the index.
+ * This is used by the optimizer to make decisions about how to optimize queries.
+ */
+public interface SearchStats {
+    SearchStats EMPTY = new EmptySearchStats();
 
-import static org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper.TimestampFieldType;
-import static org.elasticsearch.index.mapper.DateFieldMapper.DateFieldType;
-import static org.elasticsearch.index.mapper.KeywordFieldMapper.KeywordFieldType;
+    boolean exists(String field);
 
-public class SearchStats {
+    boolean isIndexed(String field);
 
-    private final List<SearchExecutionContext> contexts;
+    boolean hasDocValues(String field);
 
-    private static class FieldStat {
-        private Long count;
-        private Object min, max;
-        // TODO: use a multi-bitset instead
-        private Boolean exists;
-        private Boolean singleValue;
-        private Boolean hasIdenticalDelegate;
-        private Boolean indexed;
-        private Boolean runtime;
-    }
+    boolean hasExactSubfield(String field);
 
-    private static final int CACHE_SIZE = 32;
+    long count();
 
-    // simple non-thread-safe cache for avoiding unnecessary IO (which while fast it still I/O)
-    private final Map<String, FieldStat> cache = new LinkedHashMap<>(CACHE_SIZE, 0.75f, true) {
-        @Override
-        protected boolean removeEldestEntry(Map.Entry<String, FieldStat> eldest) {
-            return size() > CACHE_SIZE;
-        }
-    };
+    long count(String field);
 
-    public SearchStats(List<SearchExecutionContext> contexts) {
-        this.contexts = contexts;
-    }
+    long count(String field, BytesRef value);
 
-    public long count() {
-        var count = new long[] { 0 };
-        boolean completed = doWithContexts(r -> {
-            count[0] += r.numDocs();
-            return true;
-        }, false);
-        return completed ? count[0] : -1;
-    }
+    byte[] min(String field, DataType dataType);
 
-    public long count(String field) {
-        var stat = cache.computeIfAbsent(field, s -> new FieldStat());
-        if (stat.count == null) {
-            var count = new long[] { 0 };
-            boolean completed = doWithContexts(r -> {
-                count[0] += countEntries(r, field);
-                return true;
-            }, false);
-            stat.count = completed ? count[0] : -1;
-        }
-        return stat.count;
-    }
+    byte[] max(String field, DataType dataType);
 
-    public long count(String field, BytesRef value) {
-        var count = new long[] { 0 };
-        Term term = new Term(field, value);
-        boolean completed = doWithContexts(r -> {
-            count[0] += r.docFreq(term);
-            return true;
-        }, false);
-        return completed ? count[0] : -1;
-    }
+    boolean isSingleValue(String field);
 
-    public boolean exists(String field) {
-        var stat = cache.computeIfAbsent(field, s -> new FieldStat());
-        if (stat.exists == null) {
-            stat.exists = false;
-            // even if there are deleted documents, check the existence of a field
-            // since if it's missing, deleted documents won't change that
-            for (SearchExecutionContext context : contexts) {
-                if (context.isFieldMapped(field)) {
-                    stat.exists = true;
-                    break;
-                }
-            }
+    /**
+     * When there are no search stats available, for example when there are no search contexts, we have static results.
+     */
+    record EmptySearchStats() implements SearchStats {
 
-            // populate additional properties to save on the lookups
-            if (stat.exists == false) {
-                stat.indexed = false;
-                stat.singleValue = true;
-            }
-        }
-        return stat.exists;
-    }
-
-    public boolean hasIdenticalDelegate(String field) {
-        var stat = cache.computeIfAbsent(field, s -> new FieldStat());
-        if (stat.hasIdenticalDelegate == null) {
-            stat.hasIdenticalDelegate = true;
-            for (SearchExecutionContext context : contexts) {
-                if (context.isFieldMapped(field)) {
-                    MappedFieldType type = context.getFieldType(field);
-                    if (type instanceof TextFieldMapper.TextFieldType t) {
-                        if (t.canUseSyntheticSourceDelegateForQuerying() == false) {
-                            stat.hasIdenticalDelegate = false;
-                            break;
-                        }
-                    } else {
-                        stat.hasIdenticalDelegate = false;
-                        break;
-                    }
-                }
-            }
+        @Override
+        public boolean exists(String field) {
+            return false;
         }
-        return stat.hasIdenticalDelegate;
-    }
 
-    public byte[] min(String field, DataType dataType) {
-        var stat = cache.computeIfAbsent(field, s -> new FieldStat());
-        if (stat.min == null) {
-            var min = new byte[][] { null };
-            doWithContexts(r -> {
-                byte[] localMin = PointValues.getMinPackedValue(r, field);
-                // TODO: how to compare with the previous min
-                if (localMin != null) {
-                    if (min[0] == null) {
-                        min[0] = localMin;
-                    } else {
-                        throw new EsqlIllegalArgumentException("Don't know how to compare with previous min");
-                    }
-                }
-                return true;
-            }, true);
-            stat.min = min[0];
+        @Override
+        public boolean isIndexed(String field) {
+            return false;
         }
-        // return stat.min;
-        return null;
-    }
 
-    public byte[] max(String field, DataType dataType) {
-        var stat = cache.computeIfAbsent(field, s -> new FieldStat());
-        if (stat.max == null) {
-            var max = new byte[][] { null };
-            doWithContexts(r -> {
-                byte[] localMax = PointValues.getMaxPackedValue(r, field);
-                // TODO: how to compare with the previous max
-                if (localMax != null) {
-                    if (max[0] == null) {
-                        max[0] = localMax;
-                    } else {
-                        throw new EsqlIllegalArgumentException("Don't know how to compare with previous max");
-                    }
-                }
-                return true;
-            }, true);
-            stat.max = max[0];
+        @Override
+        public boolean hasDocValues(String field) {
+            return false;
         }
-        // return stat.max;
-        return null;
-    }
 
-    public boolean isSingleValue(String field) {
-        var stat = cache.computeIfAbsent(field, s -> new FieldStat());
-        if (stat.singleValue == null) {
-            // there's no such field so no need to worry about multi-value fields
-            if (exists(field) == false) {
-                stat.singleValue = true;
-            } else {
-                // fields are MV per default
-                var sv = new boolean[] { false };
-                for (SearchExecutionContext context : contexts) {
-                    MappedFieldType mappedType = context.isFieldMapped(field) ? context.getFieldType(field) : null;
-                    if (mappedType != null) {
-                        sv[0] = true;
-                        doWithContexts(r -> {
-                            sv[0] &= detectSingleValue(r, mappedType, field);
-                            return sv[0];
-                        }, true);
-                        break;
-                    }
-                }
-                stat.singleValue = sv[0];
-            }
+        @Override
+        public boolean hasExactSubfield(String field) {
+            return false;
         }
-        return stat.singleValue;
-    }
 
-    public boolean isRuntimeField(String field) {
-        var stat = cache.computeIfAbsent(field, s -> new FieldStat());
-        if (stat.runtime == null) {
-            stat.runtime = false;
-            if (exists(field)) {
-                for (SearchExecutionContext context : contexts) {
-                    if (context.isFieldMapped(field)) {
-                        if (context.getFieldType(field) instanceof AbstractScriptFieldType<?>) {
-                            stat.runtime = true;
-                            break;
-                        }
-                    }
-                }
-            }
+        @Override
+        public long count() {
+            return 0;
         }
-        return stat.runtime;
-    }
 
-    public boolean isIndexed(String field) {
-        var stat = cache.computeIfAbsent(field, s -> new FieldStat());
-        if (stat.indexed == null) {
-            stat.indexed = false;
-            if (exists(field)) {
-                boolean indexed = true;
-                for (SearchExecutionContext context : contexts) {
-                    if (context.isFieldMapped(field)) {
-                        if (context.getFieldType(field).isIndexed() == false) {
-                            indexed = false;
-                            break;
-                        }
-                    }
-                }
-                stat.indexed = indexed;
-            }
+        @Override
+        public long count(String field) {
+            return 0;
         }
-        return stat.indexed;
-    }
 
-    private boolean detectSingleValue(IndexReader r, MappedFieldType fieldType, String name) throws IOException {
-        // types that are always single value (and are accessible through instanceof)
-        if (fieldType instanceof ConstantFieldType || fieldType instanceof DocCountFieldType || fieldType instanceof TimestampFieldType) {
-            return true;
+        @Override
+        public long count(String field, BytesRef value) {
+            return 0;
         }
 
-        var typeName = fieldType.typeName();
-
-        // non-visible fields, check their names
-        boolean found = switch (typeName) {
-            case IdFieldMapper.NAME, SeqNoFieldMapper.NAME -> true;
-            default -> false;
-        };
-
-        if (found) {
-            return true;
+        @Override
+        public byte[] min(String field, DataType dataType) {
+            return null;
         }
 
-        // check against doc size
-        DocCountTester tester = null;
-        if (fieldType instanceof DateFieldType || fieldType instanceof NumberFieldType) {
-            tester = lr -> {
-                PointValues values = lr.getPointValues(name);
-                return values == null || values.size() == values.getDocCount();
-            };
-        } else if (fieldType instanceof KeywordFieldType) {
-            tester = lr -> {
-                Terms terms = lr.terms(name);
-                return terms == null || terms.size() == terms.getDocCount();
-            };
+        @Override
+        public byte[] max(String field, DataType dataType) {
+            return null;
         }
 
-        if (tester != null) {
-            // check each leaf
-            for (LeafReaderContext context : r.leaves()) {
-                if (tester.test(context.reader()) == false) {
-                    return false;
-                }
-            }
-            // field is missing or single value
+        @Override
+        public boolean isSingleValue(String field) {
             return true;
         }
 
-        // unsupported type - default to MV
-        return false;
-    }
-
-    private interface DocCountTester {
-        Boolean test(LeafReader leafReader) throws IOException;
-    }
-
-    //
-    // @see org.elasticsearch.search.query.QueryPhaseCollectorManager#shortcutTotalHitCount(IndexReader, Query)
-    //
-    private static long countEntries(IndexReader indexReader, String field) {
-        long count = 0;
-        try {
-            for (LeafReaderContext context : indexReader.leaves()) {
-                LeafReader reader = context.reader();
-                FieldInfos fieldInfos = reader.getFieldInfos();
-                FieldInfo fieldInfo = fieldInfos.fieldInfo(field);
-
-                if (fieldInfo != null) {
-                    if (fieldInfo.getDocValuesType() == DocValuesType.NONE) {
-                        // no shortcut possible: it's a text field, empty values are counted as no value.
-                        return -1;
-                    }
-                    if (fieldInfo.getPointIndexDimensionCount() > 0) {
-                        PointValues points = reader.getPointValues(field);
-                        if (points != null) {
-                            count += points.size();
-                        }
-                    } else if (fieldInfo.getIndexOptions() != IndexOptions.NONE) {
-                        Terms terms = reader.terms(field);
-                        if (terms != null) {
-                            count += terms.getSumTotalTermFreq();
-                        }
-                    } else {
-                        return -1; // no shortcut possible for fields that are not indexed
-                    }
-                }
-            }
-        } catch (IOException ex) {
-            throw new EsqlIllegalArgumentException("Cannot access data storage", ex);
-        }
-        return count;
-    }
-
-    private interface IndexReaderConsumer {
-        /**
-         * Returns true if the consumer should keep on going, false otherwise.
-         */
-        boolean consume(IndexReader reader) throws IOException;
-    }
-
-    private boolean doWithContexts(IndexReaderConsumer consumer, boolean acceptsDeletions) {
-        try {
-            for (SearchExecutionContext context : contexts) {
-                for (LeafReaderContext leafContext : context.searcher().getLeafContexts()) {
-                    var reader = leafContext.reader();
-                    if (acceptsDeletions == false && reader.hasDeletions()) {
-                        return false;
-                    }
-                    // check if the looping continues or not
-                    if (consumer.consume(reader) == false) {
-                        return false;
-                    }
-                }
-            }
-            return true;
-        } catch (IOException ex) {
-            throw new EsqlIllegalArgumentException("Cannot access data storage", ex);
-        }
     }
 }

+ 2 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java

@@ -59,6 +59,7 @@ import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
 import org.elasticsearch.xpack.esql.session.Configuration;
 import org.elasticsearch.xpack.esql.stats.Metrics;
+import org.elasticsearch.xpack.esql.stats.SearchContextStats;
 import org.elasticsearch.xpack.esql.stats.SearchStats;
 import org.junit.Before;
 
@@ -325,7 +326,7 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
         }, directoryReader -> {
             IndexSearcher searcher = newSearcher(directoryReader);
             SearchExecutionContext ctx = createSearchExecutionContext(mapperService, searcher);
-            plan.set(plannerOptimizer.plan(query, new SearchStats(List.of(ctx))));
+            plan.set(plannerOptimizer.plan(query, SearchContextStats.from(List.of(ctx))));
         });
 
         return plan.get();

+ 105 - 61
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

@@ -33,6 +33,8 @@ import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
 import org.elasticsearch.xpack.esql.EsqlTestUtils;
+import org.elasticsearch.xpack.esql.EsqlTestUtils.TestConfigurableSearchStats;
+import org.elasticsearch.xpack.esql.EsqlTestUtils.TestConfigurableSearchStats.Config;
 import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.analysis.Analyzer;
 import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
@@ -141,6 +143,7 @@ import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
 import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
 import static org.elasticsearch.test.ListMatcher.matchesList;
 import static org.elasticsearch.test.MapMatcher.assertMap;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_SEARCH_STATS;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
@@ -189,14 +192,16 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
     private TestDataSource testData;
     private int allFieldRowSize;    // TODO: Move this into testDataSource so tests that load other indexes can also assert on this
     private TestDataSource airports;
-    private TestDataSource airportsNoDocValues;
-    private TestDataSource airportsWeb;
-    private TestDataSource countriesBbox;
-    private TestDataSource countriesBboxWeb;
+    private TestDataSource airportsNoDocValues; // Test when spatial field is indexed but has no doc values
+    private TestDataSource airportsNotIndexed;  // Test when spatial field has doc values but is not indexed
+    private TestDataSource airportsNotIndexedNorDocValues;  // Test when spatial field is neither indexed nor has doc-values
+    private TestDataSource airportsWeb;         // Cartesian point field tests
+    private TestDataSource countriesBbox;       // geo_shape field tests
+    private TestDataSource countriesBboxWeb;    // cartesian_shape field tests
 
     private final Configuration config;
 
-    private record TestDataSource(Map<String, EsField> mapping, EsIndex index, Analyzer analyzer) {}
+    private record TestDataSource(Map<String, EsField> mapping, EsIndex index, Analyzer analyzer, SearchStats stats) {}
 
     @ParametersFactory(argumentFormatting = PARAM_FORMATTING)
     public static List<Object[]> readScriptSpec() {
@@ -240,9 +245,24 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         this.airports = makeTestDataSource("airports", "mapping-airports.json", functionRegistry, enrichResolution);
         this.airportsNoDocValues = makeTestDataSource(
             "airports-no-doc-values",
-            "mapping-airports-no-doc-values.json",
+            "mapping-airports_no_doc_values.json",
             functionRegistry,
-            enrichResolution
+            enrichResolution,
+            new TestConfigurableSearchStats().exclude(Config.DOC_VALUES, "location")
+        );
+        this.airportsNotIndexed = makeTestDataSource(
+            "airports-not-indexed",
+            "mapping-airports_not_indexed.json",
+            functionRegistry,
+            enrichResolution,
+            new TestConfigurableSearchStats().exclude(Config.INDEXED, "location")
+        );
+        this.airportsNotIndexedNorDocValues = makeTestDataSource(
+            "airports-not-indexed-nor-doc-values",
+            "mapping-airports_not_indexed_nor_doc_values.json",
+            functionRegistry,
+            enrichResolution,
+            new TestConfigurableSearchStats().exclude(Config.INDEXED, "location").exclude(Config.DOC_VALUES, "location")
         );
         this.airportsWeb = makeTestDataSource("airports_web", "mapping-airports_web.json", functionRegistry, enrichResolution);
         this.countriesBbox = makeTestDataSource("countriesBbox", "mapping-countries_bbox.json", functionRegistry, enrichResolution);
@@ -258,13 +278,23 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
         String indexName,
         String mappingFileName,
         EsqlFunctionRegistry functionRegistry,
-        EnrichResolution enrichResolution
+        EnrichResolution enrichResolution,
+        SearchStats stats
     ) {
         Map<String, EsField> mapping = loadMapping(mappingFileName);
         EsIndex index = new EsIndex(indexName, mapping, Map.of("test", IndexMode.STANDARD));
         IndexResolution getIndexResult = IndexResolution.valid(index);
         Analyzer analyzer = new Analyzer(new AnalyzerContext(config, functionRegistry, getIndexResult, enrichResolution), TEST_VERIFIER);
-        return new TestDataSource(mapping, index, analyzer);
+        return new TestDataSource(mapping, index, analyzer, stats);
+    }
+
+    TestDataSource makeTestDataSource(
+        String indexName,
+        String mappingFileName,
+        EsqlFunctionRegistry functionRegistry,
+        EnrichResolution enrichResolution
+    ) {
+        return makeTestDataSource(indexName, mappingFileName, functionRegistry, enrichResolution, TEST_SEARCH_STATS);
     }
 
     private static EnrichResolution setupEnrichResolution() {
@@ -2132,7 +2162,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             | where long_noidx == 1
             """);
 
-        var optimized = optimizedPlan(plan);
+        var optimized = optimizedPlan(plan, statsWithIndexedFields());
         var limit = as(optimized, LimitExec.class);
         var exchange = asRemoteExchange(limit.child());
         var project = as(exchange.child(), ProjectExec.class);
@@ -2183,7 +2213,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             | sort long_noidx
             """);
 
-        var optimized = optimizedPlan(plan);
+        var optimized = optimizedPlan(plan, statsWithIndexedFields());
         var topN = as(optimized, TopNExec.class);
         var exchange = as(topN.child(), ExchangeExec.class);
         var project = as(exchange.child(), ProjectExec.class);
@@ -2656,7 +2686,8 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             "from airports | stats centroid = st_centroid_agg(to_geopoint(location))",
             "from airports | eval location = to_geopoint(location) | stats centroid = st_centroid_agg(location)" }) {
             for (boolean withDocValues : new boolean[] { false, true }) {
-                var plan = withDocValues ? physicalPlan(query, airports) : physicalPlan(query, airportsNoDocValues);
+                var testData = withDocValues ? airports : airportsNoDocValues;
+                var plan = physicalPlan(query, testData);
 
                 var limit = as(plan, LimitExec.class);
                 var agg = as(limit.child(), AggregateExec.class);
@@ -2669,7 +2700,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
                 as(fAgg.child(), EsRelation.class);
 
                 // Now optimize the plan and assert the aggregation uses doc-values
-                var optimized = optimizedPlan(plan);
+                var optimized = optimizedPlan(plan, testData.stats);
                 limit = as(optimized, LimitExec.class);
                 agg = as(limit.child(), AggregateExec.class);
                 // Above the exchange (in coordinator) the aggregation is not using doc-values
@@ -2943,11 +2974,12 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
      * Note the FieldExtractExec has 'location' set for stats: FieldExtractExec[location{f}#9][location{f}#9]
      */
     public void testSpatialTypesAndStatsUseDocValuesMultiAggregationsGrouped() {
-        for (boolean useDocValues : new boolean[] { true, false }) {
+        for (boolean useDocValues : new boolean[] { false }) {
+            var testData = useDocValues ? airports : airportsNoDocValues;
             var plan = this.physicalPlan("""
                 FROM airports
                 | STATS centroid=ST_CENTROID_AGG(location), count=COUNT() BY scalerank
-                """, useDocValues ? airports : airportsNoDocValues);
+                """, testData);
 
             var limit = as(plan, LimitExec.class);
             var agg = as(limit.child(), AggregateExec.class);
@@ -2964,7 +2996,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             as(fAgg.child(), EsRelation.class);
 
             // Now optimize the plan and assert the aggregation uses doc-values
-            var optimized = optimizedPlan(plan);
+            var optimized = optimizedPlan(plan, testData.stats);
             limit = as(optimized, LimitExec.class);
             agg = as(limit.child(), AggregateExec.class);
             att = as(agg.groupings().get(0), Attribute.class);
@@ -3519,44 +3551,63 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             | STATS centroid=ST_CENTROID_AGG(location), count=COUNT()
             """ }) {
 
-            for (boolean useDocValues : new boolean[] { true, false }) {
-                var plan = this.physicalPlan(query, useDocValues ? airports : airportsNoDocValues);
-                var limit = as(plan, LimitExec.class);
-                var agg = as(limit.child(), AggregateExec.class);
-                assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0));
-                // Before optimization the aggregation does not use doc-values
-                assertAggregation(agg, "count", Count.class);
-                assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, false);
-
-                var exchange = as(agg.child(), ExchangeExec.class);
-                var fragment = as(exchange.child(), FragmentExec.class);
-                var fAgg = as(fragment.fragment(), Aggregate.class);
-                var filter = as(fAgg.child(), Filter.class);
-                assertThat("filter contains ST_INTERSECTS", filter.condition(), instanceOf(SpatialIntersects.class));
+            for (boolean isIndexed : new boolean[] { true, false }) {
+                for (boolean useDocValues : new boolean[] { true, false }) {
+                    var testData = useDocValues
+                        ? (isIndexed ? airports : airportsNotIndexed)
+                        : (isIndexed ? airportsNoDocValues : airportsNotIndexedNorDocValues);
+                    var plan = this.physicalPlan(query, testData);
+                    var limit = as(plan, LimitExec.class);
+                    var agg = as(limit.child(), AggregateExec.class);
+                    assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0));
+                    // Before optimization the aggregation does not use doc-values
+                    assertAggregation(agg, "count", Count.class);
+                    assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, false);
 
-                // Now verify that optimization re-writes the ExchangeExec and pushed down the filter into the Lucene query
-                var optimized = optimizedPlan(plan);
-                limit = as(optimized, LimitExec.class);
-                agg = as(limit.child(), AggregateExec.class);
-                // Above the exchange (in coordinator) the aggregation is not using doc-values
-                assertAggregation(agg, "count", Count.class);
-                assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, false);
-                exchange = as(agg.child(), ExchangeExec.class);
-                agg = as(exchange.child(), AggregateExec.class);
-                assertThat("Aggregation is PARTIAL", agg.getMode(), equalTo(INITIAL));
-                // below the exchange (in data node) the aggregation is using doc-values
-                assertAggregation(agg, "count", Count.class);
-                assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, useDocValues);
-                var source = assertChildIsGeoPointExtract(useDocValues ? agg : as(agg.child(), FilterExec.class), useDocValues);
-                if (useDocValues) {
-                    // Query is only pushed to lucene if indexing/doc-values are enabled
-                    var condition = as(source.query(), SpatialRelatesQuery.ShapeQueryBuilder.class);
-                    assertThat("Geometry field name", condition.fieldName(), equalTo("location"));
-                    assertThat("Spatial relationship", condition.relation(), equalTo(ShapeRelation.INTERSECTS));
-                    assertThat("Geometry is Polygon", condition.shape().type(), equalTo(ShapeType.POLYGON));
-                    var polygon = as(condition.shape(), Polygon.class);
-                    assertThat("Polygon shell length", polygon.getPolygon().length(), equalTo(5));
-                    assertThat("Polygon holes", polygon.getNumberOfHoles(), equalTo(0));
+                    var exchange = as(agg.child(), ExchangeExec.class);
+                    var fragment = as(exchange.child(), FragmentExec.class);
+                    var fAgg = as(fragment.fragment(), Aggregate.class);
+                    var filter = as(fAgg.child(), Filter.class);
+                    assertThat("filter contains ST_INTERSECTS", filter.condition(), instanceOf(SpatialIntersects.class));
+
+                    // Now verify that optimization re-writes the ExchangeExec and pushed down the filter into the Lucene query
+                    var optimized = optimizedPlan(plan, testData.stats);
+                    limit = as(optimized, LimitExec.class);
+                    agg = as(limit.child(), AggregateExec.class);
+                    // Above the exchange (in coordinator) the aggregation is not using doc-values
+                    assertAggregation(agg, "count", Count.class);
+                    assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, false);
+                    exchange = as(agg.child(), ExchangeExec.class);
+                    agg = as(exchange.child(), AggregateExec.class);
+                    assertThat("Aggregation is PARTIAL", agg.getMode(), equalTo(INITIAL));
+                    // below the exchange (in data node) the aggregation is using doc-values
+                    assertAggregation(agg, "count", Count.class);
+                    assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, useDocValues);
+                    if (isIndexed) {
+                        var source = assertChildIsGeoPointExtract(agg, useDocValues);
+                        // Query is pushed to lucene if field is indexed (and does not require doc-values or isAggregatable)
+                        var condition = as(source.query(), SpatialRelatesQuery.ShapeQueryBuilder.class);
+                        assertThat("Geometry field name", condition.fieldName(), equalTo("location"));
+                        assertThat("Spatial relationship", condition.relation(), equalTo(ShapeRelation.INTERSECTS));
+                        assertThat("Geometry is Polygon", condition.shape().type(), equalTo(ShapeType.POLYGON));
+                        var polygon = as(condition.shape(), Polygon.class);
+                        assertThat("Polygon shell length", polygon.getPolygon().length(), equalTo(5));
+                        assertThat("Polygon holes", polygon.getNumberOfHoles(), equalTo(0));
+                    } else {
+                        // If the field is not indexed, we cannot push the filter down to source, so assert that we need to have an explicit
+                        // filter as well as extract the field needed for that filter.
+                        var filterExec = as(agg.child(), FilterExec.class);
+                        assertThat("filter contains ST_INTERSECTS", filterExec.condition(), instanceOf(SpatialIntersects.class));
+                        var fieldExtractLocation = as(filterExec.child(), FieldExtractExec.class);
+                        assertThat("location field is extracted", fieldExtractLocation.attributesToExtract().size(), equalTo(1));
+                        assertThat(
+                            "location field is extracted",
+                            fieldExtractLocation.attributesToExtract().get(0).name(),
+                            equalTo("location")
+                        );
+                        var source = source(fieldExtractLocation.child());
+                        assertThat("source query is null", source.query(), equalTo(null));
+                    }
                 }
             }
         }
@@ -6554,14 +6605,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
     }
 
     static SearchStats statsWithIndexedFields(String... names) {
-        return new EsqlTestUtils.TestSearchStats() {
-            private final Set<String> indexedFields = Set.of(names);
-
-            @Override
-            public boolean isIndexed(String field) {
-                return indexedFields.contains(field);
-            }
-        };
+        return new TestConfigurableSearchStats().include(Config.INDEXED, names);
     }
 
     static PhysicalPlan localRelationshipAlignment(PhysicalPlan l) {

+ 2 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushTopNToSourceTests.java

@@ -34,7 +34,7 @@ import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
-import org.elasticsearch.xpack.esql.stats.DisabledSearchStats;
+import org.elasticsearch.xpack.esql.stats.SearchStats;
 
 import java.io.IOException;
 import java.nio.ByteOrder;
@@ -256,8 +256,7 @@ public class PushTopNToSourceTests extends ESTestCase {
 
     private static PhysicalPlan pushTopNToSource(TopNExec topNExec) {
         var configuration = EsqlTestUtils.configuration("from test");
-        var searchStats = new DisabledSearchStats();
-        var ctx = new LocalPhysicalOptimizerContext(configuration, searchStats);
+        var ctx = new LocalPhysicalOptimizerContext(configuration, SearchStats.EMPTY);
         var pushTopNToSource = new PushTopNToSource();
         return pushTopNToSource.rule(topNExec, ctx);
     }

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java

@@ -318,7 +318,7 @@ public class FilterTests extends ESTestCase {
     }
 
     private QueryBuilder filterQueryForTransportNodes(PhysicalPlan plan) {
-        return PlannerUtils.detectFilter(plan, EMP_NO, x -> true);
+        return PlannerUtils.detectFilter(plan, EMP_NO);
     }
 
     @Override

+ 18 - 9
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/DisabledSearchStats.java

@@ -10,12 +10,26 @@ package org.elasticsearch.xpack.esql.stats;
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 
-import static java.util.Collections.emptyList;
+public class DisabledSearchStats implements SearchStats {
 
-public class DisabledSearchStats extends SearchStats {
+    @Override
+    public boolean exists(String field) {
+        return true;
+    }
+
+    @Override
+    public boolean isIndexed(String field) {
+        return true;
+    }
 
-    public DisabledSearchStats() {
-        super(emptyList());
+    @Override
+    public boolean hasDocValues(String field) {
+        return true;
+    }
+
+    @Override
+    public boolean hasExactSubfield(String field) {
+        return true;
     }
 
     @Override
@@ -33,11 +47,6 @@ public class DisabledSearchStats extends SearchStats {
         return -1;
     }
 
-    @Override
-    public boolean exists(String field) {
-        return true;
-    }
-
     @Override
     public byte[] min(String field, DataType dataType) {
         return null;