Browse Source

Support metrics counter types in ESQL (#107877)

This commit adds support for numeric metrics counter fields in ES|QL. 
These counter types, including counter_long, counter_integer, and
counter_double, are different from their parent types. Users will have
limited interaction with these counter types, restricted to:

- Retrieving values without any processing
- Casting to their root type (e.g., to_long(a_long_counter))
- Using them in the metrics rate aggregation

These restrictions are intentional to prevent misuse. If users want to 
use them as numeric values, explicit casting to their root types is
required.
Nhat Nguyen 1 year ago
parent
commit
22aad7b201
38 changed files with 476 additions and 110 deletions
  1. 5 0
      docs/changelog/107877.yaml
  2. 36 0
      docs/reference/esql/functions/kibana/definition/to_double.json
  3. 12 0
      docs/reference/esql/functions/kibana/definition/to_integer.json
  4. 24 0
      docs/reference/esql/functions/kibana/definition/to_long.json
  5. 3 0
      docs/reference/esql/functions/types/to_double.asciidoc
  6. 1 0
      docs/reference/esql/functions/types/to_integer.asciidoc
  7. 2 0
      docs/reference/esql/functions/types/to_long.asciidoc
  8. 0 4
      server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java
  9. 10 10
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/meta.csv-spec
  10. 30 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/tsdb-mapping.json
  11. 3 3
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java
  12. 6 6
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java
  13. 4 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java
  14. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Avg.java
  15. 3 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Count.java
  16. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountDistinct.java
  17. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Median.java
  18. 2 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/NumericAggregate.java
  19. 17 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDouble.java
  20. 4 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToInteger.java
  21. 15 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToLong.java
  22. 2 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java
  23. 6 3
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java
  24. 7 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlFeatures.java
  25. 3 4
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeRegistry.java
  26. 25 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypes.java
  27. 3 3
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java
  28. 4 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java
  29. 38 38
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java
  30. 39 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java
  31. 54 3
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractFunctionTestCase.java
  32. 27 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDoubleTests.java
  33. 10 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToIntegerTests.java
  34. 18 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToLongTests.java
  35. 5 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeRegistryTests.java
  36. 7 1
      x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/type/Types.java
  37. 2 1
      x-pack/plugin/ql/src/test/java/org/elasticsearch/xpack/ql/type/TypesTests.java
  38. 46 14
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml

+ 5 - 0
docs/changelog/107877.yaml

@@ -0,0 +1,5 @@
+pr: 107877
+summary: Support metrics counter types in ESQL
+area: "ES|QL"
+type: enhancement
+issues: []

+ 36 - 0
docs/reference/esql/functions/kibana/definition/to_double.json

@@ -16,6 +16,42 @@
       "variadic" : false,
       "variadic" : false,
       "returnType" : "double"
       "returnType" : "double"
     },
     },
