Переглянути джерело

Fix translation for two time-series aggregations (#129892)

This PR fixes the time-series translation for cases where two 
time-series aggregation functions are used within an outer aggregation
function, for example: STATS max(rate(r1) + rate(r2)).
Nhat Nguyen 4 місяців тому
батько
коміт
d3049e03e1

+ 4 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-mappings.json

@@ -30,6 +30,10 @@
           "type": "long",
           "time_series_metric": "counter"
         },
+        "total_bytes_out": {
+          "type": "long",
+          "time_series_metric": "counter"
+        },
         "cost": {
           "type": "double"
         },

+ 14 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec

@@ -336,3 +336,17 @@ distincts:long | distincts_imprecise:long | cluster:keyword | time_bucket:dateti
 2              |2              | staging         | 2024-05-10T00:18:00.000Z
 
 ;
+
+
+two_rates
+required_capability: metrics_command
+
+TS k8s | STATS cost_per_mb=max(rate(network.total_bytes_in) / 1024 * 1024 * rate(network.total_cost)) BY cluster, time_bucket = bucket(@timestamp,5minute) | SORT cost_per_mb DESC, cluster, time_bucket DESC | LIMIT 5;
+
+cost_per_mb:double | cluster:keyword | time_bucket:datetime
+5.119502189662629  | qa              | 2024-05-10T00:15:00.000Z
+4.1135056380088795 | qa              | 2024-05-10T00:05:00.000Z
+2.0974277092655393 | qa              | 2024-05-10T00:10:00.000Z
+2.071474095190272  | prod            | 2024-05-10T00:15:00.000Z
+1.59556462585034   | staging         | 2024-05-10T00:10:00.000Z
+;

+ 39 - 22
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java

@@ -51,25 +51,25 @@ import java.util.Map;
  * becomes
  *
  * TS k8s
- * | STATS rate(request) BY _tsid
- * | STATS max(`rate(request)`)
+ * | STATS rate_$1 = rate(request) BY _tsid
+ * | STATS max(rate_$1)
  *
  * TS k8s | STATS max(rate(request)) BY host
  *
  * becomes
  *
  * TS k8s
- * | STATS rate(request), VALUES(host) BY _tsid
- * | STATS max(`rate(request)`) BY host=`VALUES(host)`
+ * | STATS rate_$1=rate(request), VALUES(host) BY _tsid
+ * | STATS max(rate_$1) BY host=`VALUES(host)`
  *
  * TS k8s | STATS avg(rate(request)) BY host
  *
  * becomes
  *
  * TS k8s
- * | STATS rate(request), VALUES(host) BY _tsid
- * | STATS sum=sum(`rate(request)`), count(`rate(request)`) BY host=`VALUES(host)`
- * | EVAL `avg(rate(request))` = `sum(rate(request))` / `count(rate(request))`
+ * | STATS rate_$1=rate(request), VALUES(host) BY _tsid
+ * | STATS sum(rate_$1), count(rate_$1) BY host=`VALUES(host)`
+ * | EVAL `avg(rate(request))` = `sum(rate_$1)` / `count(rate_$1)`
  * | KEEP `avg(rate(request))`, host
  *
  * TS k8s | STATS avg(rate(request)) BY host, bucket(@timestamp, 1minute)
@@ -78,9 +78,9 @@ import java.util.Map;
  *
  * TS k8s
  * | EVAL  `bucket(@timestamp, 1minute)`=datetrunc(@timestamp, 1minute)
- * | STATS rate(request), VALUES(host) BY _tsid,`bucket(@timestamp, 1minute)`
- * | STATS sum=sum(`rate(request)`), count(`rate(request)`) BY host=`VALUES(host)`, `bucket(@timestamp, 1minute)`
- * | EVAL `avg(rate(request))` = `sum(rate(request))` / `count(rate(request))`
+ * | STATS rate_$1=rate(request), VALUES(host) BY _tsid,`bucket(@timestamp, 1minute)`
+ * | STATS sum=sum(rate_$1), count(rate_$1) BY host=`VALUES(host)`, `bucket(@timestamp, 1minute)`
+ * | EVAL `avg(rate(request))` = `sum(rate_$1)` / `count(rate_$1)`
  * | KEEP `avg(rate(request))`, host, `bucket(@timestamp, 1minute)`
  * </pre>
  *
