Browse Source

ESQL: fix COUNT filter pushdown (#117503) (#117648)

* ESQL: fix COUNT filter pushdown (#117503)

If `COUNT` agg has a filter applied, this must also be push down to source. This currently does not happen, but this issue is masked currently by two factors:
* a logical optimisation, `ExtractAggregateCommonFilter` that extracts the filter out of the STATS entirely (and pushes it to source then from a `WHERE`);
* the phisical plan optimisation implementing the  push down, `PushStatsToSource`, currently only applies if there's just one agg function to push down.

However, this fix needs to be applied since:
* it's still present in versions prior to `ExtractAggregateCommonFilter` introduction;
* the defect might resurface when the restriction in `PushStatsToSource` is lifted.

Fixes #115522.

(cherry picked from commit 560e0c5d0441a165f4588f8af869053b5202999f)

* revert merge artefact

* 8.x adaptation
Bogdan Pintea 10 months ago
parent
commit
9dafb30dbf

+ 6 - 0
docs/changelog/117503.yaml

@@ -0,0 +1,6 @@
+pr: 117503
+summary: Fix COUNT filter pushdown
+area: ES|QL
+type: bug
+issues:
+ - 115522

+ 51 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec

@@ -2678,6 +2678,57 @@ c2:l |c2_f:l |m2:i |m2_f:i |c:l
 1    |1      |5    |5      |21
 ;
 
+simpleCountOnFieldWithFilteringAndNoGrouping
+required_capability: per_agg_filtering
+from employees
+| stats c1 = count(emp_no) where emp_no < 10042
+;
+
+c1:long
+41
+;
+
+simpleCountOnFieldWithFilteringOnDifferentFieldAndNoGrouping
+required_capability: per_agg_filtering
+from employees
+| stats c1 = count(hire_date) where emp_no < 10042
+;
+
+c1:long
+41
+;
+
+simpleCountOnStarWithFilteringAndNoGrouping
+required_capability: per_agg_filtering
+from employees
+| stats c1 = count(*) where emp_no < 10042
+;
+
+c1:long
+41
+;
+
+simpleCountWithFilteringAndNoGroupingOnFieldWithNulls
+required_capability: per_agg_filtering
+from employees
+| stats c1 = count(birth_date) where emp_no <= 10050
+;
+
+c1:long
+40
+;
+
+
+simpleCountWithFilteringAndNoGroupingOnFieldWithMultivalues
+required_capability: per_agg_filtering
+from employees
+| stats c1 = count(job_positions) where emp_no <= 10003
+;
+
+c1:long
+3
+;
+
 commonFilterExtractionWithAliasing
 required_capability: per_agg_filtering
 from employees

+ 11 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushStatsToSource.java

@@ -16,6 +16,7 @@ import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
 import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
 import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
+import org.elasticsearch.xpack.esql.core.util.Queries;
 import org.elasticsearch.xpack.esql.core.util.StringUtils;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
 import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
@@ -25,12 +26,15 @@ import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders;
+import org.elasticsearch.xpack.esql.planner.PlannerUtils;
 
 import java.util.ArrayList;
 import java.util.List;
 
+import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
+import static org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource.canPushToSource;
 import static org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.StatsType.COUNT;
 
 /**
@@ -98,6 +102,13 @@ public class PushStatsToSource extends PhysicalOptimizerRules.ParameterizedOptim
                                 }
                             }
                             if (fieldName != null) {
+                                if (count.hasFilter()) {
+                                    if (canPushToSource(count.filter()) == false) {
+                                        return null; // can't push down
+                                    }
+                                    var countFilter = PlannerUtils.TRANSLATOR_HANDLER.asQuery(count.filter());
+                                    query = Queries.combine(Queries.Clause.MUST, asList(countFilter.asBuilder(), query));
+                                }
                                 return new EsStatsQueryExec.Stat(fieldName, COUNT, query);
                             }
                         }

+ 66 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java

@@ -41,7 +41,9 @@ import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.index.EsIndex;
 import org.elasticsearch.xpack.esql.index.IndexResolution;
+import org.elasticsearch.xpack.esql.optimizer.rules.logical.ExtractAggregateCommonFilter;
 import org.elasticsearch.xpack.esql.plan.logical.Enrich;
+import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
 import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec;
@@ -58,6 +60,7 @@ import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
 import org.elasticsearch.xpack.esql.planner.FilterTests;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
+import org.elasticsearch.xpack.esql.rule.Rule;
 import org.elasticsearch.xpack.esql.session.Configuration;
 import org.elasticsearch.xpack.esql.stats.Metrics;
 import org.elasticsearch.xpack.esql.stats.SearchContextStats;
@@ -66,9 +69,11 @@ import org.elasticsearch.xpack.kql.query.KqlQueryBuilder;
 import org.junit.Before;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.function.Function;
 
 import static java.util.Arrays.asList;
 import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL;
@@ -375,6 +380,67 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
         assertThat(plan.anyMatch(EsQueryExec.class::isInstance), is(true));
     }
 
+    @SuppressWarnings("unchecked")
+    public void testSingleCountWithStatsFilter() {
+        // an optimizer that filters out the ExtractAggregateCommonFilter rule
+        var logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(config)) {
+            @Override
+            protected List<Batch<LogicalPlan>> batches() {
+                var oldBatches = super.batches();
+                List<Batch<LogicalPlan>> newBatches = new ArrayList<>(oldBatches.size());
+                for (var batch : oldBatches) {
+                    List<Rule<?, LogicalPlan>> rules = new ArrayList<>(List.of(batch.rules()));
+                    rules.removeIf(r -> r instanceof ExtractAggregateCommonFilter);
+                    newBatches.add(batch.with(rules.toArray(Rule[]::new)));
+                }
+                return newBatches;
+            }
+        };
+        var analyzer = makeAnalyzer("mapping-default.json", new EnrichResolution());
+        var plannerOptimizer = new TestPlannerOptimizer(config, analyzer, logicalOptimizer);
+        var plan = plannerOptimizer.plan("""
+            from test
+            | stats c = count(hire_date) where emp_no < 10042
+            """, IS_SV_STATS);
+
+        var limit = as(plan, LimitExec.class);
+        var agg = as(limit.child(), AggregateExec.class);
+        assertThat(agg.getMode(), is(FINAL));
+        var exchange = as(agg.child(), ExchangeExec.class);
+        var esStatsQuery = as(exchange.child(), EsStatsQueryExec.class);
+
+        Function<String, String> compact = s -> s.replaceAll("\\s+", "");
+        assertThat(compact.apply(esStatsQuery.query().toString()), is(compact.apply("""
+            {
+                "bool": {
+                    "must": [
+                        {
+                            "exists": {
+                                "field": "hire_date",
+                                "boost": 1.0
+                            }
+                        },
+                        {
+                            "esql_single_value": {
+                                "field": "emp_no",
+                                "next": {
+                                    "range": {
+                                        "emp_no": {
+                                            "lt": 10042,
+                                            "boost": 1.0
+                                        }
+                                    }
+                                },
+                                "source": "emp_no < 10042@2:36"
+                            }
+                        }
+                    ],
+                    "boost": 1.0
+                }
+            }
+            """)));
+    }
+
     /**
      * Expecting
      * LimitExec[1000[INTEGER]]

+ 6 - 4
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java

@@ -9,7 +9,6 @@ package org.elasticsearch.xpack.esql.optimizer;
 
 import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.analysis.Analyzer;
-import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.parser.EsqlParser;
 import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
@@ -23,19 +22,22 @@ public class TestPlannerOptimizer {
     private final Analyzer analyzer;
     private final LogicalPlanOptimizer logicalOptimizer;
     private final PhysicalPlanOptimizer physicalPlanOptimizer;
-    private final EsqlFunctionRegistry functionRegistry;
     private final Mapper mapper;
     private final Configuration config;
 
     public TestPlannerOptimizer(Configuration config, Analyzer analyzer) {
+        this(config, analyzer, new LogicalPlanOptimizer(new LogicalOptimizerContext(config)));
+    }
+
+    public TestPlannerOptimizer(Configuration config, Analyzer analyzer, LogicalPlanOptimizer logicalOptimizer) {
         this.analyzer = analyzer;
         this.config = config;
+        this.logicalOptimizer = logicalOptimizer;
 
         parser = new EsqlParser();
-        logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(config));
         physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(config));
-        functionRegistry = new EsqlFunctionRegistry();
         mapper = new Mapper();
+
     }
 
     public PhysicalPlan plan(String query) {