浏览代码

[ES|QL] Support implicit casting of aggregate_metric_double (#129108)

This commit adds support for implicit casting of aggregate_metric_double
when present with other numerics for a limited set of aggregation
functions:
- Max / MaxOverTime
- Min / MinOverTime
- Sum / SumOverTime
- Count / CountOverTime
- Avg / AvgOverTime

Attempting to use fields mapped to aggregate_metric_double in one index
but some other numeric in another index in any other context will still
require explicit casting with ToAggregateMetricDouble
Larisa Motova 3 月之前
父节点
当前提交
9fa06245a5

+ 2 - 0
x-pack/plugin/downsample/build.gradle

@@ -18,6 +18,8 @@ dependencies {
   compileOnly project(path: xpackModule('mapper-aggregate-metric'))
   testImplementation(testArtifact(project(xpackModule('core'))))
   testImplementation project(xpackModule('ccr'))
+  testImplementation project(xpackModule('esql'))
+  testImplementation project(xpackModule('esql-core'))
 }
 
 addQaCheckDependencies(project)

+ 164 - 0
x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java

@@ -7,24 +7,35 @@
 
 package org.elasticsearch.xpack.downsample;
 
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
 import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
 import org.elasticsearch.action.downsample.DownsampleAction;
 import org.elasticsearch.action.downsample.DownsampleConfig;
 import org.elasticsearch.action.support.SubscribableListener;
+import org.elasticsearch.cluster.metadata.DataStreamAction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 import org.elasticsearch.test.ClusterServiceUtils;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentFactory;
+import org.elasticsearch.xpack.esql.action.ColumnInfoImpl;
+import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
+import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
+import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
 
 import java.io.IOException;
 import java.time.Instant;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.xpack.downsample.DownsampleDataStreamTests.TIMEOUT;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 
 public class DownsampleIT extends DownsamplingIntegTestCase {
 
@@ -96,4 +107,157 @@ public class DownsampleIT extends DownsamplingIntegTestCase {
 
         assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig);
     }
+
+    public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
+        String dataStreamName = "metrics-foo";
+        Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host", "cluster")).build();
+        putTSDBIndexTemplate("my-template", List.of("metrics-foo"), settings, """
+            {
+              "properties": {
+                "host": {
+                  "type": "keyword",
+                  "time_series_dimension": true
+                },
+                "cluster" : {
+                  "type": "keyword",
+                  "time_series_dimension": true
+                },
+                "cpu": {
+                  "type": "double",
+                  "time_series_metric": "gauge"
+                }
+              }
+            }
+            """, null, null);
+
+        // Create data stream by indexing documents
+        final Instant now = Instant.now();
+        Supplier<XContentBuilder> sourceSupplier = () -> {
+            String ts = randomDateForRange(now.minusSeconds(60 * 60).toEpochMilli(), now.plusSeconds(60 * 29).toEpochMilli());
+            try {
+                return XContentFactory.jsonBuilder()
+                    .startObject()
+                    .field("@timestamp", ts)
+                    .field("host", randomFrom("host1", "host2", "host3"))
+                    .field("cluster", randomFrom("cluster1", "cluster2", "cluster3"))
+                    .field("cpu", randomDouble())
+                    .endObject();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+        bulkIndex(dataStreamName, sourceSupplier, 100);
+
+        // Rollover to ensure the index we will downsample is not the write index
+        assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)));
+        List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
+        String sourceIndex = backingIndices.get(0);
+        String interval = "5m";
+        String targetIndex = "downsample-" + interval + "-" + sourceIndex;
+        // Set the source index to read-only state
+        assertAcked(
+            indicesAdmin().prepareUpdateSettings(sourceIndex)
+                .setSettings(Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build())
+        );
+
+        DownsampleConfig downsampleConfig = new DownsampleConfig(new DateHistogramInterval(interval));
+        assertAcked(
+            client().execute(
+                DownsampleAction.INSTANCE,
+                new DownsampleAction.Request(TEST_REQUEST_TIMEOUT, sourceIndex, targetIndex, TIMEOUT, downsampleConfig)
+            )
+        );
+
+        // Wait for downsampling to complete
+        SubscribableListener<Void> listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
+            final var indexMetadata = clusterState.metadata().getProject().index(targetIndex);
+            if (indexMetadata == null) {
+                return false;
+            }
+            var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
+            return downsampleStatus == IndexMetadata.DownsampleTaskStatus.SUCCESS;
+        });
+        safeAwait(listener);
+
+        assertDownsampleIndexFieldsAndDimensions(sourceIndex, targetIndex, downsampleConfig);
+
+        // remove old backing index and replace with downsampled index and delete old so old is not queried
+        assertAcked(
+            client().execute(
+                ModifyDataStreamsAction.INSTANCE,
+                new ModifyDataStreamsAction.Request(
+                    TEST_REQUEST_TIMEOUT,
+                    TEST_REQUEST_TIMEOUT,
+                    List.of(
+                        DataStreamAction.removeBackingIndex(dataStreamName, sourceIndex),
+                        DataStreamAction.addBackingIndex(dataStreamName, targetIndex)
+                    )
+                )
+            ).actionGet()
+        );
+        assertAcked(client().execute(TransportDeleteIndexAction.TYPE, new DeleteIndexRequest(sourceIndex)).actionGet());
+
+        // index to the next backing index; random time between 31 and 59m in the future to because default look_ahead_time is 30m and we
+        // don't want to conflict with the previous backing index
+        Supplier<XContentBuilder> nextSourceSupplier = () -> {
+            String ts = randomDateForRange(now.plusSeconds(60 * 31).toEpochMilli(), now.plusSeconds(60 * 59).toEpochMilli());
+            try {
+                return XContentFactory.jsonBuilder()
+                    .startObject()
+                    .field("@timestamp", ts)
+                    .field("host", randomFrom("host1", "host2", "host3"))
+                    .field("cluster", randomFrom("cluster1", "cluster2", "cluster3"))
+                    .field("cpu", randomDouble())
+                    .endObject();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+        bulkIndex(dataStreamName, nextSourceSupplier, 100);
+
+        // Since the downsampled field (cpu) is downsampled in one index and not in the other, we want to confirm
+        // first that the field is unsupported and has 2 original types - double and aggregate_metric_double
+        try (var resp = esqlCommand("TS " + dataStreamName + " | KEEP @timestamp, host, cluster, cpu")) {
+            var columns = resp.columns();
+            assertThat(columns, hasSize(4));
+            assertThat(
+                resp.columns(),
+                equalTo(
+                    List.of(
+                        new ColumnInfoImpl("@timestamp", "date", null),
+                        new ColumnInfoImpl("host", "keyword", null),
+                        new ColumnInfoImpl("cluster", "keyword", null),
+                        new ColumnInfoImpl("cpu", "unsupported", List.of("aggregate_metric_double", "double"))
+                    )
+                )
+            );
+        }
+
+        // test _over_time commands with implicit casting of aggregate_metric_double
+        for (String innerCommand : List.of("min_over_time", "max_over_time", "avg_over_time", "count_over_time")) {
+            for (String outerCommand : List.of("min", "max", "sum", "count")) {
+                String command = outerCommand + " (" + innerCommand + "(cpu))";
+                String expectedType = innerCommand.equals("count_over_time") || outerCommand.equals("count") ? "long" : "double";
+                try (var resp = esqlCommand("TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)")) {
+                    var columns = resp.columns();
+                    assertThat(columns, hasSize(3));
+                    assertThat(
+                        resp.columns(),
+                        equalTo(
+                            List.of(
+                                new ColumnInfoImpl(command, expectedType, null),
+                                new ColumnInfoImpl("cluster", "keyword", null),
+                                new ColumnInfoImpl("bucket(@timestamp, 1 hour)", "date", null)
+                            )
+                        )
+                    );
+                    // TODO: verify the numbers are accurate
+                }
+            }
+        }
+    }
+
+    private EsqlQueryResponse esqlCommand(String command) throws IOException {
+        return client().execute(EsqlQueryAction.INSTANCE, new EsqlQueryRequest().query(command)).actionGet(30, TimeUnit.SECONDS);
+    }
 }