+    {
+      "params" : [
+        {
+          "name" : "field",
+          "type" : "counter_double",
+          "optional" : false,
+          "description" : "Input value. The input can be a single- or multi-valued column or an expression."
+        }
+      ],
+      "variadic" : false,
+      "returnType" : "double"
+    },
+    {
+      "params" : [
+        {
+          "name" : "field",
+          "type" : "counter_integer",
+          "optional" : false,
+          "description" : "Input value. The input can be a single- or multi-valued column or an expression."
+        }
+      ],
+      "variadic" : false,
+      "returnType" : "double"
+    },
+    {
+      "params" : [
+        {
+          "name" : "field",
+          "type" : "counter_long",
+          "optional" : false,
+          "description" : "Input value. The input can be a single- or multi-valued column or an expression."
+        }
+      ],
+      "variadic" : false,
+      "returnType" : "double"
+    },
     {
     {
       "params" : [
       "params" : [
         {
         {

+ 12 - 0
docs/reference/esql/functions/kibana/definition/to_integer.json

@@ -16,6 +16,18 @@
       "variadic" : false,
       "variadic" : false,
       "returnType" : "integer"
       "returnType" : "integer"
     },
     },
+    {
+      "params" : [
+        {
+          "name" : "field",
+          "type" : "counter_integer",
+          "optional" : false,
+          "description" : "Input value. The input can be a single- or multi-valued column or an expression."
+        }
+      ],
+      "variadic" : false,
+      "returnType" : "integer"
+    },
     {
     {
       "params" : [
       "params" : [
         {
         {

+ 24 - 0
docs/reference/esql/functions/kibana/definition/to_long.json

@@ -16,6 +16,30 @@
       "variadic" : false,
       "variadic" : false,
       "returnType" : "long"
       "returnType" : "long"
     },
     },
+    {
+      "params" : [
+        {
+          "name" : "field",
+          "type" : "counter_integer",
+          "optional" : false,
+          "description" : "Input value. The input can be a single- or multi-valued column or an expression."
+        }
+      ],
+      "variadic" : false,
+      "returnType" : "long"
+    },
+    {
+      "params" : [
+        {
+          "name" : "field",
+          "type" : "counter_long",
+          "optional" : false,
+          "description" : "Input value. The input can be a single- or multi-valued column or an expression."
+        }
+      ],
+      "variadic" : false,
+      "returnType" : "long"
+    },
     {
     {
       "params" : [
       "params" : [
         {
         {

+ 3 - 0
docs/reference/esql/functions/types/to_double.asciidoc

@@ -6,6 +6,9 @@
 |===
 |===
 field | result
 field | result
 boolean | double
 boolean | double
+counter_double | double
+counter_integer | double
+counter_long | double
 datetime | double
 datetime | double
 double | double
 double | double
 integer | double
 integer | double

+ 1 - 0
docs/reference/esql/functions/types/to_integer.asciidoc

@@ -6,6 +6,7 @@
 |===
 |===
 field | result
 field | result
 boolean | integer
 boolean | integer
+counter_integer | integer
 datetime | integer
 datetime | integer
 double | integer
 double | integer
 integer | integer
 integer | integer

+ 2 - 0
docs/reference/esql/functions/types/to_long.asciidoc

@@ -6,6 +6,8 @@
 |===
 |===
 field | result
 field | result
 boolean | long
 boolean | long
+counter_integer | long
+counter_long | long
 datetime | long
 datetime | long
 double | long
 double | long
 integer | long
 integer | long

+ 0 - 4
server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java

@@ -1709,10 +1709,6 @@ public class NumberFieldMapper extends FieldMapper {
 
 
         @Override
         @Override
         public BlockLoader blockLoader(BlockLoaderContext blContext) {
         public BlockLoader blockLoader(BlockLoaderContext blContext) {
-            if (indexMode == IndexMode.TIME_SERIES && metricType == TimeSeriesParams.MetricType.COUNTER) {
-                // Counters are not supported by ESQL so we load them in null
-                return BlockLoader.CONSTANT_NULLS;
-            }
             if (hasDocValues()) {
             if (hasDocValues()) {
                 return type.blockLoaderFromDocValues(name());
                 return type.blockLoaderFromDocValues(name());
             }
             }

+ 10 - 10
x-pack/plugin/esql/qa/testFixtures/src/main/resources/meta.csv-spec

@@ -85,16 +85,16 @@ double tau()
 "cartesian_point to_cartesianpoint(field:cartesian_point|keyword|text)"
 "cartesian_point to_cartesianpoint(field:cartesian_point|keyword|text)"
 "cartesian_shape to_cartesianshape(field:cartesian_point|cartesian_shape|keyword|text)"
 "cartesian_shape to_cartesianshape(field:cartesian_point|cartesian_shape|keyword|text)"
 "date to_datetime(field:date|keyword|text|double|long|unsigned_long|integer)"
 "date to_datetime(field:date|keyword|text|double|long|unsigned_long|integer)"
-"double to_dbl(field:boolean|date|keyword|text|double|long|unsigned_long|integer)"
+"double to_dbl(field:boolean|date|keyword|text|double|long|unsigned_long|integer|counter_double|counter_integer|counter_long)"
 "double to_degrees(number:double|integer|long|unsigned_long)"
 "double to_degrees(number:double|integer|long|unsigned_long)"
-"double to_double(field:boolean|date|keyword|text|double|long|unsigned_long|integer)"
+"double to_double(field:boolean|date|keyword|text|double|long|unsigned_long|integer|counter_double|counter_integer|counter_long)"
 "date to_dt(field:date|keyword|text|double|long|unsigned_long|integer)"
 "date to_dt(field:date|keyword|text|double|long|unsigned_long|integer)"
 "geo_point to_geopoint(field:geo_point|keyword|text)"
 "geo_point to_geopoint(field:geo_point|keyword|text)"
 "geo_shape to_geoshape(field:geo_point|geo_shape|keyword|text)"
 "geo_shape to_geoshape(field:geo_point|geo_shape|keyword|text)"
-"integer to_int(field:boolean|date|keyword|text|double|long|unsigned_long|integer)"
-"integer to_integer(field:boolean|date|keyword|text|double|long|unsigned_long|integer)"
+"integer to_int(field:boolean|date|keyword|text|double|long|unsigned_long|integer|counter_integer)"
+"integer to_integer(field:boolean|date|keyword|text|double|long|unsigned_long|integer|counter_integer)"
 "ip to_ip(field:ip|keyword|text)"
 "ip to_ip(field:ip|keyword|text)"
-"long to_long(field:boolean|date|keyword|text|double|long|unsigned_long|integer)"
+"long to_long(field:boolean|date|keyword|text|double|long|unsigned_long|integer|counter_integer|counter_long)"
 "keyword|text to_lower(str:keyword|text)"
 "keyword|text to_lower(str:keyword|text)"
 "double to_radians(number:double|integer|long|unsigned_long)"
 "double to_radians(number:double|integer|long|unsigned_long)"
 "keyword to_str(field:boolean|cartesian_point|cartesian_shape|date|double|geo_point|geo_shape|integer|ip|keyword|long|text|unsigned_long|version)"
 "keyword to_str(field:boolean|cartesian_point|cartesian_shape|date|double|geo_point|geo_shape|integer|ip|keyword|long|text|unsigned_long|version)"
@@ -198,16 +198,16 @@ to_boolean    |field                               |"boolean|keyword|text|double
 to_cartesianpo|field                               |"cartesian_point|keyword|text"                                                                                                    |Input value. The input can be a single- or multi-valued column or an expression.
 to_cartesianpo|field                               |"cartesian_point|keyword|text"                                                                                                    |Input value. The input can be a single- or multi-valued column or an expression.
 to_cartesiansh|field                               |"cartesian_point|cartesian_shape|keyword|text"                                                                                    |Input value. The input can be a single- or multi-valued column or an expression.
 to_cartesiansh|field                               |"cartesian_point|cartesian_shape|keyword|text"                                                                                    |Input value. The input can be a single- or multi-valued column or an expression.
 to_datetime   |field                               |"date|keyword|text|double|long|unsigned_long|integer"                                                                             |Input value. The input can be a single- or multi-valued column or an expression.
 to_datetime   |field                               |"date|keyword|text|double|long|unsigned_long|integer"                                                                             |Input value. The input can be a single- or multi-valued column or an expression.
-to_dbl        |field                               |"boolean|date|keyword|text|double|long|unsigned_long|integer"                                                                     |Input value. The input can be a single- or multi-valued column or an expression.
+to_dbl        |field                               |"boolean|date|keyword|text|double|long|unsigned_long|integer|counter_double|counter_integer|counter_long"                         |Input value. The input can be a single- or multi-valued column or an expression.
 to_degrees    |number                              |"double|integer|long|unsigned_long"                                                                                               |Input value. The input can be a single- or multi-valued column or an expression.
 to_degrees    |number                              |"double|integer|long|unsigned_long"                                                                                               |Input value. The input can be a single- or multi-valued column or an expression.
-to_double     |field                               |"boolean|date|keyword|text|double|long|unsigned_long|integer"                                                                     |Input value. The input can be a single- or multi-valued column or an expression.
+to_double     |field                               |"boolean|date|keyword|text|double|long|unsigned_long|integer|counter_double|counter_integer|counter_long"                         |Input value. The input can be a single- or multi-valued column or an expression.
 to_dt         |field                               |"date|keyword|text|double|long|unsigned_long|integer"                                                                             |Input value. The input can be a single- or multi-valued column or an expression.
 to_dt         |field                               |"date|keyword|text|double|long|unsigned_long|integer"                                                                             |Input value. The input can be a single- or multi-valued column or an expression.
 to_geopoint   |field                               |"geo_point|keyword|text"                                                                                                          |Input value. The input can be a single- or multi-valued column or an expression.
 to_geopoint   |field                               |"geo_point|keyword|text"                                                                                                          |Input value. The input can be a single- or multi-valued column or an expression.
 to_geoshape   |field                               |"geo_point|geo_shape|keyword|text"                                                                                                |Input value. The input can be a single- or multi-valued column or an expression.
 to_geoshape   |field                               |"geo_point|geo_shape|keyword|text"                                                                                                |Input value. The input can be a single- or multi-valued column or an expression.
-to_int        |field                               |"boolean|date|keyword|text|double|long|unsigned_long|integer"                                                                     |Input value. The input can be a single- or multi-valued column or an expression.
-to_integer    |field                               |"boolean|date|keyword|text|double|long|unsigned_long|integer"                                                                     |Input value. The input can be a single- or multi-valued column or an expression.
+to_int        |field                               |"boolean|date|keyword|text|double|long|unsigned_long|integer|counter_integer"                                                     |Input value. The input can be a single- or multi-valued column or an expression.
+to_integer    |field                               |"boolean|date|keyword|text|double|long|unsigned_long|integer|counter_integer"                                                     |Input value. The input can be a single- or multi-valued column or an expression.
 to_ip         |field                               |"ip|keyword|text"                                                                                                                 |Input value. The input can be a single- or multi-valued column or an expression.
 to_ip         |field                               |"ip|keyword|text"                                                                                                                 |Input value. The input can be a single- or multi-valued column or an expression.
-to_long       |field                               |"boolean|date|keyword|text|double|long|unsigned_long|integer"                                                                     |Input value. The input can be a single- or multi-valued column or an expression.
+to_long       |field                               |"boolean|date|keyword|text|double|long|unsigned_long|integer|counter_integer|counter_long"                                        |Input value. The input can be a single- or multi-valued column or an expression.
 to_lower      |str                                 |"keyword|text"                                                                                                                    |String expression. If `null`, the function returns `null`.
 to_lower      |str                                 |"keyword|text"                                                                                                                    |String expression. If `null`, the function returns `null`.
 to_radians    |number                              |"double|integer|long|unsigned_long"                                                                                               |Input value. The input can be a single- or multi-valued column or an expression.
 to_radians    |number                              |"double|integer|long|unsigned_long"                                                                                               |Input value. The input can be a single- or multi-valued column or an expression.
 to_str        |field                               |"boolean|cartesian_point|cartesian_shape|date|double|geo_point|geo_shape|integer|ip|keyword|long|text|unsigned_long|version"      |Input value. The input can be a single- or multi-valued column or an expression.
 to_str        |field                               |"boolean|cartesian_point|cartesian_shape|date|double|geo_point|geo_shape|integer|ip|keyword|long|text|unsigned_long|version"      |Input value. The input can be a single- or multi-valued column or an expression.

+ 30 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/tsdb-mapping.json

@@ -0,0 +1,30 @@
+{
+  "properties": {
+    "@timestamp": {
+      "type": "date"
+    },
+    "metricset": {
+      "type": "keyword",
+      "time_series_dimension": true
+    },
+    "name": {
+      "type": "keyword"
+    },
+    "network": {
+      "properties": {
+        "connections": {
+          "type": "long",
+          "time_series_metric": "gauge"
+        },
+        "bytes_in": {
+          "type": "long",
+          "time_series_metric": "counter"
+        },
+        "bytes_out": {
+          "type": "long",
+          "time_series_metric": "counter"
+        }
+      }
+    }
+  }
+}

+ 3 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java

@@ -61,21 +61,21 @@ abstract class PositionToXContent {
 
 
     public static PositionToXContent positionToXContent(ColumnInfo columnInfo, Block block, BytesRef scratch) {
     public static PositionToXContent positionToXContent(ColumnInfo columnInfo, Block block, BytesRef scratch) {
         return switch (columnInfo.type()) {
         return switch (columnInfo.type()) {
-            case "long" -> new PositionToXContent(block) {
+            case "long", "counter_long" -> new PositionToXContent(block) {
                 @Override
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
                     throws IOException {
                     return builder.value(((LongBlock) block).getLong(valueIndex));
                     return builder.value(((LongBlock) block).getLong(valueIndex));
                 }
                 }
             };
             };
-            case "integer" -> new PositionToXContent(block) {
+            case "integer", "counter_integer" -> new PositionToXContent(block) {
                 @Override
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
                     throws IOException {
                     return builder.value(((IntBlock) block).getInt(valueIndex));
                     return builder.value(((IntBlock) block).getInt(valueIndex));
                 }
                 }
             };
             };
-            case "double" -> new PositionToXContent(block) {
+            case "double", "counter_double" -> new PositionToXContent(block) {
                 @Override
                 @Override
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                 protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
                     throws IOException {
                     throws IOException {

+ 6 - 6
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java

@@ -123,9 +123,9 @@ public final class ResponseValueUtils {
     private static Object valueAt(String dataType, Block block, int offset, BytesRef scratch) {
     private static Object valueAt(String dataType, Block block, int offset, BytesRef scratch) {
         return switch (dataType) {
         return switch (dataType) {
             case "unsigned_long" -> unsignedLongAsNumber(((LongBlock) block).getLong(offset));
             case "unsigned_long" -> unsignedLongAsNumber(((LongBlock) block).getLong(offset));
-            case "long" -> ((LongBlock) block).getLong(offset);
-            case "integer" -> ((IntBlock) block).getInt(offset);
-            case "double" -> ((DoubleBlock) block).getDouble(offset);
+            case "long", "counter_long" -> ((LongBlock) block).getLong(offset);
+            case "integer", "counter_integer" -> ((IntBlock) block).getInt(offset);
+            case "double", "counter_double" -> ((DoubleBlock) block).getDouble(offset);
             case "keyword", "text" -> ((BytesRefBlock) block).getBytesRef(offset, scratch).utf8ToString();
             case "keyword", "text" -> ((BytesRefBlock) block).getBytesRef(offset, scratch).utf8ToString();
             case "ip" -> {
             case "ip" -> {
                 BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch);
                 BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch);
@@ -174,9 +174,9 @@ public final class ResponseValueUtils {
                     case "unsigned_long" -> ((LongBlock.Builder) builder).appendLong(
                     case "unsigned_long" -> ((LongBlock.Builder) builder).appendLong(
                         longToUnsignedLong(((Number) value).longValue(), true)
                         longToUnsignedLong(((Number) value).longValue(), true)
                     );
                     );
-                    case "long" -> ((LongBlock.Builder) builder).appendLong(((Number) value).longValue());
-                    case "integer" -> ((IntBlock.Builder) builder).appendInt(((Number) value).intValue());
-                    case "double" -> ((DoubleBlock.Builder) builder).appendDouble(((Number) value).doubleValue());
+                    case "long", "counter_long" -> ((LongBlock.Builder) builder).appendLong(((Number) value).longValue());
+                    case "integer", "counter_integer" -> ((IntBlock.Builder) builder).appendInt(((Number) value).intValue());
+                    case "double", "counter_double" -> ((DoubleBlock.Builder) builder).appendDouble(((Number) value).doubleValue());
                     case "keyword", "text", "unsupported" -> ((BytesRefBlock.Builder) builder).appendBytesRef(
                     case "keyword", "text", "unsupported" -> ((BytesRefBlock.Builder) builder).appendBytesRef(
                         new BytesRef(value.toString())
                         new BytesRef(value.toString())
                     );
                     );

+ 4 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java

@@ -28,6 +28,7 @@ import org.elasticsearch.xpack.ql.expression.AttributeMap;
 import org.elasticsearch.xpack.ql.expression.AttributeSet;
 import org.elasticsearch.xpack.ql.expression.AttributeSet;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.expression.Expressions;
 import org.elasticsearch.xpack.ql.expression.Expressions;
+import org.elasticsearch.xpack.ql.expression.FieldAttribute;
 import org.elasticsearch.xpack.ql.expression.NamedExpression;
 import org.elasticsearch.xpack.ql.expression.NamedExpression;
 import org.elasticsearch.xpack.ql.expression.TypeResolutions;
 import org.elasticsearch.xpack.ql.expression.TypeResolutions;
 import org.elasticsearch.xpack.ql.expression.function.aggregate.AggregateFunction;
 import org.elasticsearch.xpack.ql.expression.function.aggregate.AggregateFunction;
@@ -193,6 +194,9 @@ public class Verifier {
                 if (attr != null) {
                 if (attr != null) {
                     groupRefs.add(attr);
                     groupRefs.add(attr);
                 }
                 }
+                if (e instanceof FieldAttribute f && EsqlDataTypes.isCounterType(f.dataType())) {
+                    failures.add(fail(e, "cannot group by on [{}] type for grouping [{}]", f.dataType().typeName(), e.sourceText()));
+                }
             });
             });
 
 
             // check aggregates - accept only aggregate functions or expressions over grouping
             // check aggregates - accept only aggregate functions or expressions over grouping

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Avg.java

@@ -38,7 +38,7 @@ public class Avg extends AggregateFunction implements SurrogateExpression {
             dt -> dt.isNumeric() && dt != DataTypes.UNSIGNED_LONG,
             dt -> dt.isNumeric() && dt != DataTypes.UNSIGNED_LONG,
             sourceText(),
             sourceText(),
             DEFAULT,
             DEFAULT,
-            "numeric except unsigned_long"
+            "numeric except unsigned_long or counter types"
         );
         );
     }
     }
 
 

+ 3 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Count.java

@@ -9,7 +9,6 @@ package org.elasticsearch.xpack.esql.expression.function.aggregate;
 
 
 import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
 import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
 import org.elasticsearch.compute.aggregation.CountAggregatorFunction;
 import org.elasticsearch.compute.aggregation.CountAggregatorFunction;
-import org.elasticsearch.xpack.esql.expression.EsqlTypeResolutions;
 import org.elasticsearch.xpack.esql.expression.SurrogateExpression;
 import org.elasticsearch.xpack.esql.expression.SurrogateExpression;
 import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
 import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
 import org.elasticsearch.xpack.esql.expression.function.Param;
 import org.elasticsearch.xpack.esql.expression.function.Param;
@@ -17,6 +16,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvCoun
 import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
 import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
 import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Mul;
 import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Mul;
 import org.elasticsearch.xpack.esql.planner.ToAggregator;
 import org.elasticsearch.xpack.esql.planner.ToAggregator;
+import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.expression.Literal;
 import org.elasticsearch.xpack.ql.expression.Literal;
 import org.elasticsearch.xpack.ql.expression.Nullability;
 import org.elasticsearch.xpack.ql.expression.Nullability;
@@ -31,6 +31,7 @@ import org.elasticsearch.xpack.ql.util.StringUtils;
 import java.util.List;
 import java.util.List;
 
 
 import static org.elasticsearch.xpack.ql.expression.TypeResolutions.ParamOrdinal.DEFAULT;
 import static org.elasticsearch.xpack.ql.expression.TypeResolutions.ParamOrdinal.DEFAULT;
+import static org.elasticsearch.xpack.ql.expression.TypeResolutions.isType;
 
 
 public class Count extends AggregateFunction implements EnclosedAgg, ToAggregator, SurrogateExpression {
 public class Count extends AggregateFunction implements EnclosedAgg, ToAggregator, SurrogateExpression {
 
 
@@ -91,7 +92,7 @@ public class Count extends AggregateFunction implements EnclosedAgg, ToAggregato
 
 
     @Override
     @Override
     protected TypeResolution resolveType() {
     protected TypeResolution resolveType() {
-        return EsqlTypeResolutions.isExact(field(), sourceText(), DEFAULT);
+        return isType(field(), dt -> EsqlDataTypes.isCounterType(dt) == false, sourceText(), DEFAULT, "any type except counter types");
     }
     }
 
 
     @Override
     @Override

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountDistinct.java

@@ -90,7 +90,7 @@ public class CountDistinct extends AggregateFunction implements OptionalArgument
             dt -> resolved && dt != DataTypes.UNSIGNED_LONG,
             dt -> resolved && dt != DataTypes.UNSIGNED_LONG,
             sourceText(),
             sourceText(),
             DEFAULT,
             DEFAULT,
-            "any exact type except unsigned_long"
+            "any exact type except unsigned_long or counter types"
         );
         );
         if (resolution.unresolved() || precision == null) {
         if (resolution.unresolved() || precision == null) {
             return resolution;
             return resolution;

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Median.java

@@ -44,7 +44,7 @@ public class Median extends AggregateFunction implements SurrogateExpression {
             dt -> dt.isNumeric() && dt != DataTypes.UNSIGNED_LONG,
             dt -> dt.isNumeric() && dt != DataTypes.UNSIGNED_LONG,
             sourceText(),
             sourceText(),
             DEFAULT,
             DEFAULT,
-            "numeric except unsigned_long"
+            "numeric except unsigned_long or counter types"
         );
         );
     }
     }
 
 

+ 2 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/NumericAggregate.java

@@ -40,7 +40,7 @@ public abstract class NumericAggregate extends AggregateFunction implements ToAg
                 sourceText(),
                 sourceText(),
                 DEFAULT,
                 DEFAULT,
                 "datetime",
                 "datetime",
-                "numeric except unsigned_long"
+                "numeric except unsigned_long or counter types"
             );
             );
         }
         }
         return isType(
         return isType(
@@ -48,7 +48,7 @@ public abstract class NumericAggregate extends AggregateFunction implements ToAg
             dt -> dt.isNumeric() && dt != DataTypes.UNSIGNED_LONG,
             dt -> dt.isNumeric() && dt != DataTypes.UNSIGNED_LONG,
             sourceText(),
             sourceText(),
             DEFAULT,
             DEFAULT,
-            "numeric except unsigned_long"
+            "numeric except unsigned_long or counter types"
         );
         );
     }
     }
 
 

+ 17 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDouble.java

@@ -12,6 +12,7 @@ import org.elasticsearch.compute.ann.ConvertEvaluator;
 import org.elasticsearch.xpack.esql.expression.function.Example;
 import org.elasticsearch.xpack.esql.expression.function.Example;
 import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
 import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
 import org.elasticsearch.xpack.esql.expression.function.Param;
 import org.elasticsearch.xpack.esql.expression.function.Param;
+import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
 import org.elasticsearch.xpack.ql.InvalidArgumentException;
 import org.elasticsearch.xpack.ql.InvalidArgumentException;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.tree.NodeInfo;
 import org.elasticsearch.xpack.ql.tree.NodeInfo;
@@ -42,7 +43,10 @@ public class ToDouble extends AbstractConvertFunction {
         Map.entry(TEXT, ToDoubleFromStringEvaluator.Factory::new),
         Map.entry(TEXT, ToDoubleFromStringEvaluator.Factory::new),
         Map.entry(UNSIGNED_LONG, ToDoubleFromUnsignedLongEvaluator.Factory::new),
         Map.entry(UNSIGNED_LONG, ToDoubleFromUnsignedLongEvaluator.Factory::new),
         Map.entry(LONG, ToDoubleFromLongEvaluator.Factory::new), // CastLongToDoubleEvaluator would be a candidate, but not MV'd
         Map.entry(LONG, ToDoubleFromLongEvaluator.Factory::new), // CastLongToDoubleEvaluator would be a candidate, but not MV'd
-        Map.entry(INTEGER, ToDoubleFromIntEvaluator.Factory::new) // CastIntToDoubleEvaluator would be a candidate, but not MV'd
+        Map.entry(INTEGER, ToDoubleFromIntEvaluator.Factory::new), // CastIntToDoubleEvaluator would be a candidate, but not MV'd
+        Map.entry(EsqlDataTypes.COUNTER_DOUBLE, (field, source) -> field),
+        Map.entry(EsqlDataTypes.COUNTER_INTEGER, ToDoubleFromIntEvaluator.Factory::new),
+        Map.entry(EsqlDataTypes.COUNTER_LONG, ToDoubleFromLongEvaluator.Factory::new)
     );
     );
 
 
     @FunctionInfo(
     @FunctionInfo(
@@ -65,7 +69,18 @@ public class ToDouble extends AbstractConvertFunction {
         Source source,
         Source source,
         @Param(
         @Param(
             name = "field",
             name = "field",
-            type = { "boolean", "date", "keyword", "text", "double", "long", "unsigned_long", "integer" },
+            type = {
+                "boolean",
+                "date",
+                "keyword",
+                "text",
+                "double",
+                "long",
+                "unsigned_long",
+                "integer",
+                "counter_double",
+                "counter_integer",
+                "counter_long" },
             description = "Input value. The input can be a single- or multi-valued column or an expression."
             description = "Input value. The input can be a single- or multi-valued column or an expression."
         ) Expression field
         ) Expression field
     ) {
     ) {

+ 4 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToInteger.java

@@ -12,6 +12,7 @@ import org.elasticsearch.compute.ann.ConvertEvaluator;
 import org.elasticsearch.xpack.esql.expression.function.Example;
 import org.elasticsearch.xpack.esql.expression.function.Example;
 import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
 import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
 import org.elasticsearch.xpack.esql.expression.function.Param;
 import org.elasticsearch.xpack.esql.expression.function.Param;
+import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
 import org.elasticsearch.xpack.ql.InvalidArgumentException;
 import org.elasticsearch.xpack.ql.InvalidArgumentException;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.tree.NodeInfo;
 import org.elasticsearch.xpack.ql.tree.NodeInfo;
@@ -43,7 +44,8 @@ public class ToInteger extends AbstractConvertFunction {
         Map.entry(TEXT, ToIntegerFromStringEvaluator.Factory::new),
         Map.entry(TEXT, ToIntegerFromStringEvaluator.Factory::new),
         Map.entry(DOUBLE, ToIntegerFromDoubleEvaluator.Factory::new),
         Map.entry(DOUBLE, ToIntegerFromDoubleEvaluator.Factory::new),
         Map.entry(UNSIGNED_LONG, ToIntegerFromUnsignedLongEvaluator.Factory::new),
         Map.entry(UNSIGNED_LONG, ToIntegerFromUnsignedLongEvaluator.Factory::new),
-        Map.entry(LONG, ToIntegerFromLongEvaluator.Factory::new)
+        Map.entry(LONG, ToIntegerFromLongEvaluator.Factory::new),
+        Map.entry(EsqlDataTypes.COUNTER_INTEGER, (fieldEval, source) -> fieldEval)
     );
     );
 
 
     @FunctionInfo(
     @FunctionInfo(
@@ -68,7 +70,7 @@ public class ToInteger extends AbstractConvertFunction {
         Source source,
         Source source,
         @Param(
         @Param(
             name = "field",
             name = "field",
-            type = { "boolean", "date", "keyword", "text", "double", "long", "unsigned_long", "integer" },
+            type = { "boolean", "date", "keyword", "text", "double", "long", "unsigned_long", "integer", "counter_integer" },
             description = "Input value. The input can be a single- or multi-valued column or an expression."
             description = "Input value. The input can be a single- or multi-valued column or an expression."
         ) Expression field
         ) Expression field
     ) {
     ) {

+ 15 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToLong.java

@@ -12,6 +12,7 @@ import org.elasticsearch.compute.ann.ConvertEvaluator;
 import org.elasticsearch.xpack.esql.expression.function.Example;
 import org.elasticsearch.xpack.esql.expression.function.Example;
 import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
 import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
 import org.elasticsearch.xpack.esql.expression.function.Param;
 import org.elasticsearch.xpack.esql.expression.function.Param;
+import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
 import org.elasticsearch.xpack.ql.InvalidArgumentException;
 import org.elasticsearch.xpack.ql.InvalidArgumentException;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.tree.NodeInfo;
 import org.elasticsearch.xpack.ql.tree.NodeInfo;
@@ -43,7 +44,9 @@ public class ToLong extends AbstractConvertFunction {
         Map.entry(TEXT, ToLongFromStringEvaluator.Factory::new),
         Map.entry(TEXT, ToLongFromStringEvaluator.Factory::new),
         Map.entry(DOUBLE, ToLongFromDoubleEvaluator.Factory::new),
         Map.entry(DOUBLE, ToLongFromDoubleEvaluator.Factory::new),
         Map.entry(UNSIGNED_LONG, ToLongFromUnsignedLongEvaluator.Factory::new),
         Map.entry(UNSIGNED_LONG, ToLongFromUnsignedLongEvaluator.Factory::new),
-        Map.entry(INTEGER, ToLongFromIntEvaluator.Factory::new) // CastIntToLongEvaluator would be a candidate, but not MV'd
+        Map.entry(INTEGER, ToLongFromIntEvaluator.Factory::new), // CastIntToLongEvaluator would be a candidate, but not MV'd
+        Map.entry(EsqlDataTypes.COUNTER_LONG, (field, source) -> field),
+        Map.entry(EsqlDataTypes.COUNTER_INTEGER, ToLongFromIntEvaluator.Factory::new)
     );
     );
 
 
     @FunctionInfo(
     @FunctionInfo(
@@ -67,7 +70,17 @@ public class ToLong extends AbstractConvertFunction {
         Source source,
         Source source,
         @Param(
         @Param(
             name = "field",
             name = "field",
-            type = { "boolean", "date", "keyword", "text", "double", "long", "unsigned_long", "integer" },
+            type = {
+                "boolean",
+                "date",
+                "keyword",
+                "text",
+                "double",
+                "long",
+                "unsigned_long",
+                "integer",
+                "counter_integer",
+                "counter_long" },
             description = "Input value. The input can be a single- or multi-valued column or an expression."
             description = "Input value. The input can be a single- or multi-valued column or an expression."
         ) Expression field
         ) Expression field
     ) {
     ) {

+ 2 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

@@ -344,7 +344,8 @@ public class LocalExecutionPlanner {
                 case "version" -> TopNEncoder.VERSION;
                 case "version" -> TopNEncoder.VERSION;
                 case "boolean", "null", "byte", "short", "integer", "long", "double", "float", "half_float", "datetime", "date_period",
                 case "boolean", "null", "byte", "short", "integer", "long", "double", "float", "half_float", "datetime", "date_period",
                     "time_duration", "object", "nested", "scaled_float", "unsigned_long", "_doc" -> TopNEncoder.DEFAULT_SORTABLE;
                     "time_duration", "object", "nested", "scaled_float", "unsigned_long", "_doc" -> TopNEncoder.DEFAULT_SORTABLE;
-                case "geo_point", "cartesian_point", "geo_shape", "cartesian_shape" -> TopNEncoder.DEFAULT_UNSORTABLE;
+                case "geo_point", "cartesian_point", "geo_shape", "cartesian_shape", "counter_long", "counter_integer", "counter_double" ->
+                    TopNEncoder.DEFAULT_UNSORTABLE;
                 // unsupported fields are encoded as BytesRef, we'll use the same encoder; all values should be null at this point
                 // unsupported fields are encoded as BytesRef, we'll use the same encoder; all values should be null at this point
                 case "unsupported" -> TopNEncoder.UNSUPPORTED;
                 case "unsupported" -> TopNEncoder.UNSUPPORTED;
                 default -> throw new EsqlIllegalArgumentException("No TopN sorting encoder for type " + inverse.get(channel).type());
                 default -> throw new EsqlIllegalArgumentException("No TopN sorting encoder for type " + inverse.get(channel).type());

+ 6 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

@@ -251,13 +251,16 @@ public class PlannerUtils {
      * For example, spatial types can be extracted into doc-values under specific conditions, otherwise they extract as BytesRef.
      * For example, spatial types can be extracted into doc-values under specific conditions, otherwise they extract as BytesRef.
      */
      */
     public static ElementType toElementType(DataType dataType, MappedFieldType.FieldExtractPreference fieldExtractPreference) {
     public static ElementType toElementType(DataType dataType, MappedFieldType.FieldExtractPreference fieldExtractPreference) {
-        if (dataType == DataTypes.LONG || dataType == DataTypes.DATETIME || dataType == DataTypes.UNSIGNED_LONG) {
+        if (dataType == DataTypes.LONG
+            || dataType == DataTypes.DATETIME
+            || dataType == DataTypes.UNSIGNED_LONG
+            || dataType == EsqlDataTypes.COUNTER_LONG) {
             return ElementType.LONG;
             return ElementType.LONG;
         }
         }
-        if (dataType == DataTypes.INTEGER) {
+        if (dataType == DataTypes.INTEGER || dataType == EsqlDataTypes.COUNTER_INTEGER) {
             return ElementType.INT;
             return ElementType.INT;
         }
         }
-        if (dataType == DataTypes.DOUBLE) {
+        if (dataType == DataTypes.DOUBLE || dataType == EsqlDataTypes.COUNTER_DOUBLE) {
             return ElementType.DOUBLE;
             return ElementType.DOUBLE;
         }
         }
         // unsupported fields are passed through as a BytesRef
         // unsupported fields are passed through as a BytesRef

+ 7 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlFeatures.java

@@ -121,6 +121,11 @@ public class EsqlFeatures implements FeatureSpecification {
      */
      */
     public static final NodeFeature MV_ORDERING_SORTED_ASCENDING = new NodeFeature("esql.mv_ordering_sorted_ascending");
     public static final NodeFeature MV_ORDERING_SORTED_ASCENDING = new NodeFeature("esql.mv_ordering_sorted_ascending");
 
 
+    /**
+     * Support for metrics counter fields
+     */
+    public static final NodeFeature METRICS_COUNTER_FIELDS = new NodeFeature("esql.metrics_counter_fields");
+
     @Override
     @Override
     public Set<NodeFeature> getFeatures() {
     public Set<NodeFeature> getFeatures() {
         return Set.of(
         return Set.of(
@@ -139,7 +144,8 @@ public class EsqlFeatures implements FeatureSpecification {
             ST_DISJOINT,
             ST_DISJOINT,
             STRING_LITERAL_AUTO_CASTING,
             STRING_LITERAL_AUTO_CASTING,
             CASTING_OPERATOR,
             CASTING_OPERATOR,
-            MV_ORDERING_SORTED_ASCENDING
+            MV_ORDERING_SORTED_ASCENDING,
+            METRICS_COUNTER_FIELDS
         );
         );
     }
     }
 
 

+ 3 - 4
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeRegistry.java

@@ -10,7 +10,6 @@ package org.elasticsearch.xpack.esql.type;
 import org.elasticsearch.index.mapper.TimeSeriesParams;
 import org.elasticsearch.index.mapper.TimeSeriesParams;
 import org.elasticsearch.xpack.ql.type.DataType;
 import org.elasticsearch.xpack.ql.type.DataType;
 import org.elasticsearch.xpack.ql.type.DataTypeRegistry;
 import org.elasticsearch.xpack.ql.type.DataTypeRegistry;
-import org.elasticsearch.xpack.ql.type.DataTypes;
 
 
 import java.util.Collection;
 import java.util.Collection;
 
 
@@ -37,10 +36,10 @@ public class EsqlDataTypeRegistry implements DataTypeRegistry {
     @Override
     @Override
     public DataType fromEs(String typeName, TimeSeriesParams.MetricType metricType) {
     public DataType fromEs(String typeName, TimeSeriesParams.MetricType metricType) {
         if (metricType == TimeSeriesParams.MetricType.COUNTER) {
         if (metricType == TimeSeriesParams.MetricType.COUNTER) {
-            // Counter fields will be a counter type, for now they are unsupported
-            return DataTypes.UNSUPPORTED;
+            return EsqlDataTypes.getCounterType(typeName);
+        } else {
+            return EsqlDataTypes.fromName(typeName);
         }
         }
-        return EsqlDataTypes.fromName(typeName);
     }
     }
 
 
     @Override
     @Override

+ 25 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypes.java

@@ -51,6 +51,17 @@ public final class EsqlDataTypes {
     public static final DataType GEO_SHAPE = new DataType("geo_shape", Integer.MAX_VALUE, false, false, true);
     public static final DataType GEO_SHAPE = new DataType("geo_shape", Integer.MAX_VALUE, false, false, true);
     public static final DataType CARTESIAN_SHAPE = new DataType("cartesian_shape", Integer.MAX_VALUE, false, false, true);
     public static final DataType CARTESIAN_SHAPE = new DataType("cartesian_shape", Integer.MAX_VALUE, false, false, true);
 
 
+    /**
+     * These are numeric fields labeled as metric counters in time-series indices. Although stored
+     * internally as numeric fields, they represent cumulative metrics and must not be treated as regular
+     * numeric fields. Therefore, we define them differently and separately from their parent numeric field.
+     * These fields are strictly for use in retrieval from indices, rate aggregation, and casting to their
+     * parent numeric type.
+     */
+    public static final DataType COUNTER_LONG = new DataType("counter_long", LONG.size(), false, false, LONG.hasDocValues());
+    public static final DataType COUNTER_INTEGER = new DataType("counter_integer", INTEGER.size(), false, false, INTEGER.hasDocValues());
+    public static final DataType COUNTER_DOUBLE = new DataType("counter_double", DOUBLE.size(), false, false, DOUBLE.hasDocValues());
+
     private static final Collection<DataType> TYPES = Stream.of(
     private static final Collection<DataType> TYPES = Stream.of(
         BOOLEAN,
         BOOLEAN,
         UNSUPPORTED,
         UNSUPPORTED,
@@ -77,7 +88,10 @@ public final class EsqlDataTypes {
         GEO_POINT,
         GEO_POINT,
         CARTESIAN_POINT,
         CARTESIAN_POINT,
         CARTESIAN_SHAPE,
         CARTESIAN_SHAPE,
-        GEO_SHAPE
+        GEO_SHAPE,
+        COUNTER_LONG,
+        COUNTER_INTEGER,
+        COUNTER_DOUBLE
     ).sorted(Comparator.comparing(DataType::typeName)).toList();
     ).sorted(Comparator.comparing(DataType::typeName)).toList();
 
 
     private static final Map<String, DataType> NAME_TO_TYPE = TYPES.stream().collect(toUnmodifiableMap(DataType::typeName, t -> t));
     private static final Map<String, DataType> NAME_TO_TYPE = TYPES.stream().collect(toUnmodifiableMap(DataType::typeName, t -> t));
@@ -212,7 +226,8 @@ public final class EsqlDataTypes {
             && t != FLOAT
             && t != FLOAT
             && t != SCALED_FLOAT
             && t != SCALED_FLOAT
             && t != SOURCE
             && t != SOURCE
-            && t != HALF_FLOAT;
+            && t != HALF_FLOAT
+            && isCounterType(t) == false;
     }
     }
 
 
     public static boolean areCompatible(DataType left, DataType right) {
     public static boolean areCompatible(DataType left, DataType right) {
@@ -232,4 +247,12 @@ public final class EsqlDataTypes {
         }
         }
         return type;
         return type;
     }
     }
+
+    public static DataType getCounterType(String typeName) {
+        return fromTypeName("counter_" + typeName);
+    }
+
+    public static boolean isCounterType(DataType dt) {
+        return dt == COUNTER_LONG || dt == COUNTER_INTEGER || dt == COUNTER_DOUBLE;
+    }
 }
 }

+ 3 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java

@@ -143,9 +143,9 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
         return new Page(columns.stream().map(c -> {
         return new Page(columns.stream().map(c -> {
             Block.Builder builder = PlannerUtils.toElementType(EsqlDataTypes.fromName(c.type())).newBlockBuilder(1, blockFactory);
             Block.Builder builder = PlannerUtils.toElementType(EsqlDataTypes.fromName(c.type())).newBlockBuilder(1, blockFactory);
             switch (c.type()) {
             switch (c.type()) {
-                case "unsigned_long", "long" -> ((LongBlock.Builder) builder).appendLong(randomLong());
-                case "integer" -> ((IntBlock.Builder) builder).appendInt(randomInt());
-                case "double" -> ((DoubleBlock.Builder) builder).appendDouble(randomDouble());
+                case "unsigned_long", "long", "counter_long" -> ((LongBlock.Builder) builder).appendLong(randomLong());
+                case "integer", "counter_integer" -> ((IntBlock.Builder) builder).appendInt(randomInt());
+                case "double", "counter_double" -> ((DoubleBlock.Builder) builder).appendDouble(randomDouble());
                 case "keyword" -> ((BytesRefBlock.Builder) builder).appendBytesRef(new BytesRef(randomAlphaOfLength(10)));
                 case "keyword" -> ((BytesRefBlock.Builder) builder).appendBytesRef(new BytesRef(randomAlphaOfLength(10)));
                 case "text" -> ((BytesRefBlock.Builder) builder).appendBytesRef(new BytesRef(randomAlphaOfLength(10000)));
                 case "text" -> ((BytesRefBlock.Builder) builder).appendBytesRef(new BytesRef(randomAlphaOfLength(10000)));
                 case "ip" -> ((BytesRefBlock.Builder) builder).appendBytesRef(
                 case "ip" -> ((BytesRefBlock.Builder) builder).appendBytesRef(

+ 4 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java

@@ -134,4 +134,8 @@ public final class AnalyzerTestUtils {
     public static void loadEnrichPolicyResolution(EnrichResolution enrich, String policy, String field, String index, String mapping) {
     public static void loadEnrichPolicyResolution(EnrichResolution enrich, String policy, String field, String index, String mapping) {
         loadEnrichPolicyResolution(enrich, EnrichPolicy.MATCH_TYPE, policy, field, index, mapping);
         loadEnrichPolicyResolution(enrich, EnrichPolicy.MATCH_TYPE, policy, field, index, mapping);
     }
     }
+
+    public static IndexResolution tsdbIndexResolution() {
+        return loadMapping("tsdb-mapping.json", "test");
+    }
 }
 }

+ 38 - 38
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java

@@ -1777,45 +1777,45 @@ public class AnalyzerTests extends ESTestCase {
     }
     }
 
 
     public void testUnsupportedTypesInStats() {
     public void testUnsupportedTypesInStats() {
-        verifyUnsupported(
-            """
-                  row x = to_unsigned_long(\"10\")
-                  | stats  avg(x), count_distinct(x), max(x), median(x), median_absolute_deviation(x), min(x), percentile(x, 10), sum(x)
-                """,
-            "Found 8 problems\n"
-                + "line 2:12: argument of [avg(x)] must be [numeric except unsigned_long], found value [x] type [unsigned_long]\n"
-                + "line 2:20: argument of [count_distinct(x)] must be [any exact type except unsigned_long], "
-                + "found value [x] type [unsigned_long]\n"
-                + "line 2:39: argument of [max(x)] must be [datetime or numeric except unsigned_long], "
-                + "found value [max(x)] type [unsigned_long]\n"
-                + "line 2:47: argument of [median(x)] must be [numeric except unsigned_long], found value [x] type [unsigned_long]\n"
-                + "line 2:58: argument of [median_absolute_deviation(x)] must be [numeric except unsigned_long], "
-                + "found value [x] type [unsigned_long]\n"
-                + "line 2:88: argument of [min(x)] must be [datetime or numeric except unsigned_long], "
-                + "found value [min(x)] type [unsigned_long]\n"
-                + "line 2:96: first argument of [percentile(x, 10)] must be [numeric except unsigned_long], "
-                + "found value [x] type [unsigned_long]\n"
-                + "line 2:115: argument of [sum(x)] must be [numeric except unsigned_long], found value [x] type [unsigned_long]"
-        );
+        verifyUnsupported("""
+              row x = to_unsigned_long(\"10\")
+              | stats  avg(x), count_distinct(x), max(x), median(x), median_absolute_deviation(x), min(x), percentile(x, 10), sum(x)
+            """, """
+            Found 8 problems
+            line 2:12: argument of [avg(x)] must be [numeric except unsigned_long or counter types],\
+             found value [x] type [unsigned_long]
+            line 2:20: argument of [count_distinct(x)] must be [any exact type except unsigned_long or counter types],\
+             found value [x] type [unsigned_long]
+            line 2:39: argument of [max(x)] must be [datetime or numeric except unsigned_long or counter types],\
+             found value [max(x)] type [unsigned_long]
+            line 2:47: argument of [median(x)] must be [numeric except unsigned_long or counter types],\
+             found value [x] type [unsigned_long]
+            line 2:58: argument of [median_absolute_deviation(x)] must be [numeric except unsigned_long or counter types],\
+             found value [x] type [unsigned_long]
+            line 2:88: argument of [min(x)] must be [datetime or numeric except unsigned_long or counter types],\
+             found value [min(x)] type [unsigned_long]
+            line 2:96: first argument of [percentile(x, 10)] must be [numeric except unsigned_long],\
+             found value [x] type [unsigned_long]
+            line 2:115: argument of [sum(x)] must be [numeric except unsigned_long or counter types],\
+             found value [x] type [unsigned_long]""");
 
 
-        verifyUnsupported(
-            """
-                row x = to_version("1.2")
-                | stats  avg(x), max(x), median(x), median_absolute_deviation(x), min(x), percentile(x, 10), sum(x)
-                """,
-            "Found 7 problems\n"
-                + "line 2:10: argument of [avg(x)] must be [numeric except unsigned_long], found value [x] type [version]\n"
-                + "line 2:18: argument of [max(x)] must be [datetime or numeric except unsigned_long], "
-                + "found value [max(x)] type [version]\n"
-                + "line 2:26: argument of [median(x)] must be [numeric except unsigned_long], found value [x] type [version]\n"
-                + "line 2:37: argument of [median_absolute_deviation(x)] must be [numeric except unsigned_long], "
-                + "found value [x] type [version]\n"
-                + "line 2:67: argument of [min(x)] must be [datetime or numeric except unsigned_long], "
-                + "found value [min(x)] type [version]\n"
-                + "line 2:75: first argument of [percentile(x, 10)] must be [numeric except unsigned_long], "
-                + "found value [x] type [version]\n"
-                + "line 2:94: argument of [sum(x)] must be [numeric except unsigned_long], found value [x] type [version]"
-        );
+        verifyUnsupported("""
+            row x = to_version("1.2")
+            | stats  avg(x), max(x), median(x), median_absolute_deviation(x), min(x), percentile(x, 10), sum(x)
+            """, """
+            Found 7 problems
+            line 2:10: argument of [avg(x)] must be [numeric except unsigned_long or counter types],\
+             found value [x] type [version]
+            line 2:18: argument of [max(x)] must be [datetime or numeric except unsigned_long or counter types],\
+             found value [max(x)] type [version]
+            line 2:26: argument of [median(x)] must be [numeric except unsigned_long or counter types],\
+             found value [x] type [version]
+            line 2:37: argument of [median_absolute_deviation(x)] must be [numeric except unsigned_long or counter types],\
+             found value [x] type [version]
+            line 2:67: argument of [min(x)] must be [datetime or numeric except unsigned_long or counter types],\
+             found value [min(x)] type [version]
+            line 2:75: first argument of [percentile(x, 10)] must be [numeric except unsigned_long], found value [x] type [version]
+            line 2:94: argument of [sum(x)] must be [numeric except unsigned_long or counter types], found value [x] type [version]""");
     }
     }
 
 
     public void testInOnText() {
     public void testInOnText() {

+ 39 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java

@@ -21,12 +21,14 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning
 import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping;
 import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.loadMapping;
 import static org.elasticsearch.xpack.ql.type.DataTypes.UNSIGNED_LONG;
 import static org.elasticsearch.xpack.ql.type.DataTypes.UNSIGNED_LONG;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 
 
 //@TestLogging(value = "org.elasticsearch.xpack.esql:TRACE,org.elasticsearch.compute:TRACE", reason = "debug")
 //@TestLogging(value = "org.elasticsearch.xpack.esql:TRACE,org.elasticsearch.compute:TRACE", reason = "debug")
 public class VerifierTests extends ESTestCase {
 public class VerifierTests extends ESTestCase {
 
 
     private static final EsqlParser parser = new EsqlParser();
     private static final EsqlParser parser = new EsqlParser();
     private final Analyzer defaultAnalyzer = AnalyzerTestUtils.expandedDefaultAnalyzer();
     private final Analyzer defaultAnalyzer = AnalyzerTestUtils.expandedDefaultAnalyzer();
+    private final Analyzer tsdb = AnalyzerTestUtils.analyzer(AnalyzerTestUtils.tsdbIndexResolution());
 
 
     public void testIncompatibleTypesInMathOperation() {
     public void testIncompatibleTypesInMathOperation() {
         assertEquals(
         assertEquals(
@@ -72,7 +74,8 @@ public class VerifierTests extends ESTestCase {
             error("from test | stats max(max(salary)) by first_name")
             error("from test | stats max(max(salary)) by first_name")
         );
         );
         assertEquals(
         assertEquals(
-            "1:25: argument of [avg(first_name)] must be [numeric except unsigned_long], found value [first_name] type [keyword]",
+            "1:25: argument of [avg(first_name)] must be [numeric except unsigned_long or counter types],"
+                + " found value [first_name] type [keyword]",
             error("from test | stats count(avg(first_name)) by first_name")
             error("from test | stats count(avg(first_name)) by first_name")
         );
         );
         assertEquals(
         assertEquals(
@@ -378,7 +381,8 @@ public class VerifierTests extends ESTestCase {
 
 
     public void testSumOnDate() {
     public void testSumOnDate() {
         assertEquals(
         assertEquals(
-            "1:19: argument of [sum(hire_date)] must be [numeric except unsigned_long], found value [hire_date] type [datetime]",
+            "1:19: argument of [sum(hire_date)] must be [numeric except unsigned_long or counter types],"
+                + " found value [hire_date] type [datetime]",
             error("from test | stats sum(hire_date)")
             error("from test | stats sum(hire_date)")
         );
         );
     }
     }
@@ -480,6 +484,39 @@ public class VerifierTests extends ESTestCase {
         assertEquals("1:5: argument of [false::ip] must be [ip or string], found value [false] type [boolean]", error("ROW false::ip"));
         assertEquals("1:5: argument of [false::ip] must be [ip or string], found value [false] type [boolean]", error("ROW false::ip"));
     }
     }
 
 
+    public void testAggregateOnCounter() {
+        assertThat(
+            error("FROM tests | STATS min(network.bytes_in)", tsdb),
+            equalTo(
+                "1:20: argument of [min(network.bytes_in)] must be [datetime or numeric except unsigned_long or counter types],"
+                    + " found value [min(network.bytes_in)] type [counter_long]"
+            )
+        );
+
+        assertThat(
+            error("FROM tests | STATS max(network.bytes_in)", tsdb),
+            equalTo(
+                "1:20: argument of [max(network.bytes_in)] must be [datetime or numeric except unsigned_long or counter types],"
+                    + " found value [max(network.bytes_in)] type [counter_long]"
+            )
+        );
+
+        assertThat(
+            error("FROM tests | STATS count(network.bytes_out)", tsdb),
+            equalTo(
+                "1:20: argument of [count(network.bytes_out)] must be [any type except counter types],"
+                    + " found value [network.bytes_out] type [counter_long]"
+            )
+        );
+    }
+
+    public void testGroupByCounter() {
+        assertThat(
+            error("FROM tests | STATS count(*) BY network.bytes_in", tsdb),
+            equalTo("1:32: cannot group by on [counter_long] type for grouping [network.bytes_in]")
+        );
+    }
+
     private String error(String query) {
     private String error(String query) {
         return error(query, defaultAnalyzer);
         return error(query, defaultAnalyzer);
     }
     }

+ 54 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractFunctionTestCase.java

@@ -121,11 +121,11 @@ public abstract class AbstractFunctionTestCase extends ESTestCase {
             case "boolean" -> randomBoolean();
             case "boolean" -> randomBoolean();
             case "byte" -> randomByte();
             case "byte" -> randomByte();
             case "short" -> randomShort();
             case "short" -> randomShort();
-            case "integer" -> randomInt();
-            case "unsigned_long", "long" -> randomLong();
+            case "integer", "counter_integer" -> randomInt();
+            case "unsigned_long", "long", "counter_long" -> randomLong();
             case "date_period" -> Period.of(randomIntBetween(-1000, 1000), randomIntBetween(-13, 13), randomIntBetween(-32, 32));
             case "date_period" -> Period.of(randomIntBetween(-1000, 1000), randomIntBetween(-13, 13), randomIntBetween(-32, 32));
             case "datetime" -> randomMillisUpToYear9999();
             case "datetime" -> randomMillisUpToYear9999();
-            case "double", "scaled_float" -> randomDouble();
+            case "double", "scaled_float", "counter_double" -> randomDouble();
             case "float" -> randomFloat();
             case "float" -> randomFloat();
             case "half_float" -> HalfFloatPoint.sortableShortToHalfFloat(HalfFloatPoint.halfFloatToSortableShort(randomFloat()));
             case "half_float" -> HalfFloatPoint.sortableShortToHalfFloat(HalfFloatPoint.halfFloatToSortableShort(randomFloat()));
             case "keyword" -> new BytesRef(randomAlphaOfLength(5));
             case "keyword" -> new BytesRef(randomAlphaOfLength(5));
@@ -946,6 +946,57 @@ public abstract class AbstractFunctionTestCase extends ESTestCase {
             ),
             ),
             "boolean or datetime or numeric or string"
             "boolean or datetime or numeric or string"
         ),
         ),
+        // to_int
+        Map.entry(
+            Set.of(
+                DataTypes.BOOLEAN,
+                EsqlDataTypes.COUNTER_INTEGER,
+                DataTypes.DATETIME,
+                DataTypes.DOUBLE,
+                DataTypes.INTEGER,
+                DataTypes.KEYWORD,
+                DataTypes.LONG,
+                DataTypes.TEXT,
+                DataTypes.UNSIGNED_LONG,
+                DataTypes.NULL
+            ),
+            "boolean or counter_integer or datetime or numeric or string"
+        ),
+        // to_long
+        Map.entry(
+            Set.of(
+                DataTypes.BOOLEAN,
+                EsqlDataTypes.COUNTER_INTEGER,
+                EsqlDataTypes.COUNTER_LONG,
+                DataTypes.DATETIME,
+                DataTypes.DOUBLE,
+                DataTypes.INTEGER,
+                DataTypes.KEYWORD,
+                DataTypes.LONG,
+                DataTypes.TEXT,
+                DataTypes.UNSIGNED_LONG,
+                DataTypes.NULL
+            ),
+            "boolean or counter_integer or counter_long or datetime or numeric or string"
+        ),
+        // to_double
+        Map.entry(
+            Set.of(
+                DataTypes.BOOLEAN,
+                EsqlDataTypes.COUNTER_DOUBLE,
+                EsqlDataTypes.COUNTER_INTEGER,
+                EsqlDataTypes.COUNTER_LONG,
+                DataTypes.DATETIME,
+                DataTypes.DOUBLE,
+                DataTypes.INTEGER,
+                DataTypes.KEYWORD,
+                DataTypes.LONG,
+                DataTypes.TEXT,
+                DataTypes.UNSIGNED_LONG,
+                DataTypes.NULL
+            ),
+            "boolean or counter_double or counter_integer or counter_long or datetime or numeric or string"
+        ),
         Map.entry(
         Map.entry(
             Set.of(
             Set.of(
                 DataTypes.BOOLEAN,
                 DataTypes.BOOLEAN,

+ 27 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToDoubleTests.java

@@ -11,9 +11,11 @@ import com.carrotsearch.randomizedtesting.annotations.Name;
 import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
 import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
 
 
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.esql.expression.function.AbstractFunctionTestCase;
 import org.elasticsearch.xpack.esql.expression.function.AbstractFunctionTestCase;
 import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
 import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter;
+import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
 import org.elasticsearch.xpack.ql.InvalidArgumentException;
 import org.elasticsearch.xpack.ql.InvalidArgumentException;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.tree.Source;
 import org.elasticsearch.xpack.ql.tree.Source;
@@ -113,6 +115,31 @@ public class ToDoubleTests extends AbstractFunctionTestCase {
             List.of()
             List.of()
         );
         );
 
 
+        TestCaseSupplier.unary(
+            suppliers,
+            "Attribute[channel=0]",
+            List.of(new TestCaseSupplier.TypedDataSupplier("counter", ESTestCase::randomDouble, EsqlDataTypes.COUNTER_DOUBLE)),
+            DataTypes.DOUBLE,
+            l -> l,
+            List.of()
+        );
+        TestCaseSupplier.unary(
+            suppliers,
+            evaluatorName.apply("Integer"),
+            List.of(new TestCaseSupplier.TypedDataSupplier("counter", () -> randomInt(1000), EsqlDataTypes.COUNTER_INTEGER)),
+            DataTypes.DOUBLE,
+            l -> ((Integer) l).doubleValue(),
+            List.of()
+        );
+        TestCaseSupplier.unary(
+            suppliers,
+            evaluatorName.apply("Long"),
+            List.of(new TestCaseSupplier.TypedDataSupplier("counter", () -> randomLongBetween(1, 1000), EsqlDataTypes.COUNTER_LONG)),
+            DataTypes.DOUBLE,
+            l -> ((Long) l).doubleValue(),
+            List.of()
+        );
+
         return parameterSuppliersFromTypedData(errorsForCasesWithoutExamples(anyNullIsNull(true, suppliers)));
         return parameterSuppliersFromTypedData(errorsForCasesWithoutExamples(anyNullIsNull(true, suppliers)));
     }
     }
 
 

+ 10 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToIntegerTests.java

@@ -14,6 +14,7 @@ import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.esql.expression.function.AbstractFunctionTestCase;
 import org.elasticsearch.xpack.esql.expression.function.AbstractFunctionTestCase;
 import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
 import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
+import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.tree.Source;
 import org.elasticsearch.xpack.ql.tree.Source;
 import org.elasticsearch.xpack.ql.type.DataTypes;
 import org.elasticsearch.xpack.ql.type.DataTypes;
@@ -257,6 +258,15 @@ public class ToIntegerTests extends AbstractFunctionTestCase {
             )
             )
         );
         );
 
 
+        TestCaseSupplier.unary(
+            suppliers,
+            "Attribute[channel=0]",
+            List.of(new TestCaseSupplier.TypedDataSupplier("counter", ESTestCase::randomInt, EsqlDataTypes.COUNTER_INTEGER)),
+            DataTypes.INTEGER,
+            l -> l,
+            List.of()
+        );
+
         return parameterSuppliersFromTypedData(errorsForCasesWithoutExamples(anyNullIsNull(true, suppliers)));
         return parameterSuppliersFromTypedData(errorsForCasesWithoutExamples(anyNullIsNull(true, suppliers)));
     }
     }
 
 

+ 18 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToLongTests.java

@@ -11,8 +11,10 @@ import com.carrotsearch.randomizedtesting.annotations.Name;
 import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
 import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
 
 
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.esql.expression.function.AbstractFunctionTestCase;
 import org.elasticsearch.xpack.esql.expression.function.AbstractFunctionTestCase;
 import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
 import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
+import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.tree.Source;
 import org.elasticsearch.xpack.ql.tree.Source;
 import org.elasticsearch.xpack.ql.type.DataTypes;
 import org.elasticsearch.xpack.ql.type.DataTypes;
@@ -208,6 +210,22 @@ public class ToLongTests extends AbstractFunctionTestCase {
             )
             )
         );
         );
 
 
+        TestCaseSupplier.unary(
+            suppliers,
+            "Attribute[channel=0]",
+            List.of(new TestCaseSupplier.TypedDataSupplier("counter", ESTestCase::randomNonNegativeLong, EsqlDataTypes.COUNTER_LONG)),
+            DataTypes.LONG,
+            l -> l,
+            List.of()
+        );
+        TestCaseSupplier.unary(
+            suppliers,
+            evaluatorName.apply("Integer"),
+            List.of(new TestCaseSupplier.TypedDataSupplier("counter", ESTestCase::randomInt, EsqlDataTypes.COUNTER_INTEGER)),
+            DataTypes.LONG,
+            l -> ((Integer) l).longValue(),
+            List.of()
+        );
         return parameterSuppliersFromTypedData(errorsForCasesWithoutExamples(anyNullIsNull(true, suppliers)));
         return parameterSuppliersFromTypedData(errorsForCasesWithoutExamples(anyNullIsNull(true, suppliers)));
     }
     }
 
 

+ 5 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeRegistryTests.java

@@ -23,8 +23,12 @@ import java.util.Map;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;
 
 
 public class EsqlDataTypeRegistryTests extends ESTestCase {
 public class EsqlDataTypeRegistryTests extends ESTestCase {
+
     public void testCounter() {
     public void testCounter() {
-        resolve("long", TimeSeriesParams.MetricType.COUNTER, DataTypes.UNSUPPORTED);
+        resolve("long", TimeSeriesParams.MetricType.COUNTER, EsqlDataTypes.COUNTER_LONG);
+        resolve("integer", TimeSeriesParams.MetricType.COUNTER, EsqlDataTypes.COUNTER_INTEGER);
+        resolve("double", TimeSeriesParams.MetricType.COUNTER, EsqlDataTypes.COUNTER_DOUBLE);
+
     }
     }
 
 
     public void testGauge() {
     public void testGauge() {

+ 7 - 1
x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/type/Types.java

@@ -53,7 +53,13 @@ public abstract class Types {
             if ("constant_keyword".equals(typeName) || "wildcard".equals(typeName)) {
             if ("constant_keyword".equals(typeName) || "wildcard".equals(typeName)) {
                 return KEYWORD;
                 return KEYWORD;
             }
             }
-            TimeSeriesParams.MetricType metricType = (TimeSeriesParams.MetricType) content.get(TimeSeriesParams.TIME_SERIES_METRIC_PARAM);
+            final Object metricsTypeParameter = content.get(TimeSeriesParams.TIME_SERIES_METRIC_PARAM);
+            final TimeSeriesParams.MetricType metricType;
+            if (metricsTypeParameter instanceof String str) {
+                metricType = TimeSeriesParams.MetricType.fromString(str);
+            } else {
+                metricType = (TimeSeriesParams.MetricType) metricsTypeParameter;
+            }
             try {
             try {
                 return typeRegistry.fromEs(typeName, metricType);
                 return typeRegistry.fromEs(typeName, metricType);
             } catch (IllegalArgumentException ex) {
             } catch (IllegalArgumentException ex) {

+ 2 - 1
x-pack/plugin/ql/src/test/java/org/elasticsearch/xpack/ql/type/TypesTests.java

@@ -238,7 +238,8 @@ public class TypesTests extends ESTestCase {
     private static Map<String, EsField> loadMapping(DataTypeRegistry registry, InputStream stream, Boolean ordered) {
     private static Map<String, EsField> loadMapping(DataTypeRegistry registry, InputStream stream, Boolean ordered) {
         boolean order = ordered != null ? ordered.booleanValue() : randomBoolean();
         boolean order = ordered != null ? ordered.booleanValue() : randomBoolean();
         try (InputStream in = stream) {
         try (InputStream in = stream) {
-            return Types.fromEs(registry, XContentHelper.convertToMap(JsonXContent.jsonXContent, in, order));
+            Map<String, Object> map = XContentHelper.convertToMap(JsonXContent.jsonXContent, in, order);
+            return Types.fromEs(registry, map);
         } catch (IOException ex) {
         } catch (IOException ex) {
             throw new RuntimeException(ex);
             throw new RuntimeException(ex);
         }
         }

+ 46 - 14
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/40_tsdb.yml

@@ -1,7 +1,7 @@
 setup:
 setup:
   - requires:
   - requires:
-      cluster_features: ["gte_v8.11.0"]
-      reason: "ESQL is available in 8.11+"
+      cluster_features: ["esql.metrics_counter_fields"]
+      reason: "require metrics counter fields"
       test_runner_features: allowed_warnings_regex
       test_runner_features: allowed_warnings_regex
   - do:
   - do:
       indices.create:
       indices.create:
@@ -38,7 +38,7 @@ setup:
                               type: long
                               type: long
                               time_series_metric: counter
                               time_series_metric: counter
                             rx:
                             rx:
-                              type: long
+                              type: integer
                               time_series_metric: counter
                               time_series_metric: counter
   - do:
   - do:
       bulk:
       bulk:
@@ -112,7 +112,6 @@ load everything:
       reason: "_source is available in 8.13+"
       reason: "_source is available in 8.13+"
   - do:
   - do:
       allowed_warnings_regex:
       allowed_warnings_regex:
-        - "Field \\[.*\\] cannot be retrieved, it is unsupported or not indexed; returning null"
         - "No limit defined, adding default limit of \\[.*\\]"
         - "No limit defined, adding default limit of \\[.*\\]"
       esql.query:
       esql.query:
         body:
         body:
@@ -126,9 +125,9 @@ load everything:
   - match: {columns.2.name: "k8s.pod.name"}
   - match: {columns.2.name: "k8s.pod.name"}
   - match: {columns.2.type: "keyword"}
   - match: {columns.2.type: "keyword"}
   - match: {columns.3.name: "k8s.pod.network.rx"}
   - match: {columns.3.name: "k8s.pod.network.rx"}
-  - match: {columns.3.type: "unsupported"}
+  - match: {columns.3.type: "counter_integer"}
   - match: {columns.4.name: "k8s.pod.network.tx"}
   - match: {columns.4.name: "k8s.pod.network.tx"}
-  - match: {columns.4.type: "unsupported"}
+  - match: {columns.4.type: "counter_long"}
   - match: {columns.5.name: "k8s.pod.uid"}
   - match: {columns.5.name: "k8s.pod.uid"}
   - match: {columns.5.type: "keyword"}
   - match: {columns.5.type: "keyword"}
   - match: {columns.6.name: "metricset"}
   - match: {columns.6.name: "metricset"}
@@ -139,7 +138,6 @@ load everything:
 load a document:
 load a document:
   - do:
   - do:
       allowed_warnings_regex:
       allowed_warnings_regex:
-        - "Field \\[.*\\] cannot be retrieved, it is unsupported or not indexed; returning null"
         - "No limit defined, adding default limit of \\[.*\\]"
         - "No limit defined, adding default limit of \\[.*\\]"
       esql.query:
       esql.query:
         body:
         body:
@@ -151,25 +149,60 @@ load a document:
   - match: {values.0.0: "2021-04-28T18:50:23.142Z"}
   - match: {values.0.0: "2021-04-28T18:50:23.142Z"}
   - match: {values.0.1: "10.10.55.3"}
   - match: {values.0.1: "10.10.55.3"}
   - match: {values.0.2: "dog"}
   - match: {values.0.2: "dog"}
-  - match: {values.0.3: null }
-  - match: {values.0.4: null }
+  - match: {values.0.3: 530600088 }
+  - match: {values.0.4: 1434577921 }
   - match: {values.0.5: "df3145b3-0563-4d3b-a0f7-897eb2876ea9"}
   - match: {values.0.5: "df3145b3-0563-4d3b-a0f7-897eb2876ea9"}
   - match: {values.0.6: "pod"}
   - match: {values.0.6: "pod"}
 
 
 ---
 ---
-filter on counter:
+filter on counter without cast:
   - do:
   - do:
-      catch: /Cannot use field \[k8s.pod.network.tx\] with unsupported type \[counter\]/
+      catch: bad_request
       esql.query:
       esql.query:
         body:
         body:
           query: 'from test | where k8s.pod.network.tx == 1434577921'
           query: 'from test | where k8s.pod.network.tx == 1434577921'
           version: 2024.04.01
           version: 2024.04.01
 
 
+---
+cast counter then filter:
+  - do:
+      esql.query:
+        body:
+          query: 'from test | where k8s.pod.network.tx::long == 2005177954 and k8s.pod.network.rx::integer == 801479970 | sort @timestamp | limit 10'
+          version: 2024.04.01
+  - length: {values: 1}
+  - length: {values.0: 7}
+  - match: {values.0.0: "2021-04-28T18:50:24.467Z"}
+  - match: {values.0.1: "10.10.55.1"}
+  - match: {values.0.2: "cat"}
+  - match: {values.0.3: 801479970 }
+  - match: {values.0.4: 2005177954 }
+  - match: {values.0.5: "947e4ced-1786-4e53-9e0c-5c447e959507"}
+  - match: {values.0.6: "pod"}
+
+---
+sort on counter without cast:
+  - do:
+      catch: bad_request
+      esql.query:
+        body:
+          query: 'from test |  KEEP k8s.pod.network.tx | sort @k8s.pod.network.tx | limit 1'
+          version: 2024.04.01
+
+---
+cast then sort on counter:
+  - do:
+      esql.query:
+        body:
+          query: 'from test | KEEP k8s.pod.network.tx | EVAL tx=to_long(k8s.pod.network.tx) | sort tx | limit 1'
+          version: 2024.04.01
+  - length: {values: 1}
+  - match: {values.0.0: 1434521831 }
+
 ---
 ---
 from doc with aggregate_metric_double:
 from doc with aggregate_metric_double:
   - do:
   - do:
       allowed_warnings_regex:
       allowed_warnings_regex:
-        - "Field \\[.*\\] cannot be retrieved, it is unsupported or not indexed; returning null"
         - "No limit defined, adding default limit of \\[.*\\]"
         - "No limit defined, adding default limit of \\[.*\\]"
       esql.query:
       esql.query:
         body:
         body:
@@ -201,7 +234,6 @@ stats on aggregate_metric_double:
 from index pattern unsupported counter:
 from index pattern unsupported counter:
   - do:
   - do:
       allowed_warnings_regex:
       allowed_warnings_regex:
-        - "Field \\[.*\\] cannot be retrieved, it is unsupported or not indexed; returning null"
         - "No limit defined, adding default limit of \\[.*\\]"
         - "No limit defined, adding default limit of \\[.*\\]"
       esql.query:
       esql.query:
         body:
         body:
@@ -219,7 +251,7 @@ from index pattern unsupported counter:
   - match: {columns.4.name: "k8s.pod.name"}
   - match: {columns.4.name: "k8s.pod.name"}
   - match: {columns.4.type: "keyword"}
   - match: {columns.4.type: "keyword"}
   - match: {columns.5.name: "k8s.pod.network.rx"}
   - match: {columns.5.name: "k8s.pod.network.rx"}
-  - match: {columns.5.type: "unsupported"}
+  - match: {columns.5.type: "counter_integer"}
   - match: {columns.6.name: "k8s.pod.network.tx"}
   - match: {columns.6.name: "k8s.pod.network.tx"}
   - match: {columns.6.type: "unsupported"}
   - match: {columns.6.type: "unsupported"}
   - match: {columns.7.name: "k8s.pod.uid"}
   - match: {columns.7.name: "k8s.pod.uid"}