@@ -93,16 +93,16 @@ import java.util.Map;
  * TS k8s | STATS max(rate(request)), max(memory_used) becomes:
  *
  * TS k8s
- * | STATS rate(request), $p1=to_partial(max(memory_used)) BY _tsid
- * | STATS max(`rate(request)`), `max(memory_used)` = from_partial($p1, max($_))
+ * | STATS rate_$1=rate(request), $p1=to_partial(max(memory_used)) BY _tsid
+ * | STATS max(rate_$1), `max(memory_used)` = from_partial($p1, max($_))
  *
  * TS k8s | STATS max(rate(request)) avg(memory_used) BY host
  *
  * becomes
  *
  * TS k8s
- * | STATS rate(request), $p1=to_partial(sum(memory_used)), $p2=to_partial(count(memory_used)), VALUES(host) BY _tsid
- * | STATS max(`rate(request)`), $sum=from_partial($p1, sum($_)), $count=from_partial($p2, count($_)) BY host=`VALUES(host)`
+ * | STATS rate_$1=rate(request), $p1=to_partial(sum(memory_used)), $p2=to_partial(count(memory_used)), VALUES(host) BY _tsid
+ * | STATS max(rate_$1), $sum=from_partial($p1, sum($_)), $count=from_partial($p2, count($_)) BY host=`VALUES(host)`
  * | EVAL `avg(memory_used)` = $sum / $count
  * | KEEP `max(rate(request))`, `avg(memory_used)`, host
  *
@@ -112,9 +112,9 @@ import java.util.Map;
  *
  * TS k8s
  * | EVAL `bucket(@timestamp, 5m)` = datetrunc(@timestamp, '5m')
- * | STATS rate(request), $p1=to_partial(min(memory_used)), VALUES(pod) BY _tsid, `bucket(@timestamp, 5m)`
- * | STATS sum(`rate(request)`), `min(memory_used)` = from_partial($p1, min($)) BY pod=`VALUES(pod)`, `bucket(@timestamp, 5m)`
- * | KEEP `min(memory_used)`, `sum(rate(request))`, pod, `bucket(@timestamp, 5m)`
+ * | STATS rate_$1=rate(request), $p1=to_partial(min(memory_used)), VALUES(pod) BY _tsid, `bucket(@timestamp, 5m)`
+ * | STATS sum(rate_$1), `min(memory_used)` = from_partial($p1, min($)) BY pod=`VALUES(pod)`, `bucket(@timestamp, 5m)`
+ * | KEEP `min(memory_used)`, `sum(rate_$1)`, pod, `bucket(@timestamp, 5m)`
  *
  * {agg}_over_time time-series aggregation will be rewritten in the similar way
  *
@@ -123,8 +123,8 @@ import java.util.Map;
  * becomes
  *
  * FROM k8s
- * | STATS max_memory_usage = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
- * | STATS sum(max_memory_usage) BY host_values, time_bucket
+ * | STATS max_over_time_$1 = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
+ * | STATS sum(max_over_time_$1) BY host_values, time_bucket
  *
  *
  * TS k8s | STATS sum(avg_over_time(memory_usage)) BY host, bucket(@timestamp, 1minute)
@@ -132,9 +132,16 @@ import java.util.Map;
  * becomes
  *
  * FROM k8s
