Просмотр исходного кода

SQL: Enable the InnerAggregates inside PIVOT (#65792)

* Remove the limitation of not being able to use `InnerAggregate`
inside PIVOTs (aggregations using extended and matrix stats)
* The limitation was introduced as part of the original `PIVOT` 
implementation in #46489, but after #49693 it could be lifted.
* Test that the `PIVOT` results in the same query as the 
`GROUP BY`. This should hold across all the 
`AggregateFunction`s we have.
Andras Palinkas 4 лет назад
Родитель
Сommit
67704b09be

+ 46 - 0
x-pack/plugin/sql/qa/server/src/main/resources/pivot.csv-spec

@@ -201,6 +201,51 @@ null                 |10044          |Mingsen        |F              |1994-05-21
 // end::sumWithoutSubquery
 ;
 
+sumWithInnerAggregateSumOfSquares
+schema::birth_date:ts|emp_no:i|first_name:s|gender:s|hire_date:ts|last_name:s|1:d|2:d
+SELECT * FROM test_emp PIVOT (SUM_OF_SQUARES(salary) FOR languages IN (1, 2)) LIMIT 5;
+
+  birth_date         |    emp_no     |  first_name   |    gender     |       hire_date        |   last_name   |       1       |       2       
+---------------------+---------------+---------------+---------------+------------------------+---------------+---------------+---------------
+null                 |10041          |Uri            |F              |1989-11-12T00:00:00.000Z|Lenart         |3.182652225E9  |null           
+null                 |10043          |Yishay         |M              |1990-10-20T00:00:00.000Z|Tzvieli        |1.179304281E9  |null           
+null                 |10044          |Mingsen        |F              |1994-05-21T00:00:00.000Z|Casley         |1.578313984E9  |null           
+1952-04-19T00:00:00Z |10009          |Sumant         |F              |1985-02-18T00:00:00.000Z|Peac           |4.378998276E9  |null           
+1953-01-07T00:00:00Z |10067          |Claudi         |M              |1987-03-04T00:00:00.000Z|Stavenow       |null           |2.708577936E9  
+;
+
+sumWithInnerAggregateSumOfSquaresRound
+schema::birth_date:ts|emp_no:i|first_name:s|gender:s|hire_date:ts|last_name:s|1:d|2:d
+SELECT * FROM test_emp PIVOT (ROUND(SUM_OF_SQUARES(salary)/1E6, 2) FOR languages IN (1, 2)) LIMIT 5;
+
+  birth_date         |    emp_no     |  first_name   |    gender     |       hire_date        |   last_name   |       1       |       2       
+---------------------+---------------+---------------+---------------+------------------------+---------------+---------------+---------------
+null                 |10041          |Uri            |F              |1989-11-12T00:00:00.000Z|Lenart         |3182.65        |null           
+null                 |10043          |Yishay         |M              |1990-10-20T00:00:00.000Z|Tzvieli        |1179.30        |null           
+null                 |10044          |Mingsen        |F              |1994-05-21T00:00:00.000Z|Casley         |1578.31        |null           
+1952-04-19T00:00:00Z |10009          |Sumant         |F              |1985-02-18T00:00:00.000Z|Peac           |4379.00        |null           
+1953-01-07T00:00:00Z |10067          |Claudi         |M              |1987-03-04T00:00:00.000Z|Stavenow       |null           |2708.58        
+;
+
+sumWithInnerAggregateKurtosis
+schema::client_port:i|'OK':d|'Error':d
+SELECT * FROM (SELECT client_port, status, bytes_in FROM logs WHERE client_port IS NULL) PIVOT (KURTOSIS(bytes_in) FOR status IN ('OK', 'Error')) LIMIT 10;
+
+  client_port  |       'OK'       |    'Error'    
+---------------+------------------+---------------
+null           |2.0016153277578916|NaN            
+;
+
+sumWithInnerAggregateKurtosisRound
+schema::client_port:i|'OK':d|'Error':d
+SELECT * FROM (SELECT client_port, status, bytes_in FROM logs WHERE client_port IS NULL) PIVOT (ROUND(KURTOSIS(bytes_in), 3) FOR status IN ('OK', 'Error')) LIMIT 10;
+
+  client_port  |       'OK'       |    'Error'    
+---------------+------------------+---------------
+null           |2.002             |-0.0            
+;
+
+
 averageWithOneValueAndMath
 schema::languages:bt|'F':d
 SELECT * FROM (SELECT languages, gender, salary FROM test_emp) PIVOT (ROUND(AVG(salary) / 2) FOR gender IN ('F'));
@@ -214,3 +259,4 @@ null           |31070.0
 4              |24646.0        
 5              |23353.0        
 ;
+

+ 0 - 14
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/planner/Verifier.java

@@ -6,9 +6,7 @@
 package org.elasticsearch.xpack.sql.planner;
 
 import org.elasticsearch.xpack.ql.common.Failure;
-import org.elasticsearch.xpack.ql.expression.function.aggregate.InnerAggregate;
 import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
-import org.elasticsearch.xpack.sql.plan.physical.PivotExec;
 import org.elasticsearch.xpack.sql.plan.physical.Unexecutable;
 import org.elasticsearch.xpack.sql.plan.physical.UnplannedExec;
 
@@ -32,22 +30,10 @@ abstract class Verifier {
                 }
             });
         });