+ 8 - 1
x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsamplingIntegTestCase.java

@@ -45,6 +45,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentFactory;
 import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
 import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
+import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 
 import java.io.IOException;
 import java.time.LocalDateTime;
@@ -82,7 +83,13 @@ public abstract class DownsamplingIntegTestCase extends ESIntegTestCase {
 
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins() {
-        return List.of(DataStreamsPlugin.class, LocalStateCompositeXPackPlugin.class, Downsample.class, AggregateMetricMapperPlugin.class);
+        return List.of(
+            DataStreamsPlugin.class,
+            LocalStateCompositeXPackPlugin.class,
+            Downsample.class,
+            AggregateMetricMapperPlugin.class,
+            EsqlPlugin.class
+        );
     }
 
     /**

+ 25 - 20
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -890,6 +890,31 @@ public class EsqlCapabilities {
          */
         AGGREGATE_METRIC_DOUBLE_PARTIAL_SUBMETRICS(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),
 
+        /**
+         * Support for rendering aggregate_metric_double type
+         */
+        AGGREGATE_METRIC_DOUBLE_RENDERING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),
+
+        /**
+         * Support for to_aggregate_metric_double function
+         */
+        AGGREGATE_METRIC_DOUBLE_CONVERT_TO(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),
+
+        /**
+         * Support for sorting when aggregate_metric_doubles are present
+         */
+        AGGREGATE_METRIC_DOUBLE_SORTING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),
+
+        /**
+         * Support avg with aggregate metric doubles
+         */
+        AGGREGATE_METRIC_DOUBLE_AVG(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),
+
+        /**
+         * Support for implicit casting of aggregate metric double when run in aggregations
+         */
+        AGGREGATE_METRIC_DOUBLE_IMPLICIT_CASTING_IN_AGGS(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),
+
         /**
          * Support change point detection "CHANGE_POINT".
          */
@@ -913,11 +938,6 @@ public class EsqlCapabilities {
          */
         SUPPORT_PARTIAL_RESULTS,
 
-        /**
-         * Support for rendering aggregate_metric_double type
-         */
-        AGGREGATE_METRIC_DOUBLE_RENDERING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),
-
         /**
          * Support for RERANK command
          */
@@ -964,11 +984,6 @@ public class EsqlCapabilities {
          */
         NON_FULL_TEXT_FUNCTIONS_SCORING,
 
-        /**
-         * Support for to_aggregate_metric_double function
-         */
-        AGGREGATE_METRIC_DOUBLE_CONVERT_TO(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),
-
         /**
          * The {@code _query} API now reports the original types.
          */
@@ -995,11 +1010,6 @@ public class EsqlCapabilities {
          */
         MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT,
 
-        /**
-         * Support for sorting when aggregate_metric_doubles are present
-         */
-        AGGREGATE_METRIC_DOUBLE_SORTING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),
-
         /**
          * Supercedes {@link Cap#MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT}.
          */
@@ -1260,11 +1270,6 @@ public class EsqlCapabilities {
          */
         LIKE_ON_INDEX_FIELDS,
 
-        /**
-         * Support avg with aggregate metric doubles
-         */
-        AGGREGATE_METRIC_DOUBLE_AVG(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),
-
         /**
          * Forbid usage of brackets in unquoted index and enrich policy names
          * https://github.com/elastic/elasticsearch/issues/130378

+ 129 - 6
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.esql.analysis;
 import org.elasticsearch.common.logging.HeaderWarning;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.common.lucene.BytesRefs;
+import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.core.Strings;
 import org.elasticsearch.index.IndexMode;
@@ -53,6 +54,17 @@ import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.expression.function.FunctionDefinition;
 import org.elasticsearch.xpack.esql.expression.function.UnresolvedFunction;
 import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.Avg;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.CountOverTime;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.Max;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.MaxOverTime;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.Min;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.MinOverTime;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.Sum;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.SumOverTime;
 import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
 import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction;
 import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Case;
@@ -61,6 +73,8 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Least
 import org.elasticsearch.xpack.esql.expression.function.scalar.convert.AbstractConvertFunction;
 import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ConvertFunction;
 import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FoldablesConvertFunction;
+import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromAggregateMetricDouble;
+import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToAggregateMetricDouble;
 import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToDateNanos;
 import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToDouble;
 import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToInteger;
@@ -135,6 +149,7 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE;
 import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.IMPLICIT_CASTING_DATE_AND_DATE_NANOS;
+import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE;
 import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN;
 import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME;
 import static org.elasticsearch.xpack.esql.core.type.DataType.DATE_NANOS;
@@ -182,7 +197,8 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
             "Resolution",
             new ResolveRefs(),
             new ImplicitCasting(),
-            new ResolveUnionTypes()  // Must be after ResolveRefs, so union types can be found
+            new ResolveUnionTypes(),  // Must be after ResolveRefs, so union types can be found
+            new ImplicitCastAggregateMetricDoubles()
         ),
         new Batch<>("Finish Analysis", Limiter.ONCE, new AddImplicitLimit(), new AddImplicitForkLimit(), new UnionTypesCleanup())
     );
@@ -1688,9 +1704,15 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
                 return plan;
             }
 
-            // And add generated fields to EsRelation, so these new attributes will appear in the OutputExec of the Fragment
-            // and thereby get used in FieldExtractExec
-            plan = plan.transformDown(EsRelation.class, esr -> {
+            return addGeneratedFieldsToEsRelations(plan, unionFieldAttributes);
+        }
+
+        /**
+         * Add generated fields to EsRelation, so these new attributes will appear in the OutputExec of the Fragment
+         * and thereby get used in FieldExtractExec
+         */
+        private static LogicalPlan addGeneratedFieldsToEsRelations(LogicalPlan plan, List<FieldAttribute> unionFieldAttributes) {
+            return plan.transformDown(EsRelation.class, esr -> {
                 List<Attribute> missing = new ArrayList<>();
                 for (FieldAttribute fa : unionFieldAttributes) {
                     // Using outputSet().contains looks by NameId, resp. uses semanticEquals.
@@ -1710,7 +1732,6 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
                 }
                 return esr;
             });
-            return plan;
         }
 
         private Expression resolveConvertFunction(ConvertFunction convert, List<FieldAttribute> unionFieldAttributes) {
@@ -1838,7 +1859,10 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
                 originalFieldAttr.id(),
                 true
             );
-            Expression e = ((Expression) convert).replaceChildren(Collections.singletonList(resolvedAttr));
+            Expression fn = (Expression) convert;
+            List<Expression> children = new ArrayList<>(fn.children());
+            children.set(0, resolvedAttr);
+            Expression e = ((Expression) convert).replaceChildren(children);
             /*
              * Resolve surrogates immediately because these type specific conversions are serialized
              * and SurrogateExpressions are expected to be resolved on the coordinating node. At least,
@@ -1957,4 +1981,103 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
         var concreteConvert = ResolveUnionTypes.typeSpecificConvert(convert, fieldAttribute.source(), type, imf);
         typeResolutions.put(key, concreteConvert);
     }
+
+    /**
+     * Take InvalidMappedFields in specific aggregations (min, max, sum, count, and avg) and if all original data types
+     * are aggregate metric double + any combination of numerics, implicitly cast them to the same type: aggregate metric
+     * double for count, and double for min, max, and sum. Avg gets replaced with its surrogate (Div(Sum, Count))
+     */
+    private static class ImplicitCastAggregateMetricDoubles extends Rule<LogicalPlan, LogicalPlan> {
+
+        @Override
+        public LogicalPlan apply(LogicalPlan plan) {
+            return plan.transformUp(Aggregate.class, p -> p.childrenResolved() == false ? p : doRule(p));
+        }
+
+        private LogicalPlan doRule(Aggregate plan) {
+            Map<String, FieldAttribute> unionFields = new HashMap<>();
+            Holder<Boolean> aborted = new Holder<>(Boolean.FALSE);
+            var newPlan = plan.transformExpressionsOnly(AggregateFunction.class, aggFunc -> {
+                if (aggFunc.field() instanceof FieldAttribute fa && fa.field() instanceof InvalidMappedField mtf) {
+                    if (mtf.types().contains(AGGREGATE_METRIC_DOUBLE) == false
+                        || mtf.types().stream().allMatch(f -> f == AGGREGATE_METRIC_DOUBLE || f.isNumeric()) == false) {
+                        aborted.set(Boolean.TRUE);
+                        return aggFunc;
+                    }
+                    Map<String, Expression> typeConverters = typeConverters(aggFunc, fa, mtf);
+                    if (typeConverters == null) {
+                        aborted.set(Boolean.TRUE);
+                        return aggFunc;
+                    }
+                    var newField = unionFields.computeIfAbsent(
+                        Attribute.rawTemporaryName(fa.name(), aggFunc.functionName(), aggFunc.sourceText()),
+                        newName -> new FieldAttribute(
+                            fa.source(),
+                            fa.parentName(),
+                            newName,
+                            MultiTypeEsField.resolveFrom(mtf, typeConverters),
+                            fa.nullable(),
+                            null,
+                            true
+                        )
+                    );
+                    List<Expression> children = new ArrayList<>(aggFunc.children());
+                    children.set(0, newField);
+                    return aggFunc.replaceChildren(children);
+                }
+                return aggFunc;
+            });
+            if (unionFields.isEmpty() || aborted.get()) {
+                return plan;
+            }
+            return ResolveUnionTypes.addGeneratedFieldsToEsRelations(newPlan, unionFields.values().stream().toList());
+        }
+
+        private Map<String, Expression> typeConverters(AggregateFunction aggFunc, FieldAttribute fa, InvalidMappedField mtf) {
+            var metric = getMetric(aggFunc);
+            if (metric == null) {
+                return null;
+            }
+            Map<String, Expression> typeConverter = new HashMap<>();
+            for (DataType type : mtf.types()) {
+                final ConvertFunction convert;
+                // Counting on aggregate metric double has unique behavior in that we cannot just provide the number of
+                // documents, instead we have to look inside the aggregate metric double's count field and sum those together.
+                // Grabbing the count value with FromAggregateMetricDouble the same way we do with min/max/sum would result in
+                // a single Int field, and incorrectly be treated as 1 document (instead of however many originally went into
+                // the aggregate metric double).
+                if (metric == AggregateMetricDoubleBlockBuilder.Metric.COUNT) {
+                    convert = new ToAggregateMetricDouble(fa.source(), fa);
+                } else if (type == AGGREGATE_METRIC_DOUBLE) {
+                    convert = FromAggregateMetricDouble.withMetric(aggFunc.source(), fa, metric);
+                } else if (type.isNumeric()) {
+                    convert = new ToDouble(fa.source(), fa);
+                } else {
+                    return null;
+                }
+                Expression expression = ResolveUnionTypes.typeSpecificConvert(convert, fa.source(), type, mtf);
+                typeConverter.put(type.typeName(), expression);
+            }
+            return typeConverter;
+        }
+
+        private static AggregateMetricDoubleBlockBuilder.Metric getMetric(AggregateFunction aggFunc) {
+            if (aggFunc instanceof Max || aggFunc instanceof MaxOverTime) {
+                return AggregateMetricDoubleBlockBuilder.Metric.MAX;
+            }
+            if (aggFunc instanceof Min || aggFunc instanceof MinOverTime) {
+                return AggregateMetricDoubleBlockBuilder.Metric.MIN;
+            }
+            if (aggFunc instanceof Sum || aggFunc instanceof SumOverTime) {
+                return AggregateMetricDoubleBlockBuilder.Metric.SUM;
+            }
+            if (aggFunc instanceof Count || aggFunc instanceof CountOverTime) {
+                return AggregateMetricDoubleBlockBuilder.Metric.COUNT;
+            }
+            if (aggFunc instanceof Avg || aggFunc instanceof AvgOverTime) {
+                return AggregateMetricDoubleBlockBuilder.Metric.COUNT;
+            }
+            return null;
+        }
+    }
 }

+ 10 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgOverTime.java

@@ -14,12 +14,14 @@ import org.elasticsearch.xpack.esql.core.expression.Literal;
 import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.expression.SurrogateExpression;
 import org.elasticsearch.xpack.esql.expression.function.Example;
 import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesTo;
 import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesToLifecycle;
 import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
 import org.elasticsearch.xpack.esql.expression.function.FunctionType;
 import org.elasticsearch.xpack.esql.expression.function.Param;
+import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Div;
 
 import java.io.IOException;
 import java.util.List;
@@ -29,7 +31,7 @@ import static java.util.Collections.emptyList;
 /**
  * Similar to {@link Avg}, but it is used to calculate the average value over a time series of values from the given field.
  */
-public class AvgOverTime extends TimeSeriesAggregateFunction {
+public class AvgOverTime extends TimeSeriesAggregateFunction implements SurrogateExpression {
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
         Expression.class,
         "AvgOverTime",
@@ -93,6 +95,13 @@ public class AvgOverTime extends TimeSeriesAggregateFunction {
         return new AvgOverTime(source(), field(), filter);
     }
 
+    @Override
+    public Expression surrogate() {
+        Source s = source();
+        Expression f = field();
+        return new Div(s, new SumOverTime(s, f, filter()), new CountOverTime(s, f, filter()), dataType());
+    }
+
     @Override
     public AggregateFunction perTimeSeriesAggregation() {
         return new Avg(source(), field(), filter());

+ 13 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/FromAggregateMetricDouble.java

@@ -32,14 +32,16 @@ import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 
 import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
 import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
+import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE;
 import static org.elasticsearch.xpack.esql.core.type.DataType.DOUBLE;
 import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER;
 import static org.elasticsearch.xpack.esql.core.type.DataType.NULL;
 
-public class FromAggregateMetricDouble extends EsqlScalarFunction {
+public class FromAggregateMetricDouble extends EsqlScalarFunction implements ConvertFunction {
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
         Expression.class,
         "FromAggregateMetricDouble",
@@ -169,4 +171,14 @@ public class FromAggregateMetricDouble extends EsqlScalarFunction {
             }
         };
     }
+
+    @Override
+    public Expression field() {
+        return field;
+    }
+
+    @Override
+    public Set<DataType> supportedTypes() {
+        return Set.of(AGGREGATE_METRIC_DOUBLE);
+    }
 }

+ 4 - 4
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

@@ -65,7 +65,7 @@ import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.type.KeywordEsField;
 import org.elasticsearch.xpack.esql.core.type.MultiTypeEsField;
 import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField;
-import org.elasticsearch.xpack.esql.expression.function.scalar.convert.AbstractConvertFunction;
+import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction;
 import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec.Sort;
 import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
@@ -199,7 +199,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
             Expression conversion = unionTypes.getConversionExpressionForIndex(indexName);
             return conversion == null
                 ? BlockLoader.CONSTANT_NULLS
-                : new TypeConvertingBlockLoader(blockLoader, (AbstractConvertFunction) conversion);
+                : new TypeConvertingBlockLoader(blockLoader, (EsqlScalarFunction) conversion);
         }
         return blockLoader;
     }
@@ -506,9 +506,9 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
         private final BlockLoader delegate;
         private final TypeConverter typeConverter;
 
-        protected TypeConvertingBlockLoader(BlockLoader delegate, AbstractConvertFunction convertFunction) {
+        protected TypeConvertingBlockLoader(BlockLoader delegate, EsqlScalarFunction convertFunction) {
             this.delegate = delegate;
-            this.typeConverter = TypeConverter.fromConvertFunction(convertFunction);
+            this.typeConverter = TypeConverter.fromScalarFunction(convertFunction);
         }
 
         @Override

+ 2 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/TypeConverter.java

@@ -17,7 +17,7 @@ import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
 import org.elasticsearch.xpack.esql.core.expression.FoldContext;
 import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper;
-import org.elasticsearch.xpack.esql.expression.function.scalar.convert.AbstractConvertFunction;
+import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction;
 
 class TypeConverter {
     private final String evaluatorName;
@@ -28,7 +28,7 @@ class TypeConverter {
         this.convertEvaluator = convertEvaluator;
     }
 
-    public static TypeConverter fromConvertFunction(AbstractConvertFunction convertFunction) {
+    public static TypeConverter fromScalarFunction(EsqlScalarFunction convertFunction) {
         DriverContext driverContext1 = new DriverContext(
             BigArrays.NON_RECYCLING_INSTANCE,
             new org.elasticsearch.compute.data.BlockFactory(

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java

@@ -305,7 +305,7 @@ public class TestPhysicalOperationProviders extends AbstractPhysicalOperationPro
         }
         return switch (extractBlockForSingleDoc(indexDoc, ((FieldAttribute) conversion.field()).fieldName().string(), blockCopier)) {
             case BlockResultMissing unused -> getNullsBlock(indexDoc);
-            case BlockResultSuccess success -> TypeConverter.fromConvertFunction(conversion).convert(success.block);
+            case BlockResultSuccess success -> TypeConverter.fromScalarFunction(conversion).convert(success.block);
         };
     }
 

+ 378 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/46_downsample.yml

@@ -251,3 +251,381 @@ setup:
   - match: {values.0.1: 800479.0}
   - match: {values.0.2: 4812452.0}
   - match: {values.0.3: 6}
+
+---
+"Stats from downsampled and non-downsampled index simultaneously with implicit casting":
+  - requires:
+      test_runner_features: [capabilities]
+      capabilities:
+        - method: POST
+          path: /_query
+          parameters: []
+          capabilities: [aggregate_metric_double_implicit_casting_in_aggs]
+      reason: "Support for casting aggregate metric double implicitly when present in aggregations"
+
+  - do:
+      indices.downsample:
+        index: test
+        target_index: test-downsample
+        body: >
+          {
+            "fixed_interval": "1h"
+          }
+  - is_true: acknowledged
+
+  - do:
+      indices.create:
+        index: test-2
+        body:
+          settings:
+            number_of_shards: 1
+            index:
+              mode: time_series
+              routing_path: [ metricset, k8s.pod.uid ]
+              time_series:
+                start_time: 2021-04-29T00:00:00Z
+                end_time: 2021-04-30T00:00:00Z
+          mappings:
+            properties:
+              "@timestamp":
+                type: date
+              metricset:
+                type: keyword
+                time_series_dimension: true
+              k8s:
+                properties:
+                  pod:
+                    properties:
+                      uid:
+                        type: keyword
+                        time_series_dimension: true
+                      name:
+                        type: keyword
+                      created_at:
+                        type: date_nanos
+                      running:
+                        type: boolean
+                      number_of_containers:
+                        type: integer
+                      ip:
+                        type: ip
+                      tags:
+                        type: keyword
+                      values:
+                        type: integer
+                      network:
+                        properties:
+                          tx:
+                            type: long
+                            time_series_metric: gauge
+                          rx:
+                            type: long
+                            time_series_metric: gauge
+
+  - do:
+      bulk:
+        refresh: true
+        index: test-2
+        body:
+          - '{"index": {}}'
+          - '{"@timestamp": "2021-04-29T21:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001810, "rx": 802339}, "created_at": "2021-04-28T19:34:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 6]}}}'
+          - '{"index": {}}'
+          - '{"@timestamp": "2021-04-29T21:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.26", "network": {"tx": 2000177, "rx": 800479}, "created_at": "2021-04-28T19:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west1"], "values": [1, 1, 3]}}}'
+          - '{"index": {}}'
+
+  - do:
+      esql.query:
+        body:
+          query: "FROM test-* |
+          WHERE k8s.pod.uid == \"947e4ced-1786-4e53-9e0c-5c447e959507\" |
+          STATS max(k8s.pod.network.rx), min(k8s.pod.network.rx), sum(k8s.pod.network.rx), count(k8s.pod.network.rx), avg(k8s.pod.network.rx) |
+          LIMIT 100"
+
+  - length: {values: 1}
+  - length: {values.0: 5}
+  - match: {columns.0.name: "max(k8s.pod.network.rx)"}
+  - match: {columns.0.type: "double"}
+  - match: {columns.1.name: "min(k8s.pod.network.rx)"}
+  - match: {columns.1.type: "double"}
+  - match: {columns.2.name: "sum(k8s.pod.network.rx)"}
+  - match: {columns.2.type: "double"}
+  - match: {columns.3.name: "count(k8s.pod.network.rx)"}
+  - match: {columns.3.type: "long"}
+  - match: {columns.4.name: "avg(k8s.pod.network.rx)"}
+  - match: {columns.4.type: "double"}
+  - match: {values.0.0: 803685.0}
+  - match: {values.0.1: 800479.0}
+  - match: {values.0.2: 4812452.0}
+  - match: {values.0.3: 6}
+  - match: {values.0.4: 802075.3333333334}
+
+  - do:
+      esql.query:
+        body:
+          query: "TS test-* | STATS max = max(k8s.pod.network.rx)  | LIMIT 100"
+  - length: {values: 1}
+  - length: {values.0: 1}
+  - match: {columns.0.name: "max"}
+  - match: {columns.0.type: "double"}
+  - match: {values.0.0: 803685.0}
+
+---
+"Over time functions from downsampled and non-downsampled indices simultaneously, no grouping":
+  - requires:
+      test_runner_features: [capabilities]
+      capabilities:
+        - method: POST
+          path: /_query
+          parameters: []
+          capabilities: [aggregate_metric_double_implicit_casting_in_aggs]
+      reason: "Support for casting aggregate metric double implicitly when present in aggregations"
+
+  - do:
+      indices.downsample:
+        index: test
+        target_index: test-downsample
+        body: >
+          {
+            "fixed_interval": "1h"
+          }
+  - is_true: acknowledged
+
+  - do:
+      indices.create:
+        index: test-2
+        body:
+          settings:
+            number_of_shards: 1
+            index:
+              mode: time_series
+              routing_path: [ metricset, k8s.pod.uid ]
+              time_series:
+                start_time: 2021-04-29T00:00:00Z
+                end_time: 2021-04-30T00:00:00Z
+          mappings:
+            properties:
+              "@timestamp":
+                type: date
+              metricset:
+                type: keyword
+                time_series_dimension: true
+              k8s:
+                properties:
+                  pod:
+                    properties:
+                      uid:
+                        type: keyword
+                        time_series_dimension: true
+                      name:
+                        type: keyword
+                      created_at:
+                        type: date_nanos
+                      running:
+                        type: boolean
+                      number_of_containers:
+                        type: integer
+                      ip:
+                        type: ip
+                      tags:
+                        type: keyword
+                      values:
+                        type: integer
+                      network:
+                        properties:
+                          tx:
+                            type: long
+                            time_series_metric: gauge
+                          rx:
+                            type: long
+                            time_series_metric: gauge
+
+  - do:
+      bulk:
+        refresh: true
+        index: test-2
+        body:
+          - '{"index": {}}'
+          - '{"@timestamp": "2021-04-29T21:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.10", "network": {"tx": 2005820, "rx": 802339}, "created_at": "2021-04-29T21:34:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 6]}}}'
+          - '{"index": {}}'
+          - '{"@timestamp": "2021-04-29T21:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.28", "network": {"tx": 2000481, "rx": 800479}, "created_at": "2021-04-29T21:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west1"], "values": [1, 1, 3]}}}'
+          - '{"index": {}}'
+          - '{"@timestamp": "2021-04-29T21:50:14.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.192", "network": {"tx": 1458377, "rx": 530184}, "created_at": "2021-04-29T21:36:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "test"], "values": [3, 3, 1]}}}'
+          - '{"index": {}}'
+          - '{"@timestamp": "2021-04-29T21:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.206", "network": {"tx": 1434104, "rx": 535020}, "created_at": "2021-04-29T21:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west2"], "values": [4, 1, 3]}}}'
+          - '{"index": {}}'
+
+  - do:
+      esql.query:
+        body:
+          query: "TS test-* |
+          STATS avg = sum(avg_over_time(k8s.pod.network.rx)),
+                count = sum(count_over_time(k8s.pod.network.rx)),
+                sum = sum(sum_over_time(k8s.pod.network.rx))
+                BY time_bucket = bucket(@timestamp, 1 hour) |
+          SORT time_bucket | LIMIT 10"
+
+  - length: {values: 4}
+  - length: {values.0: 4}
+  - match: {columns.0.name: "avg"}
+  - match: {columns.0.type: "double"}
+  - match: {columns.1.name: "count"}
+  - match: {columns.1.type: "long"}
+  - match: {columns.2.name: "sum"}
+  - match: {columns.2.type: "double"}
+  - match: {columns.3.name: "time_bucket"}
+  - match: {columns.3.type: "date"}
+  - match: {values.0.0: 1332393.5}
+  - match: {values.0.1: 4}
+  - match: {values.0.2: 2664787.0}
+  - match: {values.0.3: "2021-04-28T18:00:00.000Z"}
+  - match: {values.1.0: 530604.5}
+  - match: {values.1.1: 2}
+  - match: {values.1.2: 1061209.0}
+  - match: {values.1.3: "2021-04-28T19:00:00.000Z"}
+  - match: {values.2.0: 803011.0}
+  - match: {values.2.1: 2}
+  - match: {values.2.2: 1606022.0}
+  - match: {values.2.3: "2021-04-28T20:00:00.000Z"}
+  - match: {values.3.0: 1334011.0}
+  - match: {values.3.1: 4}
+  - match: {values.3.2: 2668022.0}
+  - match: {values.3.3: "2021-04-29T21:00:00.000Z"}
+
+---
+"Over time functions from downsampled and non-downsampled indices simultaneously, with grouping":
+  - requires:
+      test_runner_features: [capabilities]
+      capabilities:
+        - method: POST
+          path: /_query
+          parameters: []
+          capabilities: [aggregate_metric_double_implicit_casting_in_aggs]
+      reason: "Support for casting aggregate metric double implicitly when present in aggregations"
+
+  - do:
+      indices.downsample:
+        index: test
+        target_index: test-downsample
+        body: >
+          {
+            "fixed_interval": "1h"
+          }
+  - is_true: acknowledged
+
+  - do:
+      indices.create:
+        index: test-2
+        body:
+          settings:
+            number_of_shards: 1
+            index:
+              mode: time_series
+              routing_path: [ metricset, k8s.pod.uid ]
+              time_series:
+                start_time: 2021-04-29T00:00:00Z
+                end_time: 2021-04-30T00:00:00Z
+          mappings:
+            properties:
+              "@timestamp":
+                type: date
+              metricset:
+                type: keyword
+                time_series_dimension: true
+              k8s:
+                properties:
+                  pod:
+                    properties:
+                      uid:
+                        type: keyword
+                        time_series_dimension: true
+                      name:
+                        type: keyword
+                      created_at:
+                        type: date_nanos
+                      running:
+                        type: boolean
+                      number_of_containers:
+                        type: integer
+                      ip:
+                        type: ip
+                      tags:
+                        type: keyword
+                      values:
+                        type: integer
+                      network:
+                        properties:
+                          tx:
+                            type: long
+                            time_series_metric: gauge
+                          rx:
+                            type: long
+                            time_series_metric: gauge
+
+  - do:
+      bulk:
+        refresh: true
+        index: test-2
+        body:
+          - '{"index": {}}'
+          - '{"@timestamp": "2021-04-29T21:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.10", "network": {"tx": 2005820, "rx": 802339}, "created_at": "2021-04-29T21:34:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 6]}}}'
+          - '{"index": {}}'
+          - '{"@timestamp": "2021-04-29T21:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.28", "network": {"tx": 2000481, "rx": 800479}, "created_at": "2021-04-29T21:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west1"], "values": [1, 1, 3]}}}'
+          - '{"index": {}}'
+          - '{"@timestamp": "2021-04-29T21:50:14.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.192", "network": {"tx": 1458377, "rx": 530184}, "created_at": "2021-04-29T21:36:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "test"], "values": [3, 3, 1]}}}'
+          - '{"index": {}}'
+          - '{"@timestamp": "2021-04-29T21:50:44.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.206", "network": {"tx": 1434104, "rx": 535020}, "created_at": "2021-04-29T21:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west2"], "values": [4, 1, 3]}}}'
+          - '{"index": {}}'
+
+  - do:
+      esql.query:
+        body:
+          query: "TS test-* |
+          STATS avg = sum(avg_over_time(k8s.pod.network.rx)),
+                count = sum(count_over_time(k8s.pod.network.rx)),
+                sum = sum(sum_over_time(k8s.pod.network.rx))
+          BY k8s.pod.name, time_bucket = bucket(@timestamp, 1 hour) |
+          SORT time_bucket, k8s.pod.name |
+          LIMIT 10"
+
+  - length: {values: 6}
+  - length: {values.0: 5}
+  - match: {columns.0.name: "avg"}
+  - match: {columns.0.type: "double"}
+  - match: {columns.1.name: "count"}
+  - match: {columns.1.type: "long"}
+  - match: {columns.2.name: "sum"}
+  - match: {columns.2.type: "double"}
+  - match: {columns.3.name: "k8s.pod.name"}
+  - match: {columns.3.type: "keyword"}
+  - match: {columns.4.name: "time_bucket"}
+  - match: {columns.4.type: "date"}
+  - match: {values.0.0: 801806.0}
+  - match: {values.0.1: 2}
+  - match: {values.0.2: 1603612.0}
+  - match: {values.0.3: "cat"}
+  - match: {values.0.4: "2021-04-28T18:00:00.000Z"}
+  - match: {values.1.0: 530587.5}
+  - match: {values.1.1: 2}
+  - match: {values.1.2: 1061175.0}
+  - match: {values.1.3: "dog"}
+  - match: {values.1.4: "2021-04-28T18:00:00.000Z"}
+  - match: {values.2.0: 530604.5}
+  - match: {values.2.1: 2}
+  - match: {values.2.2: 1061209.0}
+  - match: {values.2.3: "dog"}
+  - match: {values.2.4: "2021-04-28T19:00:00.000Z"}
+  - match: {values.3.0: 803011.0}
+  - match: {values.3.1: 2}
+  - match: {values.3.2: 1606022.0}
+  - match: {values.3.3: "cat"}
+  - match: {values.3.4: "2021-04-28T20:00:00.000Z"}
+  - match: {values.4.0: 801409.0}
+  - match: {values.4.1: 2}
+  - match: {values.4.2: 1602818.0}
+  - match: {values.4.3: "cat"}
+  - match: {values.4.4: "2021-04-29T21:00:00.000Z"}
+  - match: {values.5.0: 532602.0}
+  - match: {values.5.1: 2}
+  - match: {values.5.2: 1065204.0}
+  - match: {values.5.3: "dog"}
+  - match: {values.5.4: "2021-04-29T21:00:00.000Z"}