- * | STATS avg_memory_usage = avg(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
- * | STATS sum(avg_memory_usage) BY host_values, time_bucket
+ * | STATS avg_over_time_$1 = avg(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
+ * | STATS sum(avg_over_time_$1) BY host_values, time_bucket
  *
+ * TS k8s | STATS max(rate(post_requests) + rate(get_requests)) BY host, bucket(@timestamp, 1minute)
+ *
+ * becomes
+ *
+ * FROM k8s
+ * | STATS rate_$1=rate(post_requests), rate_$2=rate(post_requests) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
+ * | STATS max(rate_$1 + rate_$2) BY host_values, time_bucket
  * </pre>
  */
 public final class TranslateTimeSeriesAggregate extends OptimizerRules.OptimizerRule<Aggregate> {
@@ -157,6 +164,7 @@ public final class TranslateTimeSeriesAggregate extends OptimizerRules.Optimizer
         List<NamedExpression> firstPassAggs = new ArrayList<>();
         List<NamedExpression> secondPassAggs = new ArrayList<>();
         Holder<Boolean> hasRateAggregates = new Holder<>(Boolean.FALSE);
+        var internalNames = new InternalNames();
         for (NamedExpression agg : aggregate.aggregates()) {
             if (agg instanceof Alias alias && alias.child() instanceof AggregateFunction af) {
                 Holder<Boolean> changed = new Holder<>(Boolean.FALSE);
@@ -167,7 +175,7 @@ public final class TranslateTimeSeriesAggregate extends OptimizerRules.Optimizer
                     }
                     AggregateFunction firstStageFn = tsAgg.perTimeSeriesAggregation();
                     Alias newAgg = timeSeriesAggs.computeIfAbsent(firstStageFn, k -> {
-                        Alias firstStageAlias = new Alias(tsAgg.source(), agg.name(), firstStageFn);
+                        Alias firstStageAlias = new Alias(tsAgg.source(), internalNames.next(tsAgg.functionName()), firstStageFn);
                         firstPassAggs.add(firstStageAlias);
                         return firstStageAlias;
                     });
@@ -269,4 +277,13 @@ public final class TranslateTimeSeriesAggregate extends OptimizerRules.Optimizer
         groupings.forEach(g -> merged.add(Expressions.attribute(g)));
         return merged;
     }
+
+    private static class InternalNames {
+        final Map<String, Integer> next = new HashMap<>();
+
+        String next(String prefix) {
+            int id = next.merge(prefix, 1, Integer::sum);
+            return prefix + "_$" + id;
+        }
+    }
 }

+ 21 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

@@ -6900,6 +6900,27 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
         assertThat(Expressions.attribute(clusterValues.field()).name(), equalTo("cluster"));
     }
 
+    public void testTranslateSumOfTwoRates() {
+        assumeTrue("requires snapshot builds", Build.current().isSnapshot());
+        var query = """
+            TS k8s
+            | STATS max(rate(network.total_bytes_in) + rate(network.total_bytes_out)) BY pod, bucket(@timestamp, 5 minute), cluster
+            | SORT cluster
+            | LIMIT 10
+            """;
+        var plan = logicalOptimizer.optimize(metricsAnalyzer.analyze(parser.createStatement(query)));
+        TopN topN = as(plan, TopN.class);
+        Aggregate finalAgg = as(topN.child(), Aggregate.class);
+        Eval eval = as(finalAgg.child(), Eval.class);
+        assertThat(eval.fields(), hasSize(1));
+        Add sum = as(Alias.unwrap(eval.fields().get(0)), Add.class);
+        assertThat(Expressions.name(sum.left()), equalTo("RATE_$1"));
+        assertThat(Expressions.name(sum.right()), equalTo("RATE_$2"));
+        TimeSeriesAggregate aggsByTsid = as(eval.child(), TimeSeriesAggregate.class);
+        assertThat(Expressions.name(aggsByTsid.aggregates().get(0)), equalTo("RATE_$1"));
+        assertThat(Expressions.name(aggsByTsid.aggregates().get(1)), equalTo("RATE_$2"));
+    }
+
     public void testTranslateMixedAggsGroupedByTimeBucketAndDimensions() {
         assumeTrue("requires snapshot builds", Build.current().isSnapshot());
         var query = """