-        // verify Pivot
-        checkInnerAggsPivot(plan, failures);
 
         return failures;
     }
 
-    private static void checkInnerAggsPivot(PhysicalPlan plan, List<Failure> failures) {
-        plan.forEachDown(p -> {
-            p.pivot().aggregates().forEach(agg -> agg.forEachDown(e -> {
-                if (e instanceof InnerAggregate) {
-                    failures.add(fail(e, "Aggregation [{}] not supported (yet) by PIVOT", e.sourceText()));
-                }
-            }));
-        }, PivotExec.class);
-    }
-
     static List<Failure> verifyExecutingPlan(PhysicalPlan plan) {
         List<Failure> failures = new ArrayList<>();
 

+ 0 - 77
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/planner/PostOptimizerVerifierTests.java

@@ -1,77 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License;
- * you may not use this file except in compliance with the Elastic License.
- */
-
-package org.elasticsearch.xpack.sql.planner;
-
-import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.xpack.ql.index.EsIndex;
-import org.elasticsearch.xpack.ql.index.IndexResolution;
-import org.elasticsearch.xpack.ql.type.EsField;
-import org.elasticsearch.xpack.sql.SqlTestUtils;
-import org.elasticsearch.xpack.sql.analysis.analyzer.Analyzer;
-import org.elasticsearch.xpack.sql.analysis.analyzer.Verifier;
-import org.elasticsearch.xpack.sql.expression.function.SqlFunctionRegistry;
-import org.elasticsearch.xpack.sql.optimizer.Optimizer;
-import org.elasticsearch.xpack.sql.parser.SqlParser;
-import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
-import org.elasticsearch.xpack.sql.stats.Metrics;
-import org.elasticsearch.xpack.sql.types.SqlTypesTests;
-import org.junit.After;
-import org.junit.Before;
-
-import java.util.Map;
-
-public class PostOptimizerVerifierTests extends ESTestCase {
-
-    private SqlParser parser;
-    private Analyzer analyzer;
-    private Optimizer optimizer;
-    private Planner planner;
-    private IndexResolution indexResolution;
-
-    @Before
-    public void init() {
-        parser = new SqlParser();
-
-        Map<String, EsField> mapping = SqlTypesTests.loadMapping("mapping-multi-field-variation.json");
-        EsIndex test = new EsIndex("test", mapping);
-        indexResolution = IndexResolution.valid(test);
-        analyzer = new Analyzer(SqlTestUtils.TEST_CFG, new SqlFunctionRegistry(), indexResolution, new Verifier(new Metrics()));
-        optimizer = new Optimizer();
-        planner = new Planner();
-    }
-
-    @After
-    public void destroy() {
-        parser = null;
-        analyzer = null;
-    }
-
-    private PhysicalPlan plan(String sql) {
-        return planner.plan(optimizer.optimize(analyzer.analyze(parser.createStatement(sql), true)), true);
-    }
-
-    private String error(String sql) {
-        return error(indexResolution, sql);
-    }
-
-    private String error(IndexResolution getIndexResult, String sql) {
-        PlanningException e = expectThrows(PlanningException.class, () -> plan(sql));
-        assertTrue(e.getMessage().startsWith("Found "));
-        String header = "Found 1 problem\nline ";
-        return e.getMessage().substring(header.length());
-    }
-
-    public void testPivotInnerAgg() {
-        assertEquals("1:59: Aggregation [SUM_OF_SQUARES(int)] not supported (yet) by PIVOT",
-                error("SELECT * FROM (SELECT int, keyword, bool FROM test) " + "PIVOT(SUM_OF_SQUARES(int) FOR keyword IN ('bla'))"));
-    }
-
-    public void testPivotNestedInnerAgg() {
-        assertEquals("1:65: Aggregation [SUM_OF_SQUARES(int)] not supported (yet) by PIVOT",
-                error("SELECT * FROM (SELECT int, keyword, bool FROM test) " + "PIVOT(ROUND(SUM_OF_SQUARES(int)) FOR keyword IN ('bla'))"));
-    }
-}

+ 33 - 4
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/planner/QueryFolderTests.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.sql.planner;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.ql.expression.Expressions;
 import org.elasticsearch.xpack.ql.expression.ReferenceAttribute;
+import org.elasticsearch.xpack.ql.expression.function.aggregate.AggregateFunction;
 import org.elasticsearch.xpack.ql.index.EsIndex;
 import org.elasticsearch.xpack.ql.index.IndexResolution;
 import org.elasticsearch.xpack.ql.type.EsField;
@@ -20,6 +21,7 @@ import org.elasticsearch.xpack.sql.parser.SqlParser;
 import org.elasticsearch.xpack.sql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.sql.plan.physical.LocalExec;
 import org.elasticsearch.xpack.sql.plan.physical.PhysicalPlan;
+import org.elasticsearch.xpack.sql.querydsl.container.QueryContainer;
 import org.elasticsearch.xpack.sql.session.EmptyExecutable;
 import org.elasticsearch.xpack.sql.session.SingletonExecutable;
 import org.elasticsearch.xpack.sql.stats.Metrics;
@@ -27,9 +29,11 @@ import org.elasticsearch.xpack.sql.types.SqlTypesTests;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
-import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
+import static java.util.Arrays.asList;
+import static java.util.stream.Collectors.toList;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.endsWith;
 import static org.hamcrest.Matchers.startsWith;
@@ -449,7 +453,7 @@ public class QueryFolderTests extends ESTestCase {
         assertEquals(EsQueryExec.class, p.getClass());
         EsQueryExec ee = (EsQueryExec) p;
         assertEquals(2, ee.output().size());
-        assertEquals(Arrays.asList("1", "MAX(int)"), Expressions.names(ee.output()));
+        assertEquals(asList("1", "MAX(int)"), Expressions.names(ee.output()));
         assertThat(ee.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", ""),
                 containsString("\"max\":{\"field\":\"int\""));
 
@@ -457,7 +461,7 @@ public class QueryFolderTests extends ESTestCase {
         assertEquals(EsQueryExec.class, p.getClass());
         ee = (EsQueryExec) p;
         assertEquals(2, ee.output().size());
-        assertEquals(Arrays.asList("1", "count(*)"), Expressions.names(ee.output()));
+        assertEquals(asList("1", "count(*)"), Expressions.names(ee.output()));
         assertThat(ee.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", ""),
                 containsString("\"terms\":{\"field\":\"int\""));
     }
@@ -495,7 +499,7 @@ public class QueryFolderTests extends ESTestCase {
         assertEquals(EsQueryExec.class, p.getClass());
         EsQueryExec ee = (EsQueryExec) p;
         assertEquals(3, ee.output().size());
-        assertEquals(Arrays.asList("bool", "'A'", "'B'"), Expressions.names(ee.output()));
+        assertEquals(asList("bool", "'A'", "'B'"), Expressions.names(ee.output()));
         String q = ee.toString().replaceAll("\\s+", "");
         assertThat(q, containsString("\"query\":{\"terms\":{\"keyword\":[\"A\",\"B\"]"));
         String a = ee.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", "");
@@ -503,6 +507,31 @@ public class QueryFolderTests extends ESTestCase {
         assertThat(a, containsString("\"terms\":{\"field\":\"keyword\""));
         assertThat(a, containsString("{\"avg\":{\"field\":\"int\"}"));
     }
+    
+    public void testPivotHasSameQueryAsGroupBy() {
+        final Map<String, String> aggFnsWithMultipleArguments = Map.of(
+            "PERCENTILE", "PERCENTILE(int, 0)",
+            "PERCENTILE_RANK", "PERCENTILE_RANK(int, 0)"
+        );
+        List<String> aggregations = new SqlFunctionRegistry().listFunctions()
+                .stream()
+                .filter(def -> AggregateFunction.class.isAssignableFrom(def.clazz()))
+                .map(def -> aggFnsWithMultipleArguments.getOrDefault(def.name(), def.name() + "(int)"))
+                .collect(toList());
+        for (String aggregationStr : aggregations) {
+            PhysicalPlan pivotPlan = plan("SELECT * FROM (SELECT some.dotted.field, bool, keyword, int FROM test) " +
+                "PIVOT(" + aggregationStr + " FOR keyword IN ('A', 'B'))");
+            PhysicalPlan groupByPlan = plan("SELECT some.dotted.field, bool, keyword, " + aggregationStr + " " +
+                "FROM test WHERE keyword IN ('A', 'B') GROUP BY some.dotted.field, bool, keyword");
+            assertEquals(EsQueryExec.class, pivotPlan.getClass());
+            assertEquals(EsQueryExec.class, groupByPlan.getClass());
+            QueryContainer pivotQueryContainer = ((EsQueryExec) pivotPlan).queryContainer();
+            QueryContainer groupByQueryContainer = ((EsQueryExec) groupByPlan).queryContainer();
+            assertEquals(pivotQueryContainer.query(), groupByQueryContainer.query());
+            assertEquals(pivotQueryContainer.aggs(), groupByQueryContainer.aggs());
+            assertEquals(pivotPlan.toString(), groupByPlan.toString());
+        }
+    }
 
     private static String randomOrderByAndLimit(int noOfSelectArgs) {
         return SqlTestUtils.randomOrderByAndLimit(noOfSelectArgs